Skip to main content

ai_memory/background/
offload_ttl_sweep.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 QW-3 — daily TTL sweep for `offloaded_blobs`.
5//!
6//! The sweep removes rows where `stored_at + ttl_seconds < now`,
7//! bounded at [`MAX_PER_RUN`] deletions per pass with a [`SLEEP_BETWEEN_DELETES`]
8//! gap between deletes so the connection lock window stays short
9//! under contended writes (matches the K2 pending-actions sweeper
10//! discipline).
11//!
12//! Spawned by `daemon_runtime::bootstrap_serve` alongside the GC and
13//! transcript-lifecycle loops; aborted on shutdown.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::sync::Mutex;
19use tokio::task::JoinHandle;
20
21use crate::offload::sweep_expired;
22
23/// Cadence between sweeps. Daily — matches the prompt's "daily task"
24/// directive. Operators that want a shorter cadence (testing,
25/// disaster-recovery exercises) call [`spawn`] with an override.
26/// Sourced from the substrate-wide `SECS_PER_DAY` SSOT
27/// (`src/lib.rs`) so the canonical time-window magnitudes are pinned
28/// in one place and the vendor-literals gate's SECS_PER_*
29/// drift-block recipe applies here automatically.
30#[allow(clippy::cast_sign_loss)]
31pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(crate::SECS_PER_DAY as u64);
32
33/// Maximum number of rows deleted per sweep pass. 1000 keeps the
34/// outer loop bounded in pathological backlog scenarios (a thousand
35/// rows times the 10 ms sleep = 10 seconds wall clock, well under
36/// the daily cadence).
37pub const MAX_PER_RUN: usize = 1000;
38
39/// Sleep between consecutive deletes. 10 ms lets concurrent writes
40/// land between the SELECT-then-DELETE pairs that make up the sweep
41/// body so the connection mutex isn't held for the whole pass.
42pub const SLEEP_BETWEEN_DELETES: Duration = Duration::from_millis(10);
43
44/// Spawn the daily sweep loop. Returns a [`JoinHandle`] the caller
45/// aborts on shutdown.
46///
47/// The state lock is held only for the duration of each pass; the
48/// in-pass `std::thread::sleep` between deletes happens INSIDE that
49/// lock window, which is acceptable because the sweep is a single
50/// background thread and not a hot data-plane path. v0.8.0 may
51/// move the per-row sleep outside the lock if the offload write
52/// volume grows.
53#[must_use]
54pub fn spawn<T>(state: Arc<Mutex<T>>, interval: Duration) -> JoinHandle<()>
55where
56    T: SweepAdapter + Send + 'static,
57{
58    tokio::spawn(async move {
59        loop {
60            tokio::time::sleep(interval).await;
61            let now_unix = chrono::Utc::now().timestamp();
62            let lock = state.lock().await;
63            match lock.run_sweep(now_unix) {
64                Ok(0) => {}
65                Ok(n) => tracing::info!(
66                    target: "offload.ttl_sweep",
67                    "TTL sweep removed {n} expired offloaded blob(s)"
68                ),
69                Err(e) => tracing::warn!(
70                    target: "offload.ttl_sweep",
71                    "TTL sweep failed: {e}"
72                ),
73            }
74        }
75    })
76}
77
78/// Trait wrapping the daemon's `(Connection, ...)` tuple so the
79/// sweep is testable without depending on the full daemon state
80/// shape. The concrete production `Db` (alias for the daemon's
81/// `(Connection, PathBuf, ResolvedTtl, bool)` tuple) implements
82/// this via the blanket impl below.
83pub trait SweepAdapter {
84    fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize>;
85}
86
87impl SweepAdapter
88    for (
89        rusqlite::Connection,
90        std::path::PathBuf,
91        crate::config::ResolvedTtl,
92        bool,
93    )
94{
95    fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize> {
96        sweep_expired(&self.0, now_unix, MAX_PER_RUN, SLEEP_BETWEEN_DELETES)
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    /// Minimal test adapter — uses a bare connection so the sweep
105    /// surface is exercised end-to-end without standing up the full
106    /// daemon `Db` shape.
107    struct ConnAdapter(rusqlite::Connection);
108    impl SweepAdapter for ConnAdapter {
109        fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize> {
110            sweep_expired(&self.0, now_unix, MAX_PER_RUN, Duration::ZERO)
111        }
112    }
113
114    #[test]
115    fn run_sweep_is_idempotent_on_empty_table() {
116        let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
117        let adapter = ConnAdapter(conn);
118        let n = adapter.run_sweep(0).unwrap();
119        assert_eq!(n, 0);
120        let n2 = adapter.run_sweep(0).unwrap();
121        assert_eq!(n2, 0);
122    }
123
124    #[test]
125    fn run_sweep_removes_expired_row() {
126        let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
127        let off = crate::offload::ContextOffloader::new(
128            &conn,
129            None,
130            crate::offload::OffloadConfig::default(),
131        );
132        let r = off.offload("expiring", "ns", Some(1), "ai:alice").unwrap();
133        let adapter = ConnAdapter(conn);
134        // Sweep at stored_at + 60s to guarantee expiry.
135        let n = adapter.run_sweep(r.stored_at + 60).unwrap();
136        assert_eq!(n, 1);
137    }
138
139    /// Default-const sanity: the cadence + per-run + per-delete tunables
140    /// resolve to the documented values so a drift in the SECS_PER_DAY
141    /// SSOT or the magic-number consts trips here.
142    #[test]
143    fn tunable_defaults_are_documented_values() {
144        assert_eq!(
145            DEFAULT_INTERVAL,
146            Duration::from_secs(crate::SECS_PER_DAY as u64)
147        );
148        assert_eq!(MAX_PER_RUN, 1000);
149        assert_eq!(SLEEP_BETWEEN_DELETES, Duration::from_millis(10));
150    }
151
152    /// Adapter whose `run_sweep` always errors — exercises the `Err`
153    /// arm of the [`spawn`] loop's `match` (the `tracing::warn!` path).
154    struct ErrAdapter;
155    impl SweepAdapter for ErrAdapter {
156        fn run_sweep(&self, _now_unix: i64) -> anyhow::Result<usize> {
157            anyhow::bail!("synthetic sweep failure")
158        }
159    }
160
161    /// Adapter that reports a deletion count so the `Ok(n)` info-log arm
162    /// of the loop fires, and counts how many times the loop ticked so
163    /// the test can prove the spawned task actually ran the body.
164    struct CountingAdapter {
165        calls: std::sync::atomic::AtomicUsize,
166    }
167    impl SweepAdapter for CountingAdapter {
168        fn run_sweep(&self, _now_unix: i64) -> anyhow::Result<usize> {
169            // Report a non-zero deletion on the first tick (Ok(n) arm),
170            // zero thereafter (Ok(0) arm) — both loop arms get covered.
171            let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
172            Ok(usize::from(n == 0))
173        }
174    }
175
176    /// Drive the real [`spawn`] tokio loop with a sub-millisecond
177    /// interval so the loop body (sleep → lock → run_sweep → match arms)
178    /// executes several times, then abort the handle. Covers the
179    /// `Ok(0)`, `Ok(n)`, and loop-cadence lines.
180    #[tokio::test(flavor = "current_thread", start_paused = true)]
181    async fn spawn_loop_drives_run_sweep_across_arms() {
182        let adapter = Arc::new(Mutex::new(CountingAdapter {
183            calls: std::sync::atomic::AtomicUsize::new(0),
184        }));
185        let handle = spawn(Arc::clone(&adapter), Duration::from_millis(1));
186        // With paused time we advance the clock deterministically so the
187        // loop wakes for several iterations.
188        for _ in 0..5 {
189            tokio::time::advance(Duration::from_millis(1)).await;
190            tokio::task::yield_now().await;
191        }
192        handle.abort();
193        let _ = handle.await;
194        assert!(
195            adapter
196                .lock()
197                .await
198                .calls
199                .load(std::sync::atomic::Ordering::SeqCst)
200                >= 2,
201            "spawn loop should have ticked run_sweep at least twice"
202        );
203    }
204
205    /// Drive the [`spawn`] loop against an always-erroring adapter so the
206    /// `Err(e)` warn-log arm of the loop `match` is exercised.
207    #[tokio::test(flavor = "current_thread", start_paused = true)]
208    async fn spawn_loop_tolerates_sweep_errors() {
209        let handle = spawn(Arc::new(Mutex::new(ErrAdapter)), Duration::from_millis(1));
210        for _ in 0..3 {
211            tokio::time::advance(Duration::from_millis(1)).await;
212            tokio::task::yield_now().await;
213        }
214        // The loop must keep running (not panic) through the error arm.
215        assert!(!handle.is_finished());
216        handle.abort();
217        let _ = handle.await;
218    }
219
220    /// The production blanket `impl SweepAdapter for (Connection, …)`
221    /// tuple delegates to [`sweep_expired`]. Drive it end-to-end so the
222    /// blanket impl body (lines 95-97) is covered, not just the test
223    /// `ConnAdapter`.
224    #[test]
225    fn production_tuple_adapter_runs_sweep_expired() {
226        let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
227        let off = crate::offload::ContextOffloader::new(
228            &conn,
229            None,
230            crate::offload::OffloadConfig::default(),
231        );
232        let r = off.offload("expiring2", "ns", Some(1), "ai:bob").unwrap();
233        let tuple = (
234            conn,
235            std::path::PathBuf::from(":memory:"),
236            crate::config::ResolvedTtl::default(),
237            true,
238        );
239        // Before expiry: nothing removed.
240        assert_eq!(tuple.run_sweep(r.stored_at).unwrap(), 0);
241        // After expiry: the row is swept.
242        assert_eq!(tuple.run_sweep(r.stored_at + 60).unwrap(), 1);
243    }
244}