use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use turnkeeper::job::{Schedule, TKJobRequest};
use turnkeeper::scheduler::PriorityQueueType;
use turnkeeper::TurnKeeper;
fn setup_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter("warn,turnkeeper=trace") .with_test_writer()
.try_init();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_overlapping_interval_jobs() {
setup_tracing();
let total_test_duration = Duration::from_secs(10);
let max_workers = 4;
let jobs_to_run = vec![("Job A (fast)", 700), ("Job B (medium)", 1100), ("Job C (slow)", 1600)];
let execution_counts = Arc::new(Mutex::new(HashMap::<String, usize>::new()));
let scheduler = TurnKeeper::builder()
.max_workers(max_workers)
.priority_queue(PriorityQueueType::HandleBased) .build()
.expect("Failed to build scheduler");
for (name, interval_ms) in jobs_to_run.iter() {
let job_name = name.to_string();
let interval = Duration::from_millis(*interval_ms);
let request = TKJobRequest::from_interval(&job_name, interval, 3);
let tracker_clone = Arc::clone(&execution_counts);
let exec_fn = move || {
let job_name_clone = job_name.clone();
let tracker = Arc::clone(&tracker_clone);
let fut = async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let mut counts = tracker.lock().unwrap();
*counts.entry(job_name_clone).or_insert(0) += 1;
true };
Box::pin(fut) as Pin<Box<dyn Future<Output = bool> + Send + 'static>>
};
scheduler
.add_job_async(request, exec_fn)
.await
.expect("Failed to add job");
}
println!(
"Scheduler running with {} jobs. Waiting for {} seconds...",
jobs_to_run.len(),
total_test_duration.as_secs()
);
tokio::time::sleep(total_test_duration).await;
println!("Test duration elapsed. Shutting down scheduler...");
scheduler
.shutdown_graceful(Some(Duration::from_secs(5)))
.await
.expect("Scheduler failed to shut down gracefully");
println!("Scheduler shutdown complete. Verifying results...");
let final_counts = execution_counts.lock().unwrap();
println!("Final Execution Counts: {:?}", final_counts);
for (name, interval_ms) in jobs_to_run.iter() {
let expected_runs: usize = (total_test_duration.as_millis() / (*interval_ms) as u128) as usize;
let actual_runs = *final_counts.get(*name).unwrap_or(&0);
println!(
"Job '{}': Expected ~{} runs, Actual: {} runs",
name, expected_runs, actual_runs
);
let is_close_enough = actual_runs == expected_runs || actual_runs == expected_runs.saturating_sub(1);
assert!(
is_close_enough,
"Job '{}' did not run the expected number of times. Expected around {}, got {}.",
name, expected_runs, actual_runs
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_overdue_job_triggers_zero_sleep() {
setup_tracing();
let scheduler = TurnKeeper::builder()
.max_workers(1) .build()
.expect("Failed to build scheduler");
let request = TKJobRequest::from_interval("Overdue Job", Duration::from_millis(50), 0);
let exec_fn = move || {
let fut = async move {
println!("JOB: Starting long work...");
tokio::time::sleep(Duration::from_millis(100)).await; println!("JOB: Finished long work.");
true
};
Box::pin(fut) as Pin<Box<dyn Future<Output = bool> + Send + 'static>>
};
scheduler.add_job_async(request, exec_fn).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
scheduler.shutdown_graceful(None).await.unwrap();
}