ff-engine 0.3.1

FlowFabric cross-partition dispatch and background scanners
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
//! ff-engine: cross-partition dispatch and background scanners.

pub mod budget;
pub mod completion_listener;
pub mod partition_router;
pub mod scanner;
pub mod supervisor;

use std::sync::Arc;
use std::time::Duration;

use ff_core::backend::ScannerFilter;
use ff_core::completion_backend::CompletionStream;
use ff_core::partition::PartitionConfig;
use ff_core::types::LaneId;
use tokio::sync::watch;
use tokio::task::JoinHandle;

use partition_router::PartitionRouter;
use supervisor::supervised_spawn;
use scanner::attempt_timeout::AttemptTimeoutScanner;
use scanner::execution_deadline::ExecutionDeadlineScanner;
use scanner::budget_reconciler::BudgetReconciler;
use scanner::budget_reset::BudgetResetScanner;
use scanner::delayed_promoter::DelayedPromoter;
use scanner::dependency_reconciler::DependencyReconciler;
use scanner::index_reconciler::IndexReconciler;
use scanner::lease_expiry::LeaseExpiryScanner;
use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
use scanner::quota_reconciler::QuotaReconciler;
use scanner::retention_trimmer::RetentionTrimmer;
use scanner::suspension_timeout::SuspensionTimeoutScanner;
use scanner::flow_projector::FlowProjector;
use scanner::unblock::UnblockScanner;

/// Engine configuration.
pub struct EngineConfig {
    pub partition_config: PartitionConfig,
    /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
    pub lanes: Vec<LaneId>,
    /// Lease expiry scan interval. Default: 1.5s.
    pub lease_expiry_interval: Duration,
    /// Delayed promoter scan interval. Default: 750ms.
    pub delayed_promoter_interval: Duration,
    /// Index reconciler scan interval. Default: 45s.
    pub index_reconciler_interval: Duration,
    /// Attempt timeout scan interval. Default: 2s.
    pub attempt_timeout_interval: Duration,
    /// Suspension timeout scan interval. Default: 2s.
    pub suspension_timeout_interval: Duration,
    /// Pending waitpoint expiry scan interval. Default: 5s.
    pub pending_wp_expiry_interval: Duration,
    /// Retention trimmer scan interval. Default: 60s.
    pub retention_trimmer_interval: Duration,
    /// Budget reset scan interval. Default: 15s.
    pub budget_reset_interval: Duration,
    /// Budget reconciler scan interval. Default: 30s.
    pub budget_reconciler_interval: Duration,
    /// Quota reconciler scan interval. Default: 30s.
    pub quota_reconciler_interval: Duration,
    /// Unblock scanner interval. Default: 5s.
    pub unblock_interval: Duration,
    /// Dependency reconciler interval. Default: 15s.
    ///
    /// Post-Batch-C this scanner is a **safety net**, not the primary
    /// promotion path. When a [`CompletionStream`] is handed to
    /// `start_with_completions`, push-based dispatch drives DAG
    /// promotion synchronously with each completion — under normal
    /// operation DAG latency is `~RTT × levels`, not `interval × levels`.
    ///
    /// The reconciler still runs as a catch-all for:
    ///   - messages missed during subscriber restart or reconnect;
    ///   - pre-Batch-C executions without `core.flow_id` stamped;
    ///   - operator-driven edge mutation that doesn't pass through
    ///     the terminal-transition publish path.
    ///
    /// 15s idle-scan cost is minimal. If the push dispatch loop is
    /// disabled (engine started via `start`/`start_with_metrics`
    /// without a stream), drop this to 1s to preserve pre-Batch-C
    /// DAG latency behavior.
    pub dependency_reconciler_interval: Duration,
    /// Flow summary projector interval. Default: 15s.
    ///
    /// Separate observability projection path — maintains the flow
    /// summary view, NOT on the DAG-completion latency path. Kept at
    /// 15s in this config; a change to that cadence is unrelated to
    /// dependency resolution.
    pub flow_projector_interval: Duration,
    /// Execution deadline scanner interval. Default: 5s.
    pub execution_deadline_interval: Duration,

    /// Cancel reconciler scanner interval. Default: 15s.
    ///
    /// Drains `ff_cancel_flow`'s per-partition `cancel_backlog` ZSET of
    /// flows owing async member cancels. Each cancelled flow gets a
    /// grace window (30s by default, set by ff-server) before the
    /// reconciler picks it up, so the live in-process dispatch isn't
    /// fought on the happy path.
    pub cancel_reconciler_interval: Duration,

