use chrono::{Duration as ChronoDuration, Utc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tracing::{error, info};
use turnkeeper::{job::TKJobRequest, scheduler::PriorityQueueType, TurnKeeper};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = tracing_subscriber::EnvFilter::try_new("warn,turnkeeper=info")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(filter).init();
info!("Building scheduler...");
let scheduler = TurnKeeper::builder()
.max_workers(2)
.priority_queue(PriorityQueueType::HandleBased)
.build()?;
info!("Scheduler built.");
let execution_count = Arc::new(AtomicUsize::new(0));
let job_req = TKJobRequest::from_once(
"Simple RunOnce Job", Utc::now() + ChronoDuration::seconds(2),
1, );
let exec_count_clone = execution_count.clone();
let job_fn = move || {
let counter = exec_count_clone.clone();
Box::pin(async move {
let count = counter.fetch_add(1, Ordering::Relaxed) + 1;
info!("*** Simple Job Executing (Count: {}) ***", count);
tokio::time::sleep(StdDuration::from_millis(50)).await;
true }) as std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
};
info!("Submitting job...");
match scheduler.add_job_async(job_req, job_fn).await {
Ok(job_id) => info!("Job submitted with ID: {}", job_id),
Err(e) => {
error!("Failed to submit job: {:?}", e); match e {
turnkeeper::error::SubmitError::StagingFull(_) => error!("Staging full"),
turnkeeper::error::SubmitError::ChannelClosed(_) => error!("Channel closed"),
}
return Err("Job submission failed".into());
}
}
info!("Waiting for job to run (approx 5 seconds)...");
tokio::time::sleep(StdDuration::from_secs(5)).await;
match scheduler.get_metrics_snapshot().await {
Ok(metrics) => {
info!("Final Metrics: {:#?}", metrics);
assert_eq!(
metrics.jobs_executed_success, 1,
"Job should have run exactly once"
);
}
Err(e) => error!("Failed to get metrics: {}", e),
}
info!("Requesting graceful shutdown...");
match scheduler
.shutdown_graceful(Some(StdDuration::from_secs(5)))
.await
{
Ok(()) => info!("Scheduler shut down successfully."),
Err(e) => error!("Shutdown failed: {}", e),
}
let final_count = execution_count.load(Ordering::Relaxed);
info!("Job executed {} times.", final_count);
assert_eq!(final_count, 1, "Job did not execute exactly once!");
Ok(())
}