Skip to main content

ff_engine/
lib.rs

1//! ff-engine: cross-partition dispatch and background scanners.
2
3pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::completion_backend::CompletionStream;
14use ff_core::engine_backend::EngineBackend;
15use ff_core::partition::PartitionConfig;
16use ff_core::types::LaneId;
17use tokio::sync::watch;
18use tokio::task::JoinHandle;
19
20use partition_router::PartitionRouter;
21use supervisor::supervised_spawn;
22use scanner::attempt_timeout::AttemptTimeoutScanner;
23use scanner::execution_deadline::ExecutionDeadlineScanner;
24use scanner::budget_reconciler::BudgetReconciler;
25use scanner::budget_reset::BudgetResetScanner;
26use scanner::delayed_promoter::DelayedPromoter;
27use scanner::dependency_reconciler::DependencyReconciler;
28use scanner::index_reconciler::IndexReconciler;
29use scanner::lease_expiry::LeaseExpiryScanner;
30use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
31use scanner::quota_reconciler::QuotaReconciler;
32use scanner::retention_trimmer::RetentionTrimmer;
33use scanner::suspension_timeout::SuspensionTimeoutScanner;
34use scanner::flow_projector::FlowProjector;
35use scanner::unblock::UnblockScanner;
36
37/// Engine configuration.
38pub struct EngineConfig {
39    pub partition_config: PartitionConfig,
40    /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
41    pub lanes: Vec<LaneId>,
42    /// Lease expiry scan interval. Default: 1.5s.
43    pub lease_expiry_interval: Duration,
44    /// Delayed promoter scan interval. Default: 750ms.
45    pub delayed_promoter_interval: Duration,
46    /// Index reconciler scan interval. Default: 45s.
47    pub index_reconciler_interval: Duration,
48    /// Attempt timeout scan interval. Default: 2s.
49    pub attempt_timeout_interval: Duration,
50    /// Suspension timeout scan interval. Default: 2s.
51    pub suspension_timeout_interval: Duration,
52    /// Pending waitpoint expiry scan interval. Default: 5s.
53    pub pending_wp_expiry_interval: Duration,
54    /// Retention trimmer scan interval. Default: 60s.
55    pub retention_trimmer_interval: Duration,
56    /// Budget reset scan interval. Default: 15s.
57    pub budget_reset_interval: Duration,
58    /// Budget reconciler scan interval. Default: 30s.
59    pub budget_reconciler_interval: Duration,
60    /// Quota reconciler scan interval. Default: 30s.
61    pub quota_reconciler_interval: Duration,
62    /// Unblock scanner interval. Default: 5s.
63    pub unblock_interval: Duration,
64    /// Dependency reconciler interval. Default: 15s.
65    ///
66    /// Post-Batch-C this scanner is a **safety net**, not the primary
67    /// promotion path. When a [`CompletionStream`] is handed to
68    /// `start_with_completions`, push-based dispatch drives DAG
69    /// promotion synchronously with each completion — under normal
70    /// operation DAG latency is `~RTT × levels`, not `interval × levels`.
71    ///
72    /// The reconciler still runs as a catch-all for:
73    ///   - messages missed during subscriber restart or reconnect;
74    ///   - pre-Batch-C executions without `core.flow_id` stamped;
75    ///   - operator-driven edge mutation that doesn't pass through
76    ///     the terminal-transition publish path.
77    ///
78    /// 15s idle-scan cost is minimal. If the push dispatch loop is
79    /// disabled (engine started via `start`/`start_with_metrics`
80    /// without a stream), drop this to 1s to preserve pre-Batch-C
81    /// DAG latency behavior.
82    pub dependency_reconciler_interval: Duration,
83    /// Flow summary projector interval. Default: 15s.
84    ///
85    /// Separate observability projection path — maintains the flow
86    /// summary view, NOT on the DAG-completion latency path. Kept at
87    /// 15s in this config; a change to that cadence is unrelated to
88    /// dependency resolution.
89    pub flow_projector_interval: Duration,
90    /// Execution deadline scanner interval. Default: 5s.
91    pub execution_deadline_interval: Duration,
92
93    /// Cancel reconciler scanner interval. Default: 15s.
94    ///
95    /// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
96    /// flows owing async member cancels. Each cancelled flow gets a
97    /// grace window (30s by default, set by ff-server) before the
98    /// reconciler picks it up, so the live in-process dispatch isn't
99    /// fought on the happy path.
100    pub cancel_reconciler_interval: Duration,
101
102    /// RFC-016 Stage C sibling-cancel dispatcher interval. Default: 1s.
103    ///
104    /// Drains the per-flow-partition `pending_cancel_groups` SET,
105    /// populated by `ff_resolve_dependency` whenever an AnyOf/Quorum
106    /// edge group fires terminal under `OnSatisfied::CancelRemaining`.
107    /// For each indexed group the dispatcher issues per-sibling
108    /// `ff_cancel_execution` with `FailureReason::sibling_quorum_{
109    /// satisfied,impossible}`, then atomically SREM+clear via
110    /// `ff_drain_sibling_cancel_group`.
111    ///
112    /// A short default (1s) minimises the window between quorum
113    /// satisfaction and sibling termination — this is the user-facing
114    /// latency floor for "kill the losers" workflows. Bump only if a
115    /// deployment's steady-state pending-set depth is observed to
116    /// backlog under the 1s cadence; Stage C's §4.2 benchmark gates
117    /// the release against the p99 ≤ 500 ms SLO at n=100 (§4.2 of
118    /// the RFC).
119    pub edge_cancel_dispatcher_interval: Duration,
120
121    /// RFC-016 Stage D sibling-cancel reconciler interval. Default: 10s.
122    ///
123    /// Safety-net scanner for Invariant Q6: if the engine crashed
124    /// between `ff_resolve_dependency`'s SADD to `pending_cancel_groups`
125    /// and the dispatcher's `ff_drain_sibling_cancel_group`, this
126    /// reconciler detects the orphan tuple and finalises via
127    /// `ff_reconcile_sibling_cancel_group`. It runs at a deliberately
128    /// slower cadence than the dispatcher (10s vs 1s) so the dispatcher
129    /// owns the happy path and the reconciler only cleans up
130    /// crash-recovery residue. The reconciler MUST NOT fight the
131    /// dispatcher — it no-ops whenever siblings are still non-terminal.
132    pub edge_cancel_reconciler_interval: Duration,
133
134    /// Per-consumer scanner filter (issue #122).
135    ///
136    /// Applied by every execution-shaped scanner (lease_expiry,
137    /// attempt_timeout, execution_deadline, suspension_timeout,
138    /// pending_wp_expiry, delayed_promoter, dependency_reconciler,
139    /// cancel_reconciler, unblock, index_reconciler,
140    /// retention_trimmer) to restrict the candidate set to
141    /// executions owned by this consumer. The four non-execution
142    /// scanners (budget_reconciler, budget_reset, quota_reconciler,
143    /// flow_projector) accept the filter for API uniformity but do
144    /// not apply it — their domains are not per-execution.
145    ///
146    /// Default: [`ScannerFilter::default`] — no filtering,
147    /// pre-#122 behaviour. Multi-tenant deployments that share a
148    /// single Valkey keyspace across two FlowFabric instances set
149    /// this (paired with
150    /// [`CompletionBackend::subscribe_completions_filtered`]) for
151    /// mutual isolation.
152    ///
153    /// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
154    pub scanner_filter: ScannerFilter,
155}
156
157impl Default for EngineConfig {
158    fn default() -> Self {
159        Self {
160            partition_config: PartitionConfig::default(),
161            lanes: vec![LaneId::new("default")],
162            lease_expiry_interval: Duration::from_millis(1500),
163            delayed_promoter_interval: Duration::from_millis(750),
164            index_reconciler_interval: Duration::from_secs(45),
165            attempt_timeout_interval: Duration::from_secs(2),
166            suspension_timeout_interval: Duration::from_secs(2),
167            pending_wp_expiry_interval: Duration::from_secs(5),
168            retention_trimmer_interval: Duration::from_secs(60),
169            budget_reset_interval: Duration::from_secs(15),
170            budget_reconciler_interval: Duration::from_secs(30),
171            quota_reconciler_interval: Duration::from_secs(30),
172            unblock_interval: Duration::from_secs(5),
173            dependency_reconciler_interval: Duration::from_secs(15),
174            flow_projector_interval: Duration::from_secs(15),
175            execution_deadline_interval: Duration::from_secs(5),
176            cancel_reconciler_interval: Duration::from_secs(15),
177            edge_cancel_dispatcher_interval: Duration::from_secs(1),
178            edge_cancel_reconciler_interval: Duration::from_secs(10),
179            scanner_filter: ScannerFilter::default(),
180        }
181    }
182}
183
184/// The FlowFabric engine: partition routing + background scanners.
185pub struct Engine {
186    pub router: Arc<PartitionRouter>,
187    shutdown_tx: watch::Sender<bool>,
188    handles: Vec<JoinHandle<()>>,
189}
190
191impl Engine {
192    /// Start the engine with the given config and backend.
193    ///
194    /// Spawns background scanner tasks. Returns immediately.
195    ///
196    /// `backend` is an `Arc<dyn EngineBackend>`. When the underlying
197    /// concrete type is [`ff_backend_valkey::ValkeyBackend`], the
198    /// engine spawns its in-tree Valkey scanner supervisors. For
199    /// non-Valkey backends (Postgres, SQLite, custom impls), the
200    /// reconciler supervisor lives inside the backend itself (spawned
201    /// during `connect_with_*`) and this constructor only wires the
202    /// completion dispatch loop + partition router. Cairn #436: the
203    /// runtime `downcast`-or-panic from v0.12 PR-7a is gone.
204    pub fn start(config: EngineConfig, backend: Arc<dyn EngineBackend>) -> Self {
205        // Construct a fresh metrics handle here so direct callers
206        // (examples, tests) don't need to. Under the default build
207        // (`observability` feature off) this is the no-op shim. With
208        // the feature on the handle is a real OTEL registry but — by
209        // design — one that nothing else shares; it's only useful for
210        // tests that want to exercise scanner cycle recording in
211        // isolation. Production code uses
212        // [`Self::start_with_metrics`] to plumb the same handle
213        // through the HTTP /metrics route.
214        Self::start_with_metrics(config, backend, Arc::new(ff_observability::Metrics::new()))
215    }
216
217    /// PR-94: start the engine with a shared observability registry.
218    ///
219    /// Used by `ff-server` so scanner cycle metrics funnel into the
220    /// same Prometheus registry exposed at `/metrics`. Under the
221    /// `observability` feature (flipped via the same feature on
222    /// `ff-server` / `ff-engine`), the handle records into an OTEL
223    /// `MeterProvider`; otherwise the shim no-ops.
224    pub fn start_with_metrics(
225        config: EngineConfig,
226        backend: Arc<dyn EngineBackend>,
227        metrics: Arc<ff_observability::Metrics>,
228    ) -> Self {
229        Self::start_internal(config, backend, metrics, None)
230    }
231
232    /// Start the engine with a shared observability registry and a
233    /// completion stream for push-based DAG promotion (issue #90).
234    ///
235    /// The stream is typically produced by
236    /// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
237    /// The engine spawns a dispatch loop that drains the stream and
238    /// fires `ff_resolve_dependency` per completion, reducing DAG
239    /// latency from `interval × levels` to `~RTT × levels`. The
240    /// `dependency_reconciler` scanner remains as a safety net for
241    /// completions missed during subscriber reconnect windows.
242    ///
243    /// # Backend parameter
244    ///
245    /// `backend` is an `Arc<dyn EngineBackend>`. Works uniformly
246    /// across Valkey / Postgres / SQLite (and any other impl): the
247    /// engine attempts a Valkey downcast and, when successful, spawns
248    /// the in-tree ferriskey-speaking scanner supervisors. Otherwise
249    /// it trusts the backend to own its reconciler supervisor (cairn
250    /// #436 / PR-7b). No runtime panic path.
251    ///
252    /// # Example
253    ///
254    /// ```no_run
255    /// # use std::sync::Arc;
256    /// # use ff_core::engine_backend::EngineBackend;
257    /// # async fn ex(
258    /// #     cfg: ff_engine::EngineConfig,
259    /// #     metrics: Arc<ff_observability::Metrics>,
260    /// #     stream: ff_core::completion_backend::CompletionStream,
261    /// #     backend: Arc<dyn EngineBackend>, // e.g. from ValkeyBackend::connect
262    /// # ) {
263    /// // Any `EngineBackend` impl works (cairn #436).
264    /// let engine = ff_engine::Engine::start_with_completions(
265    ///     cfg, backend, metrics, stream,
266    /// );
267    /// # let _ = engine;
268    /// # }
269    /// ```
270    pub fn start_with_completions(
271        config: EngineConfig,
272        backend: Arc<dyn EngineBackend>,
273        metrics: Arc<ff_observability::Metrics>,
274        completions: CompletionStream,
275    ) -> Self {
276        Self::start_internal(config, backend, metrics, Some(completions))
277    }
278
279    fn start_internal(
280        config: EngineConfig,
281        backend: Arc<dyn EngineBackend>,
282        metrics: Arc<ff_observability::Metrics>,
283        completions: Option<CompletionStream>,
284    ) -> Self {
285        // PR-7b (cairn #436): dispatch scanner spawn by backend family.
286        // Valkey scanners live inside ff-engine and still speak
287        // ferriskey directly, so they need the embedded client.
288        // Non-Valkey backends (Postgres, SQLite) self-spawn their own
289        // reconciler supervisor during connect; the engine only
290        // contributes the router + optional completion dispatch loop.
291        // Unknown backends get the same treatment as non-Valkey ones —
292        // the engine never panics on a foreign `EngineBackend` impl.
293        let valkey_client = backend
294            .as_any()
295            .downcast_ref::<ff_backend_valkey::ValkeyBackend>()
296            .map(|vb| vb.client().clone());
297        let (shutdown_tx, shutdown_rx) = watch::channel(false);
298        let num_partitions = config.partition_config.num_flow_partitions;
299        let router = Arc::new(PartitionRouter::new(config.partition_config));
300
301        let mut handles = Vec::new();
302
303        let scanner_filter = config.scanner_filter.clone();
304
305        // Short-circuit the Valkey-only scanner block for non-Valkey
306        // backends. Their reconcilers are already running under each
307        // backend's own `spawn_scanners` supervisor.
308        if valkey_client.is_none() {
309            tracing::info!(
310                backend_label = backend.backend_label(),
311                "engine started without in-tree scanner spawn; \
312                 backend owns its reconciler supervisor"
313            );
314            let listener_enabled = completions.is_some();
315            if let Some(stream) = completions {
316                handles.push(completion_listener::spawn_dispatch_loop(
317                    backend.clone(),
318                    stream,
319                    shutdown_rx,
320                ));
321            }
322            if listener_enabled {
323                tracing::info!("engine dispatch loop spawned (completion-driven DAG)");
324            }
325            return Self {
326                router,
327                shutdown_tx,
328                handles,
329            };
330        }
331        // Unwrap safe: `valkey_client.is_none()` returned above.
332        let client = valkey_client.expect("Valkey client present on Valkey path");
333
334        // Lease expiry scanner
335        let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter_and_backend(
336            config.lease_expiry_interval,
337            scanner_filter.clone(),
338            backend.clone(),
339        ));
340        handles.push(supervised_spawn(
341            lease_scanner,
342            client.clone(),
343            num_partitions,
344            shutdown_rx.clone(),
345            metrics.clone(),
346        ));
347
348        // Delayed promoter
349        let delayed_scanner = Arc::new(DelayedPromoter::with_filter_and_backend(
350            config.delayed_promoter_interval,
351            config.lanes.clone(),
352            scanner_filter.clone(),
353            backend.clone(),
354        ));
355        handles.push(supervised_spawn(
356            delayed_scanner,
357            client.clone(),
358            num_partitions,
359            shutdown_rx.clone(),
360            metrics.clone(),
361        ));
362
363        // Index reconciler
364        let reconciler = Arc::new(IndexReconciler::with_filter_and_backend(
365            config.index_reconciler_interval,
366            config.lanes.clone(),
367            scanner_filter.clone(),
368            backend.clone(),
369        ));
370        handles.push(supervised_spawn(
371            reconciler,
372            client.clone(),
373            num_partitions,
374            shutdown_rx.clone(),
375            metrics.clone(),
376        ));
377
378        // Attempt timeout scanner
379        let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter_and_backend(
380            config.attempt_timeout_interval,
381            config.lanes.clone(),
382            scanner_filter.clone(),
383            backend.clone(),
384        ));
385        handles.push(supervised_spawn(
386            timeout_scanner,
387            client.clone(),
388            num_partitions,
389            shutdown_rx.clone(),
390            metrics.clone(),
391        ));
392
393        // Suspension timeout scanner
394        let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter_and_backend(
395            config.suspension_timeout_interval,
396            scanner_filter.clone(),
397            backend.clone(),
398        ));
399        handles.push(supervised_spawn(
400            suspension_scanner,
401            client.clone(),
402            num_partitions,
403            shutdown_rx.clone(),
404            metrics.clone(),
405        ));
406
407        // Pending waitpoint expiry scanner
408        let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter_and_backend(
409            config.pending_wp_expiry_interval,
410            scanner_filter.clone(),
411            backend.clone(),
412        ));
413        handles.push(supervised_spawn(
414            pending_wp_scanner,
415            client.clone(),
416            num_partitions,
417            shutdown_rx.clone(),
418            metrics.clone(),
419        ));
420
421        // Retention trimmer
422        let retention_scanner = Arc::new(RetentionTrimmer::with_filter_and_backend(
423            config.retention_trimmer_interval,
424            config.lanes.clone(),
425            scanner_filter.clone(),
426            backend.clone(),
427        ));
428        handles.push(supervised_spawn(
429            retention_scanner,
430            client.clone(),
431            num_partitions,
432            shutdown_rx.clone(),
433            metrics.clone(),
434        ));
435
436        // Budget reset scanner (iterates budget partitions).
437        // Filter is accepted but not applied (budget partitions don't
438        // carry the per-execution namespace / instance_tag shape).
439        let budget_reset = Arc::new(BudgetResetScanner::with_filter_and_backend(
440            config.budget_reset_interval,
441            scanner_filter.clone(),
442            backend.clone(),
443        ));
444        handles.push(supervised_spawn(
445            budget_reset,
446            client.clone(),
447            config.partition_config.num_budget_partitions,
448            shutdown_rx.clone(),
449            metrics.clone(),
450        ));
451
452        // Budget reconciler (iterates budget partitions). Filter
453        // accepted but not applied — see BudgetReconciler::with_filter
454        // rustdoc. PR-7b Cluster 2b-A: routed through the trait.
455        let budget_reconciler = Arc::new(BudgetReconciler::with_filter_and_backend(
456            config.budget_reconciler_interval,
457            scanner_filter.clone(),
458            backend.clone(),
459        ));
460        handles.push(supervised_spawn(
461            budget_reconciler,
462            client.clone(),
463            config.partition_config.num_budget_partitions,
464            shutdown_rx.clone(),
465            metrics.clone(),
466        ));
467
468        // Unblock scanner (iterates execution partitions, re-evaluates blocked)
469        let unblock_scanner = Arc::new(UnblockScanner::with_filter_and_backend(
470            config.unblock_interval,
471            config.lanes.clone(),
472            config.partition_config,
473            scanner_filter.clone(),
474            backend.clone(),
475        ));
476        handles.push(supervised_spawn(
477            unblock_scanner,
478            client.clone(),
479            num_partitions,
480            shutdown_rx.clone(),
481            metrics.clone(),
482        ));
483
484        // Dependency reconciler (iterates execution partitions)
485        let dep_reconciler = Arc::new(DependencyReconciler::with_filter_and_backend(
486            config.dependency_reconciler_interval,
487            config.lanes.clone(),
488            config.partition_config,
489            scanner_filter.clone(),
490            backend.clone(),
491        ));
492        handles.push(supervised_spawn(
493            dep_reconciler,
494            client.clone(),
495            num_partitions,
496            shutdown_rx.clone(),
497            metrics.clone(),
498        ));
499
500        // Quota reconciler (iterates quota partitions). Filter
501        // accepted but not applied — see QuotaReconciler::with_filter
502        // rustdoc. PR-7b Cluster 2b-A: routed through the trait.
503        let quota_reconciler = Arc::new(QuotaReconciler::with_filter_and_backend(
504            config.quota_reconciler_interval,
505            scanner_filter.clone(),
506            backend.clone(),
507        ));
508        handles.push(supervised_spawn(
509            quota_reconciler,
510            client.clone(),
511            config.partition_config.num_quota_partitions,
512            shutdown_rx.clone(),
513            metrics.clone(),
514        ));
515
516        // Flow summary projector (iterates flow partitions). Filter
517        // accepted but not applied — see FlowProjector::with_filter
518        // rustdoc. Backend wiring (PR-7b Cluster 2b-B) routes the
519        // per-flow projection through `EngineBackend::project_flow_summary`.
520        let flow_projector = Arc::new(FlowProjector::with_filter_and_backend(
521            config.flow_projector_interval,
522            config.partition_config,
523            scanner_filter.clone(),
524            backend.clone(),
525        ));
526        handles.push(supervised_spawn(
527            flow_projector,
528            client.clone(),
529            config.partition_config.num_flow_partitions,
530            shutdown_rx.clone(),
531            metrics.clone(),
532        ));
533
534        // Cancel reconciler (iterates flow partitions). Drains
535        // cancel_backlog entries whose grace window has elapsed so a
536        // process crash mid-dispatch can't leave flow members un-cancelled.
537        let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter_and_backend(
538            config.cancel_reconciler_interval,
539            config.partition_config,
540            scanner_filter.clone(),
541            backend.clone(),
542        ));
543        handles.push(supervised_spawn(
544            cancel_reconciler,
545            client.clone(),
546            config.partition_config.num_flow_partitions,
547            shutdown_rx.clone(),
548            metrics.clone(),
549        ));
550
551        // RFC-016 Stage C: sibling-cancel dispatcher. Iterates flow
552        // partitions, drains `pending_cancel_groups` SET via
553        // `ff_drain_sibling_cancel_group`, issues per-sibling
554        // `ff_cancel_execution` with sibling_quorum reasons.
555        let edge_cancel_dispatcher = Arc::new(
556            scanner::edge_cancel_dispatcher::EdgeCancelDispatcher::with_filter_metrics_and_backend(
557                config.edge_cancel_dispatcher_interval,
558                config.partition_config,
559                scanner_filter.clone(),
560                metrics.clone(),
561                backend.clone(),
562            ),
563        );
564        handles.push(supervised_spawn(
565            edge_cancel_dispatcher,
566            client.clone(),
567            config.partition_config.num_flow_partitions,
568            shutdown_rx.clone(),
569            metrics.clone(),
570        ));
571
572        // RFC-016 Stage D: sibling-cancel reconciler. Crash-recovery
573        // safety net for Invariant Q6 — finalises tuples in
574        // `pending_cancel_groups` whose dispatcher drain was interrupted
575        // by an engine crash. Runs at a slower cadence than the
576        // dispatcher so it never fights the happy path.
577        let edge_cancel_reconciler = Arc::new(
578            scanner::edge_cancel_reconciler::EdgeCancelReconciler::with_filter_metrics_and_backend(
579                config.edge_cancel_reconciler_interval,
580                scanner_filter.clone(),
581                metrics.clone(),
582                backend.clone(),
583            ),
584        );
585        handles.push(supervised_spawn(
586            edge_cancel_reconciler,
587            client.clone(),
588            config.partition_config.num_flow_partitions,
589            shutdown_rx.clone(),
590            metrics.clone(),
591        ));
592
593        // Execution deadline scanner (iterates execution partitions)
594        let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter_and_backend(
595            config.execution_deadline_interval,
596            config.lanes,
597            scanner_filter,
598            backend.clone(),
599        ));
600        handles.push(supervised_spawn(
601            deadline_scanner,
602            client.clone(),
603            num_partitions,
604            shutdown_rx.clone(),
605            metrics.clone(),
606        ));
607
608        // Completion dispatch loop (Batch C item 6 — push-based DAG
609        // promotion; backend-agnostic since issue #90). Optional: when
610        // a stream is provided, spawn a task that drains
611        // `CompletionPayload`s and fires dependency resolution per
612        // completion. See `completion_listener` module docs.
613        let listener_enabled = completions.is_some();
614        if let Some(stream) = completions {
615            handles.push(completion_listener::spawn_dispatch_loop(
616                backend.clone(),
617                stream,
618                shutdown_rx,
619            ));
620        }
621
622        let scanner_count = if listener_enabled { "17 scanners + completion dispatch" } else { "17 scanners" };
623        tracing::info!(
624            num_partitions,
625            budget_partitions = config.partition_config.num_budget_partitions,
626            quota_partitions = config.partition_config.num_quota_partitions,
627            flow_partitions = config.partition_config.num_flow_partitions,
628            "engine started with {scanner_count}"
629        );
630
631        Self {
632            router,
633            shutdown_tx,
634            handles,
635        }
636    }
637
638    /// Signal all scanners to stop and wait for them to finish.
639    ///
640    /// Waits up to 15 seconds for scanners to drain. If any scanner is
641    /// blocked on a hung Valkey command, the timeout prevents shutdown
642    /// from hanging indefinitely (Kubernetes SIGKILL safety).
643    pub async fn shutdown(self) {
644        let _ = self.shutdown_tx.send(true);
645        let join_all = async {
646            for handle in self.handles {
647                let _ = handle.await;
648            }
649        };
650        match tokio::time::timeout(Duration::from_secs(15), join_all).await {
651            Ok(()) => tracing::info!("engine shutdown complete"),
652            Err(_) => tracing::warn!(
653                "engine shutdown timed out after 15s, abandoning remaining scanners"
654            ),
655        }
656    }
657}