mod common;
use crate::common::{build_scheduler, job_exec_counter_result, job_exec_panic, setup_tracing};
use chrono::{Duration as ChronoDuration, Utc};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use turnkeeper::{job::TKJobRequest, scheduler::PriorityQueueType};
#[tokio::test]
async fn test_supervisor_respawns_worker_and_quarantines_job() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let healthy_job_counter = Arc::new(AtomicUsize::new(0));
let panicking_job_fn = job_exec_panic();
let healthy_job_fn = job_exec_counter_result(healthy_job_counter.clone(), Duration::ZERO, true);
let healthy_job_req = TKJobRequest::from_interval("Healthy Job", Duration::from_millis(500), 0);
scheduler.add_job_async(healthy_job_req, healthy_job_fn).await.unwrap();
let mut panicking_job_req = TKJobRequest::never("Panicking Job", 0);
panicking_job_req.with_initial_run_time(Utc::now() + ChronoDuration::milliseconds(100));
let panicking_job_id = scheduler
.add_job_async(panicking_job_req, panicking_job_fn)
.await
.unwrap();
tracing::info!("Waiting 2 seconds for supervisor to react and healthy job to run...");
tokio::time::sleep(Duration::from_secs(2)).await;
let healthy_run_count = healthy_job_counter.load(Ordering::Relaxed);
tracing::info!("Healthy job ran {} times.", healthy_run_count);
assert!(
healthy_run_count >= 3,
"Healthy job should have run at least 3 times on the new worker, but ran {}",
healthy_run_count
);
let details = scheduler.get_job_details(panicking_job_id).await.unwrap();
assert!(
details.is_cancelled,
"Panicking job should be marked as cancelled (quarantined)."
);
scheduler.shutdown_graceful(Some(Duration::from_secs(5))).await.unwrap();
}