1use std::sync::Arc;
20use std::time::Duration;
21
22use ff_core::backend::ScannerFilter;
23use ff_core::partition::PartitionConfig;
24use sqlx::PgPool;
25use tokio::sync::{watch, Mutex};
26use tokio::task::JoinSet;
27
28use crate::reconcilers;
29
30#[derive(Clone, Debug)]
35pub struct PostgresScannerConfig {
36 pub attempt_timeout_interval: Duration,
37 pub lease_expiry_interval: Duration,
38 pub suspension_timeout_interval: Duration,
39 pub dependency_reconciler_interval: Duration,
40 pub edge_cancel_dispatcher_interval: Duration,
41 pub edge_cancel_reconciler_interval: Duration,
42 pub dependency_stale_threshold_ms: i64,
45 pub scanner_filter: ScannerFilter,
46 pub partition_config: PartitionConfig,
47}
48
49impl PostgresScannerConfig {
50 pub const DEFAULT_DEP_STALE_MS: i64 = 15_000;
53}
54
55pub struct PostgresScannerHandle {
59 shutdown_tx: watch::Sender<bool>,
60 join_set: Arc<Mutex<JoinSet<()>>>,
61}
62
63impl PostgresScannerHandle {
64 pub async fn shutdown(&self, grace: Duration) -> usize {
68 let _ = self.shutdown_tx.send(true);
69 let mut js = self.join_set.lock().await;
70 let deadline = tokio::time::Instant::now() + grace;
71 let mut timed_out = 0usize;
72 while !js.is_empty() {
73 let remaining = deadline
74 .checked_duration_since(tokio::time::Instant::now())
75 .unwrap_or(Duration::ZERO);
76 if remaining.is_zero() {
77 timed_out = js.len();
78 js.abort_all();
79 break;
80 }
81 match tokio::time::timeout(remaining, js.join_next()).await {
82 Ok(Some(_res)) => continue,
83 Ok(None) => break,
84 Err(_) => {
85 timed_out = js.len();
86 js.abort_all();
87 break;
88 }
89 }
90 }
91 timed_out
92 }
93}
94
95pub fn spawn_scanners(pool: PgPool, cfg: PostgresScannerConfig) -> PostgresScannerHandle {
105 let (tx, rx) = watch::channel(false);
106 let js = Arc::new(Mutex::new(JoinSet::new()));
107
108 let num_partitions: i16 = cfg.partition_config.num_flow_partitions as i16;
111 let filter = cfg.scanner_filter.clone();
112
113 spawn_partition_scan(
115 &js,
116 &tx,
117 rx.clone(),
118 pool.clone(),
119 cfg.attempt_timeout_interval,
120 num_partitions,
121 filter.clone(),
122 "pg.attempt_timeout",
123 |pool, part, filter| Box::pin(async move {
124 reconcilers::attempt_timeout::scan_tick(&pool, part, &filter)
125 .await
126 .map(|_| ())
127 }),
128 );
129 spawn_partition_scan(
130 &js,
131 &tx,
132 rx.clone(),
133 pool.clone(),
134 cfg.lease_expiry_interval,
135 num_partitions,
136 filter.clone(),
137 "pg.lease_expiry",
138 |pool, part, filter| Box::pin(async move {
139 reconcilers::lease_expiry::scan_tick(&pool, part, &filter)
140 .await
141 .map(|_| ())
142 }),
143 );
144 spawn_partition_scan(
145 &js,
146 &tx,
147 rx.clone(),
148 pool.clone(),
149 cfg.suspension_timeout_interval,
150 num_partitions,
151 filter.clone(),
152 "pg.suspension_timeout",
153 |pool, part, filter| Box::pin(async move {
154 reconcilers::suspension_timeout::scan_tick(&pool, part, &filter)
155 .await
156 .map(|_| ())
157 }),
158 );
159
160 let dep_stale = cfg.dependency_stale_threshold_ms;
162 spawn_global_scan(
163 &js,
164 &tx,
165 rx.clone(),
166 pool.clone(),
167 cfg.dependency_reconciler_interval,
168 filter.clone(),
169 "pg.dependency",
170 move |pool, filter| {
171 Box::pin(async move {
172 reconcilers::dependency::reconcile_tick(&pool, &filter, dep_stale)
173 .await
174 .map(|_| ())
175 })
176 },
177 );
178 spawn_global_scan(
179 &js,
180 &tx,
181 rx.clone(),
182 pool.clone(),
183 cfg.edge_cancel_dispatcher_interval,
184 filter.clone(),
185 "pg.edge_cancel_dispatcher",
186 |pool, filter| {
187 Box::pin(async move {
188 reconcilers::edge_cancel_dispatcher::dispatcher_tick(&pool, &filter)
189 .await
190 .map(|_| ())
191 })
192 },
193 );
194 spawn_global_scan(
195 &js,
196 &tx,
197 rx,
198 pool,
199 cfg.edge_cancel_reconciler_interval,
200 filter,
201 "pg.edge_cancel_reconciler",
202 |pool, filter| {
203 Box::pin(async move {
204 reconcilers::edge_cancel_reconciler::reconciler_tick(&pool, &filter)
205 .await
206 .map(|_| ())
207 })
208 },
209 );
210
211 tracing::info!(
212 scanners = 6,
213 num_partitions,
214 "postgres scanner supervisor spawned (RFC-017 Stage E3)"
215 );
216
217 PostgresScannerHandle {
218 shutdown_tx: tx,
219 join_set: js,
220 }
221}
222
223type TickFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>>;
224
225#[allow(clippy::too_many_arguments)]
226fn spawn_partition_scan<F>(
227 js: &Arc<Mutex<JoinSet<()>>>,
228 _tx: &watch::Sender<bool>,
229 mut shutdown: watch::Receiver<bool>,
230 pool: PgPool,
231 interval: Duration,
232 num_partitions: i16,
233 filter: ScannerFilter,
234 name: &'static str,
235 tick: F,
236) where
237 F: Fn(PgPool, i16, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
238{
239 let js_clone = js.clone();
240 tokio::spawn(async move {
241 let mut guard = js_clone.lock().await;
245 guard.spawn(async move {
246 let mut tk = tokio::time::interval(interval);
247 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
248 loop {
249 tokio::select! {
250 _ = shutdown.changed() => {
251 if *shutdown.borrow() {
252 return;
253 }
254 }
255 _ = tk.tick() => {
256 for part in 0..num_partitions {
257 if *shutdown.borrow() {
258 return;
259 }
260 if let Err(e) = tick(pool.clone(), part, filter.clone()).await {
261 tracing::warn!(
262 scanner = name,
263 partition = part,
264 error = %e,
265 "postgres reconciler tick failed"
266 );
267 }
268 }
269 }
270 }
271 }
272 });
273 });
274}
275
276#[allow(clippy::too_many_arguments)]
277fn spawn_global_scan<F>(
278 js: &Arc<Mutex<JoinSet<()>>>,
279 _tx: &watch::Sender<bool>,
280 mut shutdown: watch::Receiver<bool>,
281 pool: PgPool,
282 interval: Duration,
283 filter: ScannerFilter,
284 name: &'static str,
285 tick: F,
286) where
287 F: Fn(PgPool, ScannerFilter) -> TickFut + Send + Sync + 'static + Clone,
288{
289 let js_clone = js.clone();
290 tokio::spawn(async move {
291 let mut guard = js_clone.lock().await;
292 guard.spawn(async move {
293 let mut tk = tokio::time::interval(interval);
294 tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
295 loop {
296 tokio::select! {
297 _ = shutdown.changed() => {
298 if *shutdown.borrow() {
299 return;
300 }
301 }
302 _ = tk.tick() => {
303 if *shutdown.borrow() {
304 return;
305 }
306 if let Err(e) = tick(pool.clone(), filter.clone()).await {
307 tracing::warn!(
308 scanner = name,
309 error = %e,
310 "postgres reconciler tick failed"
311 );
312 }
313 }
314 }
315 }
316 });
317 });
318}