use std::{
cmp::Ordering,
fmt::{Debug, Display},
num::NonZeroU64,
rc::Rc,
sync::Arc,
};
use nautilus_core::{
UUID4, UnixNanos,
correctness::{FAILED, check_valid_string_utf8},
};
#[cfg(feature = "python")]
use pyo3::{Py, PyAny, Python};
use ustr::Ustr;
#[must_use]
#[expect(clippy::missing_panics_doc)] pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
}
#[repr(C)]
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
)]
pub struct TimeEvent {
pub name: Ustr,
pub event_id: UUID4,
pub ts_event: UnixNanos,
pub ts_init: UnixNanos,
}
impl TimeEvent {
#[must_use]
pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
Self {
name,
event_id,
ts_event,
ts_init,
}
}
}
impl Display for TimeEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}(name={}, event_id={}, ts_event={}, ts_init={})",
stringify!(TimeEvent),
self.name,
self.event_id,
self.ts_event,
self.ts_init
)
}
}
#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct ScheduledTimeEvent(pub TimeEvent);
impl ScheduledTimeEvent {
#[must_use]
pub const fn new(event: TimeEvent) -> Self {
Self(event)
}
#[must_use]
pub fn into_inner(self) -> TimeEvent {
self.0
}
}
impl PartialOrd for ScheduledTimeEvent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScheduledTimeEvent {
fn cmp(&self, other: &Self) -> Ordering {
other.0.ts_event.cmp(&self.0.ts_event)
}
}
pub enum TimeEventCallback {
#[cfg(feature = "python")]
Python(Py<PyAny>),
Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
RustLocal(Rc<dyn Fn(TimeEvent)>),
}
impl Clone for TimeEventCallback {
fn clone(&self) -> Self {
match self {
#[cfg(feature = "python")]
Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
Self::Rust(cb) => Self::Rust(cb.clone()),
Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
}
}
}
impl Debug for TimeEventCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(feature = "python")]
Self::Python(_) => f.write_str("Python callback"),
Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
Self::RustLocal(_) => f.write_str("Rust callback (local)"),
}
}
}
impl TimeEventCallback {
#[must_use]
pub const fn is_rust(&self) -> bool {
matches!(self, Self::Rust(_))
}
#[must_use]
pub const fn is_local(&self) -> bool {
matches!(self, Self::RustLocal(_))
}
pub fn call(&self, event: TimeEvent) {
match self {
#[cfg(feature = "python")]
Self::Python(callback) => {
Python::attach(|py| {
if let Err(e) = callback.call1(py, (event,)) {
log::error!("Python time event callback raised exception: {e}");
}
});
}
Self::Rust(callback) => callback(event),
Self::RustLocal(callback) => callback(event),
}
}
}
impl<F> From<F> for TimeEventCallback
where
F: Fn(TimeEvent) + Send + Sync + 'static,
{
fn from(value: F) -> Self {
Self::Rust(Arc::new(value))
}
}
impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
Self::Rust(value)
}
}
impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
Self::RustLocal(value)
}
}
#[cfg(feature = "python")]
impl From<Py<PyAny>> for TimeEventCallback {
fn from(value: Py<PyAny>) -> Self {
Self::Python(value)
}
}
#[allow(unsafe_code)]
unsafe impl Send for TimeEventCallback {}
#[allow(unsafe_code)]
unsafe impl Sync for TimeEventCallback {}
#[repr(C)]
#[derive(Clone, Debug)]
pub struct TimeEventHandler {
pub event: TimeEvent,
pub callback: TimeEventCallback,
}
impl TimeEventHandler {
#[must_use]
pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
Self { event, callback }
}
fn cmp_event(&self, other: &Self) -> Ordering {
self.event
.ts_event
.cmp(&other.event.ts_event)
.then_with(|| self.event.name.cmp(&other.event.name))
.then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
.then_with(|| {
self.event
.event_id
.as_str()
.cmp(other.event.event_id.as_str())
})
}
pub fn run(self) {
let Self { event, callback } = self;
callback.call(event);
}
}
impl PartialOrd for TimeEventHandler {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TimeEventHandler {
fn eq(&self, other: &Self) -> bool {
self.cmp_event(other).is_eq()
}
}
impl Eq for TimeEventHandler {}
impl Ord for TimeEventHandler {
fn cmp(&self, other: &Self) -> Ordering {
self.cmp_event(other)
}
}
pub(crate) trait Timer {
fn is_expired(&self) -> bool;
fn cancel(&mut self);
}
#[derive(Clone, Debug)]
pub struct TestTimer {
pub name: Ustr,
pub interval_ns: NonZeroU64,
pub start_time_ns: UnixNanos,
pub stop_time_ns: Option<UnixNanos>,
pub fire_immediately: bool,
next_time_ns: UnixNanos,
is_expired: bool,
}
impl TestTimer {
#[must_use]
pub fn new(
name: Ustr,
interval_ns: NonZeroU64,
start_time_ns: UnixNanos,
stop_time_ns: Option<UnixNanos>,
fire_immediately: bool,
) -> Self {
check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
let next_time_ns = if fire_immediately {
start_time_ns
} else {
start_time_ns + interval_ns.get()
};
Self {
name,
interval_ns,
start_time_ns,
stop_time_ns,
fire_immediately,
next_time_ns,
is_expired: false,
}
}
#[must_use]
pub const fn next_time_ns(&self) -> UnixNanos {
self.next_time_ns
}
#[must_use]
pub const fn is_expired(&self) -> bool {
self.is_expired
}
#[must_use]
pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
TimeEvent {
name: self.name,
event_id,
ts_event: self.next_time_ns,
ts_init,
}
}
pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
let advances = if self.next_time_ns <= to_time_ns {
((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
.saturating_add(1)
} else {
0
};
self.take(advances as usize).map(|(event, _)| event)
}
pub const fn cancel(&mut self) {
self.is_expired = true;
}
}
impl Timer for TestTimer {
fn is_expired(&self) -> bool {
self.is_expired
}
fn cancel(&mut self) {
self.is_expired = true;
}
}
impl Iterator for TestTimer {
type Item = (TimeEvent, UnixNanos);
fn next(&mut self) -> Option<Self::Item> {
if self.is_expired {
None
} else {
if let Some(stop_time_ns) = self.stop_time_ns
&& self.next_time_ns > stop_time_ns
{
self.is_expired = true;
return None;
}
let item = (
TimeEvent {
name: self.name,
event_id: UUID4::new(),
ts_event: self.next_time_ns,
ts_init: self.next_time_ns,
},
self.next_time_ns,
);
if let Some(stop_time_ns) = self.stop_time_ns
&& self.next_time_ns == stop_time_ns
{
self.is_expired = true;
}
self.next_time_ns += self.interval_ns;
Some(item)
}
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU64;
use nautilus_core::{UUID4, UnixNanos};
use rstest::*;
use ustr::Ustr;
use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler};
#[rstest]
fn test_test_timer_pop_event() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::from(1),
None,
false,
);
assert!(timer.next().is_some());
assert!(timer.next().is_some());
timer.is_expired = true;
assert!(timer.next().is_none());
}
#[rstest]
fn test_test_timer_advance_within_next_time_ns() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::default(),
None,
false,
);
let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
assert_eq!(timer.next_time_ns, 5);
assert!(!timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_up_to_next_time_ns() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
None,
false,
);
assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
assert!(!timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
Some(UnixNanos::from(2)),
false,
);
assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
assert!(timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_beyond_next_time_ns() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
Some(UnixNanos::from(5)),
false,
);
assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
assert!(timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_beyond_stop_time() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::default(),
Some(UnixNanos::from(5)),
false,
);
assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
assert!(timer.is_expired);
}
#[rstest]
fn test_test_timer_advance_exact_boundary() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::from(0),
None,
false,
);
assert_eq!(
timer.advance(UnixNanos::from(5)).count(),
1,
"Expected one event at the 5 ns boundary"
);
assert_eq!(
timer.advance(UnixNanos::from(10)).count(),
1,
"Expected one event at the 10 ns boundary"
);
}
#[rstest]
fn test_test_timer_fire_immediately_true() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::from(10),
None,
true, );
assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
assert_eq!(events.len(), 1);
assert_eq!(events[0].ts_event, UnixNanos::from(10));
assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
}
#[rstest]
fn test_test_timer_fire_immediately_false() {
let mut timer = TestTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(5).unwrap(),
UnixNanos::from(10),
None,
false, );
assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
assert_eq!(events.len(), 1);
assert_eq!(events[0].ts_event, UnixNanos::from(15));
}
#[rstest]
fn test_time_event_handler_ordering_uses_tie_breakers() {
let callback = TimeEventCallback::from(|_: TimeEvent| {});
let later_name = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
UUID4::from("00000000-0000-4000-8000-000000000003"),
100.into(),
100.into(),
),
callback.clone(),
);
let earlier_name = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("SPREAD_QUOTE_ESM4"),
UUID4::from("00000000-0000-4000-8000-000000000002"),
100.into(),
100.into(),
),
callback.clone(),
);
let later_init = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("SPREAD_QUOTE_ESM4"),
UUID4::from("00000000-0000-4000-8000-000000000004"),
100.into(),
101.into(),
),
callback.clone(),
);
let later_id = TimeEventHandler::new(
TimeEvent::new(
Ustr::from("SPREAD_QUOTE_ESM4"),
UUID4::from("00000000-0000-4000-8000-000000000005"),
100.into(),
100.into(),
),
callback,
);
assert!(earlier_name < later_name);
assert!(earlier_name < later_init);
assert!(earlier_name < later_id);
assert_ne!(earlier_name, later_id);
}
use proptest::prelude::*;
#[derive(Clone, Debug)]
enum TimerOperation {
AdvanceTime(u64),
Cancel,
}
fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
prop_oneof![
8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
2 => Just(TimerOperation::Cancel),
]
}
fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
(
1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
}
fn timer_test_strategy()
-> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
(
prop::collection::vec(timer_operation_strategy(), 5..=50),
timer_config_strategy(),
)
}
#[expect(clippy::needless_collect)] fn test_timer_with_operations(
operations: Vec<TimerOperation>,
(interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
) {
let mut timer = TestTimer::new(
Ustr::from("PROP_TEST_TIMER"),
NonZeroU64::new(interval_ns).unwrap(),
UnixNanos::from(start_time_ns),
stop_time_ns.map(UnixNanos::from),
fire_immediately,
);
let mut current_time = start_time_ns;
for operation in operations {
if timer.is_expired() {
break;
}
match operation {
TimerOperation::AdvanceTime(delta) => {
let to_time = current_time + delta;
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
current_time = to_time;
for (i, event) in events.iter().enumerate() {
if i > 0 {
assert!(
event.ts_event >= events[i - 1].ts_event,
"Events should be in chronological order"
);
}
assert!(
event.ts_event.as_u64() >= start_time_ns,
"Event timestamp should not be before start time"
);
assert!(
event.ts_event.as_u64() <= to_time,
"Event timestamp should not be after advance time"
);
if let Some(stop_time_ns) = stop_time_ns {
assert!(
event.ts_event.as_u64() <= stop_time_ns,
"Event timestamp should not exceed stop time"
);
}
}
}
TimerOperation::Cancel => {
timer.cancel();
assert!(timer.is_expired(), "Timer should be expired after cancel");
}
}
if !timer.is_expired() {
let expected_interval_multiple = if fire_immediately {
timer.next_time_ns().as_u64() >= start_time_ns
} else {
timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
};
assert!(
expected_interval_multiple,
"Next time should respect interval spacing"
);
if let Some(stop_time_ns) = stop_time_ns
&& timer.next_time_ns().as_u64() > stop_time_ns
{
let mut test_timer = timer.clone();
let events: Vec<TimeEvent> = test_timer
.advance(UnixNanos::from(stop_time_ns + 1))
.collect();
assert!(
events.is_empty() || test_timer.is_expired(),
"Timer should not generate events beyond stop time"
);
}
}
}
if !timer.is_expired()
&& let Some(stop_time_ns) = stop_time_ns
{
let events: Vec<TimeEvent> = timer
.advance(UnixNanos::from(stop_time_ns + 1000))
.collect();
assert!(
timer.is_expired() || events.is_empty(),
"Timer should eventually expire or stop generating events"
);
}
}
proptest! {
#[rstest]
fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
test_timer_with_operations(operations, config);
}
#[rstest]
fn prop_timer_interval_consistency(
interval_ns in 1u64..=100u64,
start_time_ns in 0u64..=50u64,
fire_immediately in prop::bool::ANY,
advance_count in 1usize..=20usize,
) {
let mut timer = TestTimer::new(
Ustr::from("CONSISTENCY_TEST"),
NonZeroU64::new(interval_ns).unwrap(),
UnixNanos::from(start_time_ns),
None, fire_immediately,
);
let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
for _ in 0..advance_count {
let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
if !events.is_empty() {
prop_assert_eq!(events.len(), 1);
prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
}
previous_event_time += interval_ns;
}
}
}
}