1use std::sync::Arc;
20use std::time::Duration;
21
22use ff_core::backend::ScannerFilter;
23use ff_core::partition::PartitionConfig;
24use sqlx::PgPool;
25use tokio::sync::{watch, Mutex};
26use tokio::task::JoinSet;
27
28use crate::reconcilers;
29
30#[derive(Clone, Debug)]
35pub struct PostgresScannerConfig {
36 pub attempt_timeout_interval: Duration,
37 pub lease_expiry_interval: Duration,
38 pub suspension_timeout_interval: Duration,
39 pub dependency_reconciler_interval: Duration,
40 pub edge_cancel_dispatcher_interval: Duration,
41 pub edge_cancel_reconciler_interval: Duration,
42 pub budget_reset_interval: Duration,
47 pub dependency_stale_threshold_ms: i64,
50 pub scanner_filter: ScannerFilter,
51 pub partition_config: PartitionConfig,
52}
53
54impl PostgresScannerConfig {
55 pub const DEFAULT_DEP_STALE_MS: i64 = 15_000;
58}
59
60pub struct PostgresScannerHandle {
64 shutdown_tx: watch::Sender<bool>,
65 join_set: Arc<Mutex<JoinSet<()>>>,
66}
67
68impl PostgresScannerHandle {
69 pub async fn shutdown(&self, grace: Duration) -> usize {
73 let _ = self.shutdown_tx.send(true);
74 let mut js = self.join_set.lock().await;
75 let deadline = tokio::time::Instant::now() + grace;
76 let mut timed_out = 0usize;
77 while !js.is_empty() {
78 let remaining = deadline
79 .checked_duration_since(tokio::time::Instant::now())
80 .unwrap_or(Duration::ZERO);
81 if remaining.is_zero() {
82 timed_out = js.len();
83 js.abort_all();
84 break;
85 }
86 match tokio::time::timeout(remaining, js.join_next()).await {
87 Ok(Some(_res)) => continue,
88 Ok(None) => break,
89 Err(_) => {
90 timed_out = js.len();
91 js.abort_all();
92 break;
93 }
94 }
95 }
96 timed_out
97 }
98}
99
100pub fn spawn_scanners(pool: PgPool, cfg: PostgresScannerConfig) -> PostgresScannerHandle {
110 let (tx, rx) = watch::channel(false);
111 let js = Arc::new(Mutex::new(JoinSet::new()));
112
113 let num_partitions: i16 = cfg.partition_config.num_flow_partitions as i16;
119 let num_budget_partitions: i16 = cfg.partition_config.num_budget_partitions as i16;
120 let filter = cfg.scanner_filter.clone();
121
122 spawn_partition_scan(
124 &js,
125 &tx,
126 rx.clone(),
127 pool.clone(),
128 cfg.attempt_timeout_interval,
129 num_partitions,
130 filter.clone(),
131 "pg.attempt_timeout",
132 |pool, part, filter| Box::pin(async move {
133 reconcilers::attempt_timeout::scan_tick(&pool, part, &filter)
134 .await
135 .map(|_| ())
136 }),
137 );
138 spawn_partition_scan(
139 &js,
140 &tx,
141 rx.clone(),
142 pool.clone(),
143 cfg.lease_expiry_interval,
144 num_partitions,
145 filter.clone(),
146 "pg.lease_expiry",
147 |pool, part, filter| Box::pin(async move {
148 reconcilers::lease_expiry::scan_tick(&pool, part, &filter)
149 .await
150 .map(|_| ())
151 }),
152 );
153 spawn_partition_scan(
154 &js,
155 &tx,
156 rx.clone(),
157 pool.clone(),
158 cfg.suspension_timeout_interval,
159 num_partitions,
160 filter.clone(),
161 "pg.suspension_timeout",
162 |pool, part, filter| Box::pin(async move {
163 reconcilers::suspension_timeout::scan_tick(&pool, part, &filter)
164 .await
165 .map(|_| ())
166 }),
167 );
168
169 spawn_partition_scan(
177 &js,
178 &tx,
179 rx.clone(),
180 pool.clone(),
181 cfg.budget_reset_interval,
182 num_budget_partitions,
183 filter.clone(),
184 "pg.budget_reset",
185 |pool, part, _filter| {
186 Box::pin(async move {
187 reconcilers::budget_reset::scan_tick(&pool, part)
188 .await
189 .map(|_| ())
190 })
191 },
192 );
193
194 let dep_stale = cfg.dependency_stale_threshold_ms;
196 spawn_global_scan(
197 &js,
198 &tx,
199 rx.clone(),
200 pool.clone(),
201 cfg.dependency_reconciler_interval,
202 filter.clone(),
203 "pg.dependency",
204 move |pool, filter| {
205 Box::pin(async move {
206 reconcilers::dependency::reconcile_tick(&pool, &filter, dep_stale)
207 .await
208 .map(|_| ())
209 })
210 },
211 );
212 spawn_global_scan(
213 &js,
214 &tx,
215 rx.clone(),
216 pool.clone(),
217 cfg.edge_cancel_dispatcher_interval,
218 filter.clone(),
219 "pg.edge_cancel_dispatcher",
220 |pool, filter| {
221 Box::pin(async move {
222 reconcilers::edge_cancel_dispatcher::dispatcher_tick(&pool, &filter)
223 .await
224 .map(|_| ())
225 })
226 },
227 );
228 spawn_global_scan(
229 &js,
230 &tx,
231 rx,
232 pool,
233 cfg.edge_cancel_reconciler_interval,
234 filter,
235 "pg.edge_cancel_reconciler",
236 |pool, filter| {
237 Box::pin(async move {
238 reconcilers::edge_cancel_reconciler::reconciler_tick(&pool, &filter)
239 .await
240 .map(|_| ())
241 })
242 },
243 );
244
245 tracing::info!(
246 scanners = 7,
247 num_partitions,
248 num_budget_partitions,
249 "postgres scanner supervisor spawned (RFC-017 Stage E3 + RFC-020 Wave 9 budget_reset)"
250 );
251
252 PostgresScannerHandle {
253 shutdown_tx: tx,
254 join_set: js,
255 }
256}
257
258type TickFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>>;
259
260#[allow(clippy::too_many_arguments)]
261fn spawn_partition_scan<F>(
262 js: &Arc<Mutex<JoinSet<()>>>,
263 _tx: &watch::Sender<bool>,
264 mut shutdown: watch::Receiver<bool>,
265 pool: PgPool,
266 interval: Duration,
267 num_partitions: i16,
268 filter: ScannerFilter,
269 name: &'static str,
270 tick: F,
271) where
272 F: Fn(PgPool, i16, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
273{
274 let js_clone = js.clone();
275 tokio::spawn(async move {
276 let mut guard = js_clone.lock().await;
280 guard.spawn(async move {
281 let mut tk = tokio::time::interval(interval);
282 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
283 loop {
284 tokio::select! {
285 _ = shutdown.changed() => {
286 if *shutdown.borrow() {
287 return;
288 }
289 }
290 _ = tk.tick() => {
291 for part in 0..num_partitions {
292 if *shutdown.borrow() {
293 return;
294 }
295 if let Err(e) = tick(pool.clone(), part, filter.clone()).await {
296 tracing::warn!(
297 scanner = name,
298 partition = part,
299 error = %e,
300 "postgres reconciler tick failed"
301 );
302 }
303 }
304 }
305 }
306 }
307 });
308 });
309}
310
311#[allow(clippy::too_many_arguments)]
312fn spawn_global_scan<F>(
313 js: &Arc<Mutex<JoinSet<()>>>,
314 _tx: &watch::Sender<bool>,
315 mut shutdown: watch::Receiver<bool>,
316 pool: PgPool,
317 interval: Duration,
318 filter: ScannerFilter,
319 name: &'static str,
320 tick: F,
321) where
322 F: Fn(PgPool, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
323{
324 let js_clone = js.clone();
325 tokio::spawn(async move {
326 let mut guard = js_clone.lock().await;
327 guard.spawn(async move {
328 let mut tk = tokio::time::interval(interval);
329 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
330 loop {
331 tokio::select! {
332 _ = shutdown.changed() => {
333 if *shutdown.borrow() {
334 return;
335 }
336 }
337 _ = tk.tick() => {
338 if *shutdown.borrow() {
339 return;
340 }
341 if let Err(e) = tick(pool.clone(), filter.clone()).await {
342 tracing::warn!(
343 scanner = name,
344 error = %e,
345 "postgres reconciler tick failed"
346 );
347 }
348 }
349 }
350 }
351 });
352 });
353}