ff_backend_sqlite/scanner_supervisor.rs
1//! SQLite scanner supervisor (RFC-023 Phase 3.5).
2//!
3//! SQLite collapses the Postgres per-partition fan-out to N=1
4//! (§4.1): one process, one writer, one logical partition. The
5//! supervisor is therefore just a tokio task per reconciler type
6//! running a `tokio::time::interval` tick loop with a `watch`
7//! shutdown channel — no partition iteration.
8//!
9//! Phase 3.5 ships only `budget_reset`. Future reconcilers (if the
10//! §4.1 A3 single-writer envelope ever needs them) drop in as
11//! additional `spawn_reconciler` calls here.
12//!
13//! # Shutdown
14//!
15//! [`SqliteScannerHandle::shutdown`] flips the `watch` channel and
16//! awaits the underlying [`JoinSet`] up to `grace`. Tasks are
17//! registered synchronously during [`spawn_scanners`] (before the
18//! handle is returned), so the drain contract holds even for an
19//! immediate shutdown after `with_scanners`. Mirrors the
20//! `PostgresScannerHandle::shutdown` shape so `ff-server`'s
21//! `Server::shutdown` drain logic stays backend-agnostic.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use sqlx::SqlitePool;
27use tokio::sync::{Mutex, watch};
28use tokio::task::JoinSet;
29
30use crate::reconcilers;
31use crate::tx_util::now_ms;
32
33/// Subset of `EngineConfig`'s interval knobs that the SQLite
34/// reconcilers honour. Only `budget_reset_interval` is wired today;
35/// kept as a struct (not a bare `Duration`) so additional cadences
36/// are additive, matching [`ff_backend_postgres::PostgresScannerConfig`].
37#[derive(Clone, Debug)]
38pub struct SqliteScannerConfig {
39 /// RFC-020 Wave 9 Standalone-1 cadence. Matches the Valkey side's
40 /// `ff-server::config::budget_reset_interval` knob so one env
41 /// value drives all three backends.
42 ///
43 /// If set to zero (`FF_BUDGET_RESET_INTERVAL_S=0`) the reconciler
44 /// is treated as disabled and not spawned, mirroring
45 /// `tokio::time::interval`'s zero-duration panic safety.
46 pub budget_reset_interval: Duration,
47 /// RFC-025 Phase 4: cadence for `worker_registry_ttl_sweep`.
48 /// Mirrors PG's `worker_registry_ttl_interval` (default 30s).
49 /// Zero disables the scanner — identical contract to
50 /// `budget_reset_interval`.
51 pub worker_registry_ttl_interval: Duration,
52}
53
54/// Spawned scanner supervisor. Holding this handle keeps the
55/// reconciler tasks alive; drop-on-backend or explicit shutdown
56/// drains them. Returned by [`spawn_scanners`]; stored inside
57/// `SqliteBackendInner` so `EngineBackend::shutdown_prepare` can
58/// drain on server shutdown.
59pub struct SqliteScannerHandle {
60 shutdown_tx: watch::Sender<bool>,
61 join_set: Arc<Mutex<JoinSet<()>>>,
62}
63
64impl SqliteScannerHandle {
65 /// Signal shutdown and await drain up to `grace`. Returns the
66 /// number of tasks that did not exit cleanly within the grace
67 /// window (for operator logging). Subsequent calls are no-ops.
68 pub async fn shutdown(&self, grace: Duration) -> usize {
69 let _ = self.shutdown_tx.send(true);
70 let mut js = self.join_set.lock().await;
71 let deadline = tokio::time::Instant::now() + grace;
72 let mut timed_out = 0usize;
73 while !js.is_empty() {
74 let remaining = deadline
75 .checked_duration_since(tokio::time::Instant::now())
76 .unwrap_or(Duration::ZERO);
77 if remaining.is_zero() {
78 timed_out = js.len();
79 js.abort_all();
80 break;
81 }
82 match tokio::time::timeout(remaining, js.join_next()).await {
83 Ok(Some(_res)) => continue,
84 Ok(None) => break,
85 Err(_) => {
86 timed_out = js.len();
87 js.abort_all();
88 break;
89 }
90 }
91 }
92 timed_out
93 }
94}
95
96impl Drop for SqliteScannerHandle {
97 /// Best-effort signal on drop. Tasks exit at their next tick; if
98 /// the caller wants a bounded drain they must call
99 /// [`Self::shutdown`] explicitly (per PG parity).
100 fn drop(&mut self) {
101 let _ = self.shutdown_tx.send(true);
102 }
103}
104
105/// Spawn all SQLite reconcilers as long-lived tick loops. Phase 3.5
106/// ships one: `budget_reset`. Tasks are registered into the
107/// `JoinSet` synchronously — the handle returned is always drainable
108/// via [`SqliteScannerHandle::shutdown`].
109pub fn spawn_scanners(pool: SqlitePool, cfg: SqliteScannerConfig) -> SqliteScannerHandle {
110 let (tx, rx) = watch::channel(false);
111 let mut js = JoinSet::new();
112
113 // Zero-interval guard: treat as disabled rather than panicking
114 // in `tokio::time::interval`. Matches `FF_BUDGET_RESET_INTERVAL_S=0`
115 // as an opt-out.
116 if !cfg.budget_reset_interval.is_zero() {
117 spawn_reconciler(
118 &mut js,
119 rx.clone(),
120 pool.clone(),
121 cfg.budget_reset_interval,
122 "sqlite.budget_reset",
123 |pool| {
124 Box::pin(async move {
125 reconcilers::budget_reset::scan_tick(&pool, now_ms())
126 .await
127 .map(|_| ())
128 })
129 },
130 );
131 }
132
133 // ── RFC-025 Phase 4: worker_registry_ttl_sweep ──
134 // Single-writer: no partition fan-out. One tick deletes every
135 // expired row in one transaction.
136 #[cfg(feature = "core")]
137 if !cfg.worker_registry_ttl_interval.is_zero() {
138 spawn_reconciler(
139 &mut js,
140 rx.clone(),
141 pool.clone(),
142 cfg.worker_registry_ttl_interval,
143 "sqlite.worker_registry_ttl_sweep",
144 |pool| {
145 Box::pin(async move {
146 crate::worker_registry::ttl_sweep_tick(&pool)
147 .await
148 .map(|_| ())
149 })
150 },
151 );
152 }
153
154 let scanners = js.len();
155 tracing::info!(
156 scanners,
157 "sqlite scanner supervisor spawned (RFC-023 Phase 3.5 budget_reset + RFC-025 Phase 4 worker_registry_ttl_sweep)"
158 );
159
160 SqliteScannerHandle {
161 shutdown_tx: tx,
162 join_set: Arc::new(Mutex::new(js)),
163 }
164}
165
166type TickFut = std::pin::Pin<
167 Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>,
168>;
169
170/// Register one reconciler tick task synchronously into the
171/// supervisor's [`JoinSet`]. Tasks exit at the next `select!` once
172/// `shutdown` fires (or the watch channel is closed).
173fn spawn_reconciler<F>(
174 js: &mut JoinSet<()>,
175 mut shutdown: watch::Receiver<bool>,
176 pool: SqlitePool,
177 interval: Duration,
178 name: &'static str,
179 tick: F,
180) where
181 F: Fn(SqlitePool) -> TickFut + Send + Sync + 'static,
182{
183 js.spawn(async move {
184 let mut tk = tokio::time::interval(interval);
185 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
186 // `tokio::time::interval` yields an immediate first tick;
187 // drain it so the first observable reconciler run happens
188 // `interval` after spawn. Intentional SQLite startup-timing
189 // difference from the Postgres supervisor, which lets the
190 // first immediate tick fire. Safe: the watch shutdown signal
191 // is only delivered via `changed()`, so skipping this
192 // pre-drain tick has no shutdown-observability cost.
193 tk.tick().await;
194 loop {
195 tokio::select! {
196 res = shutdown.changed() => {
197 // Channel closed (sender dropped) OR shutdown=true → exit.
198 if res.is_err() || *shutdown.borrow() {
199 return;
200 }
201 }
202 _ = tk.tick() => {
203 if *shutdown.borrow() {
204 return;
205 }
206 if let Err(e) = tick(pool.clone()).await {
207 tracing::warn!(
208 scanner = name,
209 error = %e,
210 "sqlite reconciler tick failed"
211 );
212 }
213 }
214 }
215 }
216 });
217}