mod common;
use crate::common::{build_scheduler, job_exec_counter_result, job_exec_flag, setup_tracing};
use chrono::{Duration as ChronoDuration, Utc};
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use std::time::Duration as StdDuration;
use turnkeeper::{
QueryError,
job::TKJobRequest,
scheduler::PriorityQueueType,
};
#[tokio::test]
async fn test_binary_heap_ghost_is_ignored() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let run_time = Utc::now() + ChronoDuration::seconds(1);
let req = TKJobRequest::from_once("Ghost Test", run_time, 0);
let job_id = scheduler
.add_job_async(req, job_exec_counter_result(counter.clone(), StdDuration::ZERO, true))
.await
.expect("Failed to add job");
tracing::info!("Job scheduled for 1s in future.");
tracing::info!("Triggering job now (preempting).");
scheduler.trigger_job_now(job_id).await.expect("Trigger failed");
tokio::time::sleep(StdDuration::from_millis(200)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Job should have run once (manual trigger)"
);
tracing::info!("Waiting for ghost instance time...");
tokio::time::sleep(StdDuration::from_millis(1200)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Job should NOT run again when the ghost instance pops"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_debounce_queued_job() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let flag_blocker = Arc::new(AtomicBool::new(false));
let flag_target = Arc::new(AtomicBool::new(false));
let req_blocker = TKJobRequest::from_once("Blocker", Utc::now(), 0);
scheduler
.add_job_async(req_blocker, move || {
let f = flag_blocker.clone();
Box::pin(async move {
tokio::time::sleep(StdDuration::from_secs(1)).await;
f.store(true, Ordering::SeqCst);
true
})
})
.await
.unwrap();
let req_target = TKJobRequest::from_once("Target", Utc::now(), 0);
let target_id = scheduler
.add_job_async(req_target, job_exec_flag(flag_target.clone(), StdDuration::ZERO))
.await
.unwrap();
tokio::time::sleep(StdDuration::from_millis(100)).await;
tracing::info!("Attempting to trigger queued job...");
let result = scheduler.trigger_job_now(target_id).await;
assert!(
matches!(result, Err(QueryError::TriggerFailedJobScheduled(_))),
"Trigger should be rejected because job is already queued/ready. Got: {:?}",
result
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_debounce_running_job() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let start_latch = Arc::new(tokio::sync::Notify::new());
let finish_latch = Arc::new(tokio::sync::Notify::new());
let req = TKJobRequest::from_once("Running Job", Utc::now(), 0);
let start_clone = start_latch.clone();
let finish_clone = finish_latch.clone();
let job_id = scheduler
.add_job_async(req, move || {
let s = start_clone.clone();
let f = finish_clone.clone();
Box::pin(async move {
s.notify_one(); f.notified().await; true
})
})
.await
.unwrap();
start_latch.notified().await;
tracing::info!("Job is running.");
tracing::info!("Attempting to trigger running job...");
let result = scheduler.trigger_job_now(job_id).await;
assert!(
matches!(result, Err(QueryError::TriggerFailedJobScheduled(_))),
"Trigger should be rejected because job is running. Got: {:?}",
result
);
finish_latch.notify_one();
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_handle_based_preemption() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let run_time = Utc::now() + ChronoDuration::hours(1);
let req = TKJobRequest::from_once("Preempt Me", run_time, 0);
let job_id = scheduler
.add_job_async(req, job_exec_counter_result(counter.clone(), StdDuration::ZERO, true))
.await
.expect("Failed to add job");
scheduler.trigger_job_now(job_id).await.expect("Trigger failed");
tokio::time::sleep(StdDuration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
scheduler.shutdown_graceful(None).await.unwrap();
}