use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{info, warn};
use crate::config::RetentionConfig;
use crate::engine::metrics as metric_names;
use crate::engine::queue::QueueManager;
pub fn start_scheduler(
manager: Arc<QueueManager>,
tick_interval_ms: u64,
stall_timeout_ms: u64,
retention: RetentionConfig,
) -> tokio::task::JoinHandle<()> {
let interval = Duration::from_millis(tick_interval_ms);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
info!(
interval_ms = tick_interval_ms,
stall_timeout_ms, "Background scheduler started"
);
let mut tick_count: u64 = 0;
loop {
ticker.tick().await;
tick_count += 1;
let tick_start = Instant::now();
if let Err(e) = manager.promote_delayed_jobs().await {
warn!(error = %e, "Delayed job promotion failed");
}
if let Err(e) = manager.execute_schedules().await {
warn!(error = %e, "Schedule execution failed");
}
if let Err(e) = manager.check_timeouts().await {
warn!(error = %e, "Timeout check failed");
}
if let Err(e) = manager.detect_stalls(stall_timeout_ms).await {
warn!(error = %e, "Stall detection failed");
}
if let Ok(queues) = manager.list_queues().await {
for qi in &queues {
let q = qi.name.as_str();
metrics::gauge!(metric_names::QUEUE_WAITING_JOBS, "queue" => q.to_string())
.set(qi.counts.waiting as f64);
metrics::gauge!(metric_names::QUEUE_ACTIVE_JOBS, "queue" => q.to_string())
.set(qi.counts.active as f64);
metrics::gauge!(metric_names::QUEUE_DELAYED_JOBS, "queue" => q.to_string())
.set(qi.counts.delayed as f64);
metrics::gauge!(metric_names::QUEUE_DLQ_JOBS, "queue" => q.to_string())
.set(qi.counts.dlq as f64);
}
}
if tick_count % 10 == 0 {
if let Err(e) = manager.promote_orphaned_blocked_jobs().await {
warn!(error = %e, "Orphaned blocked job promotion failed");
}
}
if tick_count % 60 == 0 {
if let Err(e) = manager
.cleanup_expired_jobs(
&retention.completed_ttl,
&retention.failed_ttl,
&retention.dlq_ttl,
)
.await
{
warn!(error = %e, "Retention cleanup failed");
}
}
metrics::gauge!(metric_names::SCHEDULER_TICK_DURATION_SECONDS)
.set(tick_start.elapsed().as_secs_f64());
}
})
}