ff_engine/lib.rs
1//! ff-engine: cross-partition dispatch and background scanners.
2
3pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::completion_backend::CompletionStream;
14use ff_core::engine_backend::EngineBackend;
15use ff_core::partition::PartitionConfig;
16use ff_core::types::LaneId;
17use tokio::sync::watch;
18use tokio::task::JoinHandle;
19
20use partition_router::PartitionRouter;
21use supervisor::supervised_spawn;
22use scanner::attempt_timeout::AttemptTimeoutScanner;
23use scanner::execution_deadline::ExecutionDeadlineScanner;
24use scanner::budget_reconciler::BudgetReconciler;
25use scanner::budget_reset::BudgetResetScanner;
26use scanner::delayed_promoter::DelayedPromoter;
27use scanner::dependency_reconciler::DependencyReconciler;
28use scanner::index_reconciler::IndexReconciler;
29use scanner::lease_expiry::LeaseExpiryScanner;
30use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
31use scanner::quota_reconciler::QuotaReconciler;
32use scanner::retention_trimmer::RetentionTrimmer;
33use scanner::suspension_timeout::SuspensionTimeoutScanner;
34use scanner::flow_projector::FlowProjector;
35use scanner::unblock::UnblockScanner;
36
37/// Engine configuration.
38pub struct EngineConfig {
39 pub partition_config: PartitionConfig,
40 /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
41 pub lanes: Vec<LaneId>,
42 /// Lease expiry scan interval. Default: 1.5s.
43 pub lease_expiry_interval: Duration,
44 /// Delayed promoter scan interval. Default: 750ms.
45 pub delayed_promoter_interval: Duration,
46 /// Index reconciler scan interval. Default: 45s.
47 pub index_reconciler_interval: Duration,
48 /// Attempt timeout scan interval. Default: 2s.
49 pub attempt_timeout_interval: Duration,
50 /// Suspension timeout scan interval. Default: 2s.
51 pub suspension_timeout_interval: Duration,
52 /// Pending waitpoint expiry scan interval. Default: 5s.
53 pub pending_wp_expiry_interval: Duration,
54 /// Retention trimmer scan interval. Default: 60s.
55 pub retention_trimmer_interval: Duration,
56 /// Budget reset scan interval. Default: 15s.
57 pub budget_reset_interval: Duration,
58 /// Budget reconciler scan interval. Default: 30s.
59 pub budget_reconciler_interval: Duration,
60 /// Quota reconciler scan interval. Default: 30s.
61 pub quota_reconciler_interval: Duration,
62 /// Unblock scanner interval. Default: 5s.
63 pub unblock_interval: Duration,
64 /// Dependency reconciler interval. Default: 15s.
65 ///
66 /// Post-Batch-C this scanner is a **safety net**, not the primary
67 /// promotion path. When a [`CompletionStream`] is handed to
68 /// `start_with_completions`, push-based dispatch drives DAG
69 /// promotion synchronously with each completion — under normal
70 /// operation DAG latency is `~RTT × levels`, not `interval × levels`.
71 ///
72 /// The reconciler still runs as a catch-all for:
73 /// - messages missed during subscriber restart or reconnect;
74 /// - pre-Batch-C executions without `core.flow_id` stamped;
75 /// - operator-driven edge mutation that doesn't pass through
76 /// the terminal-transition publish path.
77 ///
78 /// 15s idle-scan cost is minimal. If the push dispatch loop is
79 /// disabled (engine started via `start`/`start_with_metrics`
80 /// without a stream), drop this to 1s to preserve pre-Batch-C
81 /// DAG latency behavior.
82 pub dependency_reconciler_interval: Duration,
83 /// Flow summary projector interval. Default: 15s.
84 ///
85 /// Separate observability projection path — maintains the flow
86 /// summary view, NOT on the DAG-completion latency path. Kept at
87 /// 15s in this config; a change to that cadence is unrelated to
88 /// dependency resolution.
89 pub flow_projector_interval: Duration,
90 /// Execution deadline scanner interval. Default: 5s.
91 pub execution_deadline_interval: Duration,
92
93 /// Cancel reconciler scanner interval. Default: 15s.
94 ///
95 /// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
96 /// flows owing async member cancels. Each cancelled flow gets a
97 /// grace window (30s by default, set by ff-server) before the
98 /// reconciler picks it up, so the live in-process dispatch isn't
99 /// fought on the happy path.
100 pub cancel_reconciler_interval: Duration,
101
102 /// RFC-016 Stage C sibling-cancel dispatcher interval. Default: 1s.
103 ///
104 /// Drains the per-flow-partition `pending_cancel_groups` SET,
105 /// populated by `ff_resolve_dependency` whenever an AnyOf/Quorum
106 /// edge group fires terminal under `OnSatisfied::CancelRemaining`.
107 /// For each indexed group the dispatcher issues per-sibling
108 /// `ff_cancel_execution` with `FailureReason::sibling_quorum_{
109 /// satisfied,impossible}`, then atomically SREM+clear via
110 /// `ff_drain_sibling_cancel_group`.
111 ///
112 /// A short default (1s) minimises the window between quorum
113 /// satisfaction and sibling termination — this is the user-facing
114 /// latency floor for "kill the losers" workflows. Bump only if a
115 /// deployment's steady-state pending-set depth is observed to
116 /// backlog under the 1s cadence; Stage C's §4.2 benchmark gates
117 /// the release against the p99 ≤ 500 ms SLO at n=100 (§4.2 of
118 /// the RFC).
119 pub edge_cancel_dispatcher_interval: Duration,
120
121 /// RFC-016 Stage D sibling-cancel reconciler interval. Default: 10s.
122 ///
123 /// Safety-net scanner for Invariant Q6: if the engine crashed
124 /// between `ff_resolve_dependency`'s SADD to `pending_cancel_groups`
125 /// and the dispatcher's `ff_drain_sibling_cancel_group`, this
126 /// reconciler detects the orphan tuple and finalises via
127 /// `ff_reconcile_sibling_cancel_group`. It runs at a deliberately
128 /// slower cadence than the dispatcher (10s vs 1s) so the dispatcher
129 /// owns the happy path and the reconciler only cleans up
130 /// crash-recovery residue. The reconciler MUST NOT fight the
131 /// dispatcher — it no-ops whenever siblings are still non-terminal.
132 pub edge_cancel_reconciler_interval: Duration,
133
134 /// Per-consumer scanner filter (issue #122).
135 ///
136 /// Applied by every execution-shaped scanner (lease_expiry,
137 /// attempt_timeout, execution_deadline, suspension_timeout,
138 /// pending_wp_expiry, delayed_promoter, dependency_reconciler,
139 /// cancel_reconciler, unblock, index_reconciler,
140 /// retention_trimmer) to restrict the candidate set to
141 /// executions owned by this consumer. The four non-execution
142 /// scanners (budget_reconciler, budget_reset, quota_reconciler,
143 /// flow_projector) accept the filter for API uniformity but do
144 /// not apply it — their domains are not per-execution.
145 ///
146 /// Default: [`ScannerFilter::default`] — no filtering,
147 /// pre-#122 behaviour. Multi-tenant deployments that share a
148 /// single Valkey keyspace across two FlowFabric instances set
149 /// this (paired with
150 /// [`CompletionBackend::subscribe_completions_filtered`]) for
151 /// mutual isolation.
152 ///
153 /// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
154 pub scanner_filter: ScannerFilter,
155}
156
157impl Default for EngineConfig {
158 fn default() -> Self {
159 Self {
160 partition_config: PartitionConfig::default(),
161 lanes: vec![LaneId::new("default")],
162 lease_expiry_interval: Duration::from_millis(1500),
163 delayed_promoter_interval: Duration::from_millis(750),
164 index_reconciler_interval: Duration::from_secs(45),
165 attempt_timeout_interval: Duration::from_secs(2),
166 suspension_timeout_interval: Duration::from_secs(2),
167 pending_wp_expiry_interval: Duration::from_secs(5),
168 retention_trimmer_interval: Duration::from_secs(60),
169 budget_reset_interval: Duration::from_secs(15),
170 budget_reconciler_interval: Duration::from_secs(30),
171 quota_reconciler_interval: Duration::from_secs(30),
172 unblock_interval: Duration::from_secs(5),
173 dependency_reconciler_interval: Duration::from_secs(15),
174 flow_projector_interval: Duration::from_secs(15),
175 execution_deadline_interval: Duration::from_secs(5),
176 cancel_reconciler_interval: Duration::from_secs(15),
177 edge_cancel_dispatcher_interval: Duration::from_secs(1),
178 edge_cancel_reconciler_interval: Duration::from_secs(10),
179 scanner_filter: ScannerFilter::default(),
180 }
181 }
182}
183
184/// The FlowFabric engine: partition routing + background scanners.
185pub struct Engine {
186 pub router: Arc<PartitionRouter>,
187 shutdown_tx: watch::Sender<bool>,
188 handles: Vec<JoinHandle<()>>,
189}
190
191impl Engine {
192 /// Start the engine with the given config and backend.
193 ///
194 /// Spawns background scanner tasks. Returns immediately.
195 ///
196 /// `backend` is an `Arc<dyn EngineBackend>`. When the underlying
197 /// concrete type is [`ff_backend_valkey::ValkeyBackend`], the
198 /// engine spawns its in-tree Valkey scanner supervisors. For
199 /// non-Valkey backends (Postgres, SQLite, custom impls), the
200 /// reconciler supervisor lives inside the backend itself (spawned
201 /// during `connect_with_*`) and this constructor only wires the
202 /// completion dispatch loop + partition router. Cairn #436: the
203 /// runtime `downcast`-or-panic from v0.12 PR-7a is gone.
204 pub fn start(config: EngineConfig, backend: Arc<dyn EngineBackend>) -> Self {
205 // Construct a fresh metrics handle here so direct callers
206 // (examples, tests) don't need to. Under the default build
207 // (`observability` feature off) this is the no-op shim. With
208 // the feature on the handle is a real OTEL registry but — by
209 // design — one that nothing else shares; it's only useful for
210 // tests that want to exercise scanner cycle recording in
211 // isolation. Production code uses
212 // [`Self::start_with_metrics`] to plumb the same handle
213 // through the HTTP /metrics route.
214 Self::start_with_metrics(config, backend, Arc::new(ff_observability::Metrics::new()))
215 }
216
217 /// PR-94: start the engine with a shared observability registry.
218 ///
219 /// Used by `ff-server` so scanner cycle metrics funnel into the
220 /// same Prometheus registry exposed at `/metrics`. Under the
221 /// `observability` feature (flipped via the same feature on
222 /// `ff-server` / `ff-engine`), the handle records into an OTEL
223 /// `MeterProvider`; otherwise the shim no-ops.
224 pub fn start_with_metrics(
225 config: EngineConfig,
226 backend: Arc<dyn EngineBackend>,
227 metrics: Arc<ff_observability::Metrics>,
228 ) -> Self {
229 Self::start_internal(config, backend, metrics, None)
230 }
231
232 /// Start the engine with a shared observability registry and a
233 /// completion stream for push-based DAG promotion (issue #90).
234 ///
235 /// The stream is typically produced by
236 /// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
237 /// The engine spawns a dispatch loop that drains the stream and
238 /// fires `ff_resolve_dependency` per completion, reducing DAG
239 /// latency from `interval × levels` to `~RTT × levels`. The
240 /// `dependency_reconciler` scanner remains as a safety net for
241 /// completions missed during subscriber reconnect windows.
242 ///
243 /// # Backend parameter
244 ///
245 /// `backend` is an `Arc<dyn EngineBackend>`. Works uniformly
246 /// across Valkey / Postgres / SQLite (and any other impl): the
247 /// engine attempts a Valkey downcast and, when successful, spawns
248 /// the in-tree ferriskey-speaking scanner supervisors. Otherwise
249 /// it trusts the backend to own its reconciler supervisor (cairn
250 /// #436 / PR-7b). No runtime panic path.
251 ///
252 /// # Example
253 ///
254 /// ```no_run
255 /// # use std::sync::Arc;
256 /// # use ff_core::engine_backend::EngineBackend;
257 /// # async fn ex(
258 /// # cfg: ff_engine::EngineConfig,
259 /// # metrics: Arc<ff_observability::Metrics>,
260 /// # stream: ff_core::completion_backend::CompletionStream,
261 /// # backend: Arc<dyn EngineBackend>, // e.g. from ValkeyBackend::connect
262 /// # ) {
263 /// // Any `EngineBackend` impl works (cairn #436).
264 /// let engine = ff_engine::Engine::start_with_completions(
265 /// cfg, backend, metrics, stream,
266 /// );
267 /// # let _ = engine;
268 /// # }
269 /// ```
270 pub fn start_with_completions(
271 config: EngineConfig,
272 backend: Arc<dyn EngineBackend>,
273 metrics: Arc<ff_observability::Metrics>,
274 completions: CompletionStream,
275 ) -> Self {
276 Self::start_internal(config, backend, metrics, Some(completions))
277 }
278
279 fn start_internal(
280 config: EngineConfig,
281 backend: Arc<dyn EngineBackend>,
282 metrics: Arc<ff_observability::Metrics>,
283 completions: Option<CompletionStream>,
284 ) -> Self {
285 // PR-7b (cairn #436): dispatch scanner spawn by backend family.
286 // Valkey scanners live inside ff-engine and still speak
287 // ferriskey directly, so they need the embedded client.
288 // Non-Valkey backends (Postgres, SQLite) self-spawn their own
289 // reconciler supervisor during connect; the engine only
290 // contributes the router + optional completion dispatch loop.
291 // Unknown backends get the same treatment as non-Valkey ones —
292 // the engine never panics on a foreign `EngineBackend` impl.
293 let valkey_client = backend
294 .as_any()
295 .downcast_ref::<ff_backend_valkey::ValkeyBackend>()
296 .map(|vb| vb.client().clone());
297 let (shutdown_tx, shutdown_rx) = watch::channel(false);
298 let num_partitions = config.partition_config.num_flow_partitions;
299 let router = Arc::new(PartitionRouter::new(config.partition_config));
300
301 let mut handles = Vec::new();
302
303 let scanner_filter = config.scanner_filter.clone();
304
305 // Short-circuit the Valkey-only scanner block for non-Valkey
306 // backends. Their reconcilers are already running under each
307 // backend's own `spawn_scanners` supervisor.
308 if valkey_client.is_none() {
309 tracing::info!(
310 backend_label = backend.backend_label(),
311 "engine started without in-tree scanner spawn; \
312 backend owns its reconciler supervisor"
313 );
314 let listener_enabled = completions.is_some();
315 if let Some(stream) = completions {
316 handles.push(completion_listener::spawn_dispatch_loop(
317 backend.clone(),
318 stream,
319 shutdown_rx,
320 ));
321 }
322 if listener_enabled {
323 tracing::info!("engine dispatch loop spawned (completion-driven DAG)");
324 }
325 return Self {
326 router,
327 shutdown_tx,
328 handles,
329 };
330 }
331 // Unwrap safe: `valkey_client.is_none()` returned above.
332 let client = valkey_client.expect("Valkey client present on Valkey path");
333
334 // Lease expiry scanner
335 let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter_and_backend(
336 config.lease_expiry_interval,
337 scanner_filter.clone(),
338 backend.clone(),
339 ));
340 handles.push(supervised_spawn(
341 lease_scanner,
342 client.clone(),
343 num_partitions,
344 shutdown_rx.clone(),
345 metrics.clone(),
346 ));
347
348 // Delayed promoter
349 let delayed_scanner = Arc::new(DelayedPromoter::with_filter_and_backend(
350 config.delayed_promoter_interval,
351 config.lanes.clone(),
352 scanner_filter.clone(),
353 backend.clone(),
354 ));
355 handles.push(supervised_spawn(
356 delayed_scanner,
357 client.clone(),
358 num_partitions,
359 shutdown_rx.clone(),
360 metrics.clone(),
361 ));
362
363 // Index reconciler
364 let reconciler = Arc::new(IndexReconciler::with_filter_and_backend(
365 config.index_reconciler_interval,
366 config.lanes.clone(),
367 scanner_filter.clone(),
368 backend.clone(),
369 ));
370 handles.push(supervised_spawn(
371 reconciler,
372 client.clone(),
373 num_partitions,
374 shutdown_rx.clone(),
375 metrics.clone(),
376 ));
377
378 // Attempt timeout scanner
379 let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter_and_backend(
380 config.attempt_timeout_interval,
381 config.lanes.clone(),
382 scanner_filter.clone(),
383 backend.clone(),
384 ));
385 handles.push(supervised_spawn(
386 timeout_scanner,
387 client.clone(),
388 num_partitions,
389 shutdown_rx.clone(),
390 metrics.clone(),
391 ));
392
393 // Suspension timeout scanner
394 let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter_and_backend(
395 config.suspension_timeout_interval,
396 scanner_filter.clone(),
397 backend.clone(),
398 ));
399 handles.push(supervised_spawn(
400 suspension_scanner,
401 client.clone(),
402 num_partitions,
403 shutdown_rx.clone(),
404 metrics.clone(),
405 ));
406
407 // Pending waitpoint expiry scanner
408 let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter_and_backend(
409 config.pending_wp_expiry_interval,
410 scanner_filter.clone(),
411 backend.clone(),
412 ));
413 handles.push(supervised_spawn(
414 pending_wp_scanner,
415 client.clone(),
416 num_partitions,
417 shutdown_rx.clone(),
418 metrics.clone(),
419 ));
420
421 // Retention trimmer
422 let retention_scanner = Arc::new(RetentionTrimmer::with_filter_and_backend(
423 config.retention_trimmer_interval,
424 config.lanes.clone(),
425 scanner_filter.clone(),
426 backend.clone(),
427 ));
428 handles.push(supervised_spawn(
429 retention_scanner,
430 client.clone(),
431 num_partitions,
432 shutdown_rx.clone(),
433 metrics.clone(),
434 ));
435
436 // Budget reset scanner (iterates budget partitions).
437 // Filter is accepted but not applied (budget partitions don't
438 // carry the per-execution namespace / instance_tag shape).
439 let budget_reset = Arc::new(BudgetResetScanner::with_filter_and_backend(
440 config.budget_reset_interval,
441 scanner_filter.clone(),
442 backend.clone(),
443 ));
444 handles.push(supervised_spawn(
445 budget_reset,
446 client.clone(),
447 config.partition_config.num_budget_partitions,
448 shutdown_rx.clone(),
449 metrics.clone(),
450 ));
451
452 // Budget reconciler (iterates budget partitions). Filter
453 // accepted but not applied — see BudgetReconciler::with_filter
454 // rustdoc. PR-7b Cluster 2b-A: routed through the trait.
455 let budget_reconciler = Arc::new(BudgetReconciler::with_filter_and_backend(
456 config.budget_reconciler_interval,
457 scanner_filter.clone(),
458 backend.clone(),
459 ));
460 handles.push(supervised_spawn(
461 budget_reconciler,
462 client.clone(),
463 config.partition_config.num_budget_partitions,
464 shutdown_rx.clone(),
465 metrics.clone(),
466 ));
467
468 // Unblock scanner (iterates execution partitions, re-evaluates blocked)
469 let unblock_scanner = Arc::new(UnblockScanner::with_filter_and_backend(
470 config.unblock_interval,
471 config.lanes.clone(),
472 config.partition_config,
473 scanner_filter.clone(),
474 backend.clone(),
475 ));
476 handles.push(supervised_spawn(
477 unblock_scanner,
478 client.clone(),
479 num_partitions,
480 shutdown_rx.clone(),
481 metrics.clone(),
482 ));
483
484 // Dependency reconciler (iterates execution partitions)
485 let dep_reconciler = Arc::new(DependencyReconciler::with_filter_and_backend(
486 config.dependency_reconciler_interval,
487 config.lanes.clone(),
488 config.partition_config,
489 scanner_filter.clone(),
490 backend.clone(),
491 ));
492 handles.push(supervised_spawn(
493 dep_reconciler,
494 client.clone(),
495 num_partitions,
496 shutdown_rx.clone(),
497 metrics.clone(),
498 ));
499
500 // Quota reconciler (iterates quota partitions). Filter
501 // accepted but not applied — see QuotaReconciler::with_filter
502 // rustdoc. PR-7b Cluster 2b-A: routed through the trait.
503 let quota_reconciler = Arc::new(QuotaReconciler::with_filter_and_backend(
504 config.quota_reconciler_interval,
505 scanner_filter.clone(),
506 backend.clone(),
507 ));
508 handles.push(supervised_spawn(
509 quota_reconciler,
510 client.clone(),
511 config.partition_config.num_quota_partitions,
512 shutdown_rx.clone(),
513 metrics.clone(),
514 ));
515
516 // Flow summary projector (iterates flow partitions). Filter
517 // accepted but not applied — see FlowProjector::with_filter
518 // rustdoc. Backend wiring (PR-7b Cluster 2b-B) routes the
519 // per-flow projection through `EngineBackend::project_flow_summary`.
520 let flow_projector = Arc::new(FlowProjector::with_filter_and_backend(
521 config.flow_projector_interval,
522 config.partition_config,
523 scanner_filter.clone(),
524 backend.clone(),
525 ));
526 handles.push(supervised_spawn(
527 flow_projector,
528 client.clone(),
529 config.partition_config.num_flow_partitions,
530 shutdown_rx.clone(),
531 metrics.clone(),
532 ));
533
534 // Cancel reconciler (iterates flow partitions). Drains
535 // cancel_backlog entries whose grace window has elapsed so a
536 // process crash mid-dispatch can't leave flow members un-cancelled.
537 let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter_and_backend(
538 config.cancel_reconciler_interval,
539 config.partition_config,
540 scanner_filter.clone(),
541 backend.clone(),
542 ));
543 handles.push(supervised_spawn(
544 cancel_reconciler,
545 client.clone(),
546 config.partition_config.num_flow_partitions,
547 shutdown_rx.clone(),
548 metrics.clone(),
549 ));
550
551 // RFC-016 Stage C: sibling-cancel dispatcher. Iterates flow
552 // partitions, drains `pending_cancel_groups` SET via
553 // `ff_drain_sibling_cancel_group`, issues per-sibling
554 // `ff_cancel_execution` with sibling_quorum reasons.
555 let edge_cancel_dispatcher = Arc::new(
556 scanner::edge_cancel_dispatcher::EdgeCancelDispatcher::with_filter_metrics_and_backend(
557 config.edge_cancel_dispatcher_interval,
558 config.partition_config,
559 scanner_filter.clone(),
560 metrics.clone(),
561 backend.clone(),
562 ),
563 );
564 handles.push(supervised_spawn(
565 edge_cancel_dispatcher,
566 client.clone(),
567 config.partition_config.num_flow_partitions,
568 shutdown_rx.clone(),
569 metrics.clone(),
570 ));
571
572 // RFC-016 Stage D: sibling-cancel reconciler. Crash-recovery
573 // safety net for Invariant Q6 — finalises tuples in
574 // `pending_cancel_groups` whose dispatcher drain was interrupted
575 // by an engine crash. Runs at a slower cadence than the
576 // dispatcher so it never fights the happy path.
577 let edge_cancel_reconciler = Arc::new(
578 scanner::edge_cancel_reconciler::EdgeCancelReconciler::with_filter_metrics_and_backend(
579 config.edge_cancel_reconciler_interval,
580 scanner_filter.clone(),
581 metrics.clone(),
582 backend.clone(),
583 ),
584 );
585 handles.push(supervised_spawn(
586 edge_cancel_reconciler,
587 client.clone(),
588 config.partition_config.num_flow_partitions,
589 shutdown_rx.clone(),
590 metrics.clone(),
591 ));
592
593 // Execution deadline scanner (iterates execution partitions)
594 let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter_and_backend(
595 config.execution_deadline_interval,
596 config.lanes,
597 scanner_filter,
598 backend.clone(),
599 ));
600 handles.push(supervised_spawn(
601 deadline_scanner,
602 client.clone(),
603 num_partitions,
604 shutdown_rx.clone(),
605 metrics.clone(),
606 ));
607
608 // Completion dispatch loop (Batch C item 6 — push-based DAG
609 // promotion; backend-agnostic since issue #90). Optional: when
610 // a stream is provided, spawn a task that drains
611 // `CompletionPayload`s and fires dependency resolution per
612 // completion. See `completion_listener` module docs.
613 let listener_enabled = completions.is_some();
614 if let Some(stream) = completions {
615 handles.push(completion_listener::spawn_dispatch_loop(
616 backend.clone(),
617 stream,
618 shutdown_rx,
619 ));
620 }
621
622 let scanner_count = if listener_enabled { "17 scanners + completion dispatch" } else { "17 scanners" };
623 tracing::info!(
624 num_partitions,
625 budget_partitions = config.partition_config.num_budget_partitions,
626 quota_partitions = config.partition_config.num_quota_partitions,
627 flow_partitions = config.partition_config.num_flow_partitions,
628 "engine started with {scanner_count}"
629 );
630
631 Self {
632 router,
633 shutdown_tx,
634 handles,
635 }
636 }
637
638 /// Signal all scanners to stop and wait for them to finish.
639 ///
640 /// Waits up to 15 seconds for scanners to drain. If any scanner is
641 /// blocked on a hung Valkey command, the timeout prevents shutdown
642 /// from hanging indefinitely (Kubernetes SIGKILL safety).
643 pub async fn shutdown(self) {
644 let _ = self.shutdown_tx.send(true);
645 let join_all = async {
646 for handle in self.handles {
647 let _ = handle.await;
648 }
649 };
650 match tokio::time::timeout(Duration::from_secs(15), join_all).await {
651 Ok(()) => tracing::info!("engine shutdown complete"),
652 Err(_) => tracing::warn!(
653 "engine shutdown timed out after 15s, abandoning remaining scanners"
654 ),
655 }
656 }
657}