use chrono::{TimeZone, Utc};
use es_entity::clock::{Clock, ClockHandle};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
#[tokio::test]
async fn test_realtime_now() {
let clock = ClockHandle::realtime();
let before = chrono::Utc::now();
let clock_now = clock.now();
let after = chrono::Utc::now();
assert!(clock_now >= before);
assert!(clock_now <= after);
}
#[tokio::test]
async fn test_realtime_sleep() {
let clock = ClockHandle::realtime();
let start = std::time::Instant::now();
clock.sleep(Duration::from_millis(50)).await;
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(40));
assert!(elapsed < Duration::from_millis(150));
}
#[tokio::test]
async fn test_manual_at_starts_at_specified_time() {
let start = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
let (clock, ctrl) = ClockHandle::manual_at(start);
assert_eq!(clock.now(), start);
ctrl.advance(Duration::from_secs(3600)).await;
assert_eq!(clock.now(), start + chrono::Duration::hours(1));
}
#[tokio::test]
async fn test_manual_time_stands_still() {
let (clock, _ctrl) = ClockHandle::manual();
let t0 = clock.now();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(clock.now(), t0);
}
#[tokio::test]
async fn test_manual_advance() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
ctrl.advance(Duration::from_secs(3600)).await;
assert_eq!(clock.now(), t0 + chrono::Duration::hours(1));
}
#[tokio::test]
async fn test_manual_sleep_wakes_on_advance() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let woke = Arc::new(AtomicUsize::new(0));
let woke_clone = woke.clone();
let clock_clone = clock.clone();
let handle = tokio::spawn(async move {
clock_clone.sleep(Duration::from_secs(60)).await;
woke_clone.fetch_add(1, Ordering::SeqCst);
clock_clone.now()
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 1);
assert_eq!(woke.load(Ordering::SeqCst), 0);
ctrl.advance(Duration::from_secs(120)).await;
let wake_time = handle.await.unwrap();
assert_eq!(woke.load(Ordering::SeqCst), 1);
assert_eq!(wake_time, t0 + chrono::Duration::seconds(60));
}
#[tokio::test]
async fn test_multiple_sleeps_wake_in_order() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let wake_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
let wo = wake_order.clone();
let c = clock.clone();
let handle_a = tokio::spawn(async move {
c.sleep(Duration::from_secs(30)).await;
wo.lock().push(('A', c.now()));
});
let wo = wake_order.clone();
let c = clock.clone();
let handle_b = tokio::spawn(async move {
c.sleep(Duration::from_secs(10)).await;
wo.lock().push(('B', c.now()));
});
let wo = wake_order.clone();
let c = clock.clone();
let handle_c = tokio::spawn(async move {
c.sleep(Duration::from_secs(20)).await;
wo.lock().push(('C', c.now()));
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 3);
ctrl.advance(Duration::from_secs(60)).await;
let _ = tokio::join!(handle_a, handle_b, handle_c);
let order = wake_order.lock();
assert_eq!(order.len(), 3);
assert_eq!(order[0].0, 'B'); assert_eq!(order[1].0, 'C'); assert_eq!(order[2].0, 'A');
assert_eq!(order[0].1, t0 + chrono::Duration::seconds(10));
assert_eq!(order[1].1, t0 + chrono::Duration::seconds(20));
assert_eq!(order[2].1, t0 + chrono::Duration::seconds(30));
}
#[tokio::test]
async fn test_advance_to_next_wake() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let c = clock.clone();
let handle = tokio::spawn(async move {
c.sleep(Duration::from_secs(100)).await;
});
tokio::task::yield_now().await;
let wake_time = ctrl.advance_to_next_wake().await;
assert_eq!(wake_time, Some(t0 + chrono::Duration::seconds(100)));
assert_eq!(clock.now(), t0 + chrono::Duration::seconds(100));
let next = ctrl.advance_to_next_wake().await;
assert_eq!(next, None);
let _ = handle.await;
}
#[tokio::test]
async fn test_timeout_success() {
let (clock, ctrl) = ClockHandle::manual();
let c = clock.clone();
let result =
tokio::spawn(async move { c.timeout(Duration::from_secs(10), async { 42 }).await });
tokio::task::yield_now().await;
ctrl.advance(Duration::from_secs(1)).await;
let value = result.await.unwrap();
assert_eq!(value, Ok(42));
}
#[tokio::test]
async fn test_timeout_elapsed() {
let (clock, ctrl) = ClockHandle::manual();
let c = clock.clone();
let result_handle = tokio::spawn(async move {
c.timeout(Duration::from_secs(5), async {
std::future::pending::<()>().await
})
.await
});
tokio::task::yield_now().await;
ctrl.advance(Duration::from_secs(10)).await;
let result = result_handle.await.unwrap();
assert!(result.is_err());
}
#[tokio::test]
async fn test_cloned_handles_share_time() {
let (clock1, ctrl) = ClockHandle::manual();
let clock2 = clock1.clone();
let clock3 = clock1.clone();
let t0 = clock1.now();
ctrl.advance(Duration::from_secs(100)).await;
assert_eq!(clock1.now(), t0 + chrono::Duration::seconds(100));
assert_eq!(clock2.now(), t0 + chrono::Duration::seconds(100));
assert_eq!(clock3.now(), t0 + chrono::Duration::seconds(100));
}
#[tokio::test]
async fn test_cancelled_sleep_cleanup() {
let (clock, ctrl) = ClockHandle::manual();
let c = clock.clone();
let handle = tokio::spawn(async move {
c.sleep(Duration::from_secs(100)).await;
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 1);
handle.abort();
let _ = handle.await;
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 0);
}
#[tokio::test]
async fn test_concurrent_system_coordination() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let job_runs = Arc::new(AtomicUsize::new(0));
let jr = job_runs.clone();
let c = clock.clone();
let _job_system = tokio::spawn(async move {
loop {
c.sleep(Duration::from_secs(3600)).await;
jr.fetch_add(1, Ordering::SeqCst);
}
});
let cache_refreshes = Arc::new(AtomicUsize::new(0));
let cr = cache_refreshes.clone();
let c = clock.clone();
let _cache_system = tokio::spawn(async move {
loop {
c.sleep(Duration::from_secs(1800)).await;
cr.fetch_add(1, Ordering::SeqCst);
}
});
tokio::task::yield_now().await;
ctrl.advance(Duration::from_secs(7200)).await;
assert_eq!(job_runs.load(Ordering::SeqCst), 2);
assert_eq!(cache_refreshes.load(Ordering::SeqCst), 4);
assert_eq!(clock.now(), t0 + chrono::Duration::hours(2));
}
#[tokio::test]
async fn test_same_time_wakes() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let wake_count = Arc::new(AtomicUsize::new(0));
for _ in 0..5 {
let wc = wake_count.clone();
let c = clock.clone();
tokio::spawn(async move {
c.sleep(Duration::from_secs(60)).await;
wc.fetch_add(1, Ordering::SeqCst);
});
}
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 5);
ctrl.advance(Duration::from_secs(60)).await;
assert_eq!(wake_count.load(Ordering::SeqCst), 5);
assert_eq!(clock.now(), t0 + chrono::Duration::seconds(60));
}
#[tokio::test]
async fn test_debug_output() {
let clock = ClockHandle::realtime();
let debug = format!("{:?}", clock);
assert!(debug.contains("Realtime"));
let (clock, ctrl) = ClockHandle::manual();
let debug = format!("{:?}", clock);
assert!(debug.contains("Manual"));
let debug = format!("{:?}", ctrl);
assert!(debug.contains("ClockController"));
}
#[tokio::test]
async fn test_controller_now() {
let (clock, ctrl) = ClockHandle::manual();
assert_eq!(clock.now(), ctrl.now());
ctrl.advance(Duration::from_secs(100)).await;
assert_eq!(clock.now(), ctrl.now());
}
#[tokio::test]
async fn test_controller_clone() {
let (clock, ctrl) = ClockHandle::manual();
let ctrl2 = ctrl.clone();
let t0 = clock.now();
ctrl.advance(Duration::from_secs(50)).await;
assert_eq!(ctrl.now(), t0 + chrono::Duration::seconds(50));
assert_eq!(ctrl2.now(), t0 + chrono::Duration::seconds(50));
assert_eq!(ctrl.pending_wake_count(), ctrl2.pending_wake_count());
}
#[tokio::test]
async fn test_global_clock_api() {
let ctrl = Clock::install_manual();
let t0 = Clock::now();
assert!(Clock::is_manual());
let handle = Clock::handle();
assert_eq!(handle.now(), t0);
ctrl.advance(std::time::Duration::from_secs(100)).await;
assert_eq!(Clock::now(), t0 + chrono::Duration::seconds(100));
}
#[tokio::test]
async fn test_sleep_coalesce_fires_once_at_end_of_advance() {
let (clock, ctrl) = ClockHandle::manual();
let wake_count = Arc::new(AtomicUsize::new(0));
let wc = wake_count.clone();
let c = clock.clone();
let _housekeeping = tokio::spawn(async move {
loop {
c.sleep_coalesce(Duration::from_secs(75)).await;
wc.fetch_add(1, Ordering::SeqCst);
}
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 1);
ctrl.advance(Duration::from_secs(86400)).await;
assert_eq!(wake_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_sleep_coalesce_regular_wakes_still_fire_at_intermediate_points() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let regular_times = Arc::new(parking_lot::Mutex::new(Vec::new()));
let coalesce_times = Arc::new(parking_lot::Mutex::new(Vec::new()));
let rt = regular_times.clone();
let c = clock.clone();
tokio::spawn(async move {
c.sleep(Duration::from_secs(30)).await;
rt.lock().push(c.now());
});
let rt = regular_times.clone();
let c = clock.clone();
tokio::spawn(async move {
c.sleep(Duration::from_secs(60)).await;
rt.lock().push(c.now());
});
let ct = coalesce_times.clone();
let c = clock.clone();
tokio::spawn(async move {
c.sleep_coalesce(Duration::from_secs(10)).await;
ct.lock().push(c.now());
});
tokio::task::yield_now().await;
ctrl.advance(Duration::from_secs(120)).await;
let regular = regular_times.lock();
assert_eq!(regular.len(), 2);
assert_eq!(regular[0], t0 + chrono::Duration::seconds(30));
assert_eq!(regular[1], t0 + chrono::Duration::seconds(60));
let coalesce = coalesce_times.lock();
assert_eq!(coalesce.len(), 1);
assert_eq!(coalesce[0], t0 + chrono::Duration::seconds(120));
}
#[tokio::test]
async fn test_advance_to_next_wake_considers_coalesce_wakes() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let c = clock.clone();
let handle = tokio::spawn(async move {
c.sleep_coalesce(Duration::from_secs(50)).await;
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 1);
let wake_time = ctrl.advance_to_next_wake().await;
assert_eq!(wake_time, Some(t0 + chrono::Duration::seconds(50)));
let _ = handle.await;
}
#[tokio::test]
async fn test_advance_to_next_wake_picks_earliest_across_both_lists() {
let (clock, ctrl) = ClockHandle::manual();
let t0 = clock.now();
let c = clock.clone();
tokio::spawn(async move {
c.sleep(Duration::from_secs(100)).await;
});
let c = clock.clone();
tokio::spawn(async move {
c.sleep_coalesce(Duration::from_secs(50)).await;
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 2);
let wake_time = ctrl.advance_to_next_wake().await;
assert_eq!(wake_time, Some(t0 + chrono::Duration::seconds(50)));
let wake_time = ctrl.advance_to_next_wake().await;
assert_eq!(wake_time, Some(t0 + chrono::Duration::seconds(100)));
}
#[tokio::test]
async fn test_cancelled_coalesce_sleep_cleanup() {
let (clock, ctrl) = ClockHandle::manual();
let c = clock.clone();
let handle = tokio::spawn(async move {
c.sleep_coalesce(Duration::from_secs(100)).await;
});
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 1);
handle.abort();
let _ = handle.await;
tokio::task::yield_now().await;
assert_eq!(ctrl.pending_wake_count(), 0);
}