use std::sync::Arc;
use tokio::time::{interval, Duration};
use tracing::{debug, error, info};
use crate::store::WorkflowStore;
pub const WORKFLOW_TASK_HEARTBEAT_TIMEOUT_SECS: f64 = 30.0;
const POLL_SECS: u64 = 1;
fn timeout_secs() -> f64 {
std::env::var("ASSAY_WF_DISPATCH_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(WORKFLOW_TASK_HEARTBEAT_TIMEOUT_SECS)
}
pub async fn run_dispatch_recovery<S: WorkflowStore>(store: Arc<S>) {
let mut tick = interval(Duration::from_secs(POLL_SECS));
let t = timeout_secs();
info!(
"Dispatch-recovery poller started (poll every {POLL_SECS}s, \
release stale leases older than {t}s)"
);
loop {
tick.tick().await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
match store.release_stale_dispatch_leases(now, timeout_secs()).await {
Ok(0) => {}
Ok(n) => debug!("Released {n} stale workflow dispatch lease(s)"),
Err(e) => error!("Dispatch-recovery poller error: {e}"),
}
}
}