#![warn(clippy::clone_on_ref_ptr)]
use std::{
any::Any,
cell::RefCell,
collections::{BTreeMap, BinaryHeap},
fmt::Debug,
ops::Deref,
time::Duration,
};
use ahash::AHashMap;
use chrono::{DateTime, Utc};
use nautilus_core::{
AtomicTime, UUID4, UnixNanos,
correctness::{check_positive_u64, check_predicate_true, check_valid_string_utf8},
datetime::NANOSECONDS_IN_SECOND,
string::formatting::Separable,
};
use ustr::Ustr;
use crate::timer::{
ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, Timer,
create_valid_interval,
};
pub trait Clock: Debug + Any {
fn utc_now(&self) -> DateTime<Utc> {
DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
}
fn timestamp_ns(&self) -> UnixNanos;
fn timestamp_us(&self) -> u64;
fn timestamp_ms(&self) -> u64;
fn timestamp(&self) -> f64;
fn timer_names(&self) -> Vec<&str>;
fn timer_count(&self) -> usize;
fn timer_exists(&self, name: &Ustr) -> bool;
fn register_default_handler(&mut self, callback: TimeEventCallback);
fn cancel_default_handler(&mut self);
fn cancel_callbacks(&mut self);
fn get_handler(&self, event: TimeEvent) -> TimeEventHandler;
fn set_time_alert(
&mut self,
name: &str,
alert_time: DateTime<Utc>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
) -> anyhow::Result<()> {
self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
}
fn set_time_alert_ns(
&mut self,
name: &str,
alert_time_ns: UnixNanos,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
) -> anyhow::Result<()>;
#[expect(clippy::too_many_arguments)]
fn set_timer(
&mut self,
name: &str,
interval: Duration,
start_time: Option<DateTime<Utc>>,
stop_time: Option<DateTime<Utc>>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
fire_immediately: Option<bool>,
) -> anyhow::Result<()> {
self.set_timer_ns(
name,
interval.as_nanos() as u64,
start_time.map(UnixNanos::from),
stop_time.map(UnixNanos::from),
callback,
allow_past,
fire_immediately,
)
}
#[expect(clippy::too_many_arguments)]
fn set_timer_ns(
&mut self,
name: &str,
interval_ns: u64,
start_time_ns: Option<UnixNanos>,
stop_time_ns: Option<UnixNanos>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
fire_immediately: Option<bool>,
) -> anyhow::Result<()>;
fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
fn cancel_timer(&mut self, name: &str);
fn cancel_timers(&mut self);
fn reset(&mut self);
}
impl dyn Clock {
pub fn as_any(&self) -> &dyn std::any::Any {
self
}
pub fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}
#[derive(Debug)]
pub struct ClockApi<'a> {
backing: ClockApiBacking<'a>,
}
enum ClockApiBacking<'a> {
Native(&'a RefCell<dyn Clock>),
Handlers(ClockApiHandlers<'a>),
}
struct ClockApiHandlers<'a> {
timestamp_ns: Box<dyn Fn() -> UnixNanos + 'a>,
set_time_alert_ns: Box<SetTimeAlertNsHandler<'a>>,
set_timer_ns: Box<SetTimerNsHandler<'a>>,
timer_names: Box<dyn Fn() -> Vec<String> + 'a>,
timer_count: Box<dyn Fn() -> usize + 'a>,
timer_exists: Box<dyn Fn(&str) -> bool + 'a>,
next_time_ns: Box<NextTimeNsHandler<'a>>,
cancel_timer: Box<dyn Fn(&str) + 'a>,
cancel_timers: Box<dyn Fn() + 'a>,
}
impl Debug for ClockApiBacking<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Native(_) => f.write_str("Native"),
Self::Handlers(_) => f.write_str("Handlers"),
}
}
}
type SetTimeAlertNsHandler<'a> =
dyn Fn(&str, UnixNanos, Option<TimeEventCallback>, Option<bool>) -> anyhow::Result<()> + 'a;
type NextTimeNsHandler<'a> = dyn Fn(&str) -> Option<UnixNanos> + 'a;
type SetTimerNsHandler<'a> = dyn Fn(
&str,
u64,
Option<UnixNanos>,
Option<UnixNanos>,
Option<TimeEventCallback>,
Option<bool>,
Option<bool>,
) -> anyhow::Result<()>
+ 'a;
impl<'a> ClockApi<'a> {
pub(crate) fn new(clock: &'a RefCell<dyn Clock>) -> Self {
Self {
backing: ClockApiBacking::Native(clock),
}
}
#[doc(hidden)]
#[must_use]
#[expect(
clippy::too_many_arguments,
reason = "clock API backing mirrors the full ClockApi surface"
)]
pub fn from_handlers<
TimestampNs,
SetTimeAlertNs,
SetTimerNs,
TimerNames,
TimerCount,
TimerExists,
NextTimeNs,
CancelTimer,
CancelTimers,
>(
timestamp_ns: TimestampNs,
set_time_alert_ns: SetTimeAlertNs,
set_timer_ns: SetTimerNs,
timer_names: TimerNames,
timer_count: TimerCount,
timer_exists: TimerExists,
next_time_ns: NextTimeNs,
cancel_timer: CancelTimer,
cancel_timers: CancelTimers,
) -> Self
where
TimestampNs: Fn() -> UnixNanos + 'a,
SetTimeAlertNs:
Fn(&str, UnixNanos, Option<TimeEventCallback>, Option<bool>) -> anyhow::Result<()> + 'a,
SetTimerNs: Fn(
&str,
u64,
Option<UnixNanos>,
Option<UnixNanos>,
Option<TimeEventCallback>,
Option<bool>,
Option<bool>,
) -> anyhow::Result<()>
+ 'a,
TimerNames: Fn() -> Vec<String> + 'a,
TimerCount: Fn() -> usize + 'a,
TimerExists: Fn(&str) -> bool + 'a,
NextTimeNs: Fn(&str) -> Option<UnixNanos> + 'a,
CancelTimer: Fn(&str) + 'a,
CancelTimers: Fn() + 'a,
{
Self {
backing: ClockApiBacking::Handlers(ClockApiHandlers {
timestamp_ns: Box::new(timestamp_ns),
set_time_alert_ns: Box::new(set_time_alert_ns),
set_timer_ns: Box::new(set_timer_ns),
timer_names: Box::new(timer_names),
timer_count: Box::new(timer_count),
timer_exists: Box::new(timer_exists),
next_time_ns: Box::new(next_time_ns),
cancel_timer: Box::new(cancel_timer),
cancel_timers: Box::new(cancel_timers),
}),
}
}
#[must_use]
pub fn timestamp_ns(&self) -> UnixNanos {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().timestamp_ns(),
ClockApiBacking::Handlers(handlers) => (handlers.timestamp_ns)(),
}
}
#[must_use]
pub fn timestamp_us(&self) -> u64 {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().timestamp_us(),
ClockApiBacking::Handlers(handlers) => (handlers.timestamp_ns)().as_micros(),
}
}
#[must_use]
pub fn timestamp_ms(&self) -> u64 {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().timestamp_ms(),
ClockApiBacking::Handlers(handlers) => (handlers.timestamp_ns)().as_millis(),
}
}
#[must_use]
pub fn timestamp(&self) -> f64 {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().timestamp(),
ClockApiBacking::Handlers(handlers) => {
(handlers.timestamp_ns)().as_f64() / (NANOSECONDS_IN_SECOND as f64)
}
}
}
#[must_use]
pub fn utc_now(&self) -> DateTime<Utc> {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().utc_now(),
ClockApiBacking::Handlers(handlers) => {
DateTime::from_timestamp_nanos((handlers.timestamp_ns)().as_i64())
}
}
}
pub fn set_time_alert(
&self,
name: &str,
alert_time: DateTime<Utc>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
) -> anyhow::Result<()> {
match &self.backing {
ClockApiBacking::Native(clock) => clock
.borrow_mut()
.set_time_alert(name, alert_time, callback, allow_past),
ClockApiBacking::Handlers(handlers) => {
(handlers.set_time_alert_ns)(name, alert_time.into(), callback, allow_past)
}
}
}
pub fn set_time_alert_ns(
&self,
name: &str,
alert_time_ns: UnixNanos,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
) -> anyhow::Result<()> {
match &self.backing {
ClockApiBacking::Native(clock) => {
clock
.borrow_mut()
.set_time_alert_ns(name, alert_time_ns, callback, allow_past)
}
ClockApiBacking::Handlers(handlers) => {
(handlers.set_time_alert_ns)(name, alert_time_ns, callback, allow_past)
}
}
}
#[expect(clippy::too_many_arguments, reason = "timer scheduling mirrors Clock")]
pub fn set_timer(
&self,
name: &str,
interval: Duration,
start_time: Option<DateTime<Utc>>,
stop_time: Option<DateTime<Utc>>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
fire_immediately: Option<bool>,
) -> anyhow::Result<()> {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow_mut().set_timer(
name,
interval,
start_time,
stop_time,
callback,
allow_past,
fire_immediately,
),
ClockApiBacking::Handlers(handlers) => (handlers.set_timer_ns)(
name,
interval.as_nanos() as u64,
start_time.map(UnixNanos::from),
stop_time.map(UnixNanos::from),
callback,
allow_past,
fire_immediately,
),
}
}
#[expect(clippy::too_many_arguments, reason = "timer scheduling mirrors Clock")]
pub fn set_timer_ns(
&self,
name: &str,
interval_ns: u64,
start_time_ns: Option<UnixNanos>,
stop_time_ns: Option<UnixNanos>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
fire_immediately: Option<bool>,
) -> anyhow::Result<()> {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow_mut().set_timer_ns(
name,
interval_ns,
start_time_ns,
stop_time_ns,
callback,
allow_past,
fire_immediately,
),
ClockApiBacking::Handlers(handlers) => (handlers.set_timer_ns)(
name,
interval_ns,
start_time_ns,
stop_time_ns,
callback,
allow_past,
fire_immediately,
),
}
}
#[must_use]
pub fn timer_names(&self) -> Vec<String> {
match &self.backing {
ClockApiBacking::Native(clock) => clock
.borrow()
.timer_names()
.into_iter()
.map(str::to_string)
.collect(),
ClockApiBacking::Handlers(handlers) => (handlers.timer_names)(),
}
}
#[must_use]
pub fn timer_count(&self) -> usize {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().timer_count(),
ClockApiBacking::Handlers(handlers) => (handlers.timer_count)(),
}
}
#[must_use]
pub fn timer_exists(&self, name: &str) -> bool {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().timer_exists(&Ustr::from(name)),
ClockApiBacking::Handlers(handlers) => (handlers.timer_exists)(name),
}
}
#[must_use]
pub fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow().next_time_ns(name),
ClockApiBacking::Handlers(handlers) => (handlers.next_time_ns)(name),
}
}
pub fn cancel_timer(&self, name: &str) {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow_mut().cancel_timer(name),
ClockApiBacking::Handlers(handlers) => (handlers.cancel_timer)(name),
}
}
pub fn cancel_timers(&self) {
match &self.backing {
ClockApiBacking::Native(clock) => clock.borrow_mut().cancel_timers(),
ClockApiBacking::Handlers(handlers) => (handlers.cancel_timers)(),
}
}
}
#[derive(Debug, Default)]
pub struct CallbackRegistry {
default_callback: Option<TimeEventCallback>,
callbacks: AHashMap<Ustr, TimeEventCallback>,
}
impl CallbackRegistry {
#[must_use]
pub fn new() -> Self {
Self {
default_callback: None,
callbacks: AHashMap::new(),
}
}
pub fn register_default_handler(&mut self, callback: TimeEventCallback) {
self.default_callback = Some(callback);
}
pub fn cancel_default_handler(&mut self) {
self.default_callback = None;
}
pub fn register_callback(&mut self, name: Ustr, callback: TimeEventCallback) {
self.callbacks.insert(name, callback);
}
#[must_use]
pub fn has_any_callback(&self, name: &Ustr) -> bool {
self.callbacks.contains_key(name) || self.default_callback.is_some()
}
#[must_use]
pub fn get_callback(&self, name: &Ustr) -> Option<TimeEventCallback> {
self.callbacks
.get(name)
.cloned()
.or_else(|| self.default_callback.clone())
}
#[must_use]
pub fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
let callback = self
.get_callback(&event.name)
.unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
TimeEventHandler::new(event, callback)
}
pub fn clear(&mut self) {
self.callbacks.clear();
}
}
pub fn validate_and_prepare_time_alert(
name: &str,
mut alert_time_ns: UnixNanos,
allow_past: Option<bool>,
ts_now: UnixNanos,
) -> anyhow::Result<(Ustr, UnixNanos)> {
check_valid_string_utf8(name, stringify!(name))?;
let name = Ustr::from(name);
let allow_past = allow_past.unwrap_or(true);
if alert_time_ns < ts_now {
if allow_past {
alert_time_ns = ts_now;
log::warn!(
"Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
alert_time_ns.to_rfc3339(),
);
} else {
anyhow::bail!(
"Timer '{name}' alert time {} was in the past (current time is {ts_now})",
alert_time_ns.to_rfc3339(),
);
}
}
Ok((name, alert_time_ns))
}
pub fn validate_and_prepare_timer(
name: &str,
interval_ns: u64,
start_time_ns: Option<UnixNanos>,
stop_time_ns: Option<UnixNanos>,
allow_past: Option<bool>,
fire_immediately: Option<bool>,
ts_now: UnixNanos,
) -> anyhow::Result<(Ustr, UnixNanos, Option<UnixNanos>, bool, bool)> {
check_valid_string_utf8(name, stringify!(name))?;
check_positive_u64(interval_ns, stringify!(interval_ns))?;
let name = Ustr::from(name);
let allow_past = allow_past.unwrap_or(true);
let fire_immediately = fire_immediately.unwrap_or(false);
let mut start_time_ns = start_time_ns.unwrap_or_default();
if start_time_ns == 0 {
start_time_ns = ts_now;
} else if !allow_past {
let next_event_time = if fire_immediately {
start_time_ns
} else {
start_time_ns + interval_ns
};
if next_event_time < ts_now {
anyhow::bail!(
"Timer '{name}' next event time {} would be in the past (current time is {ts_now})",
next_event_time.to_rfc3339(),
);
}
}
if let Some(stop_time) = stop_time_ns {
if stop_time <= start_time_ns {
anyhow::bail!(
"Timer '{name}' stop time {} must be after start time {}",
stop_time.to_rfc3339(),
start_time_ns.to_rfc3339(),
);
}
if !allow_past && stop_time <= ts_now {
anyhow::bail!(
"Timer '{name}' stop time {} is in the past (current time is {ts_now})",
stop_time.to_rfc3339(),
);
}
}
Ok((
name,
start_time_ns,
stop_time_ns,
allow_past,
fire_immediately,
))
}
#[derive(Debug)]
pub struct TestClock {
time: AtomicTime,
timers: BTreeMap<Ustr, TestTimer>,
timer_queue: BinaryHeap<ScheduledTimeEvent>,
callbacks: CallbackRegistry,
}
impl TestClock {
#[must_use]
pub fn new() -> Self {
Self {
time: AtomicTime::new(false, UnixNanos::default()),
timers: BTreeMap::new(),
timer_queue: BinaryHeap::new(),
callbacks: CallbackRegistry::new(),
}
}
#[must_use]
pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
&self.timers
}
pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
let from_time_ns = self.time.get_time_ns();
assert!(
to_time_ns >= from_time_ns,
"Invariant: time must be non-decreasing, `to_time_ns` {to_time_ns} < `from_time_ns` {from_time_ns}"
);
if set_time {
self.time.set_time(to_time_ns);
}
let mut events: Vec<TimeEvent> = Vec::new();
while self
.timer_queue
.peek()
.is_some_and(|entry| entry.0.ts_event <= to_time_ns)
{
let entry = self
.timer_queue
.pop()
.expect("timer queue peeked Some but pop returned None");
let Some((event, next_event)) = self.advance_timer_from_entry(&entry.0) else {
continue;
};
events.push(event);
if let Some(next_event) = next_event {
self.timer_queue.push(next_event);
}
}
self.compact_timer_queue_if_needed();
if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
log::warn!(
"Allocated {} time events during clock advancement from {} to {}, \
consider stopping the timer between large time ranges with no data points",
events.len().separate_with_commas(),
from_time_ns,
to_time_ns
);
}
events.sort_by(|a, b| {
a.ts_event
.cmp(&b.ts_event)
.then_with(|| a.name.cmp(&b.name))
});
events
}
#[must_use]
pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandler> {
events
.into_iter()
.map(|event| self.callbacks.get_handler(event))
.collect()
}
fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
replace_existing_timer(&mut self.timers, name);
self.compact_timer_queue_if_needed();
}
fn insert_timer(&mut self, timer: TestTimer) {
self.timer_queue.push(Self::scheduled_event(&timer));
self.timers.insert(timer.name, timer);
self.compact_timer_queue_if_needed();
}
fn advance_timer_from_entry(
&mut self,
entry: &TimeEvent,
) -> Option<(TimeEvent, Option<ScheduledTimeEvent>)> {
let timer = self.timers.get_mut(&entry.name)?;
if timer.next_time_ns() != entry.ts_event {
return None;
}
let Some((event, _)) = timer.next() else {
self.timers.remove(&entry.name);
return None;
};
let next_entry = if timer.is_expired() {
self.timers.remove(&entry.name);
None
} else {
Some(Self::scheduled_event(timer))
};
Some((event, next_entry))
}
fn compact_timer_queue_if_needed(&mut self) {
if self.timer_queue.len() > self.timers.len().saturating_mul(2) {
self.compact_timer_queue();
}
}
fn compact_timer_queue(&mut self) {
self.timer_queue = self.timers.values().map(Self::scheduled_event).collect();
}
fn scheduled_event(timer: &TestTimer) -> ScheduledTimeEvent {
ScheduledTimeEvent::new(TimeEvent::new(
timer.name,
UUID4::new(),
timer.next_time_ns(),
timer.next_time_ns(),
))
}
}
impl Default for TestClock {
fn default() -> Self {
Self::new()
}
}
impl Deref for TestClock {
type Target = AtomicTime;
fn deref(&self) -> &Self::Target {
&self.time
}
}
impl Clock for TestClock {
fn timestamp_ns(&self) -> UnixNanos {
self.time.get_time_ns()
}
fn timestamp_us(&self) -> u64 {
self.time.get_time_us()
}
fn timestamp_ms(&self) -> u64 {
self.time.get_time_ms()
}
fn timestamp(&self) -> f64 {
self.time.get_time()
}
fn timer_names(&self) -> Vec<&str> {
self.timers
.iter()
.filter(|(_, timer)| !timer.is_expired())
.map(|(k, _)| k.as_str())
.collect()
}
fn timer_count(&self) -> usize {
self.timers
.iter()
.filter(|(_, timer)| !timer.is_expired())
.count()
}
fn timer_exists(&self, name: &Ustr) -> bool {
self.timers.contains_key(name)
}
fn register_default_handler(&mut self, callback: TimeEventCallback) {
self.callbacks.register_default_handler(callback);
}
fn cancel_default_handler(&mut self) {
self.callbacks.cancel_default_handler();
}
fn cancel_callbacks(&mut self) {
self.callbacks.clear();
}
fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
self.callbacks.get_handler(event)
}
fn set_time_alert_ns(
&mut self,
name: &str,
alert_time_ns: UnixNanos,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
) -> anyhow::Result<()> {
let ts_now = self.get_time_ns();
let (name, alert_time_ns) =
validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
self.replace_existing_timer_if_needed(&name);
check_predicate_true(
callback.is_some() | self.callbacks.has_any_callback(&name),
"No callbacks provided",
)?;
if let Some(callback) = callback {
self.callbacks.register_callback(name, callback);
}
let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
let fire_immediately = alert_time_ns == ts_now;
let timer = TestTimer::new(
name,
interval_ns,
ts_now,
Some(alert_time_ns),
fire_immediately,
);
self.insert_timer(timer);
Ok(())
}
fn set_timer_ns(
&mut self,
name: &str,
interval_ns: u64,
start_time_ns: Option<UnixNanos>,
stop_time_ns: Option<UnixNanos>,
callback: Option<TimeEventCallback>,
allow_past: Option<bool>,
fire_immediately: Option<bool>,
) -> anyhow::Result<()> {
let ts_now = self.get_time_ns();
let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
validate_and_prepare_timer(
name,
interval_ns,
start_time_ns,
stop_time_ns,
allow_past,
fire_immediately,
ts_now,
)?;
check_predicate_true(
callback.is_some() | self.callbacks.has_any_callback(&name),
"No callbacks provided",
)?;
self.replace_existing_timer_if_needed(&name);
if let Some(callback) = callback {
self.callbacks.register_callback(name, callback);
}
let interval_ns = create_valid_interval(interval_ns);
let timer = TestTimer::new(
name,
interval_ns,
start_time_ns,
stop_time_ns,
fire_immediately,
);
self.insert_timer(timer);
Ok(())
}
fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
self.timers
.get(&Ustr::from(name))
.map(TestTimer::next_time_ns)
}
fn cancel_timer(&mut self, name: &str) {
let timer = self.timers.remove(&Ustr::from(name));
if let Some(mut timer) = timer {
timer.cancel();
}
self.compact_timer_queue_if_needed();
}
fn cancel_timers(&mut self) {
for timer in &mut self.timers.values_mut() {
timer.cancel();
}
self.timers.clear();
self.timer_queue.clear();
}
fn reset(&mut self) {
self.time = AtomicTime::new(false, UnixNanos::default());
self.timers = BTreeMap::new();
self.timer_queue = BinaryHeap::new();
self.callbacks.clear();
}
}
pub(crate) fn replace_existing_timer<T: Timer>(timers: &mut BTreeMap<Ustr, T>, name: &Ustr) {
let is_expired = timers.get(name).map(T::is_expired);
match is_expired {
Some(true) => {
timers.remove(name);
}
Some(false) => {
if let Some(mut timer) = timers.remove(name) {
timer.cancel();
}
log::warn!("Timer '{name}' replaced");
}
None => {}
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use nautilus_core::{MUTEX_POISONED, UnixNanos};
use rstest::{fixture, rstest};
use ustr::Ustr;
use super::*;
use crate::timer::{TimeEvent, TimeEventCallback};
#[derive(Debug, Default)]
struct TestCallback {
called: Arc<Mutex<bool>>,
}
impl TestCallback {
fn new(called: Arc<Mutex<bool>>) -> Self {
Self { called }
}
}
impl From<TestCallback> for TimeEventCallback {
fn from(callback: TestCallback) -> Self {
Self::from(move |_event: TimeEvent| {
if let Ok(mut called) = callback.called.lock() {
*called = true;
}
})
}
}
#[fixture]
pub fn test_clock() -> TestClock {
let mut clock = TestClock::new();
clock.register_default_handler(TestCallback::default().into());
clock
}
#[rstest]
fn test_time_monotonicity(mut test_clock: TestClock) {
let initial_time = test_clock.timestamp_ns();
test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
assert!(test_clock.timestamp_ns() > initial_time);
}
#[rstest]
fn test_timer_registration(mut test_clock: TestClock) {
test_clock
.set_time_alert_ns(
"test_timer",
(*test_clock.timestamp_ns() + 1000).into(),
None,
None,
)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
}
#[rstest]
fn test_timer_expiration(mut test_clock: TestClock) {
let alert_time = (*test_clock.timestamp_ns() + 1000).into();
test_clock
.set_time_alert_ns("test_timer", alert_time, None, None)
.unwrap();
let events = test_clock.advance_time(alert_time, true);
assert_eq!(events.len(), 1);
assert_eq!(events[0].name.as_str(), "test_timer");
}
#[rstest]
fn test_timer_cancellation(mut test_clock: TestClock) {
test_clock
.set_time_alert_ns(
"test_timer",
(*test_clock.timestamp_ns() + 1000).into(),
None,
None,
)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
test_clock.cancel_timer("test_timer");
assert_eq!(test_clock.timer_count(), 0);
}
#[rstest]
fn test_time_advancement(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
test_clock
.set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
.unwrap();
let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
assert_eq!(events.len(), 2);
assert_eq!(*events[0].ts_event, *start_time + 1000);
assert_eq!(*events[1].ts_event, *start_time + 2000);
}
#[rstest]
fn test_default_and_custom_callbacks() {
let mut clock = TestClock::new();
let default_called = Arc::new(Mutex::new(false));
let custom_called = Arc::new(Mutex::new(false));
let default_callback = TestCallback::new(Arc::clone(&default_called));
let custom_callback = TestCallback::new(Arc::clone(&custom_called));
clock.register_default_handler(TimeEventCallback::from(default_callback));
clock
.set_time_alert_ns(
"default_timer",
(*clock.timestamp_ns() + 1000).into(),
None,
None,
)
.unwrap();
clock
.set_time_alert_ns(
"custom_timer",
(*clock.timestamp_ns() + 1000).into(),
Some(TimeEventCallback::from(custom_callback)),
None,
)
.unwrap();
let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
let handlers = clock.match_handlers(events);
for handler in handlers {
handler.callback.call(handler.event);
}
assert!(*default_called.lock().expect(MUTEX_POISONED));
assert!(*custom_called.lock().expect(MUTEX_POISONED));
}
#[rstest]
fn test_timer_with_rust_local_callback() {
use std::{cell::RefCell, rc::Rc};
let mut clock = TestClock::new();
let call_count = Rc::new(RefCell::new(0_u32));
let call_count_clone = Rc::clone(&call_count);
let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event: TimeEvent| {
*call_count_clone.borrow_mut() += 1;
});
clock
.set_time_alert_ns(
"local_timer",
(*clock.timestamp_ns() + 1000).into(),
Some(TimeEventCallback::from(callback)),
None,
)
.unwrap();
let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
let handlers = clock.match_handlers(events);
for handler in handlers {
handler.callback.call(handler.event);
}
assert_eq!(*call_count.borrow(), 1);
}
#[rstest]
fn test_multiple_timers(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
test_clock
.set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
.unwrap();
test_clock
.set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
.unwrap();
let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
assert_eq!(events.len(), 3);
assert_eq!(events[0].name.as_str(), "timer1");
assert_eq!(events[1].name.as_str(), "timer1");
assert_eq!(events[2].name.as_str(), "timer2");
}
#[rstest]
fn test_allow_past_parameter_true(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(2000));
let current_time = test_clock.timestamp_ns();
let past_time = UnixNanos::from(current_time.as_u64() - 1000);
test_clock
.set_time_alert_ns("past_timer", past_time, None, Some(true))
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
let next_time = test_clock.next_time_ns("past_timer").unwrap();
assert!(next_time >= current_time);
}
#[rstest]
fn test_allow_past_parameter_false(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(2000));
let current_time = test_clock.timestamp_ns();
let past_time = current_time - 1000;
let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
assert!(result.is_err());
assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
assert_eq!(test_clock.timer_count(), 0);
assert!(test_clock.timer_names().is_empty());
}
#[rstest]
fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(2000));
let current_time = test_clock.timestamp_ns();
let start_time = current_time + 1000;
let stop_time = current_time + 500;
let result = test_clock.set_timer_ns(
"invalid_timer",
100,
Some(start_time),
Some(stop_time),
None,
None,
None,
);
assert!(result.is_err());
assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
assert_eq!(test_clock.timer_count(), 0);
}
#[rstest]
fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let interval_ns = 1000;
test_clock
.set_timer_ns(
"fire_immediately_timer",
interval_ns,
Some(start_time),
None,
None,
None,
Some(true),
)
.unwrap();
let events = test_clock.advance_time(start_time + 2500, true);
assert_eq!(events.len(), 3);
assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
#[rstest]
fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let interval_ns = 1000;
test_clock
.set_timer_ns(
"normal_timer",
interval_ns,
Some(start_time),
None,
None,
None,
Some(false),
)
.unwrap();
let events = test_clock.advance_time(start_time + 2500, true);
assert_eq!(events.len(), 2);
assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
#[rstest]
fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let interval_ns = 1000;
test_clock
.set_timer_ns(
"default_timer",
interval_ns,
Some(start_time),
None,
None,
None,
None,
)
.unwrap();
let events = test_clock.advance_time(start_time + 1500, true);
assert_eq!(events.len(), 1);
assert_eq!(*events[0].ts_event, *start_time + 1000); }
#[rstest]
fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
test_clock.set_time(5000.into());
let interval_ns = 1000;
test_clock
.set_timer_ns(
"zero_start_timer",
interval_ns,
None,
None,
None,
None,
Some(true),
)
.unwrap();
let events = test_clock.advance_time(UnixNanos::from(7000), true);
assert_eq!(events.len(), 3);
assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
assert_eq!(*events[2].ts_event, 7000);
}
#[rstest]
fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let interval_ns = 1000;
test_clock
.set_timer_ns(
"immediate_timer",
interval_ns,
Some(start_time),
None,
None,
None,
Some(true),
)
.unwrap();
test_clock
.set_timer_ns(
"normal_timer",
interval_ns,
Some(start_time),
None,
None,
None,
Some(false),
)
.unwrap();
let events = test_clock.advance_time(start_time + 1500, true);
assert_eq!(events.len(), 3);
let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
event_times.sort_unstable();
assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
#[rstest]
fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
test_clock
.set_timer_ns(
"collision_timer",
1000,
Some(start_time),
None,
None,
None,
None,
)
.unwrap();
let result = test_clock.set_timer_ns(
"collision_timer",
2000,
Some(start_time),
None,
None,
None,
None,
);
assert!(result.is_ok());
assert_eq!(test_clock.timer_count(), 1);
let next_time = test_clock.next_time_ns("collision_timer").unwrap();
assert_eq!(next_time, start_time + 2000);
}
#[rstest]
fn test_timer_zero_interval_error(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let result =
test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
assert!(result.is_err());
assert_eq!(test_clock.timer_count(), 0);
}
#[rstest]
fn test_timer_empty_name_error(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
assert!(result.is_err());
assert_eq!(test_clock.timer_count(), 0);
}
#[rstest]
fn test_timer_exists(mut test_clock: TestClock) {
let name = Ustr::from("exists_timer");
assert!(!test_clock.timer_exists(&name));
test_clock
.set_time_alert_ns(
name.as_str(),
(*test_clock.timestamp_ns() + 1_000).into(),
None,
None,
)
.unwrap();
assert!(test_clock.timer_exists(&name));
}
#[rstest]
fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(10_000));
let current = test_clock.timestamp_ns();
let result = test_clock.set_timer_ns(
"past_stop",
10_000,
Some(current - 500),
Some(current - 100),
None,
Some(false),
None,
);
let err = result.expect_err("expected stop time validation error");
let err_msg = err.to_string();
assert!(err_msg.contains("stop time"));
assert!(err_msg.contains("in the past"));
}
#[rstest]
fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
let current = test_clock.timestamp_ns();
let result = test_clock.set_timer_ns(
"future_stop",
1_000,
Some(current),
Some(current + 10_000),
None,
Some(false),
None,
);
assert!(result.is_ok());
}
#[rstest]
fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let interval_ns = 1000;
let stop_time = start_time + interval_ns;
test_clock
.set_timer_ns(
"exact_stop",
interval_ns,
Some(start_time),
Some(stop_time),
None,
None,
Some(true),
)
.unwrap();
let events = test_clock.advance_time(stop_time, true);
assert_eq!(events.len(), 2);
assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
#[rstest]
fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
let interval_ns = 1000;
test_clock
.set_timer_ns(
"exact_advance",
interval_ns,
Some(start_time),
None,
None,
None,
Some(false),
)
.unwrap();
let next_time = test_clock.next_time_ns("exact_advance").unwrap();
let events = test_clock.advance_time(next_time, true);
assert_eq!(events.len(), 1);
assert_eq!(*events[0].ts_event, *next_time);
}
#[rstest]
fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(100_500));
let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
let result = test_clock.set_timer_ns(
"bar_timer",
interval_ns,
Some(bar_start_time),
None,
None,
Some(false), Some(false), );
assert!(result.is_ok());
assert_eq!(test_clock.timer_count(), 1);
let next_time = test_clock.next_time_ns("bar_timer").unwrap();
assert_eq!(*next_time, 101_000);
}
#[rstest]
fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(102_000));
let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
let result = test_clock.set_timer_ns(
"past_event_timer",
interval_ns,
Some(past_start_time),
None,
None,
Some(false), Some(false), );
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("would be in the past")
);
}
#[rstest]
fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
test_clock.set_time(UnixNanos::from(100_500));
let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
let result = test_clock.set_timer_ns(
"immediate_past_timer",
interval_ns,
Some(past_start_time),
None,
None,
Some(false), Some(true), );
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("would be in the past")
);
}
#[rstest]
fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
test_clock
.set_timer_ns(
"cancel_test",
1000,
Some(start_time),
None,
None,
None,
None,
)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
test_clock.cancel_timer("cancel_test");
assert_eq!(test_clock.timer_count(), 0);
let events = test_clock.advance_time(start_time + 2000, true);
assert_eq!(events.len(), 0);
}
#[rstest]
fn test_cancelled_timer_queue_entry_is_skipped(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
test_clock
.set_time_alert_ns("cancelled", start_time + 1000, None, None)
.unwrap();
test_clock
.set_time_alert_ns("active", start_time + 2000, None, None)
.unwrap();
test_clock.cancel_timer("cancelled");
assert_eq!(test_clock.timer_count(), 1);
assert_eq!(test_clock.timer_queue.len(), 2);
let events = test_clock.advance_time(start_time + 1000, true);
assert!(events.is_empty());
assert_eq!(test_clock.timer_names(), vec!["active"]);
let events = test_clock.advance_time(start_time + 2000, true);
assert_eq!(events.len(), 1);
assert_eq!(events[0].name.as_str(), "active");
}
#[rstest]
fn test_timer_queue_compacts_stale_entries(mut test_clock: TestClock) {
let start_time = test_clock.timestamp_ns();
test_clock
.set_time_alert_ns("active", start_time + 1000, None, None)
.unwrap();
test_clock
.set_time_alert_ns("cancelled-1", start_time + 2000, None, None)
.unwrap();
test_clock
.set_time_alert_ns("cancelled-2", start_time + 3000, None, None)
.unwrap();
test_clock.cancel_timer("cancelled-1");
assert_eq!(test_clock.timer_queue.len(), 3);
test_clock.cancel_timer("cancelled-2");
assert_eq!(test_clock.timer_count(), 1);
assert_eq!(test_clock.timer_queue.len(), 1);
}
#[rstest]
fn test_cancel_all_timers(mut test_clock: TestClock) {
test_clock
.set_timer_ns("timer1", 1000, None, None, None, None, None)
.unwrap();
test_clock
.set_timer_ns("timer2", 1500, None, None, None, None, None)
.unwrap();
test_clock
.set_timer_ns("timer3", 2000, None, None, None, None, None)
.unwrap();
assert_eq!(test_clock.timer_count(), 3);
test_clock.cancel_timers();
assert_eq!(test_clock.timer_count(), 0);
let events = test_clock.advance_time(UnixNanos::from(5000), true);
assert_eq!(events.len(), 0);
}
#[rstest]
fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
test_clock
.set_timer_ns("reset_test", 1000, None, None, None, None, None)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
test_clock.reset();
assert_eq!(test_clock.timer_count(), 0);
assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
#[rstest]
fn test_cancel_default_handler_clears_default(mut test_clock: TestClock) {
test_clock.cancel_default_handler();
let alert_time: UnixNanos = (*test_clock.timestamp_ns() + 1000).into();
let err = test_clock
.set_time_alert_ns("alert", alert_time, None, None)
.unwrap_err();
assert!(
err.to_string().contains("No callbacks provided"),
"unexpected error: {err}"
);
}
#[rstest]
fn test_cancel_default_handler_is_idempotent_on_empty_registry() {
let mut clock = TestClock::new();
clock.cancel_default_handler();
clock.cancel_default_handler();
}
#[rstest]
fn test_cancel_callbacks_clears_named(mut test_clock: TestClock) {
let alert_time: UnixNanos = (*test_clock.timestamp_ns() + 1000).into();
let callback = TimeEventCallback::from(TestCallback::default());
test_clock
.set_time_alert_ns("named_alert", alert_time, Some(callback), None)
.unwrap();
test_clock.cancel_timer("named_alert");
test_clock.cancel_default_handler();
test_clock.cancel_callbacks();
let err = test_clock
.set_time_alert_ns("named_alert", alert_time, None, None)
.unwrap_err();
assert!(
err.to_string().contains("No callbacks provided"),
"unexpected error: {err}"
);
}
#[rstest]
fn test_cancel_default_handler_preserves_named_callbacks(mut test_clock: TestClock) {
let alert_time: UnixNanos = (*test_clock.timestamp_ns() + 1000).into();
let callback = TimeEventCallback::from(TestCallback::default());
test_clock
.set_time_alert_ns("alert", alert_time, Some(callback), None)
.unwrap();
test_clock.cancel_timer("alert");
test_clock.cancel_default_handler();
test_clock
.set_time_alert_ns("alert", alert_time, None, None)
.unwrap();
}
#[rstest]
fn test_cancel_callbacks_preserves_default_handler(mut test_clock: TestClock) {
test_clock.cancel_callbacks();
let alert_time: UnixNanos = (*test_clock.timestamp_ns() + 1000).into();
test_clock
.set_time_alert_ns("alert", alert_time, None, None)
.unwrap();
}
#[rstest]
fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
let current_time = test_clock.utc_now();
let alert_time = current_time + chrono::Duration::seconds(1);
test_clock
.set_time_alert("alert_test", alert_time, None, None)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
let expected_ns = UnixNanos::from(alert_time);
let next_time = test_clock.next_time_ns("alert_test").unwrap();
let diff = if next_time >= expected_ns {
next_time.as_u64() - expected_ns.as_u64()
} else {
expected_ns.as_u64() - next_time.as_u64()
};
assert!(
diff < 1000,
"Timer should be set within 1 microsecond of expected time"
);
}
#[rstest]
fn test_set_timer_default_impl(mut test_clock: TestClock) {
let current_time = test_clock.utc_now();
let start_time = current_time + chrono::Duration::seconds(1);
let interval = Duration::from_millis(500);
test_clock
.set_timer(
"timer_test",
interval,
Some(start_time),
None,
None,
None,
None,
)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
let start_ns = UnixNanos::from(start_time);
let interval_ns = interval.as_nanos() as u64;
let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
assert_eq!(events.len(), 3);
assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
}
#[rstest]
fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
let current_time = test_clock.utc_now();
let start_time = current_time + chrono::Duration::seconds(1);
let stop_time = current_time + chrono::Duration::seconds(3);
let interval = Duration::from_secs(1);
test_clock
.set_timer(
"timer_with_stop",
interval,
Some(start_time),
Some(stop_time),
None,
None,
None,
)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
let stop_ns = UnixNanos::from(stop_time);
let events = test_clock.advance_time(stop_ns + 1000, true);
assert_eq!(events.len(), 2);
let start_ns = UnixNanos::from(start_time);
let interval_ns = interval.as_nanos() as u64;
assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
}
#[rstest]
fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
let current_time = test_clock.utc_now();
let start_time = current_time + chrono::Duration::seconds(1);
let interval = Duration::from_millis(500);
test_clock
.set_timer(
"immediate_timer",
interval,
Some(start_time),
None,
None,
None,
Some(true),
)
.unwrap();
let start_ns = UnixNanos::from(start_time);
let interval_ns = interval.as_nanos() as u64;
let events = test_clock.advance_time(start_ns + interval_ns, true);
assert_eq!(events.len(), 2);
assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
#[rstest]
fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
let current_time = test_clock.timestamp_ns();
test_clock
.set_time_alert_ns("alert_at_current_time", current_time, None, None)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
let events = test_clock.advance_time(current_time, true);
assert_eq!(events.len(), 1);
assert_eq!(events[0].name.as_str(), "alert_at_current_time");
assert_eq!(*events[0].ts_event, *current_time);
}
#[rstest]
fn test_cancel_and_reschedule_same_name(mut test_clock: TestClock) {
let start = test_clock.timestamp_ns();
test_clock
.set_time_alert_ns("timer", UnixNanos::from(*start + 1000), None, None)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
test_clock.cancel_timer("timer");
assert_eq!(test_clock.timer_count(), 0);
test_clock
.set_time_alert_ns("timer", UnixNanos::from(*start + 2000), None, None)
.unwrap();
assert_eq!(test_clock.timer_count(), 1);
let events = test_clock.advance_time(UnixNanos::from(*start + 1500), true);
assert!(events.is_empty());
let events = test_clock.advance_time(UnixNanos::from(*start + 2000), true);
assert_eq!(events.len(), 1);
assert_eq!(*events[0].ts_event, *start + 2000);
}
#[rstest]
fn test_multiple_timers_same_timestamp_all_fire(mut test_clock: TestClock) {
let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
for i in 0..5 {
test_clock
.set_time_alert_ns(&format!("timer_{i}"), fire_time, None, None)
.unwrap();
}
assert_eq!(test_clock.timer_count(), 5);
let events = test_clock.advance_time(fire_time, true);
assert_eq!(events.len(), 5);
for event in &events {
assert_eq!(*event.ts_event, *fire_time);
}
}
#[rstest]
fn test_events_ordered_by_timestamp_after_advance() {
let mut clock = TestClock::new();
clock.register_default_handler(TestCallback::default().into());
let start = clock.timestamp_ns();
clock
.set_time_alert_ns("third", UnixNanos::from(*start + 300), None, None)
.unwrap();
clock
.set_time_alert_ns("first", UnixNanos::from(*start + 100), None, None)
.unwrap();
clock
.set_time_alert_ns("second", UnixNanos::from(*start + 200), None, None)
.unwrap();
let events = clock.advance_time(UnixNanos::from(*start + 400), true);
assert_eq!(events.len(), 3);
assert_eq!(events[0].name.as_str(), "first");
assert_eq!(events[1].name.as_str(), "second");
assert_eq!(events[2].name.as_str(), "third");
}
#[rstest]
fn test_large_interval_does_not_overflow(mut test_clock: TestClock) {
let start = test_clock.timestamp_ns();
let large_interval: u64 = 1_000_000_000 * 60 * 60 * 24 * 365;
test_clock
.set_timer_ns(
"large_interval",
large_interval,
Some(start),
None,
None,
None,
None,
)
.unwrap();
let events = test_clock.advance_time(UnixNanos::from(*start + large_interval), true);
assert_eq!(events.len(), 1);
assert_eq!(*events[0].ts_event, *start + large_interval);
}
#[rstest]
fn test_near_zero_interval_fires_correctly(mut test_clock: TestClock) {
let start = test_clock.timestamp_ns();
test_clock
.set_timer_ns("tiny", 1, Some(start), None, None, None, None)
.unwrap();
let events = test_clock.advance_time(UnixNanos::from(*start + 10), true);
assert_eq!(events.len(), 10);
for i in 1..events.len() {
assert!(events[i].ts_event >= events[i - 1].ts_event);
}
}
#[rstest]
fn test_repeated_advance_to_same_time_no_double_fire(mut test_clock: TestClock) {
let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
test_clock
.set_time_alert_ns("once", fire_time, None, None)
.unwrap();
let events1 = test_clock.advance_time(fire_time, true);
assert_eq!(events1.len(), 1);
let events2 = test_clock.advance_time(fire_time, true);
assert!(events2.is_empty());
}
#[rstest]
fn test_advance_with_no_timers(mut test_clock: TestClock) {
let start = test_clock.timestamp_ns();
let events = test_clock.advance_time(UnixNanos::from(*start + 1000), true);
assert!(events.is_empty());
assert_eq!(*test_clock.timestamp_ns(), *start + 1000);
}
#[rstest]
fn test_clock_api_handlers_back_full_surface() {
let alerts = Arc::new(Mutex::new(Vec::new()));
let timers = Arc::new(Mutex::new(Vec::new()));
let cancellations = Arc::new(Mutex::new(Vec::new()));
let cancel_all = Arc::new(Mutex::new(false));
let alerts_for_handler = Arc::clone(&alerts);
let timers_for_handler = Arc::clone(&timers);
let cancellations_for_handler = Arc::clone(&cancellations);
let cancel_all_for_handler = Arc::clone(&cancel_all);
let clock = ClockApi::from_handlers(
|| UnixNanos::from(1_700_000_000_123_456_789),
move |name, alert_time_ns, _callback, allow_past| {
alerts_for_handler.lock().expect(MUTEX_POISONED).push((
name.to_string(),
alert_time_ns,
allow_past,
));
Ok(())
},
move |name,
interval_ns,
start_time_ns,
stop_time_ns,
_callback,
allow_past,
fire_immediately| {
timers_for_handler.lock().expect(MUTEX_POISONED).push((
name.to_string(),
interval_ns,
start_time_ns,
stop_time_ns,
allow_past,
fire_immediately,
));
Ok(())
},
|| vec!["alpha".to_string(), "beta".to_string()],
|| 2,
|name| name == "alpha",
|name| (name == "alpha").then(|| UnixNanos::from(1_700_000_000_999_000_000)),
move |name| {
cancellations_for_handler
.lock()
.expect(MUTEX_POISONED)
.push(name.to_string());
},
move || {
*cancel_all_for_handler.lock().expect(MUTEX_POISONED) = true;
},
);
let alert_time = DateTime::from_timestamp_nanos(1_700_000_000_333_000_000);
let start_time = DateTime::from_timestamp_nanos(1_700_000_000_444_000_000);
let stop_time = DateTime::from_timestamp_nanos(1_700_000_001_444_000_000);
clock
.set_time_alert("alert", alert_time, None, Some(false))
.unwrap();
clock
.set_timer(
"timer",
Duration::from_millis(250),
Some(start_time),
Some(stop_time),
None,
Some(true),
Some(false),
)
.unwrap();
clock.cancel_timer("alpha");
clock.cancel_timers();
assert_eq!(
clock.timestamp_ns(),
UnixNanos::from(1_700_000_000_123_456_789)
);
assert_eq!(clock.timestamp_us(), 1_700_000_000_123_456);
assert_eq!(clock.timestamp_ms(), 1_700_000_000_123);
assert_eq!(clock.timestamp(), 1_700_000_000.123_456_7);
assert_eq!(
clock.utc_now(),
DateTime::from_timestamp_nanos(1_700_000_000_123_456_789)
);
assert_eq!(clock.timer_names(), vec!["alpha", "beta"]);
assert_eq!(clock.timer_count(), 2);
assert!(clock.timer_exists("alpha"));
assert!(!clock.timer_exists("gamma"));
assert_eq!(
clock.next_time_ns("alpha"),
Some(UnixNanos::from(1_700_000_000_999_000_000))
);
assert_eq!(
alerts.lock().expect(MUTEX_POISONED).as_slice(),
&[(
"alert".to_string(),
UnixNanos::from(1_700_000_000_333_000_000),
Some(false)
)]
);
assert_eq!(
timers.lock().expect(MUTEX_POISONED).as_slice(),
&[(
"timer".to_string(),
250_000_000,
Some(UnixNanos::from(1_700_000_000_444_000_000)),
Some(UnixNanos::from(1_700_000_001_444_000_000)),
Some(true),
Some(false)
)]
);
assert_eq!(
cancellations.lock().expect(MUTEX_POISONED).as_slice(),
&["alpha".to_string()]
);
assert!(*cancel_all.lock().expect(MUTEX_POISONED));
}
}