use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use ppoppo_clock::ArcClock;
use ppoppo_clock::native::WallClock;
use tokio::sync::RwLock;
use super::Cache;
pub struct InProcessTtlCache {
inner: RwLock<HashMap<String, (i64, i64)>>,
ttl: Duration,
clock: ArcClock,
}
impl InProcessTtlCache {
#[must_use]
pub fn new(ttl: Duration) -> Self {
Self {
inner: RwLock::new(HashMap::new()),
ttl,
clock: Arc::new(WallClock),
}
}
#[must_use]
pub fn with_clock(mut self, clock: ArcClock) -> Self {
self.clock = clock;
self
}
}
impl std::fmt::Debug for InProcessTtlCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InProcessTtlCache")
.field("ttl", &self.ttl)
.finish_non_exhaustive()
}
}
#[async_trait]
impl Cache for InProcessTtlCache {
async fn get(&self, key: &str) -> Option<i64> {
let now_ms = self.clock.now_unix_millis();
let ttl_ms = self.ttl.as_millis() as i64;
{
let read = self.inner.read().await;
if let Some((sv, inserted_at_ms)) = read.get(key) {
if now_ms - inserted_at_ms < ttl_ms {
return Some(*sv);
}
} else {
return None;
}
}
let mut write = self.inner.write().await;
if let Some((_, inserted_at_ms)) = write.get(key) {
if now_ms - inserted_at_ms >= ttl_ms {
write.remove(key);
}
}
None
}
async fn set(&self, key: &str, sv: i64, _ttl: Duration) {
let mut write = self.inner.write().await;
write.insert(key.to_string(), (sv, self.clock.now_unix_millis()));
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[tokio::test]
async fn miss_returns_none() {
let cache = InProcessTtlCache::new(Duration::from_secs(60));
assert_eq!(cache.get("sv:nonexistent").await, None);
}
#[tokio::test]
async fn hit_after_set() {
let cache = InProcessTtlCache::new(Duration::from_secs(60));
cache.set("sv:abc", 7, Duration::from_secs(60)).await;
assert_eq!(cache.get("sv:abc").await, Some(7));
}
#[tokio::test]
async fn expired_entry_evicts_on_access() {
let cache = InProcessTtlCache::new(Duration::from_millis(10));
cache.set("sv:abc", 5, Duration::from_millis(10)).await;
assert_eq!(cache.get("sv:abc").await, Some(5));
tokio::time::sleep(Duration::from_millis(25)).await;
assert_eq!(cache.get("sv:abc").await, None);
cache.set("sv:abc", 11, Duration::from_secs(60)).await;
assert_eq!(cache.get("sv:abc").await, Some(11));
}
#[tokio::test]
async fn set_overwrites_existing_entry() {
let cache = InProcessTtlCache::new(Duration::from_secs(60));
cache.set("sv:abc", 7, Duration::from_secs(60)).await;
cache.set("sv:abc", 8, Duration::from_secs(60)).await;
assert_eq!(cache.get("sv:abc").await, Some(8));
}
#[tokio::test]
async fn distinct_keys_dont_collide() {
let cache = InProcessTtlCache::new(Duration::from_secs(60));
cache.set("sv:alpha", 1, Duration::from_secs(60)).await;
cache.set("sv:beta", 2, Duration::from_secs(60)).await;
assert_eq!(cache.get("sv:alpha").await, Some(1));
assert_eq!(cache.get("sv:beta").await, Some(2));
}
}