use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::offload::sweep_expired;
#[allow(clippy::cast_sign_loss)]
pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(crate::SECS_PER_DAY as u64);
pub const MAX_PER_RUN: usize = 1000;
pub const SLEEP_BETWEEN_DELETES: Duration = Duration::from_millis(10);
#[must_use]
pub fn spawn<T>(state: Arc<Mutex<T>>, interval: Duration) -> JoinHandle<()>
where
T: SweepAdapter + Send + 'static,
{
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let now_unix = chrono::Utc::now().timestamp();
let lock = state.lock().await;
match lock.run_sweep(now_unix) {
Ok(0) => {}
Ok(n) => tracing::info!(
target: "offload.ttl_sweep",
"TTL sweep removed {n} expired offloaded blob(s)"
),
Err(e) => tracing::warn!(
target: "offload.ttl_sweep",
"TTL sweep failed: {e}"
),
}
}
})
}
pub trait SweepAdapter {
fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize>;
}
impl SweepAdapter
for (
rusqlite::Connection,
std::path::PathBuf,
crate::config::ResolvedTtl,
bool,
)
{
fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize> {
sweep_expired(&self.0, now_unix, MAX_PER_RUN, SLEEP_BETWEEN_DELETES)
}
}
#[cfg(test)]
mod tests {
use super::*;
struct ConnAdapter(rusqlite::Connection);
impl SweepAdapter for ConnAdapter {
fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize> {
sweep_expired(&self.0, now_unix, MAX_PER_RUN, Duration::ZERO)
}
}
#[test]
fn run_sweep_is_idempotent_on_empty_table() {
let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
let adapter = ConnAdapter(conn);
let n = adapter.run_sweep(0).unwrap();
assert_eq!(n, 0);
let n2 = adapter.run_sweep(0).unwrap();
assert_eq!(n2, 0);
}
#[test]
fn run_sweep_removes_expired_row() {
let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
let off = crate::offload::ContextOffloader::new(
&conn,
None,
crate::offload::OffloadConfig::default(),
);
let r = off.offload("expiring", "ns", Some(1), "ai:alice").unwrap();
let adapter = ConnAdapter(conn);
let n = adapter.run_sweep(r.stored_at + 60).unwrap();
assert_eq!(n, 1);
}
#[test]
fn tunable_defaults_are_documented_values() {
assert_eq!(
DEFAULT_INTERVAL,
Duration::from_secs(crate::SECS_PER_DAY as u64)
);
assert_eq!(MAX_PER_RUN, 1000);
assert_eq!(SLEEP_BETWEEN_DELETES, Duration::from_millis(10));
}
struct ErrAdapter;
impl SweepAdapter for ErrAdapter {
fn run_sweep(&self, _now_unix: i64) -> anyhow::Result<usize> {
anyhow::bail!("synthetic sweep failure")
}
}
struct CountingAdapter {
calls: std::sync::atomic::AtomicUsize,
}
impl SweepAdapter for CountingAdapter {
fn run_sweep(&self, _now_unix: i64) -> anyhow::Result<usize> {
let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(usize::from(n == 0))
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn spawn_loop_drives_run_sweep_across_arms() {
let adapter = Arc::new(Mutex::new(CountingAdapter {
calls: std::sync::atomic::AtomicUsize::new(0),
}));
let handle = spawn(Arc::clone(&adapter), Duration::from_millis(1));
for _ in 0..5 {
tokio::time::advance(Duration::from_millis(1)).await;
tokio::task::yield_now().await;
}
handle.abort();
let _ = handle.await;
assert!(
adapter
.lock()
.await
.calls
.load(std::sync::atomic::Ordering::SeqCst)
>= 2,
"spawn loop should have ticked run_sweep at least twice"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn spawn_loop_tolerates_sweep_errors() {
let handle = spawn(Arc::new(Mutex::new(ErrAdapter)), Duration::from_millis(1));
for _ in 0..3 {
tokio::time::advance(Duration::from_millis(1)).await;
tokio::task::yield_now().await;
}
assert!(!handle.is_finished());
handle.abort();
let _ = handle.await;
}
#[test]
fn production_tuple_adapter_runs_sweep_expired() {
let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
let off = crate::offload::ContextOffloader::new(
&conn,
None,
crate::offload::OffloadConfig::default(),
);
let r = off.offload("expiring2", "ns", Some(1), "ai:bob").unwrap();
let tuple = (
conn,
std::path::PathBuf::from(":memory:"),
crate::config::ResolvedTtl::default(),
true,
);
assert_eq!(tuple.run_sweep(r.stored_at).unwrap(), 0);
assert_eq!(tuple.run_sweep(r.stored_at + 60).unwrap(), 1);
}
}