pub mod budget;
pub mod completion_listener;
pub mod partition_router;
pub mod scanner;
pub mod supervisor;
use std::sync::Arc;
use std::time::Duration;
use ff_core::partition::PartitionConfig;
use ff_core::types::LaneId;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use partition_router::PartitionRouter;
use supervisor::supervised_spawn;
use scanner::attempt_timeout::AttemptTimeoutScanner;
use scanner::execution_deadline::ExecutionDeadlineScanner;
use scanner::budget_reconciler::BudgetReconciler;
use scanner::budget_reset::BudgetResetScanner;
use scanner::delayed_promoter::DelayedPromoter;
use scanner::dependency_reconciler::DependencyReconciler;
use scanner::index_reconciler::IndexReconciler;
use scanner::lease_expiry::LeaseExpiryScanner;
use scanner::pending_wp_expiry::PendingWaitpointExpiryScanner;
use scanner::quota_reconciler::QuotaReconciler;
use scanner::retention_trimmer::RetentionTrimmer;
use scanner::suspension_timeout::SuspensionTimeoutScanner;
use scanner::flow_projector::FlowProjector;
use scanner::unblock::UnblockScanner;
#[derive(Clone, Debug)]
pub struct CompletionListenerConfig {
pub addresses: Vec<(String, u16)>,
pub tls: bool,
pub cluster: bool,
}
pub struct EngineConfig {
pub partition_config: PartitionConfig,
pub lanes: Vec<LaneId>,
pub lease_expiry_interval: Duration,
pub delayed_promoter_interval: Duration,
pub index_reconciler_interval: Duration,
pub attempt_timeout_interval: Duration,
pub suspension_timeout_interval: Duration,
pub pending_wp_expiry_interval: Duration,
pub retention_trimmer_interval: Duration,
pub budget_reset_interval: Duration,
pub budget_reconciler_interval: Duration,
pub quota_reconciler_interval: Duration,
pub unblock_interval: Duration,
pub dependency_reconciler_interval: Duration,
pub completion_listener: Option<CompletionListenerConfig>,
pub flow_projector_interval: Duration,
pub execution_deadline_interval: Duration,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
partition_config: PartitionConfig::default(),
lanes: vec![LaneId::new("default")],
lease_expiry_interval: Duration::from_millis(1500),
delayed_promoter_interval: Duration::from_millis(750),
index_reconciler_interval: Duration::from_secs(45),
attempt_timeout_interval: Duration::from_secs(2),
suspension_timeout_interval: Duration::from_secs(2),
pending_wp_expiry_interval: Duration::from_secs(5),
retention_trimmer_interval: Duration::from_secs(60),
budget_reset_interval: Duration::from_secs(15),
budget_reconciler_interval: Duration::from_secs(30),
quota_reconciler_interval: Duration::from_secs(30),
unblock_interval: Duration::from_secs(5),
dependency_reconciler_interval: Duration::from_secs(15),
completion_listener: None,
flow_projector_interval: Duration::from_secs(15),
execution_deadline_interval: Duration::from_secs(5),
}
}
}
pub struct Engine {
pub router: Arc<PartitionRouter>,
shutdown_tx: watch::Sender<bool>,
handles: Vec<JoinHandle<()>>,
}
impl Engine {
pub fn start(config: EngineConfig, client: ferriskey::Client) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let num_partitions = config.partition_config.num_flow_partitions;
let router = Arc::new(PartitionRouter::new(config.partition_config));
let mut handles = Vec::new();
let lease_scanner = Arc::new(LeaseExpiryScanner::new(config.lease_expiry_interval));
handles.push(supervised_spawn(
lease_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let delayed_scanner = Arc::new(DelayedPromoter::new(
config.delayed_promoter_interval,
config.lanes.clone(),
));
handles.push(supervised_spawn(
delayed_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let reconciler = Arc::new(IndexReconciler::new(
config.index_reconciler_interval,
config.lanes.clone(),
));
handles.push(supervised_spawn(
reconciler,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let timeout_scanner = Arc::new(AttemptTimeoutScanner::new(
config.attempt_timeout_interval,
config.lanes.clone(),
));
handles.push(supervised_spawn(
timeout_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let suspension_scanner = Arc::new(SuspensionTimeoutScanner::new(
config.suspension_timeout_interval,
));
handles.push(supervised_spawn(
suspension_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let pending_wp_scanner = Arc::new(PendingWaitpointExpiryScanner::new(
config.pending_wp_expiry_interval,
));
handles.push(supervised_spawn(
pending_wp_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let retention_scanner = Arc::new(RetentionTrimmer::new(
config.retention_trimmer_interval,
config.lanes.clone(),
));
handles.push(supervised_spawn(
retention_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let budget_reset = Arc::new(BudgetResetScanner::new(
config.budget_reset_interval,
));
handles.push(supervised_spawn(
budget_reset,
client.clone(),
config.partition_config.num_budget_partitions,
shutdown_rx.clone(),
));
let budget_reconciler = Arc::new(BudgetReconciler::new(
config.budget_reconciler_interval,
));
handles.push(supervised_spawn(
budget_reconciler,
client.clone(),
config.partition_config.num_budget_partitions,
shutdown_rx.clone(),
));
let unblock_scanner = Arc::new(UnblockScanner::new(
config.unblock_interval,
config.lanes.clone(),
config.partition_config,
));
handles.push(supervised_spawn(
unblock_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let dep_reconciler = Arc::new(DependencyReconciler::new(
config.dependency_reconciler_interval,
config.lanes.clone(),
config.partition_config,
));
handles.push(supervised_spawn(
dep_reconciler,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let quota_reconciler = Arc::new(QuotaReconciler::new(
config.quota_reconciler_interval,
));
handles.push(supervised_spawn(
quota_reconciler,
client.clone(),
config.partition_config.num_quota_partitions,
shutdown_rx.clone(),
));
let flow_projector = Arc::new(FlowProjector::new(
config.flow_projector_interval,
config.partition_config,
));
handles.push(supervised_spawn(
flow_projector,
client.clone(),
config.partition_config.num_flow_partitions,
shutdown_rx.clone(),
));
let deadline_scanner = Arc::new(ExecutionDeadlineScanner::new(
config.execution_deadline_interval,
config.lanes,
));
handles.push(supervised_spawn(
deadline_scanner,
client.clone(),
num_partitions,
shutdown_rx.clone(),
));
let listener_enabled = config.completion_listener.is_some();
if let Some(listener_cfg) = config.completion_listener {
handles.push(completion_listener::spawn_completion_listener(
router.clone(),
client,
listener_cfg.addresses,
listener_cfg.tls,
listener_cfg.cluster,
shutdown_rx,
));
}
let scanner_count = if listener_enabled { "14 scanners + completion listener" } else { "14 scanners" };
tracing::info!(
num_partitions,
budget_partitions = config.partition_config.num_budget_partitions,
quota_partitions = config.partition_config.num_quota_partitions,
flow_partitions = config.partition_config.num_flow_partitions,
"engine started with {scanner_count}"
);
Self {
router,
shutdown_tx,
handles,
}
}
pub async fn shutdown(self) {
let _ = self.shutdown_tx.send(true);
let join_all = async {
for handle in self.handles {
let _ = handle.await;
}
};
match tokio::time::timeout(Duration::from_secs(15), join_all).await {
Ok(()) => tracing::info!("engine shutdown complete"),
Err(_) => tracing::warn!(
"engine shutdown timed out after 15s, abandoning remaining scanners"
),
}
}
}