Skip to main content

ff_backend_postgres/
scanner_supervisor.rs

1//! Postgres scanner supervisor (RFC-017 Wave 8 Stage E3).
2//!
3//! Owns a tokio [`JoinSet`] of reconciler-tick tasks — the Postgres
4//! twin of `ff-engine`'s Valkey scanner supervisor. Each reconciler
5//! runs on its configured interval; ticks iterate over the partition
6//! space (for the partition-scoped reconcilers: `attempt_timeout`,
7//! `lease_expiry`, `suspension_timeout`) or run once per tick (for
8//! the global-scan reconcilers: `dependency`, `edge_cancel_dispatcher`,
9//! `edge_cancel_reconciler`). A per-tick error is logged at `warn`
10//! and does not abort the task — matches the Valkey scanner
11//! semantics where a bad partition does not poison the scanner.
12//!
13//! # Shutdown
14//!
15//! [`PostgresScannerHandle::shutdown`] flips a `watch` channel;
16//! running tasks observe it at the next tick boundary and exit.
17//! The `grace` timeout bounds the wait on the underlying JoinSet.
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use ff_core::backend::ScannerFilter;
23use ff_core::partition::PartitionConfig;
24use sqlx::PgPool;
25use tokio::sync::{watch, Mutex};
26use tokio::task::JoinSet;
27
28use crate::reconcilers;
29
30/// Subset of `EngineConfig`'s interval knobs that the Postgres
31/// reconcilers honour. Mirrors the Valkey engine's per-scanner
32/// interval fields so `ServerConfig::engine_config` can thread the
33/// same environment values through to both backends.
34#[derive(Clone, Debug)]
35pub struct PostgresScannerConfig {
36    pub attempt_timeout_interval: Duration,
37    pub lease_expiry_interval: Duration,
38    pub suspension_timeout_interval: Duration,
39    pub dependency_reconciler_interval: Duration,
40    pub edge_cancel_dispatcher_interval: Duration,
41    pub edge_cancel_reconciler_interval: Duration,
42    /// RFC-020 Wave 9 Standalone-1: cadence for the `budget_reset`
43    /// reconciler. Matches the Valkey side's
44    /// `ff-server::config::budget_reset_interval` knob so the same
45    /// env value drives both backends.
46    pub budget_reset_interval: Duration,
47    /// RFC-025 Phase 3: cadence for the `worker_registry_ttl_sweep`
48    /// reconciler. Mirrors Valkey's native PEXPIRE cleanup for
49    /// worker-liveness keys on PG (Valkey doesn't need a scanner).
50    /// Defaults to 30s, matching `budget_reset_interval` convention.
51    pub worker_registry_ttl_interval: Duration,
52    /// Stale-threshold for the dependency reconciler (ms). Matches
53    /// the Valkey scanner's `stale_threshold_ms` knob.
54    pub dependency_stale_threshold_ms: i64,
55    pub scanner_filter: ScannerFilter,
56    pub partition_config: PartitionConfig,
57}
58
59impl PostgresScannerConfig {
60    /// Default threshold mirrors the Valkey dep reconciler (15s — a
61    /// full scan cycle).
62    pub const DEFAULT_DEP_STALE_MS: i64 = 15_000;
63}
64
65/// Spawned scanner supervisor. Holding this handle keeps the scanner
66/// tasks alive; dropping it leaves them running until shutdown. Call
67/// [`shutdown`](Self::shutdown) with a bounded `grace` to drain.
68pub struct PostgresScannerHandle {
69    shutdown_tx: watch::Sender<bool>,
70    join_set: Arc<Mutex<JoinSet<()>>>,
71}
72
73impl PostgresScannerHandle {
74    /// Signal shutdown and await drain up to `grace`. Returns the
75    /// number of tasks that did not exit cleanly within `grace` (for
76    /// operator logging). Subsequent calls are no-ops.
77    pub async fn shutdown(&self, grace: Duration) -> usize {
78        let _ = self.shutdown_tx.send(true);
79        let mut js = self.join_set.lock().await;
80        let deadline = tokio::time::Instant::now() + grace;
81        let mut timed_out = 0usize;
82        while !js.is_empty() {
83            let remaining = deadline
84                .checked_duration_since(tokio::time::Instant::now())
85                .unwrap_or(Duration::ZERO);
86            if remaining.is_zero() {
87                timed_out = js.len();
88                js.abort_all();
89                break;
90            }
91            match tokio::time::timeout(remaining, js.join_next()).await {
92                Ok(Some(_res)) => continue,
93                Ok(None) => break,
94                Err(_) => {
95                    timed_out = js.len();
96                    js.abort_all();
97                    break;
98                }
99            }
100        }
101        timed_out
102    }
103}
104
105/// Spawn all six Postgres reconcilers as long-lived tick loops.
106///
107/// Per-task shape:
108///   1. Build a `tokio::time::interval(cfg.<reconciler>_interval)`.
109///   2. On each tick (or on shutdown signal), either run the
110///      reconciler tick body or exit.
111///   3. Log per-tick failures at `warn` and continue — matches the
112///      Valkey scanner's "don't poison the scanner on one bad
113///      partition" semantic.
114pub fn spawn_scanners(pool: PgPool, cfg: PostgresScannerConfig) -> PostgresScannerHandle {
115    let (tx, rx) = watch::channel(false);
116    let js = Arc::new(Mutex::new(JoinSet::new()));
117
118    // Capture partition counts up-front so each task doesn't need the
119    // full PartitionConfig reference. `budget_reset` iterates over the
120    // budget partition space (`num_budget_partitions`), distinct from
121    // the execution/flow partition space the other partition-scoped
122    // reconcilers walk.
123    let num_partitions: i16 = cfg.partition_config.num_flow_partitions as i16;
124    let num_budget_partitions: i16 = cfg.partition_config.num_budget_partitions as i16;
125    let filter = cfg.scanner_filter.clone();
126
127    // ── Partition-scoped reconcilers ──
128    spawn_partition_scan(
129        &js,
130        &tx,
131        rx.clone(),
132        pool.clone(),
133        cfg.attempt_timeout_interval,
134        num_partitions,
135        filter.clone(),
136        "pg.attempt_timeout",
137        |pool, part, filter| Box::pin(async move {
138            reconcilers::attempt_timeout::scan_tick(&pool, part, &filter)
139                .await
140                .map(|_| ())
141        }),
142    );
143    spawn_partition_scan(
144        &js,
145        &tx,
146        rx.clone(),
147        pool.clone(),
148        cfg.lease_expiry_interval,
149        num_partitions,
150        filter.clone(),
151        "pg.lease_expiry",
152        |pool, part, filter| Box::pin(async move {
153            reconcilers::lease_expiry::scan_tick(&pool, part, &filter)
154                .await
155                .map(|_| ())
156        }),
157    );
158    spawn_partition_scan(
159        &js,
160        &tx,
161        rx.clone(),
162        pool.clone(),
163        cfg.suspension_timeout_interval,
164        num_partitions,
165        filter.clone(),
166        "pg.suspension_timeout",
167        |pool, part, filter| Box::pin(async move {
168            reconcilers::suspension_timeout::scan_tick(&pool, part, &filter)
169                .await
170                .map(|_| ())
171        }),
172    );
173
174    // ── `budget_reset` (RFC-020 Wave 9 Standalone-1) ──
175    //
176    // Walks the budget partition space, not the flow/exec space, so
177    // it uses `num_budget_partitions`. Filter is ignored — budget IDs
178    // are not namespace/instance-tagged (mirrors the Valkey
179    // `BudgetResetScanner` which accepts the filter "for uniform API"
180    // and does not apply it, issue #122).
181    spawn_partition_scan(
182        &js,
183        &tx,
184        rx.clone(),
185        pool.clone(),
186        cfg.budget_reset_interval,
187        num_budget_partitions,
188        filter.clone(),
189        "pg.budget_reset",
190        |pool, part, _filter| {
191            Box::pin(async move {
192                reconcilers::budget_reset::scan_tick(&pool, part)
193                    .await
194                    .map(|_| ())
195            })
196        },
197    );
198
199    // ── RFC-025 Phase 3 — `worker_registry_ttl_sweep` ──
200    //
201    // Walks the 256-partition worker-registry space (keyed on
202    // `fnv1a_u64(worker_instance_id) % 256`, not the flow/exec
203    // partition family). 30s tick mirrors Valkey's native PEXPIRE
204    // cadence closely enough for operator-tooling latency.
205    spawn_partition_scan(
206        &js,
207        &tx,
208        rx.clone(),
209        pool.clone(),
210        cfg.worker_registry_ttl_interval,
211        256,
212        filter.clone(),
213        "pg.worker_registry_ttl_sweep",
214        |pool, part, _filter| {
215            Box::pin(async move {
216                crate::worker_registry::ttl_sweep_tick(&pool, part)
217                    .await
218                    .map(|_| ())
219            })
220        },
221    );
222
223    // ── Global (non-partition-scoped) reconcilers ──
224    let dep_stale = cfg.dependency_stale_threshold_ms;
225    spawn_global_scan(
226        &js,
227        &tx,
228        rx.clone(),
229        pool.clone(),
230        cfg.dependency_reconciler_interval,
231        filter.clone(),
232        "pg.dependency",
233        move |pool, filter| {
234            Box::pin(async move {
235                reconcilers::dependency::reconcile_tick(&pool, &filter, dep_stale)
236                    .await
237                    .map(|_| ())
238            })
239        },
240    );
241    spawn_global_scan(
242        &js,
243        &tx,
244        rx.clone(),
245        pool.clone(),
246        cfg.edge_cancel_dispatcher_interval,
247        filter.clone(),
248        "pg.edge_cancel_dispatcher",
249        |pool, filter| {
250            Box::pin(async move {
251                reconcilers::edge_cancel_dispatcher::dispatcher_tick(&pool, &filter)
252                    .await
253                    .map(|_| ())
254            })
255        },
256    );
257    spawn_global_scan(
258        &js,
259        &tx,
260        rx,
261        pool,
262        cfg.edge_cancel_reconciler_interval,
263        filter,
264        "pg.edge_cancel_reconciler",
265        |pool, filter| {
266            Box::pin(async move {
267                reconcilers::edge_cancel_reconciler::reconciler_tick(&pool, &filter)
268                    .await
269                    .map(|_| ())
270            })
271        },
272    );
273
274    tracing::info!(
275        scanners = 8,
276        num_partitions,
277        num_budget_partitions,
278        "postgres scanner supervisor spawned (RFC-017 Stage E3 + RFC-020 Wave 9 budget_reset + RFC-025 Phase 3 worker_registry_ttl_sweep)"
279    );
280
281    PostgresScannerHandle {
282        shutdown_tx: tx,
283        join_set: js,
284    }
285}
286
287type TickFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>>;
288
289#[allow(clippy::too_many_arguments)]
290fn spawn_partition_scan<F>(
291    js: &Arc<Mutex<JoinSet<()>>>,
292    _tx: &watch::Sender<bool>,
293    mut shutdown: watch::Receiver<bool>,
294    pool: PgPool,
295    interval: Duration,
296    num_partitions: i16,
297    filter: ScannerFilter,
298    name: &'static str,
299    tick: F,
300) where
301    F: Fn(PgPool, i16, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
302{
303    let js_clone = js.clone();
304    tokio::spawn(async move {
305        // Use try_lock / block_on-free lock via Mutex::lock — JoinSet
306        // itself is !Send across tokio spawn boundaries only via its
307        // internal handles, but the Arc<Mutex<>> holds it cleanly.
308        let mut guard = js_clone.lock().await;
309        guard.spawn(async move {
310            let mut tk = tokio::time::interval(interval);
311            tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
312            loop {
313                tokio::select! {
314                    _ = shutdown.changed() => {
315                        if *shutdown.borrow() {
316                            return;
317                        }
318                    }
319                    _ = tk.tick() => {
320                        for part in 0..num_partitions {
321                            if *shutdown.borrow() {
322                                return;
323                            }
324                            if let Err(e) = tick(pool.clone(), part, filter.clone()).await {
325                                tracing::warn!(
326                                    scanner = name,
327                                    partition = part,
328                                    error = %e,
329                                    "postgres reconciler tick failed"
330                                );
331                            }
332                        }
333                    }
334                }
335            }
336        });
337    });
338}
339
340#[allow(clippy::too_many_arguments)]
341fn spawn_global_scan<F>(
342    js: &Arc<Mutex<JoinSet<()>>>,
343    _tx: &watch::Sender<bool>,
344    mut shutdown: watch::Receiver<bool>,
345    pool: PgPool,
346    interval: Duration,
347    filter: ScannerFilter,
348    name: &'static str,
349    tick: F,
350) where
351    F: Fn(PgPool, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
352{
353    let js_clone = js.clone();
354    tokio::spawn(async move {
355        let mut guard = js_clone.lock().await;
356        guard.spawn(async move {
357            let mut tk = tokio::time::interval(interval);
358            tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
359            loop {
360                tokio::select! {
361                    _ = shutdown.changed() => {
362                        if *shutdown.borrow() {
363                            return;
364                        }
365                    }
366                    _ = tk.tick() => {
367                        if *shutdown.borrow() {
368                            return;
369                        }
370                        if let Err(e) = tick(pool.clone(), filter.clone()).await {
371                            tracing::warn!(
372                                scanner = name,
373                                error = %e,
374                                "postgres reconciler tick failed"
375                            );
376                        }
377                    }
378                }
379            }
380        });
381    });
382}