use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tracing::{error, info};
use turnkeeper::{job::TKJobRequest, scheduler::PriorityQueueType, TurnKeeper};
#[cfg(feature = "job_context")]
use turnkeeper::{
job_context, try_get_current_job_context,
JobContext,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = tracing_subscriber::EnvFilter::try_new(
"warn,turnkeeper=info,job_context=trace", )
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(filter).init();
#[cfg(not(feature = "job_context"))]
{
error!("This example requires the `job_context` feature to be enabled.");
error!("Build with `cargo run --example job_context --features job_context` or ensure it's in default features.");
return Err("job_context feature not enabled".into());
}
#[cfg(feature = "job_context")]
info!("`job_context` feature is enabled.");
info!("Building scheduler...");
let scheduler = TurnKeeper::builder()
.max_workers(1) .priority_queue(PriorityQueueType::BinaryHeap) .build()?;
info!("Scheduler built.");
let execution_count = Arc::new(AtomicUsize::new(0));
let job_req = TKJobRequest::from_interval(
"Context Aware Job",
StdDuration::from_secs(2), 1, );
let exec_count_clone = execution_count.clone();
#[cfg(feature = "job_context")]
let job_fn = move || {
let counter = exec_count_clone.clone();
Box::pin(async move {
let count = counter.fetch_add(1, Ordering::Relaxed) + 1;
info!("*** Context Aware Job Executing (Count: {}) ***", count);
if let Some(ctx) = try_get_current_job_context() {
info!(
" Context (safe access): Job ID {}, Instance ID {}",
ctx.tk_job_id, ctx.instance_id
);
} else {
error!(" Failed to get job context via try_get_current_job_context!");
}
let required_ctx: JobContext = job_context!(); info!(
" Context (macro access): Job ID {}, Instance ID {}",
required_ctx.tk_job_id, required_ctx.instance_id
);
tokio::time::sleep(StdDuration::from_millis(75)).await;
info!("*** Context Aware Job Finished ***");
true }) as std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
};
#[cfg(not(feature = "job_context"))]
let job_fn = move || {
let counter = exec_count_clone.clone();
Box::pin(async move {
let count = counter.fetch_add(1, Ordering::Relaxed) + 1;
info!("*** Dummy Job Executing (Count: {}) ***", count);
info!(" (Job context feature not enabled)");
tokio::time::sleep(StdDuration::from_millis(75)).await;
true
}) as std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
};
info!("Submitting context-aware job...");
let job_id = match scheduler.add_job_async(job_req, job_fn).await {
Ok(job_id) => {
info!("Job submitted with ID: {}", job_id);
job_id }
Err(e) => {
error!("Failed to submit job: {:?}", e);
return Err("Job submission failed".into());
}
};
info!("Waiting for job to run a few times (approx 7 seconds)...");
tokio::time::sleep(StdDuration::from_secs(7)).await;
match scheduler.get_job_details(job_id).await {
Ok(details) => info!("Job Details: {:#?}", details),
Err(e) => error!("Failed to get job details: {}", e),
}
info!("Requesting graceful shutdown...");
match scheduler
.shutdown_graceful(Some(StdDuration::from_secs(5)))
.await
{
Ok(()) => info!("Scheduler shut down successfully."),
Err(e) => error!("Shutdown failed: {}", e),
}
let final_count = execution_count.load(Ordering::Relaxed);
info!("Job executed {} times.", final_count);
assert!(final_count >= 2, "Job should have executed multiple times!");
Ok(())
}