use std::{
cmp::Ordering,
fmt::{Debug, Display},
num::NonZeroU64,
rc::Rc,
sync::Arc,
};
#[cfg(feature = "python")]
use nautilus_core::python::IntoPyObjectNautilusExt;
use nautilus_core::{
UUID4, UnixNanos,
correctness::{FAILED, check_valid_string_utf8},
};
#[cfg(feature = "python")]
use pyo3::{Py, PyAny, PyResult, Python, types::PyCapsule};
use ustr::Ustr;
#[must_use]
pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
NonZeroU64::new(interval_ns).unwrap_or(NonZeroU64::MIN)
}
#[repr(C)]
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
)]
pub struct TimeEvent {
pub name: Ustr,
pub event_id: UUID4,
pub ts_event: UnixNanos,
pub ts_init: UnixNanos,
}
impl TimeEvent {
#[must_use]
pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
Self {
name,
event_id,
ts_event,
ts_init,
}
}
}
impl Display for TimeEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}(name={}, event_id={}, ts_event={}, ts_init={})",
stringify!(TimeEvent),
self.name,
self.event_id,
self.ts_event,
self.ts_init
)
}
}
#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct ScheduledTimeEvent(pub TimeEvent);
impl ScheduledTimeEvent {
#[must_use]
pub const fn new(event: TimeEvent) -> Self {
Self(event)
}
#[must_use]
pub fn into_inner(self) -> TimeEvent {
self.0
}
}
impl PartialOrd for ScheduledTimeEvent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScheduledTimeEvent {
fn cmp(&self, other: &Self) -> Ordering {
other.0.ts_event.cmp(&self.0.ts_event)
}
}
#[cfg(feature = "python")]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PythonTimeEventCallbackArg {
TimeEvent,
LegacyCapsule,
}
#[cfg(feature = "python")]
pub struct PythonTimeEventCallback {
callback: Py<PyAny>,
arg: PythonTimeEventCallbackArg,
}
#[cfg(feature = "python")]
impl PythonTimeEventCallback {
#[must_use]
pub const fn new(callback: Py<PyAny>, arg: PythonTimeEventCallbackArg) -> Self {
Self { callback, arg }
}
#[must_use]
pub const fn callback(&self) -> &Py<PyAny> {
&self.callback
}
pub fn call(&self, event: TimeEvent) {
Python::attach(|py| {
let result = match self.arg {
PythonTimeEventCallbackArg::TimeEvent => self.callback.call1(py, (event,)),
PythonTimeEventCallbackArg::LegacyCapsule => {
call_legacy_python_time_event_callback(py, event, &self.callback)
}
};
if let Err(e) = result {
log::error!("Python time event callback raised exception: {e}");
}
});
}
}
#[cfg(feature = "python")]
impl Debug for PythonTimeEventCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(PythonTimeEventCallback))
.field("arg", &self.arg)
.finish_non_exhaustive()
}
}
pub enum TimeEventCallback {
#[cfg(feature = "python")]
Python(Arc<PythonTimeEventCallback>),
Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
RustLocal(Rc<dyn Fn(TimeEvent)>),
}
impl Clone for TimeEventCallback {
fn clone(&self) -> Self {
match self {
#[cfg(feature = "python")]
Self::Python(callback) => Self::Python(callback.clone()),
Self::Rust(cb) => Self::Rust(cb.clone()),
Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
}
}
}
impl Debug for TimeEventCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(feature = "python")]
Self::Python(_) => f.write_str("Python callback"),
Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
Self::RustLocal(_) => f.write_str("Rust callback (local)"),
}
}
}
impl TimeEventCallback {
#[must_use]
pub const fn is_rust(&self) -> bool {
matches!(self, Self::Rust(_))
}
#[must_use]
pub const fn is_local(&self) -> bool {
matches!(self, Self::RustLocal(_))
}
pub fn call(&self, event: TimeEvent) {
match self {
#[cfg(feature = "python")]
Self::Python(callback) => callback.call(event),
Self::Rust(callback) => callback(event),
Self::RustLocal(callback) => callback(event),
}
}
}
impl<F> From<F> for TimeEventCallback
where
F: Fn(TimeEvent) + Send + Sync + 'static,
{
fn from(value: F) -> Self {
Self::Rust(Arc::new(value))
}
}
impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
Self::Rust(value)
}
}
impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
Self::RustLocal(value)
}
}
#[cfg(feature = "python")]
impl From<Py<PyAny>> for TimeEventCallback {
fn from(value: Py<PyAny>) -> Self {
Self::from_python_time_event(value)
}
}
#[cfg(feature = "python")]
impl TimeEventCallback {
#[must_use]
pub fn from_python_time_event(callback: Py<PyAny>) -> Self {
Self::Python(Arc::new(PythonTimeEventCallback::new(
callback,
PythonTimeEventCallbackArg::TimeEvent,
)))
}
#[must_use]
pub fn from_python_legacy_capsule(callback: Py<PyAny>) -> Self {
Self::Python(Arc::new(PythonTimeEventCallback::new(
callback,
PythonTimeEventCallbackArg::LegacyCapsule,
)))
}
}
#[allow(unsafe_code)]
unsafe impl Send for TimeEventCallback {}
#[allow(unsafe_code)]
unsafe impl Sync for TimeEventCallback {}
#[cfg(feature = "python")]
fn call_legacy_python_time_event_callback(
py: Python<'_>,
event: TimeEvent,
callback: &Py<PyAny>,
) -> PyResult<Py<PyAny>> {
let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
.expect("Error creating `PyCapsule`")
.into_py_any_unwrap(py);
callback.call1(py, (capsule,))
}
#[repr(C)]
#[derive(Clone, Debug)]
pub struct TimeEventHandler {
pub event: TimeEvent,
pub callback: TimeEventCallback,
}
impl TimeEventHandler {
#[must_use]
pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
Self { event, callback }
}
fn cmp_event(&self, other: &Self) -> Ordering {
self.event
.ts_event
.cmp(&other.event.ts_event)
.then_with(|| self.event.name.cmp(&other.event.name))
.then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
.then_with(|| {
self.event
.event_id
.as_str()
.cmp(other.event.event_id.as_str())
})
}
pub fn run(self) {
let Self { event, callback } = self;
crate::msgbus::dispatch_tap_time_event(&event);
callback.call(event);
}
}
impl PartialOrd for TimeEventHandler {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TimeEventHandler {
fn eq(&self, other: &Self) -> bool {
self.cmp_event(other).is_eq()
}
}
impl Eq for TimeEventHandler {}
impl Ord for TimeEventHandler {
fn cmp(&self, other: &Self) -> Ordering {
self.cmp_event(other)
}
}
pub(crate) trait Timer {
fn is_expired(&self) -> bool;
fn cancel(&mut self);
}
#[derive(Clone, Debug)]
pub struct TestTimer {
pub name: Ustr,
pub interval_ns: NonZeroU64,
pub start_time_ns: UnixNanos,
pub stop_time_ns: Option<UnixNanos>,
pub fire_immediately: bool,
next_time_ns: UnixNanos,
is_expired: bool,
}
impl TestTimer {
#[must_use]
pub fn new(
name: Ustr,
interval_ns: NonZeroU64,
start_time_ns: UnixNanos,
stop_time_ns: Option<UnixNanos>,
fire_immediately: bool,
) -> Self {
check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
let next_time_ns = if fire_immediately {
start_time_ns
} else {
start_time_ns + interval_ns.get()
};
Self {
name,
interval_ns,
start_time_ns,
stop_time_ns,
fire_immediately,
next_time_ns,
is_expired: false,
}
}
#[must_use]
pub const fn next_time_ns(&self) -> UnixNanos {
self.next_time_ns
}
#[must_use]
pub const fn is_expired(&self) -> bool {
self.is_expired
}
#[must_use]
pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
TimeEvent {
name: self.name,
event_id,
ts_event: self.next_time_ns,
ts_init,
}
}
pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
let advances = if self.next_time_ns <= to_time_ns {
((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
.saturating_add(1)
} else {
0
};
self.take(advances as usize).map(|(event, _)| event)
}
pub const fn cancel(&mut self) {
self.is_expired = true;
}
}
impl Timer for TestTimer {
fn is_expired(&self) -> bool {
self.is_expired
}
fn cancel(&mut self) {
self.is_expired = true;
}
}
impl Iterator for TestTimer {
type Item = (TimeEvent, UnixNanos);
fn next(&mut self) -> Option<Self::Item> {
if self.is_expired {
None
} else {
if let Some(stop_time_ns) = self.stop_time_ns
&& self.next_time_ns > stop_time_ns
{
self.is_expired = true;
return None;
}
let item = (
TimeEvent {
name: self.name,
event_id: UUID4::new(),
ts_event: self.next_time_ns,
ts_init: self.next_time_ns,
},
self.next_time_ns,
);
if let Some(stop_time_ns) = self.stop_time_ns
&& self.next_time_ns == stop_time_ns
{
self.is_expired = true;
}
self.next_time_ns += self.interval_ns;
Some(item)
}
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, num::NonZeroU64, rc::Rc};
use nautilus_core::{UUID4, UnixNanos};
#[cfg(feature = "python")]
use pyo3::{
Bound, PyResult, Python,
types::{
PyAnyMethods, PyCFunction, PyDict, PyList, PyListMethods, PyTuple, PyTupleMethods,
PyTypeMethods,
},
};
use rstest::*;
use ustr::Ustr;
use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval};
use crate::msgbus::{
BusTap, Endpoint, MStr, MessagingSwitchboard, Topic, clear_bus_tap, set_bus_tap,
};
#[rstest]
#[case(0, 1)]
#[case(1, 1)]
#[case(25, 25)]
fn test_create_valid_interval(#[case] interval_ns: u64, #[case] expected: u64) {
assert_eq!(create_valid_interval(interval_ns).get(), expected);
}
#[rstest]
fn test_test_timer_pop_event() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::from(1),
None,
false,
);
assert!(timer.next().is_some());
assert!(timer.next().is_some());
timer.is_expired = true;
assert!(timer.next().is_none());
}
#[rstest]
fn test_test_timer_advance_within_next_time_ns() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::default(),
None,
false,
);
let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
assert_eq!(timer.next_time_ns, 5);
assert!(!timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_up_to_next_time_ns() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
None,
false,
);
assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
assert!(!timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
Some(UnixNanos::from(2)),
false,
);
assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
assert!(timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_beyond_next_time_ns() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
Some(UnixNanos::from(5)),
false,
);
assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
assert!(timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_beyond_stop_time() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
Some(UnixNanos::from(5)),
false,
);
assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
assert!(timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_exact_boundary() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::from(0),
None,
false,
);
assert_eq!(
timer.advance(UnixNanos::from(5)).count(),
1,
"Expected one event at the 5 ns boundary"
);
assert_eq!(
timer.advance(UnixNanos::from(10)).count(),
1,
"Expected one event at the 10 ns boundary"
);
}
#[rstest]
fn test_test_timer_fire_immediately_true() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::from(10),
None,
true, );
assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
assert_eq!(events.len(), 1);
assert_eq!(events[0].ts_event, UnixNanos::from(10));
assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
}
#[rstest]
fn test_test_timer_fire_immediately_false() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::from(10),
None,
false, );
assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
assert_eq!(events.len(), 1);
assert_eq!(events[0].ts_event, UnixNanos::from(15));
}
#[rstest]
fn test_time_event_handler_ordering_uses_tie_breakers() {
let callback = TimeEventCallback::from(|_: TimeEvent| {});
let later_name = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
UUID4::from("00000000-0000-4000-8000-000000000003"),
100.into(),
100.into(),
),
callback.clone(),
);
let earlier_name = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("SPREAD_QUOTE_ESM4"),
UUID4::from("00000000-0000-4000-8000-000000000002"),
100.into(),
100.into(),
),
callback.clone(),
);
let later_init = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("SPREAD_QUOTE_ESM4"),
UUID4::from("00000000-0000-4000-8000-000000000004"),
100.into(),
101.into(),
),
callback.clone(),
);
let later_id = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("SPREAD_QUOTE_ESM4"),
UUID4::from("00000000-0000-4000-8000-000000000005"),
100.into(),
100.into(),
),
callback,
);
assert!(earlier_name < later_name);
assert!(earlier_name < later_init);
assert!(earlier_name < later_id);
assert_ne!(earlier_name, later_id);
}
#[cfg(feature = "python")]
#[rstest]
fn test_python_callback_modes_pass_expected_argument_types() {
Python::initialize();
Python::attach(|py| {
let seen = PyList::empty(py);
let seen_obj = seen.clone().unbind().into_any();
let callback = PyCFunction::new_closure(
py,
None,
None,
move |args: &Bound<'_, PyTuple>,
_kwargs: Option<&Bound<'_, PyDict>>|
-> PyResult<()> {
let arg = args.get_item(0)?;
let type_name = arg.get_type().name()?.to_string();
Python::attach(|py| seen_obj.call_method1(py, "append", (type_name,)))?;
Ok(())
},
)
.expect("callback should create")
.into_any()
.unbind();
let event = TimeEvent::new(
Ustr::from("PY_CALLBACK_MODE"),
UUID4::from("00000000-0000-4000-8000-000000000007"),
UnixNanos::from(100),
UnixNanos::from(99),
);
TimeEventCallback::from_python_time_event(callback.clone_ref(py)).call(event.clone());
TimeEventCallback::from_python_legacy_capsule(callback).call(event);
assert_eq!(
seen.get_item(0).unwrap().extract::<String>().unwrap(),
"TimeEvent"
);
assert_eq!(
seen.get_item(1).unwrap().extract::<String>().unwrap(),
"PyCapsule"
);
});
}
#[derive(Default)]
struct RecordingTimeEventTap {
time_events: RefCell<Vec<(String, TimeEvent)>>,
}
impl RecordingTimeEventTap {
fn time_events(&self) -> Vec<(String, TimeEvent)> {
self.time_events.borrow().clone()
}
}
impl BusTap for RecordingTimeEventTap {
fn on_publish(&self, topic: MStr<Topic>, message: &dyn std::any::Any) {
if let Some(event) = message.downcast_ref::<TimeEvent>() {
self.time_events
.borrow_mut()
.push((topic.to_string(), event.clone()));
}
}
fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
}
#[rstest]
fn test_time_event_handler_run_dispatches_tap_before_callback() {
let event = TimeEvent::new(
Ustr::from("strategy.heartbeat"),
UUID4::from("00000000-0000-4000-8000-000000000006"),
UnixNanos::from(100),
UnixNanos::from(99),
);
let tap = Rc::new(RecordingTimeEventTap::default());
let callback_seen: Rc<RefCell<Vec<TimeEvent>>> = Rc::new(RefCell::new(Vec::new()));
let expected_topic = MessagingSwitchboard::time_event_topic().to_string();
let callback_expected = event.clone();
let callback_expected_topic = expected_topic.clone();
let callback_tap = Rc::clone(&tap);
let callback_seen_ref = Rc::clone(&callback_seen);
let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |callback_event| {
assert_eq!(
callback_tap.time_events(),
vec![(callback_expected_topic.clone(), callback_expected.clone())],
);
callback_seen_ref.borrow_mut().push(callback_event);
});
set_bus_tap(tap.clone());
TimeEventHandler::new(event.clone(), TimeEventCallback::from(callback)).run();
clear_bus_tap();
assert_eq!(tap.time_events(), vec![(expected_topic, event.clone())]);
assert_eq!(*callback_seen.borrow(), vec![event]);
}
use proptest::prelude::*;
#[derive(Clone, Debug)]
enum TimerOperation {
AdvanceTime(u64),
Cancel,
}
fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
prop_oneof![
8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
2 => Just(TimerOperation::Cancel),
]
}
fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
(
1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
}
fn timer_test_strategy()
-> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
(
prop::collection::vec(timer_operation_strategy(), 5..=50),
timer_config_strategy(),
)
}
#[expect(clippy::needless_collect)] fn test_timer_with_operations(
operations: Vec<TimerOperation>,
(interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
) {
let mut timer = TestTimer::new(
Ustr::from("PROP_TEST_TIMER"),
NonZeroU64::new(interval_ns).unwrap(),
UnixNanos::from(start_time_ns),
stop_time_ns.map(UnixNanos::from),
fire_immediately,
);
let mut current_time = start_time_ns;
for operation in operations {
if timer.is_expired() {
break;
}
match operation {
TimerOperation::AdvanceTime(delta) => {
let to_time = current_time + delta;
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
current_time = to_time;
for (i, event) in events.iter().enumerate() {
if i > 0 {
assert!(
event.ts_event >= events[i - 1].ts_event,
"Events should be in chronological order"
);
}
assert!(
event.ts_event.as_u64() >= start_time_ns,
"Event timestamp should not be before start time"
);
assert!(
event.ts_event.as_u64() <= to_time,
"Event timestamp should not be after advance time"
);
if let Some(stop_time_ns) = stop_time_ns {
assert!(
event.ts_event.as_u64() <= stop_time_ns,
"Event timestamp should not exceed stop time"
);
}
}
}
TimerOperation::Cancel => {
timer.cancel();
assert!(timer.is_expired(), "Timer should be expired after cancel");
}
}
if !timer.is_expired() {
let expected_interval_multiple = if fire_immediately {
timer.next_time_ns().as_u64() >= start_time_ns
} else {
timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
};
assert!(
expected_interval_multiple,
"Next time should respect interval spacing"
);
if let Some(stop_time_ns) = stop_time_ns
&& timer.next_time_ns().as_u64() > stop_time_ns
{
let mut test_timer = timer.clone();
let events: Vec<TimeEvent> = test_timer
.advance(UnixNanos::from(stop_time_ns + 1))
.collect();
assert!(
events.is_empty() || test_timer.is_expired(),
"Timer should not generate events beyond stop time"
);
}
}
}
if !timer.is_expired()
&& let Some(stop_time_ns) = stop_time_ns
{
let events: Vec<TimeEvent> = timer
.advance(UnixNanos::from(stop_time_ns + 1000))
.collect();
assert!(
timer.is_expired() || events.is_empty(),
"Timer should eventually expire or stop generating events"
);
}
}
proptest! {
#[rstest]
fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
test_timer_with_operations(operations, config);
}
#[rstest]
fn prop_timer_interval_consistency(
interval_ns in 1u64..=100u64,
start_time_ns in 0u64..=50u64,
fire_immediately in prop::bool::ANY,
advance_count in 1usize..=20usize,
) {
let mut timer = TestTimer::new(
Ustr::from("CONSISTENCY_TEST"),
NonZeroU64::new(interval_ns).unwrap(),
UnixNanos::from(start_time_ns),
None, fire_immediately,
);
let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
for _ in 0..advance_count {
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
if !events.is_empty() {
prop_assert_eq!(events.len(), 1);
prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
}
previous_event_time += interval_ns;
}
}
}
}