mod common;
use crate::common::{build_scheduler, job_exec_counter_result, job_exec_panic, setup_tracing};
use chrono::{Duration as ChronoDuration, Utc};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration as StdDuration;
use turnkeeper::{job::TKJobRequest, scheduler::PriorityQueueType, Schedule};
#[tokio::test]
async fn test_retry_scheduling_on_failure() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let max_retries = 2;
let fail_until_attempt = max_retries as usize + 1;
let job_fn = {
let ctr = counter.clone();
move || {
let current_ctr = ctr.clone();
Box::pin(async move {
let attempt_index = current_ctr.fetch_add(1, Ordering::SeqCst); let should_succeed = (attempt_index + 1) >= fail_until_attempt;
tracing::info!(
"Retry test job executing (Attempt Index: {}, WillSucceed: {})",
attempt_index,
should_succeed
);
tokio::time::sleep(StdDuration::from_millis(10)).await;
should_succeed
}) as std::pin::Pin<Box<(dyn std::future::Future<Output = bool> + Send + 'static)>>
}
};
let mut req = TKJobRequest::never("Retry Schedule Test", max_retries);
let initial_run_time = Utc::now() + ChronoDuration::milliseconds(50);
req.with_initial_run_time(initial_run_time);
let job_id = scheduler
.add_job_async(req, job_fn)
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(200)).await; let metrics1 = scheduler.get_metrics_snapshot().await.unwrap();
let details1 = scheduler.get_job_details(job_id).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 1, "Should have run once");
assert_eq!(metrics1.jobs_executed_fail, 1, "Should fail once");
assert_eq!(metrics1.jobs_retried, 1, "Should schedule 1 retry");
assert_eq!(
details1.retry_count, 1,
"Retry count in definition should be 1 for next run"
);
assert!(
details1.next_run_time.is_some(),
"Should have a next run time scheduled"
);
assert!(
details1.next_run_time.unwrap() > initial_run_time + ChronoDuration::seconds(50),
"Retry time should be >~60s out"
);
tokio::time::sleep(StdDuration::from_millis(500)).await;
let metrics_final = scheduler.get_metrics_snapshot().await.unwrap();
let details_final = scheduler.get_job_details(job_id).await.unwrap();
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Still should have only run once in test timeframe"
);
assert_eq!(metrics_final.jobs_executed_fail, 1); assert_eq!(metrics_final.jobs_retried, 1);
assert_eq!(details_final.retry_count, 1, "Retry count remains 1"); assert!(details_final.next_run_time.is_some());
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_permanent_failure_scheduling() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let max_retries = 1;
let mut req = TKJobRequest::never("Perm Failure Schedule", max_retries);
let initial_run_time = Utc::now() + ChronoDuration::milliseconds(50);
req.with_initial_run_time(initial_run_time);
let job_id = scheduler
.add_job_async(
req,
job_exec_counter_result(counter.clone(), StdDuration::ZERO, false), )
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(200)).await; let metrics1 = scheduler.get_metrics_snapshot().await.unwrap();
let details1 = scheduler.get_job_details(job_id).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 1, "Should have run once");
assert_eq!(metrics1.jobs_executed_fail, 1, "Should fail once");
assert_eq!(metrics1.jobs_retried, 1, "Should schedule 1 retry");
assert_eq!(
details1.retry_count, 1,
"Retry count should be 1 for next run"
);
assert!(
details1.next_run_time.is_some(),
"Should have retry scheduled"
);
assert!(details1.next_run_time.unwrap() > initial_run_time + ChronoDuration::seconds(50));
assert_eq!(metrics1.jobs_permanently_failed, 0);
scheduler.shutdown_graceful(None).await.unwrap();
}
#[tokio::test]
async fn test_fixed_retry_delay_scheduling() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let max_retries = 2;
let fixed_delay = StdDuration::from_secs(5);
let initial_run_time = Utc::now() + ChronoDuration::milliseconds(50);
let job_req = TKJobRequest::with_fixed_retry_delay(
"Fixed Retry Delay Test",
Schedule::Once(initial_run_time),
max_retries,
fixed_delay,
);
let job_id = scheduler
.add_job_async(
job_req,
job_exec_counter_result(counter.clone(), StdDuration::from_millis(10), false), )
.await
.expect("Add job failed");
tokio::time::sleep(StdDuration::from_millis(200)).await; let time_after_run = Utc::now();
let metrics1 = scheduler.get_metrics_snapshot().await.unwrap();
let details1 = scheduler.get_job_details(job_id).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 1, "Should have run once");
assert_eq!(metrics1.jobs_executed_fail, 1, "Should fail once");
assert_eq!(metrics1.jobs_retried, 1, "Should schedule 1 retry");
assert_eq!(
details1.retry_count, 1,
"Retry count in definition should be 1 for next run"
);
assert!(
details1.next_run_time.is_some(),
"Should have a next run time scheduled"
);
let next_run = details1.next_run_time.unwrap();
let delta_duration = next_run.signed_duration_since(time_after_run);
let expected_delay_chrono = ChronoDuration::from_std(fixed_delay).unwrap();
let lower_bound = expected_delay_chrono - ChronoDuration::milliseconds(300); let upper_bound = expected_delay_chrono - ChronoDuration::milliseconds(10);
tracing::debug!("Time after run: {}", time_after_run);
tracing::debug!("Scheduled next run: {}", next_run);
tracing::debug!("Delta duration: {:?}", delta_duration);
tracing::debug!("Expected Delay: {:?}", expected_delay_chrono);
tracing::debug!("Lower bound delta: {:?}", lower_bound);
tracing::debug!("Upper bound delta: {:?}", upper_bound);
assert!(
delta_duration >= lower_bound && delta_duration <= upper_bound,
"Delta duration ({:?}) not within expected range ({:?} to {:?}), expected ~ < {:?}",
delta_duration,
lower_bound,
upper_bound,
expected_delay_chrono
);
assert!(
next_run < initial_run_time + ChronoDuration::seconds(30),
"Retry time ({}) seems too long for fixed delay, maybe exponential?",
next_run
);
scheduler.shutdown_graceful(None).await.unwrap();
}