mod common;
use crate::common::{build_scheduler, job_exec_counter_result, setup_tracing};
use chrono::{Duration as ChronoDuration, Utc};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration as StdDuration;
use turnkeeper::{
job::{RecurringJobRequest, Schedule},
scheduler::PriorityQueueType,
QueryError,
};
#[tokio::test]
async fn test_cron_schedule() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let cron_expr = "*/2 * * * * * *";
let req = RecurringJobRequest::from_cron("Cron Test", cron_expr, 0);
let job_id = scheduler
.add_job_async(
req,
job_exec_counter_result(counter.clone(), StdDuration::from_millis(50), true),
)
.await
.expect("Failed to add job");
tracing::info!("Cron job submitted: {}", job_id);
tokio::time::sleep(StdDuration::from_secs(5)).await;
let count_after = counter.load(Ordering::SeqCst);
assert!(
count_after >= 2 && count_after <= 3, "Cron job should have run 2 or 3 times (ran {})",
count_after
);
let details = scheduler.get_job_details(job_id).await.unwrap();
assert!(
details.next_run_time.is_some(),
"Cron job should still be scheduled"
);
assert!(
matches!(&details.schedule, Schedule::Cron(s) if s == cron_expr),
"Schedule type should be Cron with correct expression"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_interval_schedule() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let interval = StdDuration::from_millis(750); let mut req = RecurringJobRequest::from_interval("Interval Test", interval, 0);
req.with_initial_run_time(Utc::now() + ChronoDuration::milliseconds(100));
let job_id = scheduler
.add_job_async(
req,
job_exec_counter_result(counter.clone(), StdDuration::from_millis(20), true),
)
.await
.expect("Failed to add job");
tracing::info!("Interval job submitted: {}", job_id);
tokio::time::sleep(StdDuration::from_millis(3100)).await;
let count_after = counter.load(Ordering::SeqCst);
assert!(
count_after >= 4 && count_after <= 5,
"Interval job should have run 4 or 5 times (ran {})",
count_after
);
let details = scheduler.get_job_details(job_id).await.unwrap();
assert!(
details.next_run_time.is_some(),
"Interval job should still be scheduled"
);
assert!(
matches!(&details.schedule, Schedule::FixedInterval(d) if *d == interval),
"Schedule type should be FixedInterval with correct duration"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_never_schedule_no_initial_time() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let req = RecurringJobRequest::never("Never Run", 0);
let job_id = scheduler
.add_job_async(
req,
job_exec_counter_result(counter.clone(), StdDuration::ZERO, true),
)
.await
.expect("Failed to add job"); tracing::info!("'Never' job submitted (no initial time): {}", job_id);
tokio::time::sleep(StdDuration::from_secs(1)).await;
let count_after = counter.load(Ordering::SeqCst);
assert_eq!(
count_after, 0,
"Job with Schedule::Never and no initial time should not run"
);
let details_result = scheduler.get_job_details(job_id).await;
tracing::info!(
"Result of get_job_details for discarded job: {:?}",
details_result
);
assert!(
matches!(details_result, Err(QueryError::JobNotFound(id)) if id == job_id),
"Getting details for a discarded 'Never' job should result in JobNotFound"
);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_never_schedule_with_initial_time() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let mut req = RecurringJobRequest::never("Never Run Once", 0);
req.with_initial_run_time(Utc::now() + ChronoDuration::milliseconds(150));
let job_id = scheduler
.add_job_async(
req,
job_exec_counter_result(counter.clone(), StdDuration::ZERO, true),
)
.await
.expect("Failed to add job");
tracing::info!("'Never' job submitted (with initial time): {}", job_id);
tokio::time::sleep(StdDuration::from_secs(1)).await;
let count_after = counter.load(Ordering::SeqCst);
assert_eq!(
count_after, 1,
"Job with Schedule::Never and initial time should run exactly once"
);
let details = scheduler.get_job_details(job_id).await.unwrap();
assert!(
details.next_run_time.is_none(),
"'Never' job should have no next run time after completion"
);
assert!(
matches!(&details.schedule, Schedule::Never),
"Schedule type should be Never"
);
scheduler.shutdown_graceful(None).await.unwrap();
}