ff_backend_sqlite/scanner_supervisor.rs
1//! SQLite scanner supervisor (RFC-023 Phase 3.5).
2//!
3//! SQLite collapses the Postgres per-partition fan-out to N=1
4//! (§4.1): one process, one writer, one logical partition. The
5//! supervisor is therefore just a tokio task per reconciler type
6//! running a `tokio::time::interval` tick loop with a `watch`
7//! shutdown channel — no partition iteration.
8//!
9//! Phase 3.5 ships only `budget_reset`. Future reconcilers (if the
10//! §4.1 A3 single-writer envelope ever needs them) drop in as
11//! additional `spawn_reconciler` calls here.
12//!
13//! # Shutdown
14//!
15//! [`SqliteScannerHandle::shutdown`] flips the `watch` channel and
16//! awaits the underlying [`JoinSet`] up to `grace`. Tasks are
17//! registered synchronously during [`spawn_scanners`] (before the
18//! handle is returned), so the drain contract holds even for an
19//! immediate shutdown after `with_scanners`. Mirrors the
20//! `PostgresScannerHandle::shutdown` shape so `ff-server`'s
21//! `Server::shutdown` drain logic stays backend-agnostic.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use sqlx::SqlitePool;
27use tokio::sync::{Mutex, watch};
28use tokio::task::JoinSet;
29
30use crate::reconcilers;
31use crate::tx_util::now_ms;
32
33/// Subset of `EngineConfig`'s interval knobs that the SQLite
34/// reconcilers honour. Only `budget_reset_interval` is wired today;
35/// kept as a struct (not a bare `Duration`) so additional cadences
36/// are additive, matching [`ff_backend_postgres::PostgresScannerConfig`].
37#[derive(Clone, Debug)]
38pub struct SqliteScannerConfig {
39 /// RFC-020 Wave 9 Standalone-1 cadence. Matches the Valkey side's
40 /// `ff-server::config::budget_reset_interval` knob so one env
41 /// value drives all three backends.
42 ///
43 /// If set to zero (`FF_BUDGET_RESET_INTERVAL_S=0`) the reconciler
44 /// is treated as disabled and not spawned, mirroring
45 /// `tokio::time::interval`'s zero-duration panic safety.
46 pub budget_reset_interval: Duration,
47}
48
49/// Spawned scanner supervisor. Holding this handle keeps the
50/// reconciler tasks alive; drop-on-backend or explicit shutdown
51/// drains them. Returned by [`spawn_scanners`]; stored inside
52/// `SqliteBackendInner` so `EngineBackend::shutdown_prepare` can
53/// drain on server shutdown.
54pub struct SqliteScannerHandle {
55 shutdown_tx: watch::Sender<bool>,
56 join_set: Arc<Mutex<JoinSet<()>>>,
57}
58
59impl SqliteScannerHandle {
60 /// Signal shutdown and await drain up to `grace`. Returns the
61 /// number of tasks that did not exit cleanly within the grace
62 /// window (for operator logging). Subsequent calls are no-ops.
63 pub async fn shutdown(&self, grace: Duration) -> usize {
64 let _ = self.shutdown_tx.send(true);
65 let mut js = self.join_set.lock().await;
66 let deadline = tokio::time::Instant::now() + grace;
67 let mut timed_out = 0usize;
68 while !js.is_empty() {
69 let remaining = deadline
70 .checked_duration_since(tokio::time::Instant::now())
71 .unwrap_or(Duration::ZERO);
72 if remaining.is_zero() {
73 timed_out = js.len();
74 js.abort_all();
75 break;
76 }
77 match tokio::time::timeout(remaining, js.join_next()).await {
78 Ok(Some(_res)) => continue,
79 Ok(None) => break,
80 Err(_) => {
81 timed_out = js.len();
82 js.abort_all();
83 break;
84 }
85 }
86 }
87 timed_out
88 }
89}
90
91impl Drop for SqliteScannerHandle {
92 /// Best-effort signal on drop. Tasks exit at their next tick; if
93 /// the caller wants a bounded drain they must call
94 /// [`Self::shutdown`] explicitly (per PG parity).
95 fn drop(&mut self) {
96 let _ = self.shutdown_tx.send(true);
97 }
98}
99
100/// Spawn all SQLite reconcilers as long-lived tick loops. Phase 3.5
101/// ships one: `budget_reset`. Tasks are registered into the
102/// `JoinSet` synchronously — the handle returned is always drainable
103/// via [`SqliteScannerHandle::shutdown`].
104pub fn spawn_scanners(pool: SqlitePool, cfg: SqliteScannerConfig) -> SqliteScannerHandle {
105 let (tx, rx) = watch::channel(false);
106 let mut js = JoinSet::new();
107
108 // Zero-interval guard: treat as disabled rather than panicking
109 // in `tokio::time::interval`. Matches `FF_BUDGET_RESET_INTERVAL_S=0`
110 // as an opt-out.
111 if !cfg.budget_reset_interval.is_zero() {
112 spawn_reconciler(
113 &mut js,
114 rx.clone(),
115 pool.clone(),
116 cfg.budget_reset_interval,
117 "sqlite.budget_reset",
118 |pool| {
119 Box::pin(async move {
120 reconcilers::budget_reset::scan_tick(&pool, now_ms())
121 .await
122 .map(|_| ())
123 })
124 },
125 );
126 }
127
128 let scanners = js.len();
129 tracing::info!(
130 scanners,
131 "sqlite scanner supervisor spawned (RFC-023 Phase 3.5 — budget_reset N=1)"
132 );
133
134 SqliteScannerHandle {
135 shutdown_tx: tx,
136 join_set: Arc::new(Mutex::new(js)),
137 }
138}
139
140type TickFut = std::pin::Pin<
141 Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>,
142>;
143
144/// Register one reconciler tick task synchronously into the
145/// supervisor's [`JoinSet`]. Tasks exit at the next `select!` once
146/// `shutdown` fires (or the watch channel is closed).
147fn spawn_reconciler<F>(
148 js: &mut JoinSet<()>,
149 mut shutdown: watch::Receiver<bool>,
150 pool: SqlitePool,
151 interval: Duration,
152 name: &'static str,
153 tick: F,
154) where
155 F: Fn(SqlitePool) -> TickFut + Send + Sync + 'static,
156{
157 js.spawn(async move {
158 let mut tk = tokio::time::interval(interval);
159 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
160 // `tokio::time::interval` yields an immediate first tick;
161 // drain it so the first observable reconciler run happens
162 // `interval` after spawn. Intentional SQLite startup-timing
163 // difference from the Postgres supervisor, which lets the
164 // first immediate tick fire. Safe: the watch shutdown signal
165 // is only delivered via `changed()`, so skipping this
166 // pre-drain tick has no shutdown-observability cost.
167 tk.tick().await;
168 loop {
169 tokio::select! {
170 res = shutdown.changed() => {
171 // Channel closed (sender dropped) OR shutdown=true → exit.
172 if res.is_err() || *shutdown.borrow() {
173 return;
174 }
175 }
176 _ = tk.tick() => {
177 if *shutdown.borrow() {
178 return;
179 }
180 if let Err(e) = tick(pool.clone()).await {
181 tracing::warn!(
182 scanner = name,
183 error = %e,
184 "sqlite reconciler tick failed"
185 );
186 }
187 }
188 }
189 }
190 });
191}