Skip to main content

ff_backend_postgres/
scanner_supervisor.rs

1//! Postgres scanner supervisor (RFC-017 Wave 8 Stage E3).
2//!
3//! Owns a tokio [`JoinSet`] of reconciler-tick tasks — the Postgres
4//! twin of `ff-engine`'s Valkey scanner supervisor. Each reconciler
5//! runs on its configured interval; ticks iterate over the partition
6//! space (for the partition-scoped reconcilers: `attempt_timeout`,
7//! `lease_expiry`, `suspension_timeout`) or run once per tick (for
8//! the global-scan reconcilers: `dependency`, `edge_cancel_dispatcher`,
9//! `edge_cancel_reconciler`). A per-tick error is logged at `warn`
10//! and does not abort the task — matches the Valkey scanner
11//! semantics where a bad partition does not poison the scanner.
12//!
13//! # Shutdown
14//!
15//! [`PostgresScannerHandle::shutdown`] flips a `watch` channel;
16//! running tasks observe it at the next tick boundary and exit.
17//! The `grace` timeout bounds the wait on the underlying JoinSet.
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use ff_core::backend::ScannerFilter;
23use ff_core::partition::PartitionConfig;
24use sqlx::PgPool;
25use tokio::sync::{watch, Mutex};
26use tokio::task::JoinSet;
27
28use crate::reconcilers;
29
30/// Subset of `EngineConfig`'s interval knobs that the Postgres
31/// reconcilers honour. Mirrors the Valkey engine's per-scanner
32/// interval fields so `ServerConfig::engine_config` can thread the
33/// same environment values through to both backends.
34#[derive(Clone, Debug)]
35pub struct PostgresScannerConfig {
36    pub attempt_timeout_interval: Duration,
37    pub lease_expiry_interval: Duration,
38    pub suspension_timeout_interval: Duration,
39    pub dependency_reconciler_interval: Duration,
40    pub edge_cancel_dispatcher_interval: Duration,
41    pub edge_cancel_reconciler_interval: Duration,
42    /// RFC-020 Wave 9 Standalone-1: cadence for the `budget_reset`
43    /// reconciler. Matches the Valkey side's
44    /// `ff-server::config::budget_reset_interval` knob so the same
45    /// env value drives both backends.
46    pub budget_reset_interval: Duration,
47    /// Stale-threshold for the dependency reconciler (ms). Matches
48    /// the Valkey scanner's `stale_threshold_ms` knob.
49    pub dependency_stale_threshold_ms: i64,
50    pub scanner_filter: ScannerFilter,
51    pub partition_config: PartitionConfig,
52}
53
54impl PostgresScannerConfig {
55    /// Default threshold mirrors the Valkey dep reconciler (15s — a
56    /// full scan cycle).
57    pub const DEFAULT_DEP_STALE_MS: i64 = 15_000;
58}
59
60/// Spawned scanner supervisor. Holding this handle keeps the scanner
61/// tasks alive; dropping it leaves them running until shutdown. Call
62/// [`shutdown`](Self::shutdown) with a bounded `grace` to drain.
63pub struct PostgresScannerHandle {
64    shutdown_tx: watch::Sender<bool>,
65    join_set: Arc<Mutex<JoinSet<()>>>,
66}
67
68impl PostgresScannerHandle {
69    /// Signal shutdown and await drain up to `grace`. Returns the
70    /// number of tasks that did not exit cleanly within `grace` (for
71    /// operator logging). Subsequent calls are no-ops.
72    pub async fn shutdown(&self, grace: Duration) -> usize {
73        let _ = self.shutdown_tx.send(true);
74        let mut js = self.join_set.lock().await;
75        let deadline = tokio::time::Instant::now() + grace;
76        let mut timed_out = 0usize;
77        while !js.is_empty() {
78            let remaining = deadline
79                .checked_duration_since(tokio::time::Instant::now())
80                .unwrap_or(Duration::ZERO);
81            if remaining.is_zero() {
82                timed_out = js.len();
83                js.abort_all();
84                break;
85            }
86            match tokio::time::timeout(remaining, js.join_next()).await {
87                Ok(Some(_res)) => continue,
88                Ok(None) => break,
89                Err(_) => {
90                    timed_out = js.len();
91                    js.abort_all();
92                    break;
93                }
94            }
95        }
96        timed_out
97    }
98}
99
100/// Spawn all six Postgres reconcilers as long-lived tick loops.
101///
102/// Per-task shape:
103///   1. Build a `tokio::time::interval(cfg.<reconciler>_interval)`.
104///   2. On each tick (or on shutdown signal), either run the
105///      reconciler tick body or exit.
106///   3. Log per-tick failures at `warn` and continue — matches the
107///      Valkey scanner's "don't poison the scanner on one bad
108///      partition" semantic.
109pub fn spawn_scanners(pool: PgPool, cfg: PostgresScannerConfig) -> PostgresScannerHandle {
110    let (tx, rx) = watch::channel(false);
111    let js = Arc::new(Mutex::new(JoinSet::new()));
112
113    // Capture partition counts up-front so each task doesn't need the
114    // full PartitionConfig reference. `budget_reset` iterates over the
115    // budget partition space (`num_budget_partitions`), distinct from
116    // the execution/flow partition space the other partition-scoped
117    // reconcilers walk.
118    let num_partitions: i16 = cfg.partition_config.num_flow_partitions as i16;
119    let num_budget_partitions: i16 = cfg.partition_config.num_budget_partitions as i16;
120    let filter = cfg.scanner_filter.clone();
121
122    // ── Partition-scoped reconcilers ──
123    spawn_partition_scan(
124        &js,
125        &tx,
126        rx.clone(),
127        pool.clone(),
128        cfg.attempt_timeout_interval,
129        num_partitions,
130        filter.clone(),
131        "pg.attempt_timeout",
132        |pool, part, filter| Box::pin(async move {
133            reconcilers::attempt_timeout::scan_tick(&pool, part, &filter)
134                .await
135                .map(|_| ())
136        }),
137    );
138    spawn_partition_scan(
139        &js,
140        &tx,
141        rx.clone(),
142        pool.clone(),
143        cfg.lease_expiry_interval,
144        num_partitions,
145        filter.clone(),
146        "pg.lease_expiry",
147        |pool, part, filter| Box::pin(async move {
148            reconcilers::lease_expiry::scan_tick(&pool, part, &filter)
149                .await
150                .map(|_| ())
151        }),
152    );
153    spawn_partition_scan(
154        &js,
155        &tx,
156        rx.clone(),
157        pool.clone(),
158        cfg.suspension_timeout_interval,
159        num_partitions,
160        filter.clone(),
161        "pg.suspension_timeout",
162        |pool, part, filter| Box::pin(async move {
163            reconcilers::suspension_timeout::scan_tick(&pool, part, &filter)
164                .await
165                .map(|_| ())
166        }),
167    );
168
169    // ── `budget_reset` (RFC-020 Wave 9 Standalone-1) ──
170    //
171    // Walks the budget partition space, not the flow/exec space, so
172    // it uses `num_budget_partitions`. Filter is ignored — budget IDs
173    // are not namespace/instance-tagged (mirrors the Valkey
174    // `BudgetResetScanner` which accepts the filter "for uniform API"
175    // and does not apply it, issue #122).
176    spawn_partition_scan(
177        &js,
178        &tx,
179        rx.clone(),
180        pool.clone(),
181        cfg.budget_reset_interval,
182        num_budget_partitions,
183        filter.clone(),
184        "pg.budget_reset",
185        |pool, part, _filter| {
186            Box::pin(async move {
187                reconcilers::budget_reset::scan_tick(&pool, part)
188                    .await
189                    .map(|_| ())
190            })
191        },
192    );
193
194    // ── Global (non-partition-scoped) reconcilers ──
195    let dep_stale = cfg.dependency_stale_threshold_ms;
196    spawn_global_scan(
197        &js,
198        &tx,
199        rx.clone(),
200        pool.clone(),
201        cfg.dependency_reconciler_interval,
202        filter.clone(),
203        "pg.dependency",
204        move |pool, filter| {
205            Box::pin(async move {
206                reconcilers::dependency::reconcile_tick(&pool, &filter, dep_stale)
207                    .await
208                    .map(|_| ())
209            })
210        },
211    );
212    spawn_global_scan(
213        &js,
214        &tx,
215        rx.clone(),
216        pool.clone(),
217        cfg.edge_cancel_dispatcher_interval,
218        filter.clone(),
219        "pg.edge_cancel_dispatcher",
220        |pool, filter| {
221            Box::pin(async move {
222                reconcilers::edge_cancel_dispatcher::dispatcher_tick(&pool, &filter)
223                    .await
224                    .map(|_| ())
225            })
226        },
227    );
228    spawn_global_scan(
229        &js,
230        &tx,
231        rx,
232        pool,
233        cfg.edge_cancel_reconciler_interval,
234        filter,
235        "pg.edge_cancel_reconciler",
236        |pool, filter| {
237            Box::pin(async move {
238                reconcilers::edge_cancel_reconciler::reconciler_tick(&pool, &filter)
239                    .await
240                    .map(|_| ())
241            })
242        },
243    );
244
245    tracing::info!(
246        scanners = 7,
247        num_partitions,
248        num_budget_partitions,
249        "postgres scanner supervisor spawned (RFC-017 Stage E3 + RFC-020 Wave 9 budget_reset)"
250    );
251
252    PostgresScannerHandle {
253        shutdown_tx: tx,
254        join_set: js,
255    }
256}
257
258type TickFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>>;
259
260#[allow(clippy::too_many_arguments)]
261fn spawn_partition_scan<F>(
262    js: &Arc<Mutex<JoinSet<()>>>,
263    _tx: &watch::Sender<bool>,
264    mut shutdown: watch::Receiver<bool>,
265    pool: PgPool,
266    interval: Duration,
267    num_partitions: i16,
268    filter: ScannerFilter,
269    name: &'static str,
270    tick: F,
271) where
272    F: Fn(PgPool, i16, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
273{
274    let js_clone = js.clone();
275    tokio::spawn(async move {
276        // Use try_lock / block_on-free lock via Mutex::lock — JoinSet
277        // itself is !Send across tokio spawn boundaries only via its
278        // internal handles, but the Arc<Mutex<>> holds it cleanly.
279        let mut guard = js_clone.lock().await;
280        guard.spawn(async move {
281            let mut tk = tokio::time::interval(interval);
282            tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
283            loop {
284                tokio::select! {
285                    _ = shutdown.changed() => {
286                        if *shutdown.borrow() {
287                            return;
288                        }
289                    }
290                    _ = tk.tick() => {
291                        for part in 0..num_partitions {
292                            if *shutdown.borrow() {
293                                return;
294                            }
295                            if let Err(e) = tick(pool.clone(), part, filter.clone()).await {
296                                tracing::warn!(
297                                    scanner = name,
298                                    partition = part,
299                                    error = %e,
300                                    "postgres reconciler tick failed"
301                                );
302                            }
303                        }
304                    }
305                }
306            }
307        });
308    });
309}
310
311#[allow(clippy::too_many_arguments)]
312fn spawn_global_scan<F>(
313    js: &Arc<Mutex<JoinSet<()>>>,
314    _tx: &watch::Sender<bool>,
315    mut shutdown: watch::Receiver<bool>,
316    pool: PgPool,
317    interval: Duration,
318    filter: ScannerFilter,
319    name: &'static str,
320    tick: F,
321) where
322    F: Fn(PgPool, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
323{
324    let js_clone = js.clone();
325    tokio::spawn(async move {
326        let mut guard = js_clone.lock().await;
327        guard.spawn(async move {
328            let mut tk = tokio::time::interval(interval);
329            tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
330            loop {
331                tokio::select! {
332                    _ = shutdown.changed() => {
333                        if *shutdown.borrow() {
334                            return;
335                        }
336                    }
337                    _ = tk.tick() => {
338                        if *shutdown.borrow() {
339                            return;
340                        }
341                        if let Err(e) = tick(pool.clone(), filter.clone()).await {
342                            tracing::warn!(
343                                scanner = name,
344                                error = %e,
345                                "postgres reconciler tick failed"
346                            );
347                        }
348                    }
349                }
350            }
351        });
352    });
353}