    /// Per-consumer scanner filter (issue #122).
    ///
    /// Applied by every execution-shaped scanner (lease_expiry,
    /// attempt_timeout, execution_deadline, suspension_timeout,
    /// pending_wp_expiry, delayed_promoter, dependency_reconciler,
    /// cancel_reconciler, unblock, index_reconciler,
    /// retention_trimmer) to restrict the candidate set to
    /// executions owned by this consumer. The four non-execution
    /// scanners (budget_reconciler, budget_reset, quota_reconciler,
    /// flow_projector) accept the filter for API uniformity but do
    /// not apply it — their domains are not per-execution.
    ///
    /// Default: [`ScannerFilter::default`] — no filtering,
    /// pre-#122 behaviour. Multi-tenant deployments that share a
    /// single Valkey keyspace across two FlowFabric instances set
    /// this (paired with
    /// [`CompletionBackend::subscribe_completions_filtered`]) for
    /// mutual isolation.
    ///
    /// [`CompletionBackend::subscribe_completions_filtered`]: ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered
    pub scanner_filter: ScannerFilter,
}

impl Default for EngineConfig {
    fn default() -> Self {
        Self {
            partition_config: PartitionConfig::default(),
            lanes: vec![LaneId::new("default")],
            lease_expiry_interval: Duration::from_millis(1500),
            delayed_promoter_interval: Duration::from_millis(750),
            index_reconciler_interval: Duration::from_secs(45),
            attempt_timeout_interval: Duration::from_secs(2),
            suspension_timeout_interval: Duration::from_secs(2),
            pending_wp_expiry_interval: Duration::from_secs(5),
            retention_trimmer_interval: Duration::from_secs(60),
            budget_reset_interval: Duration::from_secs(15),
            budget_reconciler_interval: Duration::from_secs(30),
            quota_reconciler_interval: Duration::from_secs(30),
            unblock_interval: Duration::from_secs(5),
            dependency_reconciler_interval: Duration::from_secs(15),
            flow_projector_interval: Duration::from_secs(15),
            execution_deadline_interval: Duration::from_secs(5),
            cancel_reconciler_interval: Duration::from_secs(15),
            scanner_filter: ScannerFilter::default(),
        }
    }
}

/// The FlowFabric engine: partition routing + background scanners.
pub struct Engine {
    pub router: Arc<PartitionRouter>,
    shutdown_tx: watch::Sender<bool>,
    handles: Vec<JoinHandle<()>>,
}

impl Engine {
    /// Start the engine with the given config and Valkey client.
    ///
    /// Spawns background scanner tasks. Returns immediately.
    pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
        // Construct a fresh metrics handle here so direct callers
        // (examples, tests) don't need to. Under the default build
        // (`observability` feature off) this is the no-op shim. With
        // the feature on the handle is a real OTEL registry but — by
        // design — one that nothing else shares; it's only useful for
        // tests that want to exercise scanner cycle recording in
        // isolation. Production code uses
        // [`Self::start_with_metrics`] to plumb the same handle
        // through the HTTP /metrics route.
        Self::start_with_metrics(config, client, Arc::new(ff_observability::Metrics::new()))
    }

    /// PR-94: start the engine with a shared observability registry.
    ///
    /// Used by `ff-server` so scanner cycle metrics funnel into the
    /// same Prometheus registry exposed at `/metrics`. Under the
    /// `observability` feature (flipped via the same feature on
    /// `ff-server` / `ff-engine`), the handle records into an OTEL
    /// `MeterProvider`; otherwise the shim no-ops.
    pub fn start_with_metrics(
        config: EngineConfig,
        client: ferriskey::Client,
        metrics: Arc<ff_observability::Metrics>,
    ) -> Self {
        Self::start_internal(config, client, metrics, None)
    }

    /// Start the engine with a shared observability registry and a
    /// completion stream for push-based DAG promotion (issue #90).
    ///
    /// The stream is typically produced by
    /// [`ff_core::completion_backend::CompletionBackend::subscribe_completions`].
    /// The engine spawns a dispatch loop that drains the stream and
    /// fires `ff_resolve_dependency` per completion, reducing DAG
    /// latency from `interval × levels` to `~RTT × levels`. The
    /// `dependency_reconciler` scanner remains as a safety net for
    /// completions missed during subscriber reconnect windows.
    pub fn start_with_completions(
        config: EngineConfig,
        client: ferriskey::Client,
        metrics: Arc<ff_observability::Metrics>,
        completions: CompletionStream,
    ) -> Self {
        Self::start_internal(config, client, metrics, Some(completions))
    }

    fn start_internal(
        config: EngineConfig,
        client: ferriskey::Client,
        metrics: Arc<ff_observability::Metrics>,
        completions: Option<CompletionStream>,
    ) -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        let num_partitions = config.partition_config.num_flow_partitions;
        let router = Arc::new(PartitionRouter::new(config.partition_config));

        let mut handles = Vec::new();

        let scanner_filter = config.scanner_filter.clone();

        // Lease expiry scanner
        let lease_scanner = Arc::new(LeaseExpiryScanner::with_filter(
            config.lease_expiry_interval,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            lease_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Delayed promoter
        let delayed_scanner = Arc::new(DelayedPromoter::with_filter(
            config.delayed_promoter_interval,
            config.lanes.clone(),
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            delayed_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Index reconciler
        let reconciler = Arc::new(IndexReconciler::with_filter(
            config.index_reconciler_interval,
            config.lanes.clone(),
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            reconciler,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Attempt timeout scanner
        let timeout_scanner = Arc::new(AttemptTimeoutScanner::with_filter(
            config.attempt_timeout_interval,
            config.lanes.clone(),
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            timeout_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Suspension timeout scanner
        let suspension_scanner = Arc::new(SuspensionTimeoutScanner::with_filter(
            config.suspension_timeout_interval,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            suspension_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Pending waitpoint expiry scanner
        let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::with_filter(
            config.pending_wp_expiry_interval,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            pending_wp_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Retention trimmer
        let retention_scanner = Arc::new(RetentionTrimmer::with_filter(
            config.retention_trimmer_interval,
            config.lanes.clone(),
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            retention_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Budget reset scanner (iterates budget partitions).
        // Filter is accepted but not applied (budget partitions don't
        // carry the per-execution namespace / instance_tag shape).
        let budget_reset = Arc::new(BudgetResetScanner::with_filter(
            config.budget_reset_interval,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            budget_reset,
            client.clone(),
            config.partition_config.num_budget_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Budget reconciler (iterates budget partitions). Filter
        // accepted but not applied — see BudgetReconciler::with_filter
        // rustdoc.
        let budget_reconciler = Arc::new(BudgetReconciler::with_filter(
            config.budget_reconciler_interval,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            budget_reconciler,
            client.clone(),
            config.partition_config.num_budget_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Unblock scanner (iterates execution partitions, re-evaluates blocked)
        let unblock_scanner = Arc::new(UnblockScanner::with_filter(
            config.unblock_interval,
            config.lanes.clone(),
            config.partition_config,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            unblock_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Dependency reconciler (iterates execution partitions)
        let dep_reconciler = Arc::new(DependencyReconciler::with_filter(
            config.dependency_reconciler_interval,
            config.lanes.clone(),
            config.partition_config,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            dep_reconciler,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Quota reconciler (iterates quota partitions). Filter
        // accepted but not applied — see QuotaReconciler::with_filter
        // rustdoc.
        let quota_reconciler = Arc::new(QuotaReconciler::with_filter(
            config.quota_reconciler_interval,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            quota_reconciler,
            client.clone(),
            config.partition_config.num_quota_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Flow summary projector (iterates flow partitions). Filter
        // accepted but not applied — see FlowProjector::with_filter
        // rustdoc.
        let flow_projector = Arc::new(FlowProjector::with_filter(
            config.flow_projector_interval,
            config.partition_config,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            flow_projector,
            client.clone(),
            config.partition_config.num_flow_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Cancel reconciler (iterates flow partitions). Drains
        // cancel_backlog entries whose grace window has elapsed so a
        // process crash mid-dispatch can't leave flow members un-cancelled.
        let cancel_reconciler = Arc::new(scanner::cancel_reconciler::CancelReconciler::with_filter(
            config.cancel_reconciler_interval,
            config.partition_config,
            scanner_filter.clone(),
        ));
        handles.push(supervised_spawn(
            cancel_reconciler,
            client.clone(),
            config.partition_config.num_flow_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Execution deadline scanner (iterates execution partitions)
        let deadline_scanner = Arc::new(ExecutionDeadlineScanner::with_filter(
            config.execution_deadline_interval,
            config.lanes,
            scanner_filter,
        ));
        handles.push(supervised_spawn(
            deadline_scanner,
            client.clone(),
            num_partitions,
            shutdown_rx.clone(),
            metrics.clone(),
        ));

        // Completion dispatch loop (Batch C item 6 — push-based DAG
        // promotion; backend-agnostic since issue #90). Optional: when
        // a stream is provided, spawn a task that drains
        // `CompletionPayload`s and fires dependency resolution per
        // completion. See `completion_listener` module docs.
        let listener_enabled = completions.is_some();
        if let Some(stream) = completions {
            handles.push(completion_listener::spawn_dispatch_loop(
                router.clone(),
                client,
                stream,
                shutdown_rx,
            ));
        }

        let scanner_count = if listener_enabled { "15 scanners + completion dispatch" } else { "15 scanners" };
        tracing::info!(
            num_partitions,
            budget_partitions = config.partition_config.num_budget_partitions,
            quota_partitions = config.partition_config.num_quota_partitions,
            flow_partitions = config.partition_config.num_flow_partitions,
            "engine started with {scanner_count}"
        );

        Self {
            router,
            shutdown_tx,
            handles,
        }
    }

    /// Signal all scanners to stop and wait for them to finish.
    ///
    /// Waits up to 15 seconds for scanners to drain. If any scanner is
    /// blocked on a hung Valkey command, the timeout prevents shutdown
    /// from hanging indefinitely (Kubernetes SIGKILL safety).
    pub async fn shutdown(self) {
        let _ = self.shutdown_tx.send(true);
        let join_all = async {
            for handle in self.handles {
                let _ = handle.await;
            }
        };
        match tokio::time::timeout(Duration::from_secs(15), join_all).await {
            Ok(()) => tracing::info!("engine shutdown complete"),
            Err(_) => tracing::warn!(
                "engine shutdown timed out after 15s, abandoning remaining scanners"
            ),
        }
    }
}