use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[test]
fn test_state_transitions() {
let (_, rx) = unbounded::<ScheduledTask>();
let terminating = Arc::new(AtomicBool::new(false));
let stats = Arc::new(CronStats::default());
let sm = CronStateMachine::new(rx, terminating.clone(), stats.clone(), 100, None);
assert_eq!(sm.current_state(), CronState::CheckEvents);
}
#[test]
fn test_termination_from_any_state() {
let (_, rx) = unbounded::<ScheduledTask>();
let terminating = Arc::new(AtomicBool::new(false));
let stats = Arc::new(CronStats::default());
let mut sm = CronStateMachine::new(rx, terminating.clone(), stats.clone(), 100, None);
terminating.store(true, AtomicOrdering::Release);
sm.run();
assert_eq!(sm.current_state(), CronState::Terminated);
}
#[test]
fn test_concurrent_task_submission() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, stats, _ready) = spawn_cron(Arc::clone(&terminating));
let counter = Arc::new(AtomicU64::new(0));
let handles: Vec<_> = (0..10)
.map(|_| {
let h = handle.clone();
let c = Arc::clone(&counter);
std::thread::spawn(move || {
for _ in 0..100 {
h.schedule_after(0, TaskMetadata::OneShot, {
let c = Arc::clone(&c);
move || {
c.fetch_add(1, Ordering::Relaxed);
true
}
});
}
})
})
.collect();
for h in handles {
h.join().expect("Thread panicked");
}
std::thread::sleep(Duration::from_millis(500));
handle.request_shutdown();
thread.join().expect("Cron thread panicked");
assert_eq!(counter.load(Ordering::Relaxed), 1000);
assert_eq!(stats.tasks_executed.load(Ordering::Relaxed), 1000);
assert!(stats.transitions.load(Ordering::Relaxed) > 0);
}
#[test]
fn test_recurring_task() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, _stats, _ready) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
let counter = Arc::new(AtomicU64::new(0));
let c = Arc::clone(&counter);
handle.schedule_recurring(0, 50, "counter", move || {
c.fetch_add(1, Ordering::Relaxed);
true
});
std::thread::sleep(Duration::from_millis(275));
handle.request_shutdown();
thread.join().expect("Cron thread panicked");
let count = counter.load(Ordering::Relaxed);
assert!(
count >= 4 && count <= 7,
"Expected 4-7 executions, got {}",
count
);
}
#[test]
fn test_recurring_task_stops_on_false() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, _stats, _ready) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
let counter = Arc::new(AtomicU64::new(0));
let c = Arc::clone(&counter);
handle.schedule_recurring(0, 20, "limited-counter", move || {
let count = c.fetch_add(1, Ordering::Relaxed) + 1;
count < 3 });
std::thread::sleep(Duration::from_millis(200));
handle.request_shutdown();
thread.join().expect("Cron thread panicked");
let count = counter.load(Ordering::Relaxed);
assert_eq!(count, 3, "Expected exactly 3 executions, got {}", count);
}
#[test]
fn test_one_shot_task() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, _stats, _ready) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
let counter = Arc::new(AtomicU64::new(0));
let c = Arc::clone(&counter);
handle.schedule_once(0, "one-shot", move || {
c.fetch_add(1, Ordering::Relaxed);
true
});
std::thread::sleep(Duration::from_millis(100));
handle.request_shutdown();
thread.join().expect("Cron thread panicked");
let count = counter.load(Ordering::Relaxed);
assert_eq!(count, 1, "Expected exactly 1 execution, got {}", count);
}
#[test]
fn test_panic_safety() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, stats, ready_rx) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
ready_rx.recv().expect("Cron thread failed to start");
let counter = Arc::new(AtomicU64::new(0));
let c = Arc::clone(&counter);
handle.schedule_once(0, "panicking", || {
panic!("This task intentionally panics");
});
handle.schedule_once(50, "normal", move || {
c.fetch_add(1, Ordering::Relaxed);
true
});
let deadline = std::time::Instant::now() + Duration::from_millis(500);
while counter.load(Ordering::Relaxed) == 0 {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for normal task to execute");
}
std::thread::sleep(Duration::from_millis(10));
}
handle.request_shutdown();
thread
.join()
.expect("Cron thread should not panic from task panic");
let count = counter.load(Ordering::Relaxed);
assert_eq!(count, 1, "Normal task should have executed");
assert_eq!(stats.tasks_panicked.load(Ordering::Relaxed), 1);
}
#[test]
fn test_channel_disconnect_empty_queue() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, stats, _ready) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
drop(handle);
thread.join().expect("Cron thread panicked");
assert!(stats.transitions.load(Ordering::Relaxed) >= 1);
}
#[test]
fn test_channel_disconnect_with_tasks() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, _stats, _ready) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
let counter = Arc::new(AtomicU64::new(0));
let c = Arc::clone(&counter);
handle.schedule_once(100, "delayed", move || {
c.fetch_add(1, Ordering::Relaxed);
true
});
drop(handle);
let deadline = std::time::Instant::now() + Duration::from_millis(500);
while !thread.is_finished() {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for cron thread to terminate after draining tasks");
}
std::thread::sleep(Duration::from_millis(10));
}
thread.join().expect("Cron thread panicked");
let count = counter.load(Ordering::Relaxed);
assert_eq!(count, 1, "Delayed task should have executed");
}
#[test]
fn test_stats_snapshot() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, stats, _ready) = spawn_cron_with_interval(Arc::clone(&terminating), 10);
for _ in 0..5 {
handle.schedule_once(0, "success", || true);
}
for _ in 0..3 {
handle.schedule_once(0, "failure", || false);
}
std::thread::sleep(Duration::from_millis(100));
let snapshot = stats.snapshot();
handle.request_shutdown();
thread.join().expect("Cron thread panicked");
assert_eq!(snapshot.tasks_executed, 8);
assert_eq!(snapshot.tasks_failed, 3);
assert_eq!(snapshot.tasks_panicked, 0);
}
#[test]
fn test_task_metadata() {
let one_shot = TaskMetadata::OneShot;
assert_eq!(one_shot.name(), "one-shot");
assert_eq!(one_shot.recurrence_interval(), None);
let recurring = TaskMetadata::Recurring { interval_ms: 1000 };
assert_eq!(recurring.name(), "recurring");
assert_eq!(recurring.recurrence_interval(), Some(1000));
let named = TaskMetadata::Named {
name: "custom".to_string(),
recurring_interval_ms: None,
};
assert_eq!(named.name(), "custom");
assert_eq!(named.recurrence_interval(), None);
let named_recurring = TaskMetadata::Named {
name: "checkpoint".to_string(),
recurring_interval_ms: Some(5000),
};
assert_eq!(named_recurring.name(), "checkpoint");
assert_eq!(named_recurring.recurrence_interval(), Some(5000));
}
#[test]
fn test_task_ordering() {
let mut heap = BinaryHeap::new();
heap.push(ScheduledTask {
scheduled_time_ms: 300,
metadata: TaskMetadata::OneShot,
task: Box::new(|| true),
});
heap.push(ScheduledTask {
scheduled_time_ms: 100,
metadata: TaskMetadata::OneShot,
task: Box::new(|| true),
});
heap.push(ScheduledTask {
scheduled_time_ms: 200,
metadata: TaskMetadata::OneShot,
task: Box::new(|| true),
});
assert_eq!(heap.pop().expect("task 1").scheduled_time_ms, 100);
assert_eq!(heap.pop().expect("task 2").scheduled_time_ms, 200);
assert_eq!(heap.pop().expect("task 3").scheduled_time_ms, 300);
}
#[test]
fn test_execute_one_task_uses_earliest_due_task() {
let (_, rx) = unbounded::<ScheduledTask>();
let terminating = Arc::new(AtomicBool::new(false));
let stats = Arc::new(CronStats::default());
let mut sm = CronStateMachine::new(rx, terminating, stats, 100, None);
let executed = Arc::new(std::sync::Mutex::new(Vec::new()));
for scheduled_time_ms in [300, 100, 200] {
let executed = Arc::clone(&executed);
sm.queue.push(ScheduledTask {
scheduled_time_ms,
metadata: TaskMetadata::OneShot,
task: Box::new(move || {
executed.lock().expect("lock").push(scheduled_time_ms);
true
}),
});
}
sm.execute_one_task();
sm.execute_one_task();
sm.execute_one_task();
let observed = executed.lock().expect("lock").clone();
assert_eq!(observed, vec![100, 200, 300]);
}
#[test]
fn test_handle_cloning() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, _stats, _ready) = spawn_cron(Arc::clone(&terminating));
let counter = Arc::new(AtomicU64::new(0));
let handle1 = handle.clone();
let handle2 = handle.clone();
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
handle1.schedule_once(0, "from-handle1", move || {
c1.fetch_add(1, Ordering::Relaxed);
true
});
handle2.schedule_once(0, "from-handle2", move || {
c2.fetch_add(1, Ordering::Relaxed);
true
});
std::thread::sleep(Duration::from_millis(100));
handle.request_shutdown();
thread.join().expect("Cron thread panicked");
assert_eq!(counter.load(Ordering::Relaxed), 2);
}
#[test]
fn test_shutdown_flag() {
let terminating = Arc::new(AtomicBool::new(false));
let (handle, thread, _stats, _ready) = spawn_cron(Arc::clone(&terminating));
assert!(!handle.is_shutting_down());
handle.request_shutdown();
assert!(handle.is_shutting_down());
thread.join().expect("Cron thread panicked");
}