mod common;
use crate::common::{build_scheduler, job_exec_counter_result, job_exec_flag, setup_tracing};
use chrono::{Duration as ChronoDuration, Utc};
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use std::time::Duration as StdDuration;
use turnkeeper::{
QueryError,
job::{Schedule, TKJobRequest},
job_fn, scheduler::PriorityQueueType,
};
use uuid::Uuid;
#[tokio::test]
async fn test_trigger_job_success() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let job_req = TKJobRequest::new("Trigger Me", Schedule::Never, 0);
let job_id = scheduler
.add_job_async(
job_req,
job_exec_counter_result(counter.clone(), StdDuration::ZERO, true),
)
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 0);
tracing::info!(%job_id, "Triggering job now.");
scheduler.trigger_job_now(job_id).await.expect("Trigger job failed");
tokio::time::sleep(StdDuration::from_millis(500)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Job should have run once after trigger"
);
let details = scheduler.get_job_details(job_id).await.unwrap();
assert!(
details.next_run_time.is_none(),
"Triggered job should have no next run time afterwards (unless it had its own recurring schedule)"
);
assert!(
details.next_run_instance.is_none(),
"Triggered job should have no instance afterwards"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_trigger_job_not_found() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let non_existent_id = Uuid::new_v4();
let result = scheduler.trigger_job_now(non_existent_id).await;
assert!(
matches!(result, Err(QueryError::JobNotFound(id)) if id == non_existent_id),
"Expected JobNotFound error, got {:?}",
result
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_trigger_job_cancelled() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let job_req = TKJobRequest::new("Trigger Cancelled", Schedule::Never, 0);
let job_id = scheduler
.add_job_async(job_req, job_fn!({ true }))
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(50)).await;
scheduler.cancel_job(job_id).await.expect("Cancel failed");
tokio::time::sleep(StdDuration::from_millis(50)).await;
let result = scheduler.trigger_job_now(job_id).await;
assert!(
matches!(result, Err(QueryError::TriggerFailedJobCancelled(id)) if id == job_id),
"Expected TriggerFailedJobCancelled error, got {:?}",
result
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_trigger_job_preempts_schedule() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let flag = Arc::new(AtomicBool::new(false));
let run_time = Utc::now() + ChronoDuration::seconds(5);
let job_req = TKJobRequest::new("Trigger Preempt", Schedule::Once(run_time), 0);
let job_id = scheduler
.add_job_async(job_req, job_exec_flag(flag.clone(), StdDuration::ZERO))
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(100)).await;
tracing::info!("Triggering future job now...");
let result = scheduler.trigger_job_now(job_id).await;
assert!(
result.is_ok(),
"Triggering a scheduled job should succeed (Preemption). Got error: {:?}",
result
);
tokio::time::sleep(StdDuration::from_millis(500)).await;
assert!(
flag.load(Ordering::SeqCst),
"Job should have run immediately due to preemption trigger"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_trigger_job_interacts_with_schedule() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let interval = StdDuration::from_secs(2);
let mut job_req = TKJobRequest::from_interval("Trigger Interval", interval, 0);
job_req.with_initial_run_time(Utc::now() + ChronoDuration::seconds(1));
let job_id = scheduler
.add_job_async(
job_req,
job_exec_counter_result(counter.clone(), StdDuration::from_millis(10), true),
)
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(50)).await;
tracing::info!(%job_id, "Triggering interval job now.");
scheduler.trigger_job_now(job_id).await.expect("Trigger failed");
tokio::time::sleep(StdDuration::from_millis(3500)).await;
let final_count = counter.load(Ordering::SeqCst);
assert_eq!(
final_count, 2,
"Expected 2 runs (1 preemption + 1 scheduled), got {}",
final_count
);
let details = scheduler.get_job_details(job_id).await.unwrap();
assert!(
details.next_run_time.is_some(),
"Interval job should still be scheduled"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_never_job_triggered_multiple_times() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let req = TKJobRequest::never("Persistent Utility", 0);
let job_id = scheduler
.add_job_async(
req,
crate::common::job_exec_counter_result(counter.clone(), StdDuration::ZERO, true),
)
.await
.unwrap();
tokio::time::sleep(StdDuration::from_millis(50)).await;
scheduler.trigger_job_now(job_id).await.unwrap();
tokio::time::sleep(StdDuration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
let details = scheduler.get_job_details(job_id).await.unwrap();
assert!(!details.is_cancelled);
assert!(details.next_run_time.is_none());
scheduler.trigger_job_now(job_id).await.unwrap();
tokio::time::sleep(StdDuration::from_millis(200)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"Never job should be triggerable multiple times"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_once_job_retains_automatic_archiving() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let flag = Arc::new(AtomicBool::new(false));
let run_time = Utc::now() + ChronoDuration::milliseconds(100);
let req = TKJobRequest::from_once("Sanity Once", run_time, 0);
let job_id = scheduler
.add_job_async(req, job_exec_flag(flag.clone(), StdDuration::ZERO))
.await
.unwrap();
tokio::time::sleep(StdDuration::from_millis(500)).await;
assert!(flag.load(Ordering::SeqCst));
let trigger_res = scheduler.trigger_job_now(job_id).await;
assert!(
matches!(trigger_res, Err(QueryError::JobNotFound(id)) if id == job_id),
"Once job should still be automatically archived and untriggerable"
);
scheduler.shutdown_graceful(None).await.unwrap();
}