use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use awaken_ext_observability::trace_store::TraceStore;
#[derive(Debug, Clone)]
pub struct RetentionConfig {
pub ttl: Duration,
pub interval: Duration,
}
impl Default for RetentionConfig {
fn default() -> Self {
Self {
ttl: Duration::from_secs(7 * 24 * 60 * 60),
interval: Duration::from_secs(24 * 60 * 60),
}
}
}
type TickRequest = Option<tokio::sync::oneshot::Sender<()>>;
pub struct RetentionHandle {
trigger: tokio::sync::mpsc::Sender<TickRequest>,
}
impl RetentionHandle {
#[doc(hidden)]
pub async fn tick_now(&self) {
let _ = self.trigger.send(None).await;
}
#[doc(hidden)]
pub async fn tick_now_and_wait(&self) {
let (tx, rx) = tokio::sync::oneshot::channel();
if self.trigger.send(Some(tx)).await.is_ok() {
let _ = rx.await;
}
}
}
pub fn spawn_retention_loop(
store: Arc<dyn TraceStore>,
config: RetentionConfig,
) -> RetentionHandle {
let (tx, mut rx) = tokio::sync::mpsc::channel::<TickRequest>(8);
tokio::spawn(async move {
let mut interval = tokio::time::interval(config.interval);
let referenced: HashSet<String> = HashSet::new();
loop {
let reply: Option<tokio::sync::oneshot::Sender<()>> = tokio::select! {
_ = interval.tick() => None,
msg = rx.recv() => match msg {
Some(reply) => reply,
None => break,
},
};
let cutoff = SystemTime::now()
.checked_sub(config.ttl)
.unwrap_or(SystemTime::UNIX_EPOCH);
if let Err(e) = store.prune(cutoff, &referenced) {
tracing::warn!(error = %e, "TraceStore prune failed");
}
if let Some(reply) = reply {
let _ = reply.send(());
}
}
});
RetentionHandle { trigger: tx }
}