use chrono::{Duration as ChronoDuration, Utc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tracing::{error, info};
use turnkeeper::{job::TKJobRequest, scheduler::PriorityQueueType, TurnKeeper};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = tracing_subscriber::EnvFilter::try_new("warn,turnkeeper=info")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(filter).init();
info!("Building scheduler...");
let scheduler = TurnKeeper::builder()
.max_workers(1) .priority_queue(PriorityQueueType::BinaryHeap) .build()?;
info!("Scheduler built.");
let job_executed_flag = Arc::new(AtomicBool::new(false));
let run_time = Utc::now() + ChronoDuration::seconds(2);
let job_req = TKJobRequest::from_once(
"One Time Job",
run_time,
0, );
info!("Job scheduled for one-time run at: {}", run_time);
let flag_clone = job_executed_flag.clone();
let job_fn = move || {
let flag = flag_clone.clone();
Box::pin(async move {
info!("*** One Time Job Executing! ***");
flag.store(true, Ordering::SeqCst);
tokio::time::sleep(StdDuration::from_millis(50)).await;
true }) as std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
};
info!("Submitting one-time job...");
let job_id_res = scheduler.add_job_async(job_req, job_fn).await;
match job_id_res {
Ok(id) => info!("Job submitted successfully with ID: {}", id),
Err(e) => {
error!("Failed to submit job: {:?}", e);
return Err("Job submission failed".into()); }
}
info!("Waiting for job to execute (approx 4 seconds)...");
tokio::time::sleep(StdDuration::from_secs(4)).await;
match scheduler.get_metrics_snapshot().await {
Ok(metrics) => {
info!("Final Metrics: {:#?}", metrics);
assert_eq!(
metrics.jobs_executed_success, 1,
"Expected exactly one successful execution based on metrics"
);
}
Err(e) => error!("Failed to get metrics: {}", e),
}
match scheduler.list_all_jobs().await {
Ok(jobs) => {
info!("Current Jobs list:");
for job in jobs {
info!(
" - {}: ID={}, NextRun={:?}",
job.name, job.id, job.next_run
);
if job.name == "One Time Job" {
assert!(
job.next_run.is_none(),
"One time job should not have a next run time after completion"
);
}
}
}
Err(e) => error!("Failed to list jobs: {}", e),
}
info!("Requesting graceful shutdown...");
match scheduler
.shutdown_graceful(Some(StdDuration::from_secs(5)))
.await
{
Ok(()) => info!("Scheduler shut down successfully."),
Err(e) => error!("Shutdown failed: {}", e),
}
assert!(
job_executed_flag.load(Ordering::SeqCst),
"Job execution flag was not set!"
);
info!("Verified one-time job executed.");
Ok(())
}