1use std::sync::Arc;
20use std::time::Duration;
21
22use ff_core::backend::ScannerFilter;
23use ff_core::partition::PartitionConfig;
24use sqlx::PgPool;
25use tokio::sync::{watch, Mutex};
26use tokio::task::JoinSet;
27
28use crate::reconcilers;
29
30#[derive(Clone, Debug)]
35pub struct PostgresScannerConfig {
36 pub attempt_timeout_interval: Duration,
37 pub lease_expiry_interval: Duration,
38 pub suspension_timeout_interval: Duration,
39 pub dependency_reconciler_interval: Duration,
40 pub edge_cancel_dispatcher_interval: Duration,
41 pub edge_cancel_reconciler_interval: Duration,
42 pub budget_reset_interval: Duration,
47 pub worker_registry_ttl_interval: Duration,
52 pub dependency_stale_threshold_ms: i64,
55 pub scanner_filter: ScannerFilter,
56 pub partition_config: PartitionConfig,
57}
58
59impl PostgresScannerConfig {
60 pub const DEFAULT_DEP_STALE_MS: i64 = 15_000;
63}
64
65pub struct PostgresScannerHandle {
69 shutdown_tx: watch::Sender<bool>,
70 join_set: Arc<Mutex<JoinSet<()>>>,
71}
72
73impl PostgresScannerHandle {
74 pub async fn shutdown(&self, grace: Duration) -> usize {
78 let _ = self.shutdown_tx.send(true);
79 let mut js = self.join_set.lock().await;
80 let deadline = tokio::time::Instant::now() + grace;
81 let mut timed_out = 0usize;
82 while !js.is_empty() {
83 let remaining = deadline
84 .checked_duration_since(tokio::time::Instant::now())
85 .unwrap_or(Duration::ZERO);
86 if remaining.is_zero() {
87 timed_out = js.len();
88 js.abort_all();
89 break;
90 }
91 match tokio::time::timeout(remaining, js.join_next()).await {
92 Ok(Some(_res)) => continue,
93 Ok(None) => break,
94 Err(_) => {
95 timed_out = js.len();
96 js.abort_all();
97 break;
98 }
99 }
100 }
101 timed_out
102 }
103}
104
105pub fn spawn_scanners(pool: PgPool, cfg: PostgresScannerConfig) -> PostgresScannerHandle {
115 let (tx, rx) = watch::channel(false);
116 let js = Arc::new(Mutex::new(JoinSet::new()));
117
118 let num_partitions: i16 = cfg.partition_config.num_flow_partitions as i16;
124 let num_budget_partitions: i16 = cfg.partition_config.num_budget_partitions as i16;
125 let filter = cfg.scanner_filter.clone();
126
127 spawn_partition_scan(
129 &js,
130 &tx,
131 rx.clone(),
132 pool.clone(),
133 cfg.attempt_timeout_interval,
134 num_partitions,
135 filter.clone(),
136 "pg.attempt_timeout",
137 |pool, part, filter| Box::pin(async move {
138 reconcilers::attempt_timeout::scan_tick(&pool, part, &filter)
139 .await
140 .map(|_| ())
141 }),
142 );
143 spawn_partition_scan(
144 &js,
145 &tx,
146 rx.clone(),
147 pool.clone(),
148 cfg.lease_expiry_interval,
149 num_partitions,
150 filter.clone(),
151 "pg.lease_expiry",
152 |pool, part, filter| Box::pin(async move {
153 reconcilers::lease_expiry::scan_tick(&pool, part, &filter)
154 .await
155 .map(|_| ())
156 }),
157 );
158 spawn_partition_scan(
159 &js,
160 &tx,
161 rx.clone(),
162 pool.clone(),
163 cfg.suspension_timeout_interval,
164 num_partitions,
165 filter.clone(),
166 "pg.suspension_timeout",
167 |pool, part, filter| Box::pin(async move {
168 reconcilers::suspension_timeout::scan_tick(&pool, part, &filter)
169 .await
170 .map(|_| ())
171 }),
172 );
173
174 spawn_partition_scan(
182 &js,
183 &tx,
184 rx.clone(),
185 pool.clone(),
186 cfg.budget_reset_interval,
187 num_budget_partitions,
188 filter.clone(),
189 "pg.budget_reset",
190 |pool, part, _filter| {
191 Box::pin(async move {
192 reconcilers::budget_reset::scan_tick(&pool, part)
193 .await
194 .map(|_| ())
195 })
196 },
197 );
198
199 spawn_partition_scan(
206 &js,
207 &tx,
208 rx.clone(),
209 pool.clone(),
210 cfg.worker_registry_ttl_interval,
211 256,
212 filter.clone(),
213 "pg.worker_registry_ttl_sweep",
214 |pool, part, _filter| {
215 Box::pin(async move {
216 crate::worker_registry::ttl_sweep_tick(&pool, part)
217 .await
218 .map(|_| ())
219 })
220 },
221 );
222
223 let dep_stale = cfg.dependency_stale_threshold_ms;
225 spawn_global_scan(
226 &js,
227 &tx,
228 rx.clone(),
229 pool.clone(),
230 cfg.dependency_reconciler_interval,
231 filter.clone(),
232 "pg.dependency",
233 move |pool, filter| {
234 Box::pin(async move {
235 reconcilers::dependency::reconcile_tick(&pool, &filter, dep_stale)
236 .await
237 .map(|_| ())
238 })
239 },
240 );
241 spawn_global_scan(
242 &js,
243 &tx,
244 rx.clone(),
245 pool.clone(),
246 cfg.edge_cancel_dispatcher_interval,
247 filter.clone(),
248 "pg.edge_cancel_dispatcher",
249 |pool, filter| {
250 Box::pin(async move {
251 reconcilers::edge_cancel_dispatcher::dispatcher_tick(&pool, &filter)
252 .await
253 .map(|_| ())
254 })
255 },
256 );
257 spawn_global_scan(
258 &js,
259 &tx,
260 rx,
261 pool,
262 cfg.edge_cancel_reconciler_interval,
263 filter,
264 "pg.edge_cancel_reconciler",
265 |pool, filter| {
266 Box::pin(async move {
267 reconcilers::edge_cancel_reconciler::reconciler_tick(&pool, &filter)
268 .await
269 .map(|_| ())
270 })
271 },
272 );
273
274 tracing::info!(
275 scanners = 8,
276 num_partitions,
277 num_budget_partitions,
278 "postgres scanner supervisor spawned (RFC-017 Stage E3 + RFC-020 Wave 9 budget_reset + RFC-025 Phase 3 worker_registry_ttl_sweep)"
279 );
280
281 PostgresScannerHandle {
282 shutdown_tx: tx,
283 join_set: js,
284 }
285}
286
287type TickFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>>;
288
289#[allow(clippy::too_many_arguments)]
290fn spawn_partition_scan<F>(
291 js: &Arc<Mutex<JoinSet<()>>>,
292 _tx: &watch::Sender<bool>,
293 mut shutdown: watch::Receiver<bool>,
294 pool: PgPool,
295 interval: Duration,
296 num_partitions: i16,
297 filter: ScannerFilter,
298 name: &'static str,
299 tick: F,
300) where
301 F: Fn(PgPool, i16, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
302{
303 let js_clone = js.clone();
304 tokio::spawn(async move {
305 let mut guard = js_clone.lock().await;
309 guard.spawn(async move {
310 let mut tk = tokio::time::interval(interval);
311 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
312 loop {
313 tokio::select! {
314 _ = shutdown.changed() => {
315 if *shutdown.borrow() {
316 return;
317 }
318 }
319 _ = tk.tick() => {
320 for part in 0..num_partitions {
321 if *shutdown.borrow() {
322 return;
323 }
324 if let Err(e) = tick(pool.clone(), part, filter.clone()).await {
325 tracing::warn!(
326 scanner = name,
327 partition = part,
328 error = %e,
329 "postgres reconciler tick failed"
330 );
331 }
332 }
333 }
334 }
335 }
336 });
337 });
338}
339
340#[allow(clippy::too_many_arguments)]
341fn spawn_global_scan<F>(
342 js: &Arc<Mutex<JoinSet<()>>>,
343 _tx: &watch::Sender<bool>,
344 mut shutdown: watch::Receiver<bool>,
345 pool: PgPool,
346 interval: Duration,
347 filter: ScannerFilter,
348 name: &'static str,
349 tick: F,
350) where
351 F: Fn(PgPool, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
352{
353 let js_clone = js.clone();
354 tokio::spawn(async move {
355 let mut guard = js_clone.lock().await;
356 guard.spawn(async move {
357 let mut tk = tokio::time::interval(interval);
358 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
359 loop {
360 tokio::select! {
361 _ = shutdown.changed() => {
362 if *shutdown.borrow() {
363 return;
364 }
365 }
366 _ = tk.tick() => {
367 if *shutdown.borrow() {
368 return;
369 }
370 if let Err(e) = tick(pool.clone(), filter.clone()).await {
371 tracing::warn!(
372 scanner = name,
373 error = %e,
374 "postgres reconciler tick failed"
375 );
376 }
377 }
378 }
379 }
380 });
381 });
382}