Skip to main content

ff_backend_postgres/
scanner_supervisor.rs

1//! Postgres scanner supervisor (RFC-017 Wave 8 Stage E3).
2//!
3//! Owns a tokio [`JoinSet`] of reconciler-tick tasks — the Postgres
4//! twin of `ff-engine`'s Valkey scanner supervisor. Each reconciler
5//! runs on its configured interval; ticks iterate over the partition
6//! space (for the partition-scoped reconcilers: `attempt_timeout`,
7//! `lease_expiry`, `suspension_timeout`) or run once per tick (for
8//! the global-scan reconcilers: `dependency`, `edge_cancel_dispatcher`,
9//! `edge_cancel_reconciler`). A per-tick error is logged at `warn`
10//! and does not abort the task — matches the Valkey scanner
11//! semantics where a bad partition does not poison the scanner.
12//!
13//! # Shutdown
14//!
15//! [`PostgresScannerHandle::shutdown`] flips a `watch` channel;
16//! running tasks observe it at the next tick boundary and exit.
17//! The `grace` timeout bounds the wait on the underlying JoinSet.
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use ff_core::backend::ScannerFilter;
23use ff_core::partition::PartitionConfig;
24use sqlx::PgPool;
25use tokio::sync::{watch, Mutex};
26use tokio::task::JoinSet;
27
28use crate::reconcilers;
29
30/// Subset of `EngineConfig`'s interval knobs that the Postgres
31/// reconcilers honour. Mirrors the Valkey engine's per-scanner
32/// interval fields so `ServerConfig::engine_config` can thread the
33/// same environment values through to both backends.
34#[derive(Clone, Debug)]
35pub struct PostgresScannerConfig {
36    pub attempt_timeout_interval: Duration,
37    pub lease_expiry_interval: Duration,
38    pub suspension_timeout_interval: Duration,
39    pub dependency_reconciler_interval: Duration,
40    pub edge_cancel_dispatcher_interval: Duration,
41    pub edge_cancel_reconciler_interval: Duration,
42    /// Stale-threshold for the dependency reconciler (ms). Matches
43    /// the Valkey scanner's `stale_threshold_ms` knob.
44    pub dependency_stale_threshold_ms: i64,
45    pub scanner_filter: ScannerFilter,
46    pub partition_config: PartitionConfig,
47}
48
49impl PostgresScannerConfig {
50    /// Default threshold mirrors the Valkey dep reconciler (15s — a
51    /// full scan cycle).
52    pub const DEFAULT_DEP_STALE_MS: i64 = 15_000;
53}
54
55/// Spawned scanner supervisor. Holding this handle keeps the scanner
56/// tasks alive; dropping it leaves them running until shutdown. Call
57/// [`shutdown`](Self::shutdown) with a bounded `grace` to drain.
58pub struct PostgresScannerHandle {
59    shutdown_tx: watch::Sender<bool>,
60    join_set: Arc<Mutex<JoinSet<()>>>,
61}
62
63impl PostgresScannerHandle {
64    /// Signal shutdown and await drain up to `grace`. Returns the
65    /// number of tasks that did not exit cleanly within `grace` (for
66    /// operator logging). Subsequent calls are no-ops.
67    pub async fn shutdown(&self, grace: Duration) -> usize {
68        let _ = self.shutdown_tx.send(true);
69        let mut js = self.join_set.lock().await;
70        let deadline = tokio::time::Instant::now() + grace;
71        let mut timed_out = 0usize;
72        while !js.is_empty() {
73            let remaining = deadline
74                .checked_duration_since(tokio::time::Instant::now())
75                .unwrap_or(Duration::ZERO);
76            if remaining.is_zero() {
77                timed_out = js.len();
78                js.abort_all();
79                break;
80            }
81            match tokio::time::timeout(remaining, js.join_next()).await {
82                Ok(Some(_res)) => continue,
83                Ok(None) => break,
84                Err(_) => {
85                    timed_out = js.len();
86                    js.abort_all();
87                    break;
88                }
89            }
90        }
91        timed_out
92    }
93}
94
95/// Spawn all six Postgres reconcilers as long-lived tick loops.
96///
97/// Per-task shape:
98///   1. Build a `tokio::time::interval(cfg.<reconciler>_interval)`.
99///   2. On each tick (or on shutdown signal), either run the
100///      reconciler tick body or exit.
101///   3. Log per-tick failures at `warn` and continue — matches the
102///      Valkey scanner's "don't poison the scanner on one bad
103///      partition" semantic.
104pub fn spawn_scanners(pool: PgPool, cfg: PostgresScannerConfig) -> PostgresScannerHandle {
105    let (tx, rx) = watch::channel(false);
106    let js = Arc::new(Mutex::new(JoinSet::new()));
107
108    // Capture partition count up-front so each task doesn't need the
109    // full PartitionConfig reference.
110    let num_partitions: i16 = cfg.partition_config.num_flow_partitions as i16;
111    let filter = cfg.scanner_filter.clone();
112
113    // ── Partition-scoped reconcilers ──
114    spawn_partition_scan(
115        &js,
116        &tx,
117        rx.clone(),
118        pool.clone(),
119        cfg.attempt_timeout_interval,
120        num_partitions,
121        filter.clone(),
122        "pg.attempt_timeout",
123        |pool, part, filter| Box::pin(async move {
124            reconcilers::attempt_timeout::scan_tick(&pool, part, &filter)
125                .await
126                .map(|_| ())
127        }),
128    );
129    spawn_partition_scan(
130        &js,
131        &tx,
132        rx.clone(),
133        pool.clone(),
134        cfg.lease_expiry_interval,
135        num_partitions,
136        filter.clone(),
137        "pg.lease_expiry",
138        |pool, part, filter| Box::pin(async move {
139            reconcilers::lease_expiry::scan_tick(&pool, part, &filter)
140                .await
141                .map(|_| ())
142        }),
143    );
144    spawn_partition_scan(
145        &js,
146        &tx,
147        rx.clone(),
148        pool.clone(),
149        cfg.suspension_timeout_interval,
150        num_partitions,
151        filter.clone(),
152        "pg.suspension_timeout",
153        |pool, part, filter| Box::pin(async move {
154            reconcilers::suspension_timeout::scan_tick(&pool, part, &filter)
155                .await
156                .map(|_| ())
157        }),
158    );
159
160    // ── Global (non-partition-scoped) reconcilers ──
161    let dep_stale = cfg.dependency_stale_threshold_ms;
162    spawn_global_scan(
163        &js,
164        &tx,
165        rx.clone(),
166        pool.clone(),
167        cfg.dependency_reconciler_interval,
168        filter.clone(),
169        "pg.dependency",
170        move |pool, filter| {
171            Box::pin(async move {
172                reconcilers::dependency::reconcile_tick(&pool, &filter, dep_stale)
173                    .await
174                    .map(|_| ())
175            })
176        },
177    );
178    spawn_global_scan(
179        &js,
180        &tx,
181        rx.clone(),
182        pool.clone(),
183        cfg.edge_cancel_dispatcher_interval,
184        filter.clone(),
185        "pg.edge_cancel_dispatcher",
186        |pool, filter| {
187            Box::pin(async move {
188                reconcilers::edge_cancel_dispatcher::dispatcher_tick(&pool, &filter)
189                    .await
190                    .map(|_| ())
191            })
192        },
193    );
194    spawn_global_scan(
195        &js,
196        &tx,
197        rx,
198        pool,
199        cfg.edge_cancel_reconciler_interval,
200        filter,
201        "pg.edge_cancel_reconciler",
202        |pool, filter| {
203            Box::pin(async move {
204                reconcilers::edge_cancel_reconciler::reconciler_tick(&pool, &filter)
205                    .await
206                    .map(|_| ())
207            })
208        },
209    );
210
211    tracing::info!(
212        scanners = 6,
213        num_partitions,
214        "postgres scanner supervisor spawned (RFC-017 Stage E3)"
215    );
216
217    PostgresScannerHandle {
218        shutdown_tx: tx,
219        join_set: js,
220    }
221}
222
223type TickFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>>;
224
225#[allow(clippy::too_many_arguments)]
226fn spawn_partition_scan<F>(
227    js: &Arc<Mutex<JoinSet<()>>>,
228    _tx: &watch::Sender<bool>,
229    mut shutdown: watch::Receiver<bool>,
230    pool: PgPool,
231    interval: Duration,
232    num_partitions: i16,
233    filter: ScannerFilter,
234    name: &'static str,
235    tick: F,
236) where
237    F: Fn(PgPool, i16, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
238{
239    let js_clone = js.clone();
240    tokio::spawn(async move {
241        // Use try_lock / block_on-free lock via Mutex::lock — JoinSet
242        // itself is !Send across tokio spawn boundaries only via its
243        // internal handles, but the Arc<Mutex<>> holds it cleanly.
244        let mut guard = js_clone.lock().await;
245        guard.spawn(async move {
246            let mut tk = tokio::time::interval(interval);
247            tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
248            loop {
249                tokio::select! {
250                    _ = shutdown.changed() => {
251                        if *shutdown.borrow() {
252                            return;
253                        }
254                    }
255                    _ = tk.tick() => {
256                        for part in 0..num_partitions {
257                            if *shutdown.borrow() {
258                                return;
259                            }
260                            if let Err(e) = tick(pool.clone(), part, filter.clone()).await {
261                                tracing::warn!(
262                                    scanner = name,
263                                    partition = part,
264                                    error = %e,
265                                    "postgres reconciler tick failed"
266                                );
267                            }
268                        }
269                    }
270                }
271            }
272        });
273    });
274}
275
276#[allow(clippy::too_many_arguments)]
277fn spawn_global_scan<F>(
278    js: &Arc<Mutex<JoinSet<()>>>,
279    _tx: &watch::Sender<bool>,
280    mut shutdown: watch::Receiver<bool>,
281    pool: PgPool,
282    interval: Duration,
283    filter: ScannerFilter,
284    name: &'static str,
285    tick: F,
286) where
287    F: Fn(PgPool, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
288{
289    let js_clone = js.clone();
290    tokio::spawn(async move {
291        let mut guard = js_clone.lock().await;
292        guard.spawn(async move {
293            let mut tk = tokio::time::interval(interval);
294            tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
295            loop {
296                tokio::select! {
297                    _ = shutdown.changed() => {
298                        if *shutdown.borrow() {
299                            return;
300                        }
301                    }
302                    _ = tk.tick() => {
303                        if *shutdown.borrow() {
304                            return;
305                        }
306                        if let Err(e) = tick(pool.clone(), filter.clone()).await {
307                            tracing::warn!(
308                                scanner = name,
309                                error = %e,
310                                "postgres reconciler tick failed"
311                            );
312                        }
313                    }
314                }
315            }
316        });
317    });
318}