Skip to main content

ff_engine/
lib.rs

1//! ff-engine: cross-partition dispatch and background scanners.
2
3pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::completion_backend::CompletionStream;
14use ff_core::partition::PartitionConfig;
15use ff_core::types::LaneId;
16use tokio::sync::watch;
17use tokio::task::JoinHandle;
18
19use partition_router::PartitionRouter;
20use supervisor::supervised_spawn;
21use scanner::attempt_timeout::AttemptTimeoutScanner;
22use scanner::execution_deadline::ExecutionDeadlineScanner;
23use scanner::budget_reconciler::BudgetReconciler;
24use scanner::budget_reset::BudgetResetScanner;
25use scanner::delayed_promoter::DelayedPromoter;
26use scanner::dependency_reconciler::DependencyReconciler;
27use scanner::index_reconciler::IndexReconciler;
28use scanner::lease_expiry::LeaseExpiryScanner;
29use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
30use scanner::quota_reconciler::QuotaReconciler;
31use scanner::retention_trimmer::RetentionTrimmer;
32use scanner::suspension_timeout::SuspensionTimeoutScanner;
33use scanner::flow_projector::FlowProjector;
34use scanner::unblock::UnblockScanner;
35
36/// Engine configuration.
37pub struct EngineConfig {
38    pub partition_config: PartitionConfig,
39    /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
40    pub lanes: Vec<LaneId>,
41    /// Lease expiry scan interval. Default: 1.5s.
42    pub lease_expiry_interval: Duration,
43    /// Delayed promoter scan interval. Default: 750ms.
44    pub delayed_promoter_interval: Duration,
45    /// Index reconciler scan interval. Default: 45s.
46    pub index_reconciler_interval: Duration,
47    /// Attempt timeout scan interval. Default: 2s.
48    pub attempt_timeout_interval: Duration,
49    /// Suspension timeout scan interval. Default: 2s.
50    pub suspension_timeout_interval: Duration,
51    /// Pending waitpoint expiry scan interval. Default: 5s.
52    pub pending_wp_expiry_interval: Duration,
53    /// Retention trimmer scan interval. Default: 60s.
54    pub retention_trimmer_interval: Duration,
55    /// Budget reset scan interval. Default: 15s.
56    pub budget_reset_interval: Duration,
57    /// Budget reconciler scan interval. Default: 30s.
58    pub budget_reconciler_interval: Duration,
59    /// Quota reconciler scan interval. Default: 30s.
60    pub quota_reconciler_interval: Duration,
61    /// Unblock scanner interval. Default: 5s.
62    pub unblock_interval: Duration,
63    /// Dependency reconciler interval. Default: 15s.
64    ///
65    /// Post-Batch-C this scanner is a **safety net**, not the primary
66    /// promotion path. When a [`CompletionStream`] is handed to
67    /// `start_with_completions`, push-based dispatch drives DAG
68    /// promotion synchronously with each completion — under normal
69    /// operation DAG latency is `~RTT × levels`, not `interval × levels`.
70    ///
71    /// The reconciler still runs as a catch-all for:
72    ///   - messages missed during subscriber restart or reconnect;
73    ///   - pre-Batch-C executions without `core.flow_id` stamped;
74    ///   - operator-driven edge mutation that doesn't pass through
75    ///     the terminal-transition publish path.
76    ///
77    /// 15s idle-scan cost is minimal. If the push dispatch loop is
78    /// disabled (engine started via `start`/`start_with_metrics`
79    /// without a stream), drop this to 1s to preserve pre-Batch-C
80    /// DAG latency behavior.
81    pub dependency_reconciler_interval: Duration,
82    /// Flow summary projector interval. Default: 15s.
83    ///
84    /// Separate observability projection path — maintains the flow
85    /// summary view, NOT on the DAG-completion latency path. Kept at
86    /// 15s in this config; a change to that cadence is unrelated to
87    /// dependency resolution.
88    pub flow_projector_interval: Duration,
89    /// Execution deadline scanner interval. Default: 5s.
90    pub execution_deadline_interval: Duration,
91
92    /// Cancel reconciler scanner interval. Default: 15s.
93    ///
94    /// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
95    /// flows owing async member cancels. Each cancelled flow gets a
96    /// grace window (30s by default, set by ff-server) before the
97    /// reconciler picks it up, so the live in-process dispatch isn't
98    /// fought on the happy path.
99    pub cancel_reconciler_interval: Duration,
100
101    /// RFC-016 Stage C sibling-cancel dispatcher interval. Default: 1s.
102    ///
103    /// Drains the per-flow-partition `pending_cancel_groups` SET,
104    /// populated by `ff_resolve_dependency` whenever an AnyOf/Quorum
105    /// edge group fires terminal under `OnSatisfied::CancelRemaining`.
106    /// For each indexed group the dispatcher issues per-sibling
107    /// `ff_cancel_execution` with `FailureReason::sibling_quorum_{
108    /// satisfied,impossible}`, then atomically SREM+clear via
109    /// `ff_drain_sibling_cancel_group`.
110    ///
111    /// A short default (1s) minimises the window between quorum
112    /// satisfaction and sibling termination — this is the user-facing
113    /// latency floor for "kill the losers" workflows. Bump only if a
114    /// deployment's steady-state pending-set depth is observed to
115    /// backlog under the 1s cadence; Stage C's §4.2 benchmark gates
116    /// the release against the p99 ≤ 500 ms SLO at n=100 (§4.2 of
117    /// the RFC).
118    pub edge_cancel_dispatcher_interval: Duration,
119
120    /// RFC-016 Stage D sibling-cancel reconciler interval. Default: 10s.
121    ///
122    /// Safety-net scanner for Invariant Q6: if the engine crashed
123    /// between `ff_resolve_dependency`'s SADD to `pending_cancel_groups`
124    /// and the dispatcher's `ff_drain_sibling_cancel_group`, this
125    /// reconciler detects the orphan tuple and finalises via
126    /// `ff_reconcile_sibling_cancel_group`. It runs at a deliberately
127    /// slower cadence than the dispatcher (10s vs 1s) so the dispatcher
128    /// owns the happy path and the reconciler only cleans up
129    /// crash-recovery residue. The reconciler MUST NOT fight the
130    /// dispatcher — it no-ops whenever siblings are still non-terminal.
131    pub edge_cancel_reconciler_interval: Duration,
132
133    /// Per-consumer scanner filter (issue #122).
134    ///
135    /// Applied by every execution-shaped scanner (lease_expiry,
136    /// attempt_timeout, execution_deadline, suspension_timeout,
137    /// pending_wp_expiry, delayed_promoter, dependency_reconciler,
138    /// cancel_reconciler, unblock, index_reconciler,
139    /// retention_trimmer) to restrict the candidate set to
140    /// executions owned by this consumer. The four non-execution
141    /// scanners (budget_reconciler, budget_reset, quota_reconciler,
142    /// flow_projector) accept the filter for API uniformity but do
143    /// not apply it — their domains are not per-execution.
144    ///
145    /// Default: [`ScannerFilter::default`] — no filtering,
146    /// pre-#122 behaviour. Multi-tenant deployments that share a
147    /// single Valkey keyspace across two FlowFabric instances set
148    /// this (paired with
149    /// [`CompletionBackend::subscribe_completions_filtered`]) for
150    /// mutual isolation.
151    ///
152    /// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
153    pub scanner_filter: ScannerFilter,
154}
155
156impl Default for EngineConfig {
157    fn default() -> Self {
158        Self {
159            partition_config: PartitionConfig::default(),
160            lanes: vec![LaneId::new("default")],
161            lease_expiry_interval: Duration::from_millis(1500),
162            delayed_promoter_interval: Duration::from_millis(750),
163            index_reconciler_interval: Duration::from_secs(45),
164            attempt_timeout_interval: Duration::from_secs(2),
165            suspension_timeout_interval: Duration::from_secs(2),
166            pending_wp_expiry_interval: Duration::from_secs(5),
167            retention_trimmer_interval: Duration::from_secs(60),
168            budget_reset_interval: Duration::from_secs(15),
169            budget_reconciler_interval: Duration::from_secs(30),
170            quota_reconciler_interval: Duration::from_secs(30),
171            unblock_interval: Duration::from_secs(5),
172            dependency_reconciler_interval: Duration::from_secs(15),
173            flow_projector_interval: Duration::from_secs(15),
174            execution_deadline_interval: Duration::from_secs(5),
175            cancel_reconciler_interval: Duration::from_secs(15),
176            edge_cancel_dispatcher_interval: Duration::from_secs(1),
177            edge_cancel_reconciler_interval: Duration::from_secs(10),
178            scanner_filter: ScannerFilter::default(),
179        }
180    }
181}
182
183/// The FlowFabric engine: partition routing + background scanners.
184pub struct Engine {
185    pub router: Arc<PartitionRouter>,
186    shutdown_tx: watch::Sender<bool>,
187    handles: Vec<JoinHandle<()>>,
188}
189
190impl Engine {
191    /// Start the engine with the given config and Valkey client.
192    ///
193    /// Spawns background scanner tasks. Returns immediately.
194    pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
195        // Construct a fresh metrics handle here so direct callers
196        // (examples, tests) don't need to. Under the default build
197        // (`observability` feature off) this is the no-op shim. With
198        // the feature on the handle is a real OTEL registry but — by
199        // design — one that nothing else shares; it's only useful for
200        // tests that want to exercise scanner cycle recording in
201        // isolation. Production code uses
202        // [`Self::start_with_metrics`] to plumb the same handle
203        // through the HTTP /metrics route.
204        Self::start_with_metrics(config, client, Arc::new(ff_observability::Metrics::new()))
205    }
206
207    /// PR-94: start the engine with a shared observability registry.
208    ///
209    /// Used by `ff-server` so scanner cycle metrics funnel into the
210    /// same Prometheus registry exposed at `/metrics`. Under the
211    /// `observability` feature (flipped via the same feature on
212    /// `ff-server` / `ff-engine`), the handle records into an OTEL
213    /// `MeterProvider`; otherwise the shim no-ops.
214    pub fn start_with_metrics(
215        config: EngineConfig,
216        client: ferriskey::Client,
217        metrics: Arc<ff_observability::Metrics>,
218    ) -> Self {
219        Self::start_internal(config, client, metrics, None)
220    }
221
222    /// Start the engine with a shared observability registry and a
223    /// completion stream for push-based DAG promotion (issue #90).
224    ///
225    /// The stream is typically produced by
226    /// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
227    /// The engine spawns a dispatch loop that drains the stream and
228    /// fires `ff_resolve_dependency` per completion, reducing DAG
229    /// latency from `interval × levels` to `~RTT × levels`. The
230    /// `dependency_reconciler` scanner remains as a safety net for
231    /// completions missed during subscriber reconnect windows.
232    pub fn start_with_completions(
233        config: EngineConfig,
234        client: ferriskey::Client,
235        metrics: Arc<ff_observability::Metrics>,
236        completions: CompletionStream,
237    ) -> Self {
238        Self::start_internal(config, client, metrics, Some(completions))
239    }
240
241    fn start_internal(
242        config: EngineConfig,
243        client: ferriskey::Client,
244        metrics: Arc<ff_observability::Metrics>,
245        completions: Option<CompletionStream>,
246    ) -> Self {
247        let (shutdown_tx, shutdown_rx) = watch::channel(false);
248        let num_partitions = config.partition_config.num_flow_partitions;
249        let router = Arc::new(PartitionRouter::new(config.partition_config));
250
251        let mut handles = Vec::new();
252
253        let scanner_filter = config.scanner_filter.clone();
254
255        // Lease expiry scanner
256        let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter(
257            config.lease_expiry_interval,
258            scanner_filter.clone(),
259        ));
260        handles.push(supervised_spawn(
261            lease_scanner,
262            client.clone(),
263            num_partitions,
264            shutdown_rx.clone(),
265            metrics.clone(),
266        ));
267
268        // Delayed promoter
269        let delayed_scanner = Arc::new(DelayedPromoter::with_filter(
270            config.delayed_promoter_interval,
271            config.lanes.clone(),
272            scanner_filter.clone(),
273        ));
274        handles.push(supervised_spawn(
275            delayed_scanner,
276            client.clone(),
277            num_partitions,
278            shutdown_rx.clone(),
279            metrics.clone(),
280        ));
281
282        // Index reconciler
283        let reconciler = Arc::new(IndexReconciler::with_filter(
284            config.index_reconciler_interval,
285            config.lanes.clone(),
286            scanner_filter.clone(),
287        ));
288        handles.push(supervised_spawn(
289            reconciler,
290            client.clone(),
291            num_partitions,
292            shutdown_rx.clone(),
293            metrics.clone(),
294        ));
295
296        // Attempt timeout scanner
297        let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter(
298            config.attempt_timeout_interval,
299            config.lanes.clone(),
300            scanner_filter.clone(),
301        ));
302        handles.push(supervised_spawn(
303            timeout_scanner,
304            client.clone(),
305            num_partitions,
306            shutdown_rx.clone(),
307            metrics.clone(),
308        ));
309
310        // Suspension timeout scanner
311        let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter(
312            config.suspension_timeout_interval,
313            scanner_filter.clone(),
314        ));
315        handles.push(supervised_spawn(
316            suspension_scanner,
317            client.clone(),
318            num_partitions,
319            shutdown_rx.clone(),
320            metrics.clone(),
321        ));
322
323        // Pending waitpoint expiry scanner
324        let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter(
325            config.pending_wp_expiry_interval,
326            scanner_filter.clone(),
327        ));
328        handles.push(supervised_spawn(
329            pending_wp_scanner,
330            client.clone(),
331            num_partitions,
332            shutdown_rx.clone(),
333            metrics.clone(),
334        ));
335
336        // Retention trimmer
337        let retention_scanner = Arc::new(RetentionTrimmer::with_filter(
338            config.retention_trimmer_interval,
339            config.lanes.clone(),
340            scanner_filter.clone(),
341        ));
342        handles.push(supervised_spawn(
343            retention_scanner,
344            client.clone(),
345            num_partitions,
346            shutdown_rx.clone(),
347            metrics.clone(),
348        ));
349
350        // Budget reset scanner (iterates budget partitions).
351        // Filter is accepted but not applied (budget partitions don't
352        // carry the per-execution namespace / instance_tag shape).
353        let budget_reset = Arc::new(BudgetResetScanner::with_filter(
354            config.budget_reset_interval,
355            scanner_filter.clone(),
356        ));
357        handles.push(supervised_spawn(
358            budget_reset,
359            client.clone(),
360            config.partition_config.num_budget_partitions,
361            shutdown_rx.clone(),
362            metrics.clone(),
363        ));
364
365        // Budget reconciler (iterates budget partitions). Filter
366        // accepted but not applied — see BudgetReconciler::with_filter
367        // rustdoc.
368        let budget_reconciler = Arc::new(BudgetReconciler::with_filter(
369            config.budget_reconciler_interval,
370            scanner_filter.clone(),
371        ));
372        handles.push(supervised_spawn(
373            budget_reconciler,
374            client.clone(),
375            config.partition_config.num_budget_partitions,
376            shutdown_rx.clone(),
377            metrics.clone(),
378        ));
379
380        // Unblock scanner (iterates execution partitions, re-evaluates blocked)
381        let unblock_scanner = Arc::new(UnblockScanner::with_filter(
382            config.unblock_interval,
383            config.lanes.clone(),
384            config.partition_config,
385            scanner_filter.clone(),
386        ));
387        handles.push(supervised_spawn(
388            unblock_scanner,
389            client.clone(),
390            num_partitions,
391            shutdown_rx.clone(),
392            metrics.clone(),
393        ));
394
395        // Dependency reconciler (iterates execution partitions)
396        let dep_reconciler = Arc::new(DependencyReconciler::with_filter(
397            config.dependency_reconciler_interval,
398            config.lanes.clone(),
399            config.partition_config,
400            scanner_filter.clone(),
401        ));
402        handles.push(supervised_spawn(
403            dep_reconciler,
404            client.clone(),
405            num_partitions,
406            shutdown_rx.clone(),
407            metrics.clone(),
408        ));
409
410        // Quota reconciler (iterates quota partitions). Filter
411        // accepted but not applied — see QuotaReconciler::with_filter
412        // rustdoc.
413        let quota_reconciler = Arc::new(QuotaReconciler::with_filter(
414            config.quota_reconciler_interval,
415            scanner_filter.clone(),
416        ));
417        handles.push(supervised_spawn(
418            quota_reconciler,
419            client.clone(),
420            config.partition_config.num_quota_partitions,
421            shutdown_rx.clone(),
422            metrics.clone(),
423        ));
424
425        // Flow summary projector (iterates flow partitions). Filter
426        // accepted but not applied — see FlowProjector::with_filter
427        // rustdoc.
428        let flow_projector = Arc::new(FlowProjector::with_filter(
429            config.flow_projector_interval,
430            config.partition_config,
431            scanner_filter.clone(),
432        ));
433        handles.push(supervised_spawn(
434            flow_projector,
435            client.clone(),
436            config.partition_config.num_flow_partitions,
437            shutdown_rx.clone(),
438            metrics.clone(),
439        ));
440
441        // Cancel reconciler (iterates flow partitions). Drains
442        // cancel_backlog entries whose grace window has elapsed so a
443        // process crash mid-dispatch can't leave flow members un-cancelled.
444        let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter(
445            config.cancel_reconciler_interval,
446            config.partition_config,
447            scanner_filter.clone(),
448        ));
449        handles.push(supervised_spawn(
450            cancel_reconciler,
451            client.clone(),
452            config.partition_config.num_flow_partitions,
453            shutdown_rx.clone(),
454            metrics.clone(),
455        ));
456
457        // RFC-016 Stage C: sibling-cancel dispatcher. Iterates flow
458        // partitions, drains `pending_cancel_groups` SET via
459        // `ff_drain_sibling_cancel_group`, issues per-sibling
460        // `ff_cancel_execution` with sibling_quorum reasons.
461        let edge_cancel_dispatcher = Arc::new(
462            scanner::edge_cancel_dispatcher::EdgeCancelDispatcher::with_filter_and_metrics(
463                config.edge_cancel_dispatcher_interval,
464                config.partition_config,
465                scanner_filter.clone(),
466                metrics.clone(),
467            ),
468        );
469        handles.push(supervised_spawn(
470            edge_cancel_dispatcher,
471            client.clone(),
472            config.partition_config.num_flow_partitions,
473            shutdown_rx.clone(),
474            metrics.clone(),
475        ));
476
477        // RFC-016 Stage D: sibling-cancel reconciler. Crash-recovery
478        // safety net for Invariant Q6 — finalises tuples in
479        // `pending_cancel_groups` whose dispatcher drain was interrupted
480        // by an engine crash. Runs at a slower cadence than the
481        // dispatcher so it never fights the happy path.
482        let edge_cancel_reconciler = Arc::new(
483            scanner::edge_cancel_reconciler::EdgeCancelReconciler::with_filter_and_metrics(
484                config.edge_cancel_reconciler_interval,
485                scanner_filter.clone(),
486                metrics.clone(),
487            ),
488        );
489        handles.push(supervised_spawn(
490            edge_cancel_reconciler,
491            client.clone(),
492            config.partition_config.num_flow_partitions,
493            shutdown_rx.clone(),
494            metrics.clone(),
495        ));
496
497        // Execution deadline scanner (iterates execution partitions)
498        let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter(
499            config.execution_deadline_interval,
500            config.lanes,
501            scanner_filter,
502        ));
503        handles.push(supervised_spawn(
504            deadline_scanner,
505            client.clone(),
506            num_partitions,
507            shutdown_rx.clone(),
508            metrics.clone(),
509        ));
510
511        // Completion dispatch loop (Batch C item 6 — push-based DAG
512        // promotion; backend-agnostic since issue #90). Optional: when
513        // a stream is provided, spawn a task that drains
514        // `CompletionPayload`s and fires dependency resolution per
515        // completion. See `completion_listener` module docs.
516        let listener_enabled = completions.is_some();
517        if let Some(stream) = completions {
518            handles.push(completion_listener::spawn_dispatch_loop(
519                router.clone(),
520                client,
521                stream,
522                shutdown_rx,
523            ));
524        }
525
526        let scanner_count = if listener_enabled { "17 scanners + completion dispatch" } else { "17 scanners" };
527        tracing::info!(
528            num_partitions,
529            budget_partitions = config.partition_config.num_budget_partitions,
530            quota_partitions = config.partition_config.num_quota_partitions,
531            flow_partitions = config.partition_config.num_flow_partitions,
532            "engine started with {scanner_count}"
533        );
534
535        Self {
536            router,
537            shutdown_tx,
538            handles,
539        }
540    }
541
542    /// Signal all scanners to stop and wait for them to finish.
543    ///
544    /// Waits up to 15 seconds for scanners to drain. If any scanner is
545    /// blocked on a hung Valkey command, the timeout prevents shutdown
546    /// from hanging indefinitely (Kubernetes SIGKILL safety).
547    pub async fn shutdown(self) {
548        let _ = self.shutdown_tx.send(true);
549        let join_all = async {
550            for handle in self.handles {
551                let _ = handle.await;
552            }
553        };
554        match tokio::time::timeout(Duration::from_secs(15), join_all).await {
555            Ok(()) => tracing::info!("engine shutdown complete"),
556            Err(_) => tracing::warn!(
557                "engine shutdown timed out after 15s, abandoning remaining scanners"
558            ),
559        }
560    }
561}