use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use super::audit::FoldAuditSink;
use super::state::{EntryTransition, FoldIndex, FoldState};
use super::FoldKind;
use super::FoldMetrics;
pub const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_millis(500);
const SWEEP_CHUNK_SIZE: usize = 1024;
pub(super) fn sweep_expired<K: FoldKind>(
state_lock: &RwLock<FoldState<K>>,
index_lock: &RwLock<K::Index>,
metrics: &FoldMetrics,
audit_sink: Option<&Arc<dyn FoldAuditSink>>,
) -> usize {
let now = Instant::now();
let mut total_evicted = 0usize;
loop {
let candidates: Vec<K::Key> = {
let state = state_lock.read();
state
.entries
.iter()
.filter(|(_, e)| e.expires_at <= now)
.map(|(k, _)| k.clone())
.take(SWEEP_CHUNK_SIZE)
.collect()
};
if candidates.is_empty() {
return total_evicted;
}
let mut state = state_lock.write();
let mut index = index_lock.write();
for key in candidates {
let still_expired = state
.entries
.get(&key)
.map(|e| e.expires_at <= now)
.unwrap_or(false);
if !still_expired {
continue;
}
let Some(old_entry) = state.entries.remove(&key) else {
continue;
};
if let Some(keys) = state.by_node.get_mut(&old_entry.node_id) {
keys.remove(&key);
if keys.is_empty() {
state.by_node.remove(&old_entry.node_id);
}
}
index.on_remove(&key, &old_entry.payload);
if let Some(sink) = audit_sink {
let transition = EntryTransition::Expired {
key: &key,
old: &old_entry,
};
if let Some(event) = K::audit_event(transition) {
sink.record(event);
}
}
metrics.on_expire();
total_evicted += 1;
}
}
}
pub(super) fn spawn_expiry_task<K: FoldKind>(
state: Weak<RwLock<FoldState<K>>>,
index: Weak<RwLock<K::Index>>,
metrics: Weak<FoldMetrics>,
audit_sink: Weak<parking_lot::RwLock<Option<Arc<dyn FoldAuditSink>>>>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
ticker.tick().await;
let (Some(state), Some(index), Some(metrics)) =
(state.upgrade(), index.upgrade(), metrics.upgrade())
else {
break;
};
let sink_holder = audit_sink.upgrade();
let sink_guard = sink_holder.as_ref().map(|h| h.read());
let sink_ref = sink_guard.as_ref().and_then(|g| g.as_ref());
sweep_expired::<K>(&state, &index, &metrics, sink_ref);
}
})
}