ff_engine/lib.rs
1//! ff-engine: cross-partition dispatch and background scanners.
2
3pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::backend::ScannerFilter;
13use ff_core::completion_backend::CompletionStream;
14use ff_core::partition::PartitionConfig;
15use ff_core::types::LaneId;
16use tokio::sync::watch;
17use tokio::task::JoinHandle;
18
19use partition_router::PartitionRouter;
20use supervisor::supervised_spawn;
21use scanner::attempt_timeout::AttemptTimeoutScanner;
22use scanner::execution_deadline::ExecutionDeadlineScanner;
23use scanner::budget_reconciler::BudgetReconciler;
24use scanner::budget_reset::BudgetResetScanner;
25use scanner::delayed_promoter::DelayedPromoter;
26use scanner::dependency_reconciler::DependencyReconciler;
27use scanner::index_reconciler::IndexReconciler;
28use scanner::lease_expiry::LeaseExpiryScanner;
29use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
30use scanner::quota_reconciler::QuotaReconciler;
31use scanner::retention_trimmer::RetentionTrimmer;
32use scanner::suspension_timeout::SuspensionTimeoutScanner;
33use scanner::flow_projector::FlowProjector;
34use scanner::unblock::UnblockScanner;
35
36/// Engine configuration.
37pub struct EngineConfig {
38 pub partition_config: PartitionConfig,
39 /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
40 pub lanes: Vec<LaneId>,
41 /// Lease expiry scan interval. Default: 1.5s.
42 pub lease_expiry_interval: Duration,
43 /// Delayed promoter scan interval. Default: 750ms.
44 pub delayed_promoter_interval: Duration,
45 /// Index reconciler scan interval. Default: 45s.
46 pub index_reconciler_interval: Duration,
47 /// Attempt timeout scan interval. Default: 2s.
48 pub attempt_timeout_interval: Duration,
49 /// Suspension timeout scan interval. Default: 2s.
50 pub suspension_timeout_interval: Duration,
51 /// Pending waitpoint expiry scan interval. Default: 5s.
52 pub pending_wp_expiry_interval: Duration,
53 /// Retention trimmer scan interval. Default: 60s.
54 pub retention_trimmer_interval: Duration,
55 /// Budget reset scan interval. Default: 15s.
56 pub budget_reset_interval: Duration,
57 /// Budget reconciler scan interval. Default: 30s.
58 pub budget_reconciler_interval: Duration,
59 /// Quota reconciler scan interval. Default: 30s.
60 pub quota_reconciler_interval: Duration,
61 /// Unblock scanner interval. Default: 5s.
62 pub unblock_interval: Duration,
63 /// Dependency reconciler interval. Default: 15s.
64 ///
65 /// Post-Batch-C this scanner is a **safety net**, not the primary
66 /// promotion path. When a [`CompletionStream`] is handed to
67 /// `start_with_completions`, push-based dispatch drives DAG
68 /// promotion synchronously with each completion — under normal
69 /// operation DAG latency is `~RTT × levels`, not `interval × levels`.
70 ///
71 /// The reconciler still runs as a catch-all for:
72 /// - messages missed during subscriber restart or reconnect;
73 /// - pre-Batch-C executions without `core.flow_id` stamped;
74 /// - operator-driven edge mutation that doesn't pass through
75 /// the terminal-transition publish path.
76 ///
77 /// 15s idle-scan cost is minimal. If the push dispatch loop is
78 /// disabled (engine started via `start`/`start_with_metrics`
79 /// without a stream), drop this to 1s to preserve pre-Batch-C
80 /// DAG latency behavior.
81 pub dependency_reconciler_interval: Duration,
82 /// Flow summary projector interval. Default: 15s.
83 ///
84 /// Separate observability projection path — maintains the flow
85 /// summary view, NOT on the DAG-completion latency path. Kept at
86 /// 15s in this config; a change to that cadence is unrelated to
87 /// dependency resolution.
88 pub flow_projector_interval: Duration,
89 /// Execution deadline scanner interval. Default: 5s.
90 pub execution_deadline_interval: Duration,
91
92 /// Cancel reconciler scanner interval. Default: 15s.
93 ///
94 /// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
95 /// flows owing async member cancels. Each cancelled flow gets a
96 /// grace window (30s by default, set by ff-server) before the
97 /// reconciler picks it up, so the live in-process dispatch isn't
98 /// fought on the happy path.
99 pub cancel_reconciler_interval: Duration,
100
101 /// RFC-016 Stage C sibling-cancel dispatcher interval. Default: 1s.
102 ///
103 /// Drains the per-flow-partition `pending_cancel_groups` SET,
104 /// populated by `ff_resolve_dependency` whenever an AnyOf/Quorum
105 /// edge group fires terminal under `OnSatisfied::CancelRemaining`.
106 /// For each indexed group the dispatcher issues per-sibling
107 /// `ff_cancel_execution` with `FailureReason::sibling_quorum_{
108 /// satisfied,impossible}`, then atomically SREM+clear via
109 /// `ff_drain_sibling_cancel_group`.
110 ///
111 /// A short default (1s) minimises the window between quorum
112 /// satisfaction and sibling termination — this is the user-facing
113 /// latency floor for "kill the losers" workflows. Bump only if a
114 /// deployment's steady-state pending-set depth is observed to
115 /// backlog under the 1s cadence; Stage C's §4.2 benchmark gates
116 /// the release against the p99 ≤ 500 ms SLO at n=100 (§4.2 of
117 /// the RFC).
118 pub edge_cancel_dispatcher_interval: Duration,
119
120 /// RFC-016 Stage D sibling-cancel reconciler interval. Default: 10s.
121 ///
122 /// Safety-net scanner for Invariant Q6: if the engine crashed
123 /// between `ff_resolve_dependency`'s SADD to `pending_cancel_groups`
124 /// and the dispatcher's `ff_drain_sibling_cancel_group`, this
125 /// reconciler detects the orphan tuple and finalises via
126 /// `ff_reconcile_sibling_cancel_group`. It runs at a deliberately
127 /// slower cadence than the dispatcher (10s vs 1s) so the dispatcher
128 /// owns the happy path and the reconciler only cleans up
129 /// crash-recovery residue. The reconciler MUST NOT fight the
130 /// dispatcher — it no-ops whenever siblings are still non-terminal.
131 pub edge_cancel_reconciler_interval: Duration,
132
133 /// Per-consumer scanner filter (issue #122).
134 ///
135 /// Applied by every execution-shaped scanner (lease_expiry,
136 /// attempt_timeout, execution_deadline, suspension_timeout,
137 /// pending_wp_expiry, delayed_promoter, dependency_reconciler,
138 /// cancel_reconciler, unblock, index_reconciler,
139 /// retention_trimmer) to restrict the candidate set to
140 /// executions owned by this consumer. The four non-execution
141 /// scanners (budget_reconciler, budget_reset, quota_reconciler,
142 /// flow_projector) accept the filter for API uniformity but do
143 /// not apply it — their domains are not per-execution.
144 ///
145 /// Default: [`ScannerFilter::default`] — no filtering,
146 /// pre-#122 behaviour. Multi-tenant deployments that share a
147 /// single Valkey keyspace across two FlowFabric instances set
148 /// this (paired with
149 /// [`CompletionBackend::subscribe_completions_filtered`]) for
150 /// mutual isolation.
151 ///
152 /// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
153 pub scanner_filter: ScannerFilter,
154}
155
156impl Default for EngineConfig {
157 fn default() -> Self {
158 Self {
159 partition_config: PartitionConfig::default(),
160 lanes: vec![LaneId::new("default")],
161 lease_expiry_interval: Duration::from_millis(1500),
162 delayed_promoter_interval: Duration::from_millis(750),
163 index_reconciler_interval: Duration::from_secs(45),
164 attempt_timeout_interval: Duration::from_secs(2),
165 suspension_timeout_interval: Duration::from_secs(2),
166 pending_wp_expiry_interval: Duration::from_secs(5),
167 retention_trimmer_interval: Duration::from_secs(60),
168 budget_reset_interval: Duration::from_secs(15),
169 budget_reconciler_interval: Duration::from_secs(30),
170 quota_reconciler_interval: Duration::from_secs(30),
171 unblock_interval: Duration::from_secs(5),
172 dependency_reconciler_interval: Duration::from_secs(15),
173 flow_projector_interval: Duration::from_secs(15),
174 execution_deadline_interval: Duration::from_secs(5),
175 cancel_reconciler_interval: Duration::from_secs(15),
176 edge_cancel_dispatcher_interval: Duration::from_secs(1),
177 edge_cancel_reconciler_interval: Duration::from_secs(10),
178 scanner_filter: ScannerFilter::default(),
179 }
180 }
181}
182
183/// The FlowFabric engine: partition routing + background scanners.
184pub struct Engine {
185 pub router: Arc<PartitionRouter>,
186 shutdown_tx: watch::Sender<bool>,
187 handles: Vec<JoinHandle<()>>,
188}
189
190impl Engine {
191 /// Start the engine with the given config and Valkey client.
192 ///
193 /// Spawns background scanner tasks. Returns immediately.
194 pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
195 // Construct a fresh metrics handle here so direct callers
196 // (examples, tests) don't need to. Under the default build
197 // (`observability` feature off) this is the no-op shim. With
198 // the feature on the handle is a real OTEL registry but — by
199 // design — one that nothing else shares; it's only useful for
200 // tests that want to exercise scanner cycle recording in
201 // isolation. Production code uses
202 // [`Self::start_with_metrics`] to plumb the same handle
203 // through the HTTP /metrics route.
204 Self::start_with_metrics(config, client, Arc::new(ff_observability::Metrics::new()))
205 }
206
207 /// PR-94: start the engine with a shared observability registry.
208 ///
209 /// Used by `ff-server` so scanner cycle metrics funnel into the
210 /// same Prometheus registry exposed at `/metrics`. Under the
211 /// `observability` feature (flipped via the same feature on
212 /// `ff-server` / `ff-engine`), the handle records into an OTEL
213 /// `MeterProvider`; otherwise the shim no-ops.
214 pub fn start_with_metrics(
215 config: EngineConfig,
216 client: ferriskey::Client,
217 metrics: Arc<ff_observability::Metrics>,
218 ) -> Self {
219 Self::start_internal(config, client, metrics, None)
220 }
221
222 /// Start the engine with a shared observability registry and a
223 /// completion stream for push-based DAG promotion (issue #90).
224 ///
225 /// The stream is typically produced by
226 /// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
227 /// The engine spawns a dispatch loop that drains the stream and
228 /// fires `ff_resolve_dependency` per completion, reducing DAG
229 /// latency from `interval × levels` to `~RTT × levels`. The
230 /// `dependency_reconciler` scanner remains as a safety net for
231 /// completions missed during subscriber reconnect windows.
232 pub fn start_with_completions(
233 config: EngineConfig,
234 client: ferriskey::Client,
235 metrics: Arc<ff_observability::Metrics>,
236 completions: CompletionStream,
237 ) -> Self {
238 Self::start_internal(config, client, metrics, Some(completions))
239 }
240
241 fn start_internal(
242 config: EngineConfig,
243 client: ferriskey::Client,
244 metrics: Arc<ff_observability::Metrics>,
245 completions: Option<CompletionStream>,
246 ) -> Self {
247 let (shutdown_tx, shutdown_rx) = watch::channel(false);
248 let num_partitions = config.partition_config.num_flow_partitions;
249 let router = Arc::new(PartitionRouter::new(config.partition_config));
250
251 let mut handles = Vec::new();
252
253 let scanner_filter = config.scanner_filter.clone();
254
255 // Lease expiry scanner
256 let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter(
257 config.lease_expiry_interval,
258 scanner_filter.clone(),
259 ));
260 handles.push(supervised_spawn(
261 lease_scanner,
262 client.clone(),
263 num_partitions,
264 shutdown_rx.clone(),
265 metrics.clone(),
266 ));
267
268 // Delayed promoter
269 let delayed_scanner = Arc::new(DelayedPromoter::with_filter(
270 config.delayed_promoter_interval,
271 config.lanes.clone(),
272 scanner_filter.clone(),
273 ));
274 handles.push(supervised_spawn(
275 delayed_scanner,
276 client.clone(),
277 num_partitions,
278 shutdown_rx.clone(),
279 metrics.clone(),
280 ));
281
282 // Index reconciler
283 let reconciler = Arc::new(IndexReconciler::with_filter(
284 config.index_reconciler_interval,
285 config.lanes.clone(),
286 scanner_filter.clone(),
287 ));
288 handles.push(supervised_spawn(
289 reconciler,
290 client.clone(),
291 num_partitions,
292 shutdown_rx.clone(),
293 metrics.clone(),
294 ));
295
296 // Attempt timeout scanner
297 let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter(
298 config.attempt_timeout_interval,
299 config.lanes.clone(),
300 scanner_filter.clone(),
301 ));
302 handles.push(supervised_spawn(
303 timeout_scanner,
304 client.clone(),
305 num_partitions,
306 shutdown_rx.clone(),
307 metrics.clone(),
308 ));
309
310 // Suspension timeout scanner
311 let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter(
312 config.suspension_timeout_interval,
313 scanner_filter.clone(),
314 ));
315 handles.push(supervised_spawn(
316 suspension_scanner,
317 client.clone(),
318 num_partitions,
319 shutdown_rx.clone(),
320 metrics.clone(),
321 ));
322
323 // Pending waitpoint expiry scanner
324 let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter(
325 config.pending_wp_expiry_interval,
326 scanner_filter.clone(),
327 ));
328 handles.push(supervised_spawn(
329 pending_wp_scanner,
330 client.clone(),
331 num_partitions,
332 shutdown_rx.clone(),
333 metrics.clone(),
334 ));
335
336 // Retention trimmer
337 let retention_scanner = Arc::new(RetentionTrimmer::with_filter(
338 config.retention_trimmer_interval,
339 config.lanes.clone(),
340 scanner_filter.clone(),
341 ));
342 handles.push(supervised_spawn(
343 retention_scanner,
344 client.clone(),
345 num_partitions,
346 shutdown_rx.clone(),
347 metrics.clone(),
348 ));
349
350 // Budget reset scanner (iterates budget partitions).
351 // Filter is accepted but not applied (budget partitions don't
352 // carry the per-execution namespace / instance_tag shape).
353 let budget_reset = Arc::new(BudgetResetScanner::with_filter(
354 config.budget_reset_interval,
355 scanner_filter.clone(),
356 ));
357 handles.push(supervised_spawn(
358 budget_reset,
359 client.clone(),
360 config.partition_config.num_budget_partitions,
361 shutdown_rx.clone(),
362 metrics.clone(),
363 ));
364
365 // Budget reconciler (iterates budget partitions). Filter
366 // accepted but not applied — see BudgetReconciler::with_filter
367 // rustdoc.
368 let budget_reconciler = Arc::new(BudgetReconciler::with_filter(
369 config.budget_reconciler_interval,
370 scanner_filter.clone(),
371 ));
372 handles.push(supervised_spawn(
373 budget_reconciler,
374 client.clone(),
375 config.partition_config.num_budget_partitions,
376 shutdown_rx.clone(),
377 metrics.clone(),
378 ));
379
380 // Unblock scanner (iterates execution partitions, re-evaluates blocked)
381 let unblock_scanner = Arc::new(UnblockScanner::with_filter(
382 config.unblock_interval,
383 config.lanes.clone(),
384 config.partition_config,
385 scanner_filter.clone(),
386 ));
387 handles.push(supervised_spawn(
388 unblock_scanner,
389 client.clone(),
390 num_partitions,
391 shutdown_rx.clone(),
392 metrics.clone(),
393 ));
394
395 // Dependency reconciler (iterates execution partitions)
396 let dep_reconciler = Arc::new(DependencyReconciler::with_filter(
397 config.dependency_reconciler_interval,
398 config.lanes.clone(),
399 config.partition_config,
400 scanner_filter.clone(),
401 ));
402 handles.push(supervised_spawn(
403 dep_reconciler,
404 client.clone(),
405 num_partitions,
406 shutdown_rx.clone(),
407 metrics.clone(),
408 ));
409
410 // Quota reconciler (iterates quota partitions). Filter
411 // accepted but not applied — see QuotaReconciler::with_filter
412 // rustdoc.
413 let quota_reconciler = Arc::new(QuotaReconciler::with_filter(
414 config.quota_reconciler_interval,
415 scanner_filter.clone(),
416 ));
417 handles.push(supervised_spawn(
418 quota_reconciler,
419 client.clone(),
420 config.partition_config.num_quota_partitions,
421 shutdown_rx.clone(),
422 metrics.clone(),
423 ));
424
425 // Flow summary projector (iterates flow partitions). Filter
426 // accepted but not applied — see FlowProjector::with_filter
427 // rustdoc.
428 let flow_projector = Arc::new(FlowProjector::with_filter(
429 config.flow_projector_interval,
430 config.partition_config,
431 scanner_filter.clone(),
432 ));
433 handles.push(supervised_spawn(
434 flow_projector,
435 client.clone(),
436 config.partition_config.num_flow_partitions,
437 shutdown_rx.clone(),
438 metrics.clone(),
439 ));
440
441 // Cancel reconciler (iterates flow partitions). Drains
442 // cancel_backlog entries whose grace window has elapsed so a
443 // process crash mid-dispatch can't leave flow members un-cancelled.
444 let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter(
445 config.cancel_reconciler_interval,
446 config.partition_config,
447 scanner_filter.clone(),
448 ));
449 handles.push(supervised_spawn(
450 cancel_reconciler,
451 client.clone(),
452 config.partition_config.num_flow_partitions,
453 shutdown_rx.clone(),
454 metrics.clone(),
455 ));
456
457 // RFC-016 Stage C: sibling-cancel dispatcher. Iterates flow
458 // partitions, drains `pending_cancel_groups` SET via
459 // `ff_drain_sibling_cancel_group`, issues per-sibling
460 // `ff_cancel_execution` with sibling_quorum reasons.
461 let edge_cancel_dispatcher = Arc::new(
462 scanner::edge_cancel_dispatcher::EdgeCancelDispatcher::with_filter_and_metrics(
463 config.edge_cancel_dispatcher_interval,
464 config.partition_config,
465 scanner_filter.clone(),
466 metrics.clone(),
467 ),
468 );
469 handles.push(supervised_spawn(
470 edge_cancel_dispatcher,
471 client.clone(),
472 config.partition_config.num_flow_partitions,
473 shutdown_rx.clone(),
474 metrics.clone(),
475 ));
476
477 // RFC-016 Stage D: sibling-cancel reconciler. Crash-recovery
478 // safety net for Invariant Q6 — finalises tuples in
479 // `pending_cancel_groups` whose dispatcher drain was interrupted
480 // by an engine crash. Runs at a slower cadence than the
481 // dispatcher so it never fights the happy path.
482 let edge_cancel_reconciler = Arc::new(
483 scanner::edge_cancel_reconciler::EdgeCancelReconciler::with_filter_and_metrics(
484 config.edge_cancel_reconciler_interval,
485 scanner_filter.clone(),
486 metrics.clone(),
487 ),
488 );
489 handles.push(supervised_spawn(
490 edge_cancel_reconciler,
491 client.clone(),
492 config.partition_config.num_flow_partitions,
493 shutdown_rx.clone(),
494 metrics.clone(),
495 ));
496
497 // Execution deadline scanner (iterates execution partitions)
498 let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter(
499 config.execution_deadline_interval,
500 config.lanes,
501 scanner_filter,
502 ));
503 handles.push(supervised_spawn(
504 deadline_scanner,
505 client.clone(),
506 num_partitions,
507 shutdown_rx.clone(),
508 metrics.clone(),
509 ));
510
511 // Completion dispatch loop (Batch C item 6 — push-based DAG
512 // promotion; backend-agnostic since issue #90). Optional: when
513 // a stream is provided, spawn a task that drains
514 // `CompletionPayload`s and fires dependency resolution per
515 // completion. See `completion_listener` module docs.
516 let listener_enabled = completions.is_some();
517 if let Some(stream) = completions {
518 handles.push(completion_listener::spawn_dispatch_loop(
519 router.clone(),
520 client,
521 stream,
522 shutdown_rx,
523 ));
524 }
525
526 let scanner_count = if listener_enabled { "17 scanners + completion dispatch" } else { "17 scanners" };
527 tracing::info!(
528 num_partitions,
529 budget_partitions = config.partition_config.num_budget_partitions,
530 quota_partitions = config.partition_config.num_quota_partitions,
531 flow_partitions = config.partition_config.num_flow_partitions,
532 "engine started with {scanner_count}"
533 );
534
535 Self {
536 router,
537 shutdown_tx,
538 handles,
539 }
540 }
541
542 /// Signal all scanners to stop and wait for them to finish.
543 ///
544 /// Waits up to 15 seconds for scanners to drain. If any scanner is
545 /// blocked on a hung Valkey command, the timeout prevents shutdown
546 /// from hanging indefinitely (Kubernetes SIGKILL safety).
547 pub async fn shutdown(self) {
548 let _ = self.shutdown_tx.send(true);
549 let join_all = async {
550 for handle in self.handles {
551 let _ = handle.await;
552 }
553 };
554 match tokio::time::timeout(Duration::from_secs(15), join_all).await {
555 Ok(()) => tracing::info!("engine shutdown complete"),
556 Err(_) => tracing::warn!(
557 "engine shutdown timed out after 15s, abandoning remaining scanners"
558 ),
559 }
560 }
561}