use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tracing::debug;
#[async_trait::async_trait]
pub trait ForkMaintenanceHost: Send + Sync + 'static {
async fn sweep_expired_forks(&self);
async fn build_fork_local_indexes(&self, threshold: u64);
}
fn spawn_ticker<F, Fut>(
interval: Duration,
disable: bool,
task_label: &'static str,
mut shutdown_rx: broadcast::Receiver<()>,
mut tick_fn: F,
) -> Option<tokio::task::JoinHandle<()>>
where
F: FnMut() -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send,
{
if disable {
debug!("{task_label} disabled by config");
return None;
}
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {
tick_fn().await;
}
_ = shutdown_rx.recv() => {
debug!("{task_label} received shutdown signal");
break;
}
}
}
});
Some(handle)
}
pub fn spawn_sweeper<H: ForkMaintenanceHost>(
host: Arc<H>,
interval: Duration,
disable: bool,
shutdown_rx: broadcast::Receiver<()>,
) -> Option<tokio::task::JoinHandle<()>> {
spawn_ticker(interval, disable, "fork sweeper", shutdown_rx, move || {
let host = Arc::clone(&host);
async move { host.sweep_expired_forks().await }
})
}
pub fn spawn_index_builder<H: ForkMaintenanceHost>(
host: Arc<H>,
interval: Duration,
threshold: u64,
disable: bool,
shutdown_rx: broadcast::Receiver<()>,
) -> Option<tokio::task::JoinHandle<()>> {
spawn_ticker(
interval,
disable,
"fork index builder",
shutdown_rx,
move || {
let host = Arc::clone(&host);
async move { host.build_fork_local_indexes(threshold).await }
},
)
}