1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
//! ff-engine: cross-partition dispatch and background scanners.
pub mod budget;
pub mod completion_listener;
pub mod partition_router;
pub mod scanner;
pub mod supervisor;
use std::sync::Arc;
use std::time::Duration;
use ff_core::backend::ScannerFilter;
use ff_core::completion_backend::CompletionStream;
use ff_core::partition::PartitionConfig;
use ff_core::types::LaneId;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use partition_router::PartitionRouter;
use supervisor::supervised_spawn;
use scanner::attempt_timeout::AttemptTimeoutScanner;
use scanner::execution_deadline::ExecutionDeadlineScanner;
use scanner::budget_reconciler::BudgetReconciler;
use scanner::budget_reset::BudgetResetScanner;
use scanner::delayed_promoter::DelayedPromoter;
use scanner::dependency_reconciler::DependencyReconciler;
use scanner::index_reconciler::IndexReconciler;
use scanner::lease_expiry::LeaseExpiryScanner;
use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
use scanner::quota_reconciler::QuotaReconciler;
use scanner::retention_trimmer::RetentionTrimmer;
use scanner::suspension_timeout::SuspensionTimeoutScanner;
use scanner::flow_projector::FlowProjector;
use scanner::unblock::UnblockScanner;
/// Engine configuration.
pub struct EngineConfig {
pub partition_config: PartitionConfig,
/// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
pub lanes: Vec<LaneId>,
/// Lease expiry scan interval. Default: 1.5s.
pub lease_expiry_interval: Duration,
/// Delayed promoter scan interval. Default: 750ms.
pub delayed_promoter_interval: Duration,
/// Index reconciler scan interval. Default: 45s.
pub index_reconciler_interval: Duration,
/// Attempt timeout scan interval. Default: 2s.
pub attempt_timeout_interval: Duration,
/// Suspension timeout scan interval. Default: 2s.
pub suspension_timeout_interval: Duration,
/// Pending waitpoint expiry scan interval. Default: 5s.
pub pending_wp_expiry_interval: Duration,
/// Retention trimmer scan interval. Default: 60s.
pub retention_trimmer_interval: Duration,
/// Budget reset scan interval. Default: 15s.
pub budget_reset_interval: Duration,
/// Budget reconciler scan interval. Default: 30s.
pub budget_reconciler_interval: Duration,
/// Quota reconciler scan interval. Default: 30s.
pub quota_reconciler_interval: Duration,
/// Unblock scanner interval. Default: 5s.
pub unblock_interval: Duration,
/// Dependency reconciler interval. Default: 15s.
///
/// Post-Batch-C this scanner is a **safety net**, not the primary
/// promotion path. When a [`CompletionStream`] is handed to
/// `start_with_completions`, push-based dispatch drives DAG
/// promotion synchronously with each completion — under normal
/// operation DAG latency is `~RTT × levels`, not `interval × levels`.
///
/// The reconciler still runs as a catch-all for:
/// - messages missed during subscriber restart or reconnect;
/// - pre-Batch-C executions without `core.flow_id` stamped;
/// - operator-driven edge mutation that doesn't pass through
/// the terminal-transition publish path.
///
/// 15s idle-scan cost is minimal. If the push dispatch loop is
/// disabled (engine started via `start`/`start_with_metrics`
/// without a stream), drop this to 1s to preserve pre-Batch-C
/// DAG latency behavior.
pub dependency_reconciler_interval: Duration,
/// Flow summary projector interval. Default: 15s.
///
/// Separate observability projection path — maintains the flow
/// summary view, NOT on the DAG-completion latency path. Kept at
/// 15s in this config; a change to that cadence is unrelated to
/// dependency resolution.
pub flow_projector_interval: Duration,
/// Execution deadline scanner interval. Default: 5s.
pub execution_deadline_interval: Duration,
/// Cancel reconciler scanner interval. Default: 15s.
///
/// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
/// flows owing async member cancels. Each cancelled flow gets a
/// grace window (30s by default, set by ff-server) before the
/// reconciler picks it up, so the live in-process dispatch isn't
/// fought on the happy path.
pub cancel_reconciler_interval: Duration,
/// RFC-016 Stage C sibling-cancel dispatcher interval. Default: 1s.
///
/// Drains the per-flow-partition `pending_cancel_groups` SET,
/// populated by `ff_resolve_dependency` whenever an AnyOf/Quorum
/// edge group fires terminal under `OnSatisfied::CancelRemaining`.
/// For each indexed group the dispatcher issues per-sibling
/// `ff_cancel_execution` with `FailureReason::sibling_quorum_{
/// satisfied,impossible}`, then atomically SREM+clear via
/// `ff_drain_sibling_cancel_group`.
///
/// A short default (1s) minimises the window between quorum
/// satisfaction and sibling termination — this is the user-facing
/// latency floor for "kill the losers" workflows. Bump only if a
/// deployment's steady-state pending-set depth is observed to
/// backlog under the 1s cadence; Stage C's §4.2 benchmark gates
/// the release against the p99 ≤ 500 ms SLO at n=100 (§4.2 of
/// the RFC).
pub edge_cancel_dispatcher_interval: Duration,
/// RFC-016 Stage D sibling-cancel reconciler interval. Default: 10s.
///
/// Safety-net scanner for Invariant Q6: if the engine crashed
/// between `ff_resolve_dependency`'s SADD to `pending_cancel_groups`
/// and the dispatcher's `ff_drain_sibling_cancel_group`, this
/// reconciler detects the orphan tuple and finalises via
/// `ff_reconcile_sibling_cancel_group`. It runs at a deliberately
/// slower cadence than the dispatcher (10s vs 1s) so the dispatcher
/// owns the happy path and the reconciler only cleans up
/// crash-recovery residue. The reconciler MUST NOT fight the
/// dispatcher — it no-ops whenever siblings are still non-terminal.
pub edge_cancel_reconciler_interval: Duration,
/// Per-consumer scanner filter (issue #122).
///
/// Applied by every execution-shaped scanner (lease_expiry,
/// attempt_timeout, execution_deadline, suspension_timeout,
/// pending_wp_expiry, delayed_promoter, dependency_reconciler,
/// cancel_reconciler, unblock, index_reconciler,
/// retention_trimmer) to restrict the candidate set to
/// executions owned by this consumer. The four non-execution
/// scanners (budget_reconciler, budget_reset, quota_reconciler,
/// flow_projector) accept the filter for API uniformity but do
/// not apply it — their domains are not per-execution.
///
/// Default: [`ScannerFilter::default`] — no filtering,
/// pre-#122 behaviour. Multi-tenant deployments that share a
/// single Valkey keyspace across two FlowFabric instances set
/// this (paired with
/// [`CompletionBackend::subscribe_completions_filtered`]) for
/// mutual isolation.
///
/// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
pub scanner_filter: ScannerFilter,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
partition_config: PartitionConfig::default(),
lanes: vec![LaneId::new("default")],
lease_expiry_interval: Duration::from_millis(1500),
delayed_promoter_interval: Duration::from_millis(750),
index_reconciler_interval: Duration::from_secs(45),
attempt_timeout_interval: Duration::from_secs(2),
suspension_timeout_interval: Duration::from_secs(2),
pending_wp_expiry_interval: Duration::from_secs(5),
retention_trimmer_interval: Duration::from_secs(60),
budget_reset_interval: Duration::from_secs(15),
budget_reconciler_interval: Duration::from_secs(30),
quota_reconciler_interval: Duration::from_secs(30),
unblock_interval: Duration::from_secs(5),
dependency_reconciler_interval: Duration::from_secs(15),
flow_projector_interval: Duration::from_secs(15),
execution_deadline_interval: Duration::from_secs(5),
cancel_reconciler_interval: Duration::from_secs(15),
edge_cancel_dispatcher_interval: Duration::from_secs(1),
edge_cancel_reconciler_interval: Duration::from_secs(10),
scanner_filter: ScannerFilter::default(),
}
}
}
/// The FlowFabric engine: partition routing + background scanners.
pub struct Engine {
pub router: Arc<PartitionRouter>,
shutdown_tx: watch::Sender<bool>,
handles: Vec<JoinHandle<()>>,
}
impl Engine {
/// Start the engine with the given config and Valkey client.
///
/// Spawns background scanner tasks. Returns immediately.
pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
// Construct a fresh metrics handle here so direct callers
// (examples, tests) don't need to. Under the default build
// (`observability` feature off) this is the no-op shim. With
// the feature on the handle is a real OTEL registry but — by
// design — one that nothing else shares; it's only useful for
// tests that want to exercise scanner cycle recording in
// isolation. Production code uses
// [`Self::start_with_metrics`] to plumb the same handle
// through the HTTP /metrics route.
Self::start_with_metrics(config, client, Arc::new(ff_observability::Metrics::new()))
}
/// PR-94: start the engine with a shared observability registry.
///
/// Used by `ff-server` so scanner cycle metrics funnel into the
/// same Prometheus registry exposed at `/metrics`. Under the
/// `observability` feature (flipped via the same feature on
/// `ff-server` / `ff-engine`), the handle records into an OTEL
/// `MeterProvider`; otherwise the shim no-ops.
pub fn start_with_metrics(
config: EngineConfig,
client: ferriskey::Client,
metrics: Arc<ff_observability::Metrics>,
) -> Self {
Self::start_internal(config, client, metrics, None)
}
/// Start the engine with a shared observability registry and a
/// completion stream for push-based DAG promotion (issue #90).
///
/// The stream is typically produced by
/// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
/// The engine spawns a dispatch loop that drains the stream and
/// fires `ff_resolve_dependency` per completion, reducing DAG
/// latency from `interval × levels` to `~RTT × levels`. The
/// `dependency_reconciler` scanner remains as a safety net for
/// completions missed during subscriber reconnect windows.
pub fn start_with_completions(
config: EngineConfig,
client: ferriskey::Client,
metrics: Arc<ff_observability::Metrics>,
completions: CompletionStream,
) -> Self {
Self::start_internal(config, client, metrics, Some(completions))
}
fn start_internal(
config: EngineConfig,
client: ferriskey::Client,
metrics: Arc<ff_observability::Metrics>,
completions: Option<CompletionStream>,
) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let num_partitions = config.partition_config.num_flow_partitions;
let router = Arc::new(PartitionRouter::new(config.partition_config));
let mut handles = Vec::new();
let scanner_filter = config.scanner_filter.clone();
// Lease expiry scanner
let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter(
config.lease_expiry_interval,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
lease_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Delayed promoter
let delayed_scanner = Arc::new(DelayedPromoter::with_filter(
config.delayed_promoter_interval,
config.lanes.clone(),
scanner_filter.clone(),
));
handles.push(supervised_spawn(
delayed_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Index reconciler
let reconciler = Arc::new(IndexReconciler::with_filter(
config.index_reconciler_interval,
config.lanes.clone(),
scanner_filter.clone(),
));
handles.push(supervised_spawn(
reconciler,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Attempt timeout scanner
let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter(
config.attempt_timeout_interval,
config.lanes.clone(),
scanner_filter.clone(),
));
handles.push(supervised_spawn(
timeout_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Suspension timeout scanner
let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter(
config.suspension_timeout_interval,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
suspension_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Pending waitpoint expiry scanner
let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter(
config.pending_wp_expiry_interval,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
pending_wp_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Retention trimmer
let retention_scanner = Arc::new(RetentionTrimmer::with_filter(
config.retention_trimmer_interval,
config.lanes.clone(),
scanner_filter.clone(),
));
handles.push(supervised_spawn(
retention_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Budget reset scanner (iterates budget partitions).
// Filter is accepted but not applied (budget partitions don't
// carry the per-execution namespace / instance_tag shape).
let budget_reset = Arc::new(BudgetResetScanner::with_filter(
config.budget_reset_interval,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
budget_reset,
client.clone(),
config.partition_config.num_budget_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Budget reconciler (iterates budget partitions). Filter
// accepted but not applied — see BudgetReconciler::with_filter
// rustdoc.
let budget_reconciler = Arc::new(BudgetReconciler::with_filter(
config.budget_reconciler_interval,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
budget_reconciler,
client.clone(),
config.partition_config.num_budget_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Unblock scanner (iterates execution partitions, re-evaluates blocked)
let unblock_scanner = Arc::new(UnblockScanner::with_filter(
config.unblock_interval,
config.lanes.clone(),
config.partition_config,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
unblock_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Dependency reconciler (iterates execution partitions)
let dep_reconciler = Arc::new(DependencyReconciler::with_filter(
config.dependency_reconciler_interval,
config.lanes.clone(),
config.partition_config,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
dep_reconciler,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Quota reconciler (iterates quota partitions). Filter
// accepted but not applied — see QuotaReconciler::with_filter
// rustdoc.
let quota_reconciler = Arc::new(QuotaReconciler::with_filter(
config.quota_reconciler_interval,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
quota_reconciler,
client.clone(),
config.partition_config.num_quota_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Flow summary projector (iterates flow partitions). Filter
// accepted but not applied — see FlowProjector::with_filter
// rustdoc.
let flow_projector = Arc::new(FlowProjector::with_filter(
config.flow_projector_interval,
config.partition_config,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
flow_projector,
client.clone(),
config.partition_config.num_flow_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Cancel reconciler (iterates flow partitions). Drains
// cancel_backlog entries whose grace window has elapsed so a
// process crash mid-dispatch can't leave flow members un-cancelled.
let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter(
config.cancel_reconciler_interval,
config.partition_config,
scanner_filter.clone(),
));
handles.push(supervised_spawn(
cancel_reconciler,
client.clone(),
config.partition_config.num_flow_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// RFC-016 Stage C: sibling-cancel dispatcher. Iterates flow
// partitions, drains `pending_cancel_groups` SET via
// `ff_drain_sibling_cancel_group`, issues per-sibling
// `ff_cancel_execution` with sibling_quorum reasons.
let edge_cancel_dispatcher = Arc::new(
scanner::edge_cancel_dispatcher::EdgeCancelDispatcher::with_filter_and_metrics(
config.edge_cancel_dispatcher_interval,
config.partition_config,
scanner_filter.clone(),
metrics.clone(),
),
);
handles.push(supervised_spawn(
edge_cancel_dispatcher,
client.clone(),
config.partition_config.num_flow_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// RFC-016 Stage D: sibling-cancel reconciler. Crash-recovery
// safety net for Invariant Q6 — finalises tuples in
// `pending_cancel_groups` whose dispatcher drain was interrupted
// by an engine crash. Runs at a slower cadence than the
// dispatcher so it never fights the happy path.
let edge_cancel_reconciler = Arc::new(
scanner::edge_cancel_reconciler::EdgeCancelReconciler::with_filter_and_metrics(
config.edge_cancel_reconciler_interval,
scanner_filter.clone(),
metrics.clone(),
),
);
handles.push(supervised_spawn(
edge_cancel_reconciler,
client.clone(),
config.partition_config.num_flow_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Execution deadline scanner (iterates execution partitions)
let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter(
config.execution_deadline_interval,
config.lanes,
scanner_filter,
));
handles.push(supervised_spawn(
deadline_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
metrics.clone(),
));
// Completion dispatch loop (Batch C item 6 — push-based DAG
// promotion; backend-agnostic since issue #90). Optional: when
// a stream is provided, spawn a task that drains
// `CompletionPayload`s and fires dependency resolution per
// completion. See `completion_listener` module docs.
let listener_enabled = completions.is_some();
if let Some(stream) = completions {
handles.push(completion_listener::spawn_dispatch_loop(
router.clone(),
client,
stream,
shutdown_rx,
));
}
let scanner_count = if listener_enabled { "17 scanners + completion dispatch" } else { "17 scanners" };
tracing::info!(
num_partitions,
budget_partitions = config.partition_config.num_budget_partitions,
quota_partitions = config.partition_config.num_quota_partitions,
flow_partitions = config.partition_config.num_flow_partitions,
"engine started with {scanner_count}"
);
Self {
router,
shutdown_tx,
handles,
}
}
/// Signal all scanners to stop and wait for them to finish.
///
/// Waits up to 15 seconds for scanners to drain. If any scanner is
/// blocked on a hung Valkey command, the timeout prevents shutdown
/// from hanging indefinitely (Kubernetes SIGKILL safety).
pub async fn shutdown(self) {
let _ = self.shutdown_tx.send(true);
let join_all = async {
for handle in self.handles {
let _ = handle.await;
}
};
match tokio::time::timeout(Duration::from_secs(15), join_all).await {
Ok(()) => tracing::info!("engine shutdown complete"),
Err(_) => tracing::warn!(
"engine shutdown timed out after 15s, abandoning remaining scanners"
),
}
}
}