Skip to main content

ff_engine/
lib.rs

1//! ff-engine: cross-partition dispatch and background scanners.
2
3pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::partition::PartitionConfig;
13use ff_core::types::LaneId;
14use tokio::sync::watch;
15use tokio::task::JoinHandle;
16
17use partition_router::PartitionRouter;
18use supervisor::supervised_spawn;
19use scanner::attempt_timeout::AttemptTimeoutScanner;
20use scanner::execution_deadline::ExecutionDeadlineScanner;
21use scanner::budget_reconciler::BudgetReconciler;
22use scanner::budget_reset::BudgetResetScanner;
23use scanner::delayed_promoter::DelayedPromoter;
24use scanner::dependency_reconciler::DependencyReconciler;
25use scanner::index_reconciler::IndexReconciler;
26use scanner::lease_expiry::LeaseExpiryScanner;
27use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
28use scanner::quota_reconciler::QuotaReconciler;
29use scanner::retention_trimmer::RetentionTrimmer;
30use scanner::suspension_timeout::SuspensionTimeoutScanner;
31use scanner::flow_projector::FlowProjector;
32use scanner::unblock::UnblockScanner;
33
34/// Connection parameters for the completion listener's dedicated RESP3
35/// client. Must match the deployment the dispatcher client connects to.
36#[derive(Clone, Debug)]
37pub struct CompletionListenerConfig {
38    /// Seed addresses — `(host, port)` pairs. For standalone, one entry.
39    /// For cluster, pass all configured seed nodes.
40    pub addresses: Vec<(String, u16)>,
41    /// Enable TLS (matches `ClientBuilder::tls_insecure` — the listener
42    /// uses insecure TLS because it only handles completion metadata,
43    /// never user payloads; if strict TLS is required, plumb a
44    /// custom flag).
45    pub tls: bool,
46    /// Enable cluster mode.
47    pub cluster: bool,
48}
49
50/// Engine configuration.
51pub struct EngineConfig {
52    pub partition_config: PartitionConfig,
53    /// Lanes to scan for delayed/index operations. Phase 1: `["default"]`.
54    pub lanes: Vec<LaneId>,
55    /// Lease expiry scan interval. Default: 1.5s.
56    pub lease_expiry_interval: Duration,
57    /// Delayed promoter scan interval. Default: 750ms.
58    pub delayed_promoter_interval: Duration,
59    /// Index reconciler scan interval. Default: 45s.
60    pub index_reconciler_interval: Duration,
61    /// Attempt timeout scan interval. Default: 2s.
62    pub attempt_timeout_interval: Duration,
63    /// Suspension timeout scan interval. Default: 2s.
64    pub suspension_timeout_interval: Duration,
65    /// Pending waitpoint expiry scan interval. Default: 5s.
66    pub pending_wp_expiry_interval: Duration,
67    /// Retention trimmer scan interval. Default: 60s.
68    pub retention_trimmer_interval: Duration,
69    /// Budget reset scan interval. Default: 15s.
70    pub budget_reset_interval: Duration,
71    /// Budget reconciler scan interval. Default: 30s.
72    pub budget_reconciler_interval: Duration,
73    /// Quota reconciler scan interval. Default: 30s.
74    pub quota_reconciler_interval: Duration,
75    /// Unblock scanner interval. Default: 5s.
76    pub unblock_interval: Duration,
77    /// Dependency reconciler interval. Default: 15s.
78    ///
79    /// Post-Batch-C this scanner is a **safety net**, not the primary
80    /// promotion path. The [`completion_listener`] SUBSCRIBEs to
81    /// `ff:dag:completions` and dispatches dependency resolution
82    /// synchronously with each completion — under normal operation,
83    /// DAG latency is `~RTT × levels`, not `interval × levels`.
84    ///
85    /// The reconciler still runs as a catch-all for:
86    ///   - messages missed during listener restart or reconnect;
87    ///   - pre-Batch-C executions without `core.flow_id` stamped;
88    ///   - operator-driven edge mutation that doesn't pass through
89    ///     the terminal-transition publish path.
90    ///
91    /// 15s idle-scan cost is minimal. If the listener is disabled
92    /// (`completion_listener` = None), drop this to 1s to preserve
93    /// pre-Batch-C DAG latency behavior.
94    ///
95    /// [`completion_listener`]: Self::completion_listener
96    pub dependency_reconciler_interval: Duration,
97
98    /// Optional push-based DAG promotion listener (Batch C item 6).
99    /// When `Some`, the engine spawns a [`completion_listener`] task
100    /// that SUBSCRIBEs to `ff:dag:completions` on a dedicated RESP3
101    /// client and dispatches dependency resolution per completion.
102    ///
103    /// `None` disables the listener entirely — the reconciler alone
104    /// promotes. Useful for lightweight single-node deployments or
105    /// test harnesses that don't care about DAG latency.
106    ///
107    /// [`completion_listener`]: crate::completion_listener
108    pub completion_listener: Option<CompletionListenerConfig>,
109    /// Flow summary projector interval. Default: 15s.
110    ///
111    /// Separate observability projection path — maintains the flow
112    /// summary view, NOT on the DAG-completion latency path. Kept at
113    /// 15s in this config; a change to that cadence is unrelated to
114    /// dependency resolution.
115    pub flow_projector_interval: Duration,
116    /// Execution deadline scanner interval. Default: 5s.
117    pub execution_deadline_interval: Duration,
118}
119
120impl Default for EngineConfig {
121    fn default() -> Self {
122        Self {
123            partition_config: PartitionConfig::default(),
124            lanes: vec![LaneId::new("default")],
125            lease_expiry_interval: Duration::from_millis(1500),
126            delayed_promoter_interval: Duration::from_millis(750),
127            index_reconciler_interval: Duration::from_secs(45),
128            attempt_timeout_interval: Duration::from_secs(2),
129            suspension_timeout_interval: Duration::from_secs(2),
130            pending_wp_expiry_interval: Duration::from_secs(5),
131            retention_trimmer_interval: Duration::from_secs(60),
132            budget_reset_interval: Duration::from_secs(15),
133            budget_reconciler_interval: Duration::from_secs(30),
134            quota_reconciler_interval: Duration::from_secs(30),
135            unblock_interval: Duration::from_secs(5),
136            dependency_reconciler_interval: Duration::from_secs(15),
137            completion_listener: None,
138            flow_projector_interval: Duration::from_secs(15),
139            execution_deadline_interval: Duration::from_secs(5),
140        }
141    }
142}
143
144/// The FlowFabric engine: partition routing + background scanners.
145pub struct Engine {
146    pub router: Arc<PartitionRouter>,
147    shutdown_tx: watch::Sender<bool>,
148    handles: Vec<JoinHandle<()>>,
149}
150
151impl Engine {
152    /// Start the engine with the given config and Valkey client.
153    ///
154    /// Spawns background scanner tasks. Returns immediately.
155    pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
156        let (shutdown_tx, shutdown_rx) = watch::channel(false);
157        let num_partitions = config.partition_config.num_flow_partitions;
158        let router = Arc::new(PartitionRouter::new(config.partition_config));
159
160        let mut handles = Vec::new();
161
162        // Lease expiry scanner
163        let lease_scanner = Arc::new(LeaseExpiryScanner::new(config.lease_expiry_interval));
164        handles.push(supervised_spawn(
165            lease_scanner,
166            client.clone(),
167            num_partitions,
168            shutdown_rx.clone(),
169        ));
170
171        // Delayed promoter
172        let delayed_scanner = Arc::new(DelayedPromoter::new(
173            config.delayed_promoter_interval,
174            config.lanes.clone(),
175        ));
176        handles.push(supervised_spawn(
177            delayed_scanner,
178            client.clone(),
179            num_partitions,
180            shutdown_rx.clone(),
181        ));
182
183        // Index reconciler
184        let reconciler = Arc::new(IndexReconciler::new(
185            config.index_reconciler_interval,
186            config.lanes.clone(),
187        ));
188        handles.push(supervised_spawn(
189            reconciler,
190            client.clone(),
191            num_partitions,
192            shutdown_rx.clone(),
193        ));
194
195        // Attempt timeout scanner
196        let timeout_scanner = Arc::new(AttemptTimeoutScanner::new(
197            config.attempt_timeout_interval,
198            config.lanes.clone(),
199        ));
200        handles.push(supervised_spawn(
201            timeout_scanner,
202            client.clone(),
203            num_partitions,
204            shutdown_rx.clone(),
205        ));
206
207        // Suspension timeout scanner
208        let suspension_scanner = Arc::new(SuspensionTimeoutScanner::new(
209            config.suspension_timeout_interval,
210        ));
211        handles.push(supervised_spawn(
212            suspension_scanner,
213            client.clone(),
214            num_partitions,
215            shutdown_rx.clone(),
216        ));
217
218        // Pending waitpoint expiry scanner
219        let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::new(
220            config.pending_wp_expiry_interval,
221        ));
222        handles.push(supervised_spawn(
223            pending_wp_scanner,
224            client.clone(),
225            num_partitions,
226            shutdown_rx.clone(),
227        ));
228
229        // Retention trimmer
230        let retention_scanner = Arc::new(RetentionTrimmer::new(
231            config.retention_trimmer_interval,
232            config.lanes.clone(),
233        ));
234        handles.push(supervised_spawn(
235            retention_scanner,
236            client.clone(),
237            num_partitions,
238            shutdown_rx.clone(),
239        ));
240
241        // Budget reset scanner (iterates budget partitions)
242        let budget_reset = Arc::new(BudgetResetScanner::new(
243            config.budget_reset_interval,
244        ));
245        handles.push(supervised_spawn(
246            budget_reset,
247            client.clone(),
248            config.partition_config.num_budget_partitions,
249            shutdown_rx.clone(),
250        ));
251
252        // Budget reconciler (iterates budget partitions)
253        let budget_reconciler = Arc::new(BudgetReconciler::new(
254            config.budget_reconciler_interval,
255        ));
256        handles.push(supervised_spawn(
257            budget_reconciler,
258            client.clone(),
259            config.partition_config.num_budget_partitions,
260            shutdown_rx.clone(),
261        ));
262
263        // Unblock scanner (iterates execution partitions, re-evaluates blocked)
264        let unblock_scanner = Arc::new(UnblockScanner::new(
265            config.unblock_interval,
266            config.lanes.clone(),
267            config.partition_config,
268        ));
269        handles.push(supervised_spawn(
270            unblock_scanner,
271            client.clone(),
272            num_partitions,
273            shutdown_rx.clone(),
274        ));
275
276        // Dependency reconciler (iterates execution partitions)
277        let dep_reconciler = Arc::new(DependencyReconciler::new(
278            config.dependency_reconciler_interval,
279            config.lanes.clone(),
280            config.partition_config,
281        ));
282        handles.push(supervised_spawn(
283            dep_reconciler,
284            client.clone(),
285            num_partitions,
286            shutdown_rx.clone(),
287        ));
288
289        // Quota reconciler (iterates quota partitions)
290        let quota_reconciler = Arc::new(QuotaReconciler::new(
291            config.quota_reconciler_interval,
292        ));
293        handles.push(supervised_spawn(
294            quota_reconciler,
295            client.clone(),
296            config.partition_config.num_quota_partitions,
297            shutdown_rx.clone(),
298        ));
299
300        // Flow summary projector (iterates flow partitions)
301        let flow_projector = Arc::new(FlowProjector::new(
302            config.flow_projector_interval,
303            config.partition_config,
304        ));
305        handles.push(supervised_spawn(
306            flow_projector,
307            client.clone(),
308            config.partition_config.num_flow_partitions,
309            shutdown_rx.clone(),
310        ));
311
312        // Execution deadline scanner (iterates execution partitions)
313        let deadline_scanner = Arc::new(ExecutionDeadlineScanner::new(
314            config.execution_deadline_interval,
315            config.lanes,
316        ));
317        handles.push(supervised_spawn(
318            deadline_scanner,
319            client.clone(),
320            num_partitions,
321            shutdown_rx.clone(),
322        ));
323
324        // Completion listener (Batch C item 6 — push-based DAG promotion).
325        // Optional: when Some, spawns a dedicated RESP3 client that
326        // SUBSCRIBEs to ff:dag:completions and dispatches dependency
327        // resolution per completion. See `completion_listener` module
328        // docs for the design rationale.
329        let listener_enabled = config.completion_listener.is_some();
330        if let Some(listener_cfg) = config.completion_listener {
331            handles.push(completion_listener::spawn_completion_listener(
332                router.clone(),
333                client,
334                listener_cfg.addresses,
335                listener_cfg.tls,
336                listener_cfg.cluster,
337                shutdown_rx,
338            ));
339        }
340
341        let scanner_count = if listener_enabled { "14 scanners + completion listener" } else { "14 scanners" };
342        tracing::info!(
343            num_partitions,
344            budget_partitions = config.partition_config.num_budget_partitions,
345            quota_partitions = config.partition_config.num_quota_partitions,
346            flow_partitions = config.partition_config.num_flow_partitions,
347            "engine started with {scanner_count}"
348        );
349
350        Self {
351            router,
352            shutdown_tx,
353            handles,
354        }
355    }
356
357    /// Signal all scanners to stop and wait for them to finish.
358    ///
359    /// Waits up to 15 seconds for scanners to drain. If any scanner is
360    /// blocked on a hung Valkey command, the timeout prevents shutdown
361    /// from hanging indefinitely (Kubernetes SIGKILL safety).
362    pub async fn shutdown(self) {
363        let _ = self.shutdown_tx.send(true);
364        let join_all = async {
365            for handle in self.handles {
366                let _ = handle.await;
367            }
368        };
369        match tokio::time::timeout(Duration::from_secs(15), join_all).await {
370            Ok(()) => tracing::info!("engine shutdown complete"),
371            Err(_) => tracing::warn!(
372                "engine shutdown timed out after 15s, abandoning remaining scanners"
373            ),
374        }
375    }
376}