mod common;
use crate::common::{build_scheduler, setup_tracing};
use chrono::{Duration as ChronoDuration, Utc};
use parking_lot::Mutex;
use std::time::{Duration as StdDuration, Instant};
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tracing::{info, warn};
use turnkeeper::{job::TKJobRequest, scheduler::PriorityQueueType};
#[tokio::test]
async fn test_graceful_shutdown_waits_for_job() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let executed = Arc::new(AtomicBool::new(false));
let job_finish_time = Arc::new(Mutex::new(None::<Instant>));
let job_fn = {
let flag_arc = executed.clone(); let finish_time_arc = job_finish_time.clone();
move || {
let flag = flag_arc.clone();
let finish_time = finish_time_arc.clone();
let future = async move {
info!("Graceful test job STARTING");
tokio::time::sleep(StdDuration::from_secs(2)).await; flag.store(true, Ordering::SeqCst);
*finish_time.lock() = Some(Instant::now()); info!("Graceful test job FINISHED");
true };
Box::pin(future) as Pin<Box<dyn Future<Output = bool> + Send + 'static>>
}
};
let req = TKJobRequest::from_once("Graceful Wait", Utc::now() + ChronoDuration::milliseconds(100), 0);
scheduler
.add_job_async(req, job_fn)
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(500)).await;
info!("Initiating graceful shutdown while job running...");
let shutdown_start = Instant::now();
scheduler
.shutdown_graceful(Some(StdDuration::from_secs(5)))
.await
.expect("Graceful shutdown failed");
let shutdown_duration = shutdown_start.elapsed();
info!("Graceful shutdown complete after {:?}", shutdown_duration);
assert!(executed.load(Ordering::SeqCst), "Job should have executed");
assert!(
shutdown_duration > StdDuration::from_secs(1),
"Shutdown seemed too fast"
);
let finish_time_opt: Option<Instant> = {
let guard = job_finish_time.lock();
*guard };
if let Some(finish) = finish_time_opt {
assert!(
finish >= shutdown_start,
"Job finished before shutdown started?"
);
assert!(
shutdown_start.elapsed() >= finish.duration_since(shutdown_start),
"Shutdown didn't wait for job finish"
);
} else {
panic!("Job did not record finish time");
}
}
#[tokio::test]
async fn test_force_shutdown_interrupts() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let executed = Arc::new(AtomicBool::new(false));
let job_fn = {
let flag = executed.clone();
move || {
let f = flag.clone();
let future = async move {
info!("Force test job STARTING");
tokio::time::sleep(StdDuration::from_secs(5)).await;
warn!("Force test job AWOKE FROM SLEEP (should have been interrupted)");
f.store(true, Ordering::SeqCst);
true
};
Box::pin(future) as Pin<Box<dyn Future<Output = bool> + Send + 'static>>
}
};
let run_time = Utc::now() + ChronoDuration::milliseconds(100);
let req = TKJobRequest::from_once("Force Interrupt", run_time, 0);
scheduler
.add_job_async(req, job_fn)
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(500)).await;
info!("Initiating force shutdown while job running...");
let shutdown_start = Instant::now();
let _shutdown_result = scheduler
.shutdown_force(Some(StdDuration::from_secs(1)))
.await;
let shutdown_duration = shutdown_start.elapsed();
info!(
"Force shutdown complete/timed out after {:?}",
shutdown_duration
);
assert!(
shutdown_duration < StdDuration::from_secs(3),
"Force shutdown took too long"
);
assert!(
!executed.load(Ordering::SeqCst),
"Job should have been interrupted before setting flag"
);
tokio::time::sleep(StdDuration::from_millis(50)).await;
match scheduler.get_metrics_snapshot().await {
Ok(m) => warn!("Got metrics after shutdown?: {:?}", m), Err(e) => info!("Metrics query failed after shutdown as expected: {:?}", e),
};
}