use std::sync::Arc;
use std::time::Duration;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use super::metadata::EventMetadata;
use super::store::EventStore;
pub const DEFAULT_RETENTION_DAYS: u64 = 90;
pub const DEFAULT_MAX_ROWS: usize = 100_000;
pub const DEFAULT_SWEEP_INTERVAL_SECS: u64 = 6 * 3600;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SweepConfig {
pub retention_days: u64,
pub max_rows: usize,
pub interval_secs: u64,
}
impl Default for SweepConfig {
fn default() -> Self {
Self {
retention_days: DEFAULT_RETENTION_DAYS,
max_rows: DEFAULT_MAX_ROWS,
interval_secs: DEFAULT_SWEEP_INTERVAL_SECS,
}
}
}
pub struct SweepHandle {
pub task: JoinHandle<()>,
pub shutdown: CancellationToken,
}
pub fn spawn_sweep_loop<T>(store: Arc<EventStore<T>>, cfg: SweepConfig) -> SweepHandle
where
T: EventMetadata + Serialize + DeserializeOwned + Send + Sync + 'static,
{
let shutdown = CancellationToken::new();
let task = tokio::spawn(sweep_loop(Arc::clone(&store), cfg, shutdown.clone()));
SweepHandle { task, shutdown }
}
async fn sweep_loop<T>(store: Arc<EventStore<T>>, cfg: SweepConfig, shutdown: CancellationToken)
where
T: EventMetadata + Serialize + DeserializeOwned + Send + Sync + 'static,
{
match store
.sweep_retention(cfg.retention_days, cfg.max_rows)
.await
{
Ok(n) if n > 0 => {
tracing::info!(deleted = n, "events sweep (eager) ran");
}
Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e, "events sweep (eager) failed");
}
}
if cfg.interval_secs == 0 {
return;
}
let mut tick = tokio::time::interval(Duration::from_secs(cfg.interval_secs));
tick.tick().await;
loop {
tokio::select! {
_ = shutdown.cancelled() => break,
_ = tick.tick() => {
match store.sweep_retention(cfg.retention_days, cfg.max_rows).await {
Ok(n) => tracing::info!(deleted = n, "events sweep ran"),
Err(e) => tracing::warn!(error = %e, "events sweep failed"),
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::store::{EventStore, ListFilter, DEFAULT_TABLE};
use nexo_tool_meta::admin::agent_events::{AgentEventKind, TranscriptRole};
use uuid::Uuid;
#[tokio::test]
async fn sweep_loop_eagerly_drains_on_spawn() {
let store = Arc::new(EventStore::open_memory(DEFAULT_TABLE).await.unwrap());
let now_ms = chrono::Utc::now().timestamp_millis() as u64;
let day_ms: u64 = 86_400 * 1000;
for i in 0..3u64 {
let evt = AgentEventKind::TranscriptAppended {
agent_id: "ana".into(),
session_id: Uuid::nil(),
seq: i,
role: TranscriptRole::User,
body: "old".into(),
sent_at_ms: now_ms - (15 + i) * day_ms,
sender_id: None,
source_plugin: "whatsapp".into(),
tenant_id: None,
};
store.append(&evt).await.unwrap();
}
let fresh = AgentEventKind::TranscriptAppended {
agent_id: "ana".into(),
session_id: Uuid::nil(),
seq: 3,
role: TranscriptRole::User,
body: "fresh".into(),
sent_at_ms: now_ms - 1_000,
sender_id: None,
source_plugin: "whatsapp".into(),
tenant_id: None,
};
store.append(&fresh).await.unwrap();
let cfg = SweepConfig {
retention_days: 10,
max_rows: 1_000_000,
interval_secs: 0,
};
let handle = spawn_sweep_loop(Arc::clone(&store), cfg);
handle.task.await.unwrap();
let after = store
.list(&ListFilter {
limit: 100,
..Default::default()
})
.await
.unwrap();
assert_eq!(after.len(), 1, "eager sweep should drain 3 old rows");
}
}