use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tracing_subscriber::fmt::TestWriter;
use turnkeeper::{
job::{BoxedExecFn, RecurringJobRequest},
scheduler::{PriorityQueueType, SchedulerBuilder},
TurnKeeper,
};
pub fn setup_tracing() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG) .with_writer(TestWriter::new()) .with_test_writer() .try_init();
}
pub fn build_scheduler(
max_workers: usize,
pq_type: PriorityQueueType,
) -> Result<TurnKeeper, turnkeeper::error::BuildError> {
TurnKeeper::builder()
.max_workers(max_workers)
.priority_queue(pq_type)
.build()
}
pub fn job_exec_counter_result(
counter: Arc<AtomicUsize>,
delay: StdDuration,
succeeds: bool,
) -> BoxedExecFn {
Box::new(move || {
let ctr = counter.clone();
Box::pin(async move {
let count = ctr.fetch_add(1, Ordering::SeqCst) + 1;
tracing::debug!(
"Counter job executing (Count: {}, WillSucceed: {})",
count,
succeeds
);
if delay > StdDuration::ZERO {
tokio::time::sleep(delay).await;
}
succeeds
})
})
}
pub fn job_exec_flag(flag: Arc<AtomicBool>, delay: StdDuration) -> BoxedExecFn {
Box::new(move || {
let flg = flag.clone();
Box::pin(async move {
tracing::debug!("Flag job executing");
if delay > StdDuration::ZERO {
tokio::time::sleep(delay).await;
}
flg.store(true, Ordering::SeqCst);
tracing::debug!("Flag job set flag to true");
true })
})
}
pub fn job_exec_panic() -> BoxedExecFn {
Box::new(move || {
Box::pin(async move {
tracing::debug!("Panic job executing...");
tokio::task::yield_now().await;
panic!("Job forced panic!");
#[allow(unreachable_code)]
true
})
})
}
pub fn job_exec_concurrency_tracker(
active_counter: Arc<AtomicUsize>,
max_observed_active: Arc<AtomicUsize>,
delay: StdDuration,
) -> BoxedExecFn {
Box::new(move || {
let active = active_counter.clone();
let max_obs = max_observed_active.clone();
Box::pin(async move {
let current_active = active.fetch_add(1, Ordering::SeqCst) + 1;
tracing::debug!("Concurrency job START (Active: {})", current_active);
max_obs.fetch_max(current_active, Ordering::SeqCst);
if delay > StdDuration::ZERO {
tokio::time::sleep(delay).await;
}
let current_active_after = active.fetch_sub(1, Ordering::SeqCst) - 1;
tracing::debug!("Concurrency job END (Active: {})", current_active_after);
true })
})
}