#![cfg(feature = "job_context")]
mod common;
use crate::common::{build_scheduler, setup_tracing};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use chrono::{Duration as ChronoDuration, Utc};
use parking_lot::Mutex;
use turnkeeper::{
job::TKJobRequest,
job_context, scheduler::PriorityQueueType,
try_get_current_job_context,
InstanceId, JobContext,
};
use uuid::Uuid;
#[tokio::test]
async fn test_job_context_access() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::BinaryHeap).unwrap();
let captured_context = Arc::new(Mutex::new(None::<JobContext>));
let job_fn = {
let context_capture_arc = captured_context.clone();
move || {
let capture = context_capture_arc.clone();
Box::pin(async move {
tracing::info!("Context test job executing...");
let ctx_option = try_get_current_job_context();
assert!(
ctx_option.is_some(),
"Context should be available via try_get"
);
let ctx_macro = job_context!();
tracing::info!(
"Context (macro): Job ID {}, Instance ID {}",
ctx_macro.tk_job_id,
ctx_macro.instance_id
);
assert_eq!(
ctx_option.unwrap().tk_job_id,
ctx_macro.tk_job_id
);
assert_eq!(ctx_option.unwrap().instance_id, ctx_macro.instance_id);
{
let mut locked_capture = capture.lock();
if locked_capture.is_none() {
*locked_capture = Some(ctx_macro); tracing::info!("Captured context from first run.");
} else {
tracing::info!("Context already captured, skipping store.");
}
}
tokio::time::sleep(StdDuration::from_millis(50)).await;
true }) as std::pin::Pin<Box<(dyn std::future::Future<Output = bool> + Send + 'static)>>
}
};
let mut req = TKJobRequest::never("Context Test Job", 0);
req.with_initial_run_time(Utc::now() + ChronoDuration::milliseconds(150));
tracing::info!("Submitting context test job...");
let expected_tk_id = scheduler
.add_job_async(req, job_fn)
.await
.expect("Failed to add job");
tracing::info!(
"Job submitted with expected TKJobId: {}",
expected_tk_id
);
tokio::time::sleep(StdDuration::from_secs(1)).await;
tracing::info!("Shutting down scheduler...");
scheduler.shutdown_graceful(None).await.unwrap();
tracing::info!("Verifying captured context...");
let final_captured_context = captured_context.lock();
match *final_captured_context {
Some(ctx) => {
assert_eq!(
ctx.tk_job_id, expected_tk_id,
"Captured TKJobId does not match expected"
);
assert_ne!(
ctx.instance_id,
Uuid::nil(),
"Captured InstanceId should not be nil"
);
tracing::info!(
"Verification successful: RecID={}, InstID={}",
ctx.tk_job_id,
ctx.instance_id
);
}
None => {
panic!("Context was not captured by the job function!");
}
}
}
#[tokio::test]
async fn test_job_context_differs_across_runs() {
setup_tracing();
let scheduler = build_scheduler(1, PriorityQueueType::HandleBased).unwrap();
let captured_contexts = Arc::new(Mutex::new(Vec::<JobContext>::new()));
let job_fn = {
let captures_arc = captured_contexts.clone();
move || {
let captures = captures_arc.clone();
Box::pin(async move {
let ctx = job_context!(); tracing::info!(
"TurnKeeper context test job: RecID={}, InstID={}",
ctx.tk_job_id,
ctx.instance_id
);
captures.lock().push(ctx); tokio::time::sleep(StdDuration::from_millis(50)).await;
true
}) as std::pin::Pin<Box<(dyn std::future::Future<Output = bool> + Send + 'static)>>
}
};
let interval = StdDuration::from_millis(500);
let mut req = TKJobRequest::from_interval("TurnKeeper Context Test", interval, 0);
req.with_initial_run_time(Utc::now() + ChronoDuration::milliseconds(100));
let expected_tk_id = scheduler
.add_job_async(req, job_fn)
.await
.expect("Failed to add TurnKeeper job");
tracing::info!("TurnKeeper job submitted: {}", expected_tk_id);
tokio::time::sleep(StdDuration::from_millis(1800)).await;
scheduler.shutdown_graceful(None).await.unwrap();
let final_contexts = captured_contexts.lock();
let run_count = final_contexts.len();
tracing::info!("TurnKeeper job ran {} times.", run_count);
assert!(
run_count >= 3,
"Expected TurnKeeper job to run at least 3 times (ran {})",
run_count
);
let mut previous_instance_id: Option<InstanceId> = None;
for (i, ctx) in final_contexts.iter().enumerate() {
assert_eq!(
ctx.tk_job_id,
expected_tk_id,
"TKJobId mismatch on run {}",
i + 1
);
assert_ne!(
ctx.instance_id,
Uuid::nil(),
"InstanceId was nil on run {}",
i + 1
);
if let Some(prev_id) = previous_instance_id {
assert_ne!(
ctx.instance_id,
prev_id,
"InstanceId should be different on run {} (was same as run {})",
i + 1,
i
);
}
previous_instance_id = Some(ctx.instance_id);
}
}