1pub mod budget;
4pub mod completion_listener;
5pub mod partition_router;
6pub mod scanner;
7pub mod supervisor;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use ff_core::partition::PartitionConfig;
13use ff_core::types::LaneId;
14use tokio::sync::watch;
15use tokio::task::JoinHandle;
16
17use partition_router::PartitionRouter;
18use supervisor::supervised_spawn;
19use scanner::attempt_timeout::AttemptTimeoutScanner;
20use scanner::execution_deadline::ExecutionDeadlineScanner;
21use scanner::budget_reconciler::BudgetReconciler;
22use scanner::budget_reset::BudgetResetScanner;
23use scanner::delayed_promoter::DelayedPromoter;
24use scanner::dependency_reconciler::DependencyReconciler;
25use scanner::index_reconciler::IndexReconciler;
26use scanner::lease_expiry::LeaseExpiryScanner;
27use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
28use scanner::quota_reconciler::QuotaReconciler;
29use scanner::retention_trimmer::RetentionTrimmer;
30use scanner::suspension_timeout::SuspensionTimeoutScanner;
31use scanner::flow_projector::FlowProjector;
32use scanner::unblock::UnblockScanner;
33
34#[derive(Clone, Debug)]
37pub struct CompletionListenerConfig {
38 pub addresses: Vec<(String, u16)>,
41 pub tls: bool,
46 pub cluster: bool,
48}
49
50pub struct EngineConfig {
52 pub partition_config: PartitionConfig,
53 pub lanes: Vec<LaneId>,
55 pub lease_expiry_interval: Duration,
57 pub delayed_promoter_interval: Duration,
59 pub index_reconciler_interval: Duration,
61 pub attempt_timeout_interval: Duration,
63 pub suspension_timeout_interval: Duration,
65 pub pending_wp_expiry_interval: Duration,
67 pub retention_trimmer_interval: Duration,
69 pub budget_reset_interval: Duration,
71 pub budget_reconciler_interval: Duration,
73 pub quota_reconciler_interval: Duration,
75 pub unblock_interval: Duration,
77 pub dependency_reconciler_interval: Duration,
97
98 pub completion_listener: Option<CompletionListenerConfig>,
109 pub flow_projector_interval: Duration,
116 pub execution_deadline_interval: Duration,
118}
119
120impl Default for EngineConfig {
121 fn default() -> Self {
122 Self {
123 partition_config: PartitionConfig::default(),
124 lanes: vec![LaneId::new("default")],
125 lease_expiry_interval: Duration::from_millis(1500),
126 delayed_promoter_interval: Duration::from_millis(750),
127 index_reconciler_interval: Duration::from_secs(45),
128 attempt_timeout_interval: Duration::from_secs(2),
129 suspension_timeout_interval: Duration::from_secs(2),
130 pending_wp_expiry_interval: Duration::from_secs(5),
131 retention_trimmer_interval: Duration::from_secs(60),
132 budget_reset_interval: Duration::from_secs(15),
133 budget_reconciler_interval: Duration::from_secs(30),
134 quota_reconciler_interval: Duration::from_secs(30),
135 unblock_interval: Duration::from_secs(5),
136 dependency_reconciler_interval: Duration::from_secs(15),
137 completion_listener: None,
138 flow_projector_interval: Duration::from_secs(15),
139 execution_deadline_interval: Duration::from_secs(5),
140 }
141 }
142}
143
144pub struct Engine {
146 pub router: Arc<PartitionRouter>,
147 shutdown_tx: watch::Sender<bool>,
148 handles: Vec<JoinHandle<()>>,
149}
150
151impl Engine {
152 pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
156 let (shutdown_tx, shutdown_rx) = watch::channel(false);
157 let num_partitions = config.partition_config.num_flow_partitions;
158 let router = Arc::new(PartitionRouter::new(config.partition_config));
159
160 let mut handles = Vec::new();
161
162 let lease_scanner = Arc::new(LeaseExpiryScanner::new(config.lease_expiry_interval));
164 handles.push(supervised_spawn(
165 lease_scanner,
166 client.clone(),
167 num_partitions,
168 shutdown_rx.clone(),
169 ));
170
171 let delayed_scanner = Arc::new(DelayedPromoter::new(
173 config.delayed_promoter_interval,
174 config.lanes.clone(),
175 ));
176 handles.push(supervised_spawn(
177 delayed_scanner,
178 client.clone(),
179 num_partitions,
180 shutdown_rx.clone(),
181 ));
182
183 let reconciler = Arc::new(IndexReconciler::new(
185 config.index_reconciler_interval,
186 config.lanes.clone(),
187 ));
188 handles.push(supervised_spawn(
189 reconciler,
190 client.clone(),
191 num_partitions,
192 shutdown_rx.clone(),
193 ));
194
195 let timeout_scanner = Arc::new(AttemptTimeoutScanner::new(
197 config.attempt_timeout_interval,
198 config.lanes.clone(),
199 ));
200 handles.push(supervised_spawn(
201 timeout_scanner,
202 client.clone(),
203 num_partitions,
204 shutdown_rx.clone(),
205 ));
206
207 let suspension_scanner = Arc::new(SuspensionTimeoutScanner::new(
209 config.suspension_timeout_interval,
210 ));
211 handles.push(supervised_spawn(
212 suspension_scanner,
213 client.clone(),
214 num_partitions,
215 shutdown_rx.clone(),
216 ));
217
218 let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::new(
220 config.pending_wp_expiry_interval,
221 ));
222 handles.push(supervised_spawn(
223 pending_wp_scanner,
224 client.clone(),
225 num_partitions,
226 shutdown_rx.clone(),
227 ));
228
229 let retention_scanner = Arc::new(RetentionTrimmer::new(
231 config.retention_trimmer_interval,
232 config.lanes.clone(),
233 ));
234 handles.push(supervised_spawn(
235 retention_scanner,
236 client.clone(),
237 num_partitions,
238 shutdown_rx.clone(),
239 ));
240
241 let budget_reset = Arc::new(BudgetResetScanner::new(
243 config.budget_reset_interval,
244 ));
245 handles.push(supervised_spawn(
246 budget_reset,
247 client.clone(),
248 config.partition_config.num_budget_partitions,
249 shutdown_rx.clone(),
250 ));
251
252 let budget_reconciler = Arc::new(BudgetReconciler::new(
254 config.budget_reconciler_interval,
255 ));
256 handles.push(supervised_spawn(
257 budget_reconciler,
258 client.clone(),
259 config.partition_config.num_budget_partitions,
260 shutdown_rx.clone(),
261 ));
262
263 let unblock_scanner = Arc::new(UnblockScanner::new(
265 config.unblock_interval,
266 config.lanes.clone(),
267 config.partition_config,
268 ));
269 handles.push(supervised_spawn(
270 unblock_scanner,
271 client.clone(),
272 num_partitions,
273 shutdown_rx.clone(),
274 ));
275
276 let dep_reconciler = Arc::new(DependencyReconciler::new(
278 config.dependency_reconciler_interval,
279 config.lanes.clone(),
280 config.partition_config,
281 ));
282 handles.push(supervised_spawn(
283 dep_reconciler,
284 client.clone(),
285 num_partitions,
286 shutdown_rx.clone(),
287 ));
288
289 let quota_reconciler = Arc::new(QuotaReconciler::new(
291 config.quota_reconciler_interval,
292 ));
293 handles.push(supervised_spawn(
294 quota_reconciler,
295 client.clone(),
296 config.partition_config.num_quota_partitions,
297 shutdown_rx.clone(),
298 ));
299
300 let flow_projector = Arc::new(FlowProjector::new(
302 config.flow_projector_interval,
303 config.partition_config,
304 ));
305 handles.push(supervised_spawn(
306 flow_projector,
307 client.clone(),
308 config.partition_config.num_flow_partitions,
309 shutdown_rx.clone(),
310 ));
311
312 let deadline_scanner = Arc::new(ExecutionDeadlineScanner::new(
314 config.execution_deadline_interval,
315 config.lanes,
316 ));
317 handles.push(supervised_spawn(
318 deadline_scanner,
319 client.clone(),
320 num_partitions,
321 shutdown_rx.clone(),
322 ));
323
324 let listener_enabled = config.completion_listener.is_some();
330 if let Some(listener_cfg) = config.completion_listener {
331 handles.push(completion_listener::spawn_completion_listener(
332 router.clone(),
333 client,
334 listener_cfg.addresses,
335 listener_cfg.tls,
336 listener_cfg.cluster,
337 shutdown_rx,
338 ));
339 }
340
341 let scanner_count = if listener_enabled { "14 scanners + completion listener" } else { "14 scanners" };
342 tracing::info!(
343 num_partitions,
344 budget_partitions = config.partition_config.num_budget_partitions,
345 quota_partitions = config.partition_config.num_quota_partitions,
346 flow_partitions = config.partition_config.num_flow_partitions,
347 "engine started with {scanner_count}"
348 );
349
350 Self {
351 router,
352 shutdown_tx,
353 handles,
354 }
355 }
356
357 pub async fn shutdown(self) {
363 let _ = self.shutdown_tx.send(true);
364 let join_all = async {
365 for handle in self.handles {
366 let _ = handle.await;
367 }
368 };
369 match tokio::time::timeout(Duration::from_secs(15), join_all).await {
370 Ok(()) => tracing::info!("engine shutdown complete"),
371 Err(_) => tracing::warn!(
372 "engine shutdown timed out after 15s, abandoning remaining scanners"
373 ),
374 }
375 }
376}