ai_memory/background/offload_ttl_sweep.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 QW-3 — daily TTL sweep for `offloaded_blobs`.
5//!
6//! The sweep removes rows where `stored_at + ttl_seconds < now`,
7//! bounded at [`MAX_PER_RUN`] deletions per pass with a [`SLEEP_BETWEEN_DELETES`]
8//! gap between deletes so the connection lock window stays short
9//! under contended writes (matches the K2 pending-actions sweeper
10//! discipline).
11//!
12//! Spawned by `daemon_runtime::bootstrap_serve` alongside the GC and
13//! transcript-lifecycle loops; aborted on shutdown.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::sync::Mutex;
19use tokio::task::JoinHandle;
20
21use crate::offload::sweep_expired;
22
23/// Cadence between sweeps. Daily — matches the prompt's "daily task"
24/// directive. Operators that want a shorter cadence (testing,
25/// disaster-recovery exercises) call [`spawn`] with an override.
26/// Sourced from the substrate-wide `SECS_PER_DAY` SSOT
27/// (`src/lib.rs`) so the canonical time-window magnitudes are pinned
28/// in one place and the vendor-literals gate's SECS_PER_*
29/// drift-block recipe applies here automatically.
30#[allow(clippy::cast_sign_loss)]
31pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(crate::SECS_PER_DAY as u64);
32
33/// Maximum number of rows deleted per sweep pass. 1000 keeps the
34/// outer loop bounded in pathological backlog scenarios (a thousand
35/// rows times the 10 ms sleep = 10 seconds wall clock, well under
36/// the daily cadence).
37pub const MAX_PER_RUN: usize = 1000;
38
39/// Sleep between consecutive deletes. 10 ms lets concurrent writes
40/// land between the SELECT-then-DELETE pairs that make up the sweep
41/// body so the connection mutex isn't held for the whole pass.
42pub const SLEEP_BETWEEN_DELETES: Duration = Duration::from_millis(10);
43
44/// Spawn the daily sweep loop. Returns a [`JoinHandle`] the caller
45/// aborts on shutdown.
46///
47/// The state lock is held only for the duration of each pass; the
48/// in-pass `std::thread::sleep` between deletes happens INSIDE that
49/// lock window, which is acceptable because the sweep is a single
50/// background thread and not a hot data-plane path. v0.8.0 may
51/// move the per-row sleep outside the lock if the offload write
52/// volume grows.
53#[must_use]
54pub fn spawn<T>(state: Arc<Mutex<T>>, interval: Duration) -> JoinHandle<()>
55where
56 T: SweepAdapter + Send + 'static,
57{
58 tokio::spawn(async move {
59 loop {
60 tokio::time::sleep(interval).await;
61 let now_unix = chrono::Utc::now().timestamp();
62 let lock = state.lock().await;
63 match lock.run_sweep(now_unix) {
64 Ok(0) => {}
65 Ok(n) => tracing::info!(
66 target: "offload.ttl_sweep",
67 "TTL sweep removed {n} expired offloaded blob(s)"
68 ),
69 Err(e) => tracing::warn!(
70 target: "offload.ttl_sweep",
71 "TTL sweep failed: {e}"
72 ),
73 }
74 }
75 })
76}
77
78/// Trait wrapping the daemon's `(Connection, ...)` tuple so the
79/// sweep is testable without depending on the full daemon state
80/// shape. The concrete production `Db` (alias for the daemon's
81/// `(Connection, PathBuf, ResolvedTtl, bool)` tuple) implements
82/// this via the blanket impl below.
83pub trait SweepAdapter {
84 fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize>;
85}
86
87impl SweepAdapter
88 for (
89 rusqlite::Connection,
90 std::path::PathBuf,
91 crate::config::ResolvedTtl,
92 bool,
93 )
94{
95 fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize> {
96 sweep_expired(&self.0, now_unix, MAX_PER_RUN, SLEEP_BETWEEN_DELETES)
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 /// Minimal test adapter — uses a bare connection so the sweep
105 /// surface is exercised end-to-end without standing up the full
106 /// daemon `Db` shape.
107 struct ConnAdapter(rusqlite::Connection);
108 impl SweepAdapter for ConnAdapter {
109 fn run_sweep(&self, now_unix: i64) -> anyhow::Result<usize> {
110 sweep_expired(&self.0, now_unix, MAX_PER_RUN, Duration::ZERO)
111 }
112 }
113
114 #[test]
115 fn run_sweep_is_idempotent_on_empty_table() {
116 let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
117 let adapter = ConnAdapter(conn);
118 let n = adapter.run_sweep(0).unwrap();
119 assert_eq!(n, 0);
120 let n2 = adapter.run_sweep(0).unwrap();
121 assert_eq!(n2, 0);
122 }
123
124 #[test]
125 fn run_sweep_removes_expired_row() {
126 let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
127 let off = crate::offload::ContextOffloader::new(
128 &conn,
129 None,
130 crate::offload::OffloadConfig::default(),
131 );
132 let r = off.offload("expiring", "ns", Some(1), "ai:alice").unwrap();
133 let adapter = ConnAdapter(conn);
134 // Sweep at stored_at + 60s to guarantee expiry.
135 let n = adapter.run_sweep(r.stored_at + 60).unwrap();
136 assert_eq!(n, 1);
137 }
138
139 /// Default-const sanity: the cadence + per-run + per-delete tunables
140 /// resolve to the documented values so a drift in the SECS_PER_DAY
141 /// SSOT or the magic-number consts trips here.
142 #[test]
143 fn tunable_defaults_are_documented_values() {
144 assert_eq!(
145 DEFAULT_INTERVAL,
146 Duration::from_secs(crate::SECS_PER_DAY as u64)
147 );
148 assert_eq!(MAX_PER_RUN, 1000);
149 assert_eq!(SLEEP_BETWEEN_DELETES, Duration::from_millis(10));
150 }
151
152 /// Adapter whose `run_sweep` always errors — exercises the `Err`
153 /// arm of the [`spawn`] loop's `match` (the `tracing::warn!` path).
154 struct ErrAdapter;
155 impl SweepAdapter for ErrAdapter {
156 fn run_sweep(&self, _now_unix: i64) -> anyhow::Result<usize> {
157 anyhow::bail!("synthetic sweep failure")
158 }
159 }
160
161 /// Adapter that reports a deletion count so the `Ok(n)` info-log arm
162 /// of the loop fires, and counts how many times the loop ticked so
163 /// the test can prove the spawned task actually ran the body.
164 struct CountingAdapter {
165 calls: std::sync::atomic::AtomicUsize,
166 }
167 impl SweepAdapter for CountingAdapter {
168 fn run_sweep(&self, _now_unix: i64) -> anyhow::Result<usize> {
169 // Report a non-zero deletion on the first tick (Ok(n) arm),
170 // zero thereafter (Ok(0) arm) — both loop arms get covered.
171 let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
172 Ok(usize::from(n == 0))
173 }
174 }
175
176 /// Drive the real [`spawn`] tokio loop with a sub-millisecond
177 /// interval so the loop body (sleep → lock → run_sweep → match arms)
178 /// executes several times, then abort the handle. Covers the
179 /// `Ok(0)`, `Ok(n)`, and loop-cadence lines.
180 #[tokio::test(flavor = "current_thread", start_paused = true)]
181 async fn spawn_loop_drives_run_sweep_across_arms() {
182 let adapter = Arc::new(Mutex::new(CountingAdapter {
183 calls: std::sync::atomic::AtomicUsize::new(0),
184 }));
185 let handle = spawn(Arc::clone(&adapter), Duration::from_millis(1));
186 // With paused time we advance the clock deterministically so the
187 // loop wakes for several iterations.
188 for _ in 0..5 {
189 tokio::time::advance(Duration::from_millis(1)).await;
190 tokio::task::yield_now().await;
191 }
192 handle.abort();
193 let _ = handle.await;
194 assert!(
195 adapter
196 .lock()
197 .await
198 .calls
199 .load(std::sync::atomic::Ordering::SeqCst)
200 >= 2,
201 "spawn loop should have ticked run_sweep at least twice"
202 );
203 }
204
205 /// Drive the [`spawn`] loop against an always-erroring adapter so the
206 /// `Err(e)` warn-log arm of the loop `match` is exercised.
207 #[tokio::test(flavor = "current_thread", start_paused = true)]
208 async fn spawn_loop_tolerates_sweep_errors() {
209 let handle = spawn(Arc::new(Mutex::new(ErrAdapter)), Duration::from_millis(1));
210 for _ in 0..3 {
211 tokio::time::advance(Duration::from_millis(1)).await;
212 tokio::task::yield_now().await;
213 }
214 // The loop must keep running (not panic) through the error arm.
215 assert!(!handle.is_finished());
216 handle.abort();
217 let _ = handle.await;
218 }
219
220 /// The production blanket `impl SweepAdapter for (Connection, …)`
221 /// tuple delegates to [`sweep_expired`]. Drive it end-to-end so the
222 /// blanket impl body (lines 95-97) is covered, not just the test
223 /// `ConnAdapter`.
224 #[test]
225 fn production_tuple_adapter_runs_sweep_expired() {
226 let conn = crate::storage::open(std::path::Path::new(":memory:")).unwrap();
227 let off = crate::offload::ContextOffloader::new(
228 &conn,
229 None,
230 crate::offload::OffloadConfig::default(),
231 );
232 let r = off.offload("expiring2", "ns", Some(1), "ai:bob").unwrap();
233 let tuple = (
234 conn,
235 std::path::PathBuf::from(":memory:"),
236 crate::config::ResolvedTtl::default(),
237 true,
238 );
239 // Before expiry: nothing removed.
240 assert_eq!(tuple.run_sweep(r.stored_at).unwrap(), 0);
241 // After expiry: the row is swept.
242 assert_eq!(tuple.run_sweep(r.stored_at + 60).unwrap(), 1);
243 }
244}