Skip to main content

ff_backend_sqlite/
scanner_supervisor.rs

1//! SQLite scanner supervisor (RFC-023 Phase 3.5).
2//!
3//! SQLite collapses the Postgres per-partition fan-out to N=1
4//! (§4.1): one process, one writer, one logical partition. The
5//! supervisor is therefore just a tokio task per reconciler type
6//! running a `tokio::time::interval` tick loop with a `watch`
7//! shutdown channel — no partition iteration.
8//!
9//! Phase 3.5 ships only `budget_reset`. Future reconcilers (if the
10//! §4.1 A3 single-writer envelope ever needs them) drop in as
11//! additional `spawn_reconciler` calls here.
12//!
13//! # Shutdown
14//!
15//! [`SqliteScannerHandle::shutdown`] flips the `watch` channel and
16//! awaits the underlying [`JoinSet`] up to `grace`. Tasks are
17//! registered synchronously during [`spawn_scanners`] (before the
18//! handle is returned), so the drain contract holds even for an
19//! immediate shutdown after `with_scanners`. Mirrors the
20//! `PostgresScannerHandle::shutdown` shape so `ff-server`'s
21//! `Server::shutdown` drain logic stays backend-agnostic.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use sqlx::SqlitePool;
27use tokio::sync::{Mutex, watch};
28use tokio::task::JoinSet;
29
30use crate::reconcilers;
31use crate::tx_util::now_ms;
32
33/// Subset of `EngineConfig`'s interval knobs that the SQLite
34/// reconcilers honour. Only `budget_reset_interval` is wired today;
35/// kept as a struct (not a bare `Duration`) so additional cadences
36/// are additive, matching [`ff_backend_postgres::PostgresScannerConfig`].
37#[derive(Clone, Debug)]
38pub struct SqliteScannerConfig {
39    /// RFC-020 Wave 9 Standalone-1 cadence. Matches the Valkey side's
40    /// `ff-server::config::budget_reset_interval` knob so one env
41    /// value drives all three backends.
42    ///
43    /// If set to zero (`FF_BUDGET_RESET_INTERVAL_S=0`) the reconciler
44    /// is treated as disabled and not spawned, mirroring
45    /// `tokio::time::interval`'s zero-duration panic safety.
46    pub budget_reset_interval: Duration,
47    /// RFC-025 Phase 4: cadence for `worker_registry_ttl_sweep`.
48    /// Mirrors PG's `worker_registry_ttl_interval` (default 30s).
49    /// Zero disables the scanner — identical contract to
50    /// `budget_reset_interval`.
51    pub worker_registry_ttl_interval: Duration,
52}
53
54/// Spawned scanner supervisor. Holding this handle keeps the
55/// reconciler tasks alive; drop-on-backend or explicit shutdown
56/// drains them. Returned by [`spawn_scanners`]; stored inside
57/// `SqliteBackendInner` so `EngineBackend::shutdown_prepare` can
58/// drain on server shutdown.
59pub struct SqliteScannerHandle {
60    shutdown_tx: watch::Sender<bool>,
61    join_set: Arc<Mutex<JoinSet<()>>>,
62}
63
64impl SqliteScannerHandle {
65    /// Signal shutdown and await drain up to `grace`. Returns the
66    /// number of tasks that did not exit cleanly within the grace
67    /// window (for operator logging). Subsequent calls are no-ops.
68    pub async fn shutdown(&self, grace: Duration) -> usize {
69        let _ = self.shutdown_tx.send(true);
70        let mut js = self.join_set.lock().await;
71        let deadline = tokio::time::Instant::now() + grace;
72        let mut timed_out = 0usize;
73        while !js.is_empty() {
74            let remaining = deadline
75                .checked_duration_since(tokio::time::Instant::now())
76                .unwrap_or(Duration::ZERO);
77            if remaining.is_zero() {
78                timed_out = js.len();
79                js.abort_all();
80                break;
81            }
82            match tokio::time::timeout(remaining, js.join_next()).await {
83                Ok(Some(_res)) => continue,
84                Ok(None) => break,
85                Err(_) => {
86                    timed_out = js.len();
87                    js.abort_all();
88                    break;
89                }
90            }
91        }
92        timed_out
93    }
94}
95
96impl Drop for SqliteScannerHandle {
97    /// Best-effort signal on drop. Tasks exit at their next tick; if
98    /// the caller wants a bounded drain they must call
99    /// [`Self::shutdown`] explicitly (per PG parity).
100    fn drop(&mut self) {
101        let _ = self.shutdown_tx.send(true);
102    }
103}
104
105/// Spawn all SQLite reconcilers as long-lived tick loops. Phase 3.5
106/// ships one: `budget_reset`. Tasks are registered into the
107/// `JoinSet` synchronously — the handle returned is always drainable
108/// via [`SqliteScannerHandle::shutdown`].
109pub fn spawn_scanners(pool: SqlitePool, cfg: SqliteScannerConfig) -> SqliteScannerHandle {
110    let (tx, rx) = watch::channel(false);
111    let mut js = JoinSet::new();
112
113    // Zero-interval guard: treat as disabled rather than panicking
114    // in `tokio::time::interval`. Matches `FF_BUDGET_RESET_INTERVAL_S=0`
115    // as an opt-out.
116    if !cfg.budget_reset_interval.is_zero() {
117        spawn_reconciler(
118            &mut js,
119            rx.clone(),
120            pool.clone(),
121            cfg.budget_reset_interval,
122            "sqlite.budget_reset",
123            |pool| {
124                Box::pin(async move {
125                    reconcilers::budget_reset::scan_tick(&pool, now_ms())
126                        .await
127                        .map(|_| ())
128                })
129            },
130        );
131    }
132
133    // ── RFC-025 Phase 4: worker_registry_ttl_sweep ──
134    // Single-writer: no partition fan-out. One tick deletes every
135    // expired row in one transaction.
136    #[cfg(feature = "core")]
137    if !cfg.worker_registry_ttl_interval.is_zero() {
138        spawn_reconciler(
139            &mut js,
140            rx.clone(),
141            pool.clone(),
142            cfg.worker_registry_ttl_interval,
143            "sqlite.worker_registry_ttl_sweep",
144            |pool| {
145                Box::pin(async move {
146                    crate::worker_registry::ttl_sweep_tick(&pool)
147                        .await
148                        .map(|_| ())
149                })
150            },
151        );
152    }
153
154    let scanners = js.len();
155    tracing::info!(
156        scanners,
157        "sqlite scanner supervisor spawned (RFC-023 Phase 3.5 budget_reset + RFC-025 Phase 4 worker_registry_ttl_sweep)"
158    );
159
160    SqliteScannerHandle {
161        shutdown_tx: tx,
162        join_set: Arc::new(Mutex::new(js)),
163    }
164}
165
166type TickFut = std::pin::Pin<
167    Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>,
168>;
169
170/// Register one reconciler tick task synchronously into the
171/// supervisor's [`JoinSet`]. Tasks exit at the next `select!` once
172/// `shutdown` fires (or the watch channel is closed).
173fn spawn_reconciler<F>(
174    js: &mut JoinSet<()>,
175    mut shutdown: watch::Receiver<bool>,
176    pool: SqlitePool,
177    interval: Duration,
178    name: &'static str,
179    tick: F,
180) where
181    F: Fn(SqlitePool) -> TickFut + Send + Sync + 'static,
182{
183    js.spawn(async move {
184        let mut tk = tokio::time::interval(interval);
185        tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
186        // `tokio::time::interval` yields an immediate first tick;
187        // drain it so the first observable reconciler run happens
188        // `interval` after spawn. Intentional SQLite startup-timing
189        // difference from the Postgres supervisor, which lets the
190        // first immediate tick fire. Safe: the watch shutdown signal
191        // is only delivered via `changed()`, so skipping this
192        // pre-drain tick has no shutdown-observability cost.
193        tk.tick().await;
194        loop {
195            tokio::select! {
196                res = shutdown.changed() => {
197                    // Channel closed (sender dropped) OR shutdown=true → exit.
198                    if res.is_err() || *shutdown.borrow() {
199                        return;
200                    }
201                }
202                _ = tk.tick() => {
203                    if *shutdown.borrow() {
204                        return;
205                    }
206                    if let Err(e) = tick(pool.clone()).await {
207                        tracing::warn!(
208                            scanner = name,
209                            error = %e,
210                            "sqlite reconciler tick failed"
211                        );
212                    }
213                }
214            }
215        }
216    });
217}