Skip to main content

ff_engine/
lib.rs

1//! ff-engine: cross-partition dispatch and background scanners.
2
3pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::completion_backend::CompletionStream;
14use ff_core::partition::PartitionConfig;
15use ff_core::types::LaneId;
16use tokio::sync::watch;
17use tokio::task::JoinHandle;
18
19use partition_router::PartitionRouter;
20use supervisor::supervised_spawn;
21use scanner::attempt_timeout::AttemptTimeoutScanner;
22use scanner::execution_deadline::ExecutionDeadlineScanner;
23use scanner::budget_reconciler::BudgetReconciler;
24use scanner::budget_reset::BudgetResetScanner;
25use scanner::delayed_promoter::DelayedPromoter;
26use scanner::dependency_reconciler::DependencyReconciler;
27use scanner::index_reconciler::IndexReconciler;
28use scanner::lease_expiry::LeaseExpiryScanner;
29use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
30use scanner::quota_reconciler::QuotaReconciler;
31use scanner::retention_trimmer::RetentionTrimmer;
32use scanner::suspension_timeout::SuspensionTimeoutScanner;
33use scanner::flow_projector::FlowProjector;
34use scanner::unblock::UnblockScanner;
35
36/// Engine configuration.
37pub struct EngineConfig {
38    pub partition_config: PartitionConfig,
39    /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
40    pub lanes: Vec<LaneId>,
41    /// Lease expiry scan interval. Default: 1.5s.
42    pub lease_expiry_interval: Duration,
43    /// Delayed promoter scan interval. Default: 750ms.
44    pub delayed_promoter_interval: Duration,
45    /// Index reconciler scan interval. Default: 45s.
46    pub index_reconciler_interval: Duration,
47    /// Attempt timeout scan interval. Default: 2s.
48    pub attempt_timeout_interval: Duration,
49    /// Suspension timeout scan interval. Default: 2s.
50    pub suspension_timeout_interval: Duration,
51    /// Pending waitpoint expiry scan interval. Default: 5s.
52    pub pending_wp_expiry_interval: Duration,
53    /// Retention trimmer scan interval. Default: 60s.
54    pub retention_trimmer_interval: Duration,
55    /// Budget reset scan interval. Default: 15s.
56    pub budget_reset_interval: Duration,
57    /// Budget reconciler scan interval. Default: 30s.
58    pub budget_reconciler_interval: Duration,
59    /// Quota reconciler scan interval. Default: 30s.
60    pub quota_reconciler_interval: Duration,
61    /// Unblock scanner interval. Default: 5s.
62    pub unblock_interval: Duration,
63    /// Dependency reconciler interval. Default: 15s.
64    ///
65    /// Post-Batch-C this scanner is a **safety net**, not the primary
66    /// promotion path. When a [`CompletionStream`] is handed to
67    /// `start_with_completions`, push-based dispatch drives DAG
68    /// promotion synchronously with each completion — under normal
69    /// operation DAG latency is `~RTT × levels`, not `interval × levels`.
70    ///
71    /// The reconciler still runs as a catch-all for:
72    ///   - messages missed during subscriber restart or reconnect;
73    ///   - pre-Batch-C executions without `core.flow_id` stamped;
74    ///   - operator-driven edge mutation that doesn't pass through
75    ///     the terminal-transition publish path.
76    ///
77    /// 15s idle-scan cost is minimal. If the push dispatch loop is
78    /// disabled (engine started via `start`/`start_with_metrics`
79    /// without a stream), drop this to 1s to preserve pre-Batch-C
80    /// DAG latency behavior.
81    pub dependency_reconciler_interval: Duration,
82    /// Flow summary projector interval. Default: 15s.
83    ///
84    /// Separate observability projection path — maintains the flow
85    /// summary view, NOT on the DAG-completion latency path. Kept at
86    /// 15s in this config; a change to that cadence is unrelated to
87    /// dependency resolution.
88    pub flow_projector_interval: Duration,
89    /// Execution deadline scanner interval. Default: 5s.
90    pub execution_deadline_interval: Duration,
91
92    /// Cancel reconciler scanner interval. Default: 15s.
93    ///
94    /// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
95    /// flows owing async member cancels. Each cancelled flow gets a
96    /// grace window (30s by default, set by ff-server) before the
97    /// reconciler picks it up, so the live in-process dispatch isn't
98    /// fought on the happy path.
99    pub cancel_reconciler_interval: Duration,
100
101    /// Per-consumer scanner filter (issue #122).
102    ///
103    /// Applied by every execution-shaped scanner (lease_expiry,
104    /// attempt_timeout, execution_deadline, suspension_timeout,
105    /// pending_wp_expiry, delayed_promoter, dependency_reconciler,
106    /// cancel_reconciler, unblock, index_reconciler,
107    /// retention_trimmer) to restrict the candidate set to
108    /// executions owned by this consumer. The four non-execution
109    /// scanners (budget_reconciler, budget_reset, quota_reconciler,
110    /// flow_projector) accept the filter for API uniformity but do
111    /// not apply it — their domains are not per-execution.
112    ///
113    /// Default: [`ScannerFilter::default`] — no filtering,
114    /// pre-#122 behaviour. Multi-tenant deployments that share a
115    /// single Valkey keyspace across two FlowFabric instances set
116    /// this (paired with
117    /// [`CompletionBackend::subscribe_completions_filtered`]) for
118    /// mutual isolation.
119    ///
120    /// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
121    pub scanner_filter: ScannerFilter,
122}
123
124impl Default for EngineConfig {
125    fn default() -> Self {
126        Self {
127            partition_config: PartitionConfig::default(),
128            lanes: vec![LaneId::new("default")],
129            lease_expiry_interval: Duration::from_millis(1500),
130            delayed_promoter_interval: Duration::from_millis(750),
131            index_reconciler_interval: Duration::from_secs(45),
132            attempt_timeout_interval: Duration::from_secs(2),
133            suspension_timeout_interval: Duration::from_secs(2),
134            pending_wp_expiry_interval: Duration::from_secs(5),
135            retention_trimmer_interval: Duration::from_secs(60),
136            budget_reset_interval: Duration::from_secs(15),
137            budget_reconciler_interval: Duration::from_secs(30),
138            quota_reconciler_interval: Duration::from_secs(30),
139            unblock_interval: Duration::from_secs(5),
140            dependency_reconciler_interval: Duration::from_secs(15),
141            flow_projector_interval: Duration::from_secs(15),
142            execution_deadline_interval: Duration::from_secs(5),
143            cancel_reconciler_interval: Duration::from_secs(15),
144            scanner_filter: ScannerFilter::default(),
145        }
146    }
147}
148
149/// The FlowFabric engine: partition routing + background scanners.
150pub struct Engine {
151    pub router: Arc<PartitionRouter>,
152    shutdown_tx: watch::Sender<bool>,
153    handles: Vec<JoinHandle<()>>,
154}
155
156impl Engine {
157    /// Start the engine with the given config and Valkey client.
158    ///
159    /// Spawns background scanner tasks. Returns immediately.
160    pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
161        // Construct a fresh metrics handle here so direct callers
162        // (examples, tests) don't need to. Under the default build
163        // (`observability` feature off) this is the no-op shim. With
164        // the feature on the handle is a real OTEL registry but — by
165        // design — one that nothing else shares; it's only useful for
166        // tests that want to exercise scanner cycle recording in
167        // isolation. Production code uses
168        // [`Self::start_with_metrics`] to plumb the same handle
169        // through the HTTP /metrics route.
170        Self::start_with_metrics(config, client, Arc::new(ff_observability::Metrics::new()))
171    }
172
173    /// PR-94: start the engine with a shared observability registry.
174    ///
175    /// Used by `ff-server` so scanner cycle metrics funnel into the
176    /// same Prometheus registry exposed at `/metrics`. Under the
177    /// `observability` feature (flipped via the same feature on
178    /// `ff-server` / `ff-engine`), the handle records into an OTEL
179    /// `MeterProvider`; otherwise the shim no-ops.
180    pub fn start_with_metrics(
181        config: EngineConfig,
182        client: ferriskey::Client,
183        metrics: Arc<ff_observability::Metrics>,
184    ) -> Self {
185        Self::start_internal(config, client, metrics, None)
186    }
187
188    /// Start the engine with a shared observability registry and a
189    /// completion stream for push-based DAG promotion (issue #90).
190    ///
191    /// The stream is typically produced by
192    /// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
193    /// The engine spawns a dispatch loop that drains the stream and
194    /// fires `ff_resolve_dependency` per completion, reducing DAG
195    /// latency from `interval × levels` to `~RTT × levels`. The
196    /// `dependency_reconciler` scanner remains as a safety net for
197    /// completions missed during subscriber reconnect windows.
198    pub fn start_with_completions(
199        config: EngineConfig,
200        client: ferriskey::Client,
201        metrics: Arc<ff_observability::Metrics>,
202        completions: CompletionStream,
203    ) -> Self {
204        Self::start_internal(config, client, metrics, Some(completions))
205    }
206
207    fn start_internal(
208        config: EngineConfig,
209        client: ferriskey::Client,
210        metrics: Arc<ff_observability::Metrics>,
211        completions: Option<CompletionStream>,
212    ) -> Self {
213        let (shutdown_tx, shutdown_rx) = watch::channel(false);
214        let num_partitions = config.partition_config.num_flow_partitions;
215        let router = Arc::new(PartitionRouter::new(config.partition_config));
216
217        let mut handles = Vec::new();
218
219        let scanner_filter = config.scanner_filter.clone();
220
221        // Lease expiry scanner
222        let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter(
223            config.lease_expiry_interval,
224            scanner_filter.clone(),
225        ));
226        handles.push(supervised_spawn(
227            lease_scanner,
228            client.clone(),
229            num_partitions,
230            shutdown_rx.clone(),
231            metrics.clone(),
232        ));
233
234        // Delayed promoter
235        let delayed_scanner = Arc::new(DelayedPromoter::with_filter(
236            config.delayed_promoter_interval,
237            config.lanes.clone(),
238            scanner_filter.clone(),
239        ));
240        handles.push(supervised_spawn(
241            delayed_scanner,
242            client.clone(),
243            num_partitions,
244            shutdown_rx.clone(),
245            metrics.clone(),
246        ));
247
248        // Index reconciler
249        let reconciler = Arc::new(IndexReconciler::with_filter(
250            config.index_reconciler_interval,
251            config.lanes.clone(),
252            scanner_filter.clone(),
253        ));
254        handles.push(supervised_spawn(
255            reconciler,
256            client.clone(),
257            num_partitions,
258            shutdown_rx.clone(),
259            metrics.clone(),
260        ));
261
262        // Attempt timeout scanner
263        let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter(
264            config.attempt_timeout_interval,
265            config.lanes.clone(),
266            scanner_filter.clone(),
267        ));
268        handles.push(supervised_spawn(
269            timeout_scanner,
270            client.clone(),
271            num_partitions,
272            shutdown_rx.clone(),
273            metrics.clone(),
274        ));
275
276        // Suspension timeout scanner
277        let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter(
278            config.suspension_timeout_interval,
279            scanner_filter.clone(),
280        ));
281        handles.push(supervised_spawn(
282            suspension_scanner,
283            client.clone(),
284            num_partitions,
285            shutdown_rx.clone(),
286            metrics.clone(),
287        ));
288
289        // Pending waitpoint expiry scanner
290        let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter(
291            config.pending_wp_expiry_interval,
292            scanner_filter.clone(),
293        ));
294        handles.push(supervised_spawn(
295            pending_wp_scanner,
296            client.clone(),
297            num_partitions,
298            shutdown_rx.clone(),
299            metrics.clone(),
300        ));
301
302        // Retention trimmer
303        let retention_scanner = Arc::new(RetentionTrimmer::with_filter(
304            config.retention_trimmer_interval,
305            config.lanes.clone(),
306            scanner_filter.clone(),
307        ));
308        handles.push(supervised_spawn(
309            retention_scanner,
310            client.clone(),
311            num_partitions,
312            shutdown_rx.clone(),
313            metrics.clone(),
314        ));
315
316        // Budget reset scanner (iterates budget partitions).
317        // Filter is accepted but not applied (budget partitions don't
318        // carry the per-execution namespace / instance_tag shape).
319        let budget_reset = Arc::new(BudgetResetScanner::with_filter(
320            config.budget_reset_interval,
321            scanner_filter.clone(),
322        ));
323        handles.push(supervised_spawn(
324            budget_reset,
325            client.clone(),
326            config.partition_config.num_budget_partitions,
327            shutdown_rx.clone(),
328            metrics.clone(),
329        ));
330
331        // Budget reconciler (iterates budget partitions). Filter
332        // accepted but not applied — see BudgetReconciler::with_filter
333        // rustdoc.
334        let budget_reconciler = Arc::new(BudgetReconciler::with_filter(
335            config.budget_reconciler_interval,
336            scanner_filter.clone(),
337        ));
338        handles.push(supervised_spawn(
339            budget_reconciler,
340            client.clone(),
341            config.partition_config.num_budget_partitions,
342            shutdown_rx.clone(),
343            metrics.clone(),
344        ));
345
346        // Unblock scanner (iterates execution partitions, re-evaluates blocked)
347        let unblock_scanner = Arc::new(UnblockScanner::with_filter(
348            config.unblock_interval,
349            config.lanes.clone(),
350            config.partition_config,
351            scanner_filter.clone(),
352        ));
353        handles.push(supervised_spawn(
354            unblock_scanner,
355            client.clone(),
356            num_partitions,
357            shutdown_rx.clone(),
358            metrics.clone(),
359        ));
360
361        // Dependency reconciler (iterates execution partitions)
362        let dep_reconciler = Arc::new(DependencyReconciler::with_filter(
363            config.dependency_reconciler_interval,
364            config.lanes.clone(),
365            config.partition_config,
366            scanner_filter.clone(),
367        ));
368        handles.push(supervised_spawn(
369            dep_reconciler,
370            client.clone(),
371            num_partitions,
372            shutdown_rx.clone(),
373            metrics.clone(),
374        ));
375
376        // Quota reconciler (iterates quota partitions). Filter
377        // accepted but not applied — see QuotaReconciler::with_filter
378        // rustdoc.
379        let quota_reconciler = Arc::new(QuotaReconciler::with_filter(
380            config.quota_reconciler_interval,
381            scanner_filter.clone(),
382        ));
383        handles.push(supervised_spawn(
384            quota_reconciler,
385            client.clone(),
386            config.partition_config.num_quota_partitions,
387            shutdown_rx.clone(),
388            metrics.clone(),
389        ));
390
391        // Flow summary projector (iterates flow partitions). Filter
392        // accepted but not applied — see FlowProjector::with_filter
393        // rustdoc.
394        let flow_projector = Arc::new(FlowProjector::with_filter(
395            config.flow_projector_interval,
396            config.partition_config,
397            scanner_filter.clone(),
398        ));
399        handles.push(supervised_spawn(
400            flow_projector,
401            client.clone(),
402            config.partition_config.num_flow_partitions,
403            shutdown_rx.clone(),
404            metrics.clone(),
405        ));
406
407        // Cancel reconciler (iterates flow partitions). Drains
408        // cancel_backlog entries whose grace window has elapsed so a
409        // process crash mid-dispatch can't leave flow members un-cancelled.
410        let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter(
411            config.cancel_reconciler_interval,
412            config.partition_config,
413            scanner_filter.clone(),
414        ));
415        handles.push(supervised_spawn(
416            cancel_reconciler,
417            client.clone(),
418            config.partition_config.num_flow_partitions,
419            shutdown_rx.clone(),
420            metrics.clone(),
421        ));
422
423        // Execution deadline scanner (iterates execution partitions)
424        let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter(
425            config.execution_deadline_interval,
426            config.lanes,
427            scanner_filter,
428        ));
429        handles.push(supervised_spawn(
430            deadline_scanner,
431            client.clone(),
432            num_partitions,
433            shutdown_rx.clone(),
434            metrics.clone(),
435        ));
436
437        // Completion dispatch loop (Batch C item 6 — push-based DAG
438        // promotion; backend-agnostic since issue #90). Optional: when
439        // a stream is provided, spawn a task that drains
440        // `CompletionPayload`s and fires dependency resolution per
441        // completion. See `completion_listener` module docs.
442        let listener_enabled = completions.is_some();
443        if let Some(stream) = completions {
444            handles.push(completion_listener::spawn_dispatch_loop(
445                router.clone(),
446                client,
447                stream,
448                shutdown_rx,
449            ));
450        }
451
452        let scanner_count = if listener_enabled { "15 scanners + completion dispatch" } else { "15 scanners" };
453        tracing::info!(
454            num_partitions,
455            budget_partitions = config.partition_config.num_budget_partitions,
456            quota_partitions = config.partition_config.num_quota_partitions,
457            flow_partitions = config.partition_config.num_flow_partitions,
458            "engine started with {scanner_count}"
459        );
460
461        Self {
462            router,
463            shutdown_tx,
464            handles,
465        }
466    }
467
468    /// Signal all scanners to stop and wait for them to finish.
469    ///
470    /// Waits up to 15 seconds for scanners to drain. If any scanner is
471    /// blocked on a hung Valkey command, the timeout prevents shutdown
472    /// from hanging indefinitely (Kubernetes SIGKILL safety).
473    pub async fn shutdown(self) {
474        let _ = self.shutdown_tx.send(true);
475        let join_all = async {
476            for handle in self.handles {
477                let _ = handle.await;
478            }
479        };
480        match tokio::time::timeout(Duration::from_secs(15), join_all).await {
481            Ok(()) => tracing::info!("engine shutdown complete"),
482            Err(_) => tracing::warn!(
483                "engine shutdown timed out after 15s, abandoning remaining scanners"
484            ),
485        }
486    }
487}