1pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::completion_backend::CompletionStream;
14use ff_core::partition::PartitionConfig;
15use ff_core::types::LaneId;
16use tokio::sync::watch;
17use tokio::task::JoinHandle;
18
19use partition_router::PartitionRouter;
20use supervisor::supervised_spawn;
21use scanner::attempt_timeout::AttemptTimeoutScanner;
22use scanner::execution_deadline::ExecutionDeadlineScanner;
23use scanner::budget_reconciler::BudgetReconciler;
24use scanner::budget_reset::BudgetResetScanner;
25use scanner::delayed_promoter::DelayedPromoter;
26use scanner::dependency_reconciler::DependencyReconciler;
27use scanner::index_reconciler::IndexReconciler;
28use scanner::lease_expiry::LeaseExpiryScanner;
29use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
30use scanner::quota_reconciler::QuotaReconciler;
31use scanner::retention_trimmer::RetentionTrimmer;
32use scanner::suspension_timeout::SuspensionTimeoutScanner;
33use scanner::flow_projector::FlowProjector;
34use scanner::unblock::UnblockScanner;
35
36pub struct EngineConfig {
38 pub partition_config: PartitionConfig,
39 pub lanes: Vec<LaneId>,
41 pub lease_expiry_interval: Duration,
43 pub delayed_promoter_interval: Duration,
45 pub index_reconciler_interval: Duration,
47 pub attempt_timeout_interval: Duration,
49 pub suspension_timeout_interval: Duration,
51 pub pending_wp_expiry_interval: Duration,
53 pub retention_trimmer_interval: Duration,
55 pub budget_reset_interval: Duration,
57 pub budget_reconciler_interval: Duration,
59 pub quota_reconciler_interval: Duration,
61 pub unblock_interval: Duration,
63 pub dependency_reconciler_interval: Duration,
82 pub flow_projector_interval: Duration,
89 pub execution_deadline_interval: Duration,
91
92 pub cancel_reconciler_interval: Duration,
100
101 pub scanner_filter: ScannerFilter,
122}
123
124impl Default for EngineConfig {
125 fn default() -> Self {
126 Self {
127 partition_config: PartitionConfig::default(),
128 lanes: vec![LaneId::new("default")],
129 lease_expiry_interval: Duration::from_millis(1500),
130 delayed_promoter_interval: Duration::from_millis(750),
131 index_reconciler_interval: Duration::from_secs(45),
132 attempt_timeout_interval: Duration::from_secs(2),
133 suspension_timeout_interval: Duration::from_secs(2),
134 pending_wp_expiry_interval: Duration::from_secs(5),
135 retention_trimmer_interval: Duration::from_secs(60),
136 budget_reset_interval: Duration::from_secs(15),
137 budget_reconciler_interval: Duration::from_secs(30),
138 quota_reconciler_interval: Duration::from_secs(30),
139 unblock_interval: Duration::from_secs(5),
140 dependency_reconciler_interval: Duration::from_secs(15),
141 flow_projector_interval: Duration::from_secs(15),
142 execution_deadline_interval: Duration::from_secs(5),
143 cancel_reconciler_interval: Duration::from_secs(15),
144 scanner_filter: ScannerFilter::default(),
145 }
146 }
147}
148
149pub struct Engine {
151 pub router: Arc<PartitionRouter>,
152 shutdown_tx: watch::Sender<bool>,
153 handles: Vec<JoinHandle<()>>,
154}
155
156impl Engine {
157 pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
161 Self::start_with_metrics(config, client, Arc::new(ff_observability::Metrics::new()))
171 }
172
173 pub fn start_with_metrics(
181 config: EngineConfig,
182 client: ferriskey::Client,
183 metrics: Arc<ff_observability::Metrics>,
184 ) -> Self {
185 Self::start_internal(config, client, metrics, None)
186 }
187
188 pub fn start_with_completions(
199 config: EngineConfig,
200 client: ferriskey::Client,
201 metrics: Arc<ff_observability::Metrics>,
202 completions: CompletionStream,
203 ) -> Self {
204 Self::start_internal(config, client, metrics, Some(completions))
205 }
206
207 fn start_internal(
208 config: EngineConfig,
209 client: ferriskey::Client,
210 metrics: Arc<ff_observability::Metrics>,
211 completions: Option<CompletionStream>,
212 ) -> Self {
213 let (shutdown_tx, shutdown_rx) = watch::channel(false);
214 let num_partitions = config.partition_config.num_flow_partitions;
215 let router = Arc::new(PartitionRouter::new(config.partition_config));
216
217 let mut handles = Vec::new();
218
219 let scanner_filter = config.scanner_filter.clone();
220
221 let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter(
223 config.lease_expiry_interval,
224 scanner_filter.clone(),
225 ));
226 handles.push(supervised_spawn(
227 lease_scanner,
228 client.clone(),
229 num_partitions,
230 shutdown_rx.clone(),
231 metrics.clone(),
232 ));
233
234 let delayed_scanner = Arc::new(DelayedPromoter::with_filter(
236 config.delayed_promoter_interval,
237 config.lanes.clone(),
238 scanner_filter.clone(),
239 ));
240 handles.push(supervised_spawn(
241 delayed_scanner,
242 client.clone(),
243 num_partitions,
244 shutdown_rx.clone(),
245 metrics.clone(),
246 ));
247
248 let reconciler = Arc::new(IndexReconciler::with_filter(
250 config.index_reconciler_interval,
251 config.lanes.clone(),
252 scanner_filter.clone(),
253 ));
254 handles.push(supervised_spawn(
255 reconciler,
256 client.clone(),
257 num_partitions,
258 shutdown_rx.clone(),
259 metrics.clone(),
260 ));
261
262 let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter(
264 config.attempt_timeout_interval,
265 config.lanes.clone(),
266 scanner_filter.clone(),
267 ));
268 handles.push(supervised_spawn(
269 timeout_scanner,
270 client.clone(),
271 num_partitions,
272 shutdown_rx.clone(),
273 metrics.clone(),
274 ));
275
276 let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter(
278 config.suspension_timeout_interval,
279 scanner_filter.clone(),
280 ));
281 handles.push(supervised_spawn(
282 suspension_scanner,
283 client.clone(),
284 num_partitions,
285 shutdown_rx.clone(),
286 metrics.clone(),
287 ));
288
289 let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter(
291 config.pending_wp_expiry_interval,
292 scanner_filter.clone(),
293 ));
294 handles.push(supervised_spawn(
295 pending_wp_scanner,
296 client.clone(),
297 num_partitions,
298 shutdown_rx.clone(),
299 metrics.clone(),
300 ));
301
302 let retention_scanner = Arc::new(RetentionTrimmer::with_filter(
304 config.retention_trimmer_interval,
305 config.lanes.clone(),
306 scanner_filter.clone(),
307 ));
308 handles.push(supervised_spawn(
309 retention_scanner,
310 client.clone(),
311 num_partitions,
312 shutdown_rx.clone(),
313 metrics.clone(),
314 ));
315
316 let budget_reset = Arc::new(BudgetResetScanner::with_filter(
320 config.budget_reset_interval,
321 scanner_filter.clone(),
322 ));
323 handles.push(supervised_spawn(
324 budget_reset,
325 client.clone(),
326 config.partition_config.num_budget_partitions,
327 shutdown_rx.clone(),
328 metrics.clone(),
329 ));
330
331 let budget_reconciler = Arc::new(BudgetReconciler::with_filter(
335 config.budget_reconciler_interval,
336 scanner_filter.clone(),
337 ));
338 handles.push(supervised_spawn(
339 budget_reconciler,
340 client.clone(),
341 config.partition_config.num_budget_partitions,
342 shutdown_rx.clone(),
343 metrics.clone(),
344 ));
345
346 let unblock_scanner = Arc::new(UnblockScanner::with_filter(
348 config.unblock_interval,
349 config.lanes.clone(),
350 config.partition_config,
351 scanner_filter.clone(),
352 ));
353 handles.push(supervised_spawn(
354 unblock_scanner,
355 client.clone(),
356 num_partitions,
357 shutdown_rx.clone(),
358 metrics.clone(),
359 ));
360
361 let dep_reconciler = Arc::new(DependencyReconciler::with_filter(
363 config.dependency_reconciler_interval,
364 config.lanes.clone(),
365 config.partition_config,
366 scanner_filter.clone(),
367 ));
368 handles.push(supervised_spawn(
369 dep_reconciler,
370 client.clone(),
371 num_partitions,
372 shutdown_rx.clone(),
373 metrics.clone(),
374 ));
375
376 let quota_reconciler = Arc::new(QuotaReconciler::with_filter(
380 config.quota_reconciler_interval,
381 scanner_filter.clone(),
382 ));
383 handles.push(supervised_spawn(
384 quota_reconciler,
385 client.clone(),
386 config.partition_config.num_quota_partitions,
387 shutdown_rx.clone(),
388 metrics.clone(),
389 ));
390
391 let flow_projector = Arc::new(FlowProjector::with_filter(
395 config.flow_projector_interval,
396 config.partition_config,
397 scanner_filter.clone(),
398 ));
399 handles.push(supervised_spawn(
400 flow_projector,
401 client.clone(),
402 config.partition_config.num_flow_partitions,
403 shutdown_rx.clone(),
404 metrics.clone(),
405 ));
406
407 let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter(
411 config.cancel_reconciler_interval,
412 config.partition_config,
413 scanner_filter.clone(),
414 ));
415 handles.push(supervised_spawn(
416 cancel_reconciler,
417 client.clone(),
418 config.partition_config.num_flow_partitions,
419 shutdown_rx.clone(),
420 metrics.clone(),
421 ));
422
423 let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter(
425 config.execution_deadline_interval,
426 config.lanes,
427 scanner_filter,
428 ));
429 handles.push(supervised_spawn(
430 deadline_scanner,
431 client.clone(),
432 num_partitions,
433 shutdown_rx.clone(),
434 metrics.clone(),
435 ));
436
437 let listener_enabled = completions.is_some();
443 if let Some(stream) = completions {
444 handles.push(completion_listener::spawn_dispatch_loop(
445 router.clone(),
446 client,
447 stream,
448 shutdown_rx,
449 ));
450 }
451
452 let scanner_count = if listener_enabled { "15 scanners + completion dispatch" } else { "15 scanners" };
453 tracing::info!(
454 num_partitions,
455 budget_partitions = config.partition_config.num_budget_partitions,
456 quota_partitions = config.partition_config.num_quota_partitions,
457 flow_partitions = config.partition_config.num_flow_partitions,
458 "engine started with {scanner_count}"
459 );
460
461 Self {
462 router,
463 shutdown_tx,
464 handles,
465 }
466 }
467
468 pub async fn shutdown(self) {
474 let _ = self.shutdown_tx.send(true);
475 let join_all = async {
476 for handle in self.handles {
477 let _ = handle.await;
478 }
479 };
480 match tokio::time::timeout(Duration::from_secs(15), join_all).await {
481 Ok(()) => tracing::info!("engine shutdown complete"),
482 Err(_) => tracing::warn!(
483 "engine shutdown timed out after 15s, abandoning remaining scanners"
484 ),
485 }
486 }
487}