1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::Arc,
24};
25
26use nautilus_core::{
27 UUID4, UnixNanos,
28 correctness::{FAILED, check_valid_string_utf8},
29};
30#[cfg(feature = "python")]
31use pyo3::{Py, PyAny, Python};
32use ustr::Ustr;
33
34#[must_use]
38#[allow(clippy::missing_panics_doc)] pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
40 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
41}
42
43#[repr(C)]
44#[derive(Clone, Debug, PartialEq, Eq)]
45#[cfg_attr(
46 feature = "python",
47 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
48)]
49#[cfg_attr(
50 feature = "python",
51 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
52)]
53pub struct TimeEvent {
58 pub name: Ustr,
60 pub event_id: UUID4,
62 pub ts_event: UnixNanos,
64 pub ts_init: UnixNanos,
66}
67
68impl TimeEvent {
69 #[must_use]
71 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
72 Self {
73 name,
74 event_id,
75 ts_event,
76 ts_init,
77 }
78 }
79}
80
81impl Display for TimeEvent {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
86 stringify!(TimeEvent),
87 self.name,
88 self.event_id,
89 self.ts_event,
90 self.ts_init
91 )
92 }
93}
94
95#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
102pub struct ScheduledTimeEvent(pub TimeEvent);
103
104impl ScheduledTimeEvent {
105 #[must_use]
107 pub const fn new(event: TimeEvent) -> Self {
108 Self(event)
109 }
110
111 #[must_use]
113 pub fn into_inner(self) -> TimeEvent {
114 self.0
115 }
116}
117
118impl PartialOrd for ScheduledTimeEvent {
119 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl Ord for ScheduledTimeEvent {
125 fn cmp(&self, other: &Self) -> Ordering {
126 other.0.ts_event.cmp(&self.0.ts_event)
128 }
129}
130
131pub enum TimeEventCallback {
159 #[cfg(feature = "python")]
161 Python(Py<PyAny>),
162 Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
164 RustLocal(Rc<dyn Fn(TimeEvent)>),
166}
167
168impl Clone for TimeEventCallback {
169 fn clone(&self) -> Self {
170 match self {
171 #[cfg(feature = "python")]
172 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
173 Self::Rust(cb) => Self::Rust(cb.clone()),
174 Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
175 }
176 }
177}
178
179impl Debug for TimeEventCallback {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 match self {
182 #[cfg(feature = "python")]
183 Self::Python(_) => f.write_str("Python callback"),
184 Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
185 Self::RustLocal(_) => f.write_str("Rust callback (local)"),
186 }
187 }
188}
189
190impl TimeEventCallback {
191 #[must_use]
193 pub const fn is_rust(&self) -> bool {
194 matches!(self, Self::Rust(_))
195 }
196
197 #[must_use]
202 pub const fn is_local(&self) -> bool {
203 matches!(self, Self::RustLocal(_))
204 }
205
206 pub fn call(&self, event: TimeEvent) {
210 match self {
211 #[cfg(feature = "python")]
212 Self::Python(callback) => {
213 Python::attach(|py| {
214 if let Err(e) = callback.call1(py, (event,)) {
215 log::error!("Python time event callback raised exception: {e}");
216 }
217 });
218 }
219 Self::Rust(callback) => callback(event),
220 Self::RustLocal(callback) => callback(event),
221 }
222 }
223}
224
225impl<F> From<F> for TimeEventCallback
226where
227 F: Fn(TimeEvent) + Send + Sync + 'static,
228{
229 fn from(value: F) -> Self {
230 Self::Rust(Arc::new(value))
231 }
232}
233
234impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
235 fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
236 Self::Rust(value)
237 }
238}
239
240impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
241 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
242 Self::RustLocal(value)
243 }
244}
245
246#[cfg(feature = "python")]
247impl From<Py<PyAny>> for TimeEventCallback {
248 fn from(value: Py<PyAny>) -> Self {
249 Self::Python(value)
250 }
251}
252
253#[allow(unsafe_code)]
269unsafe impl Send for TimeEventCallback {}
270#[allow(unsafe_code)]
271unsafe impl Sync for TimeEventCallback {}
272
273#[repr(C)]
274#[derive(Clone, Debug)]
275pub struct TimeEventHandler {
280 pub event: TimeEvent,
282 pub callback: TimeEventCallback,
284}
285
286impl TimeEventHandler {
287 #[must_use]
289 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
290 Self { event, callback }
291 }
292
293 fn cmp_event(&self, other: &Self) -> Ordering {
294 self.event
295 .ts_event
296 .cmp(&other.event.ts_event)
297 .then_with(|| self.event.name.cmp(&other.event.name))
298 .then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
299 .then_with(|| {
300 self.event
301 .event_id
302 .as_str()
303 .cmp(other.event.event_id.as_str())
304 })
305 }
306
307 pub fn run(self) {
309 let Self { event, callback } = self;
310 callback.call(event);
311 }
312}
313
314impl PartialOrd for TimeEventHandler {
315 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
316 Some(self.cmp(other))
317 }
318}
319
320impl PartialEq for TimeEventHandler {
321 fn eq(&self, other: &Self) -> bool {
322 self.cmp_event(other).is_eq()
323 }
324}
325
326impl Eq for TimeEventHandler {}
327
328impl Ord for TimeEventHandler {
329 fn cmp(&self, other: &Self) -> Ordering {
330 self.cmp_event(other)
331 }
332}
333
334pub(crate) trait Timer {
335 fn is_expired(&self) -> bool;
336 fn cancel(&mut self);
337}
338
339#[derive(Clone, Debug)]
348pub struct TestTimer {
349 pub name: Ustr,
351 pub interval_ns: NonZeroU64,
353 pub start_time_ns: UnixNanos,
355 pub stop_time_ns: Option<UnixNanos>,
357 pub fire_immediately: bool,
359 next_time_ns: UnixNanos,
360 is_expired: bool,
361}
362
363impl TestTimer {
364 #[must_use]
370 pub fn new(
371 name: Ustr,
372 interval_ns: NonZeroU64,
373 start_time_ns: UnixNanos,
374 stop_time_ns: Option<UnixNanos>,
375 fire_immediately: bool,
376 ) -> Self {
377 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
378
379 let next_time_ns = if fire_immediately {
380 start_time_ns
381 } else {
382 start_time_ns + interval_ns.get()
383 };
384
385 Self {
386 name,
387 interval_ns,
388 start_time_ns,
389 stop_time_ns,
390 fire_immediately,
391 next_time_ns,
392 is_expired: false,
393 }
394 }
395
396 #[must_use]
398 pub const fn next_time_ns(&self) -> UnixNanos {
399 self.next_time_ns
400 }
401
402 #[must_use]
404 pub const fn is_expired(&self) -> bool {
405 self.is_expired
406 }
407
408 #[must_use]
409 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
410 TimeEvent {
411 name: self.name,
412 event_id,
413 ts_event: self.next_time_ns,
414 ts_init,
415 }
416 }
417
418 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
424 let advances = if self.next_time_ns <= to_time_ns {
426 ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
427 .saturating_add(1)
428 } else {
429 0
430 };
431 self.take(advances as usize).map(|(event, _)| event)
432 }
433
434 pub const fn cancel(&mut self) {
438 self.is_expired = true;
439 }
440}
441
442impl Timer for TestTimer {
443 fn is_expired(&self) -> bool {
444 self.is_expired
445 }
446
447 fn cancel(&mut self) {
448 self.is_expired = true;
449 }
450}
451
452impl Iterator for TestTimer {
453 type Item = (TimeEvent, UnixNanos);
454
455 fn next(&mut self) -> Option<Self::Item> {
456 if self.is_expired {
457 None
458 } else {
459 if let Some(stop_time_ns) = self.stop_time_ns
461 && self.next_time_ns > stop_time_ns
462 {
463 self.is_expired = true;
464 return None;
465 }
466
467 let item = (
468 TimeEvent {
469 name: self.name,
470 event_id: UUID4::new(),
471 ts_event: self.next_time_ns,
472 ts_init: self.next_time_ns,
473 },
474 self.next_time_ns,
475 );
476
477 if let Some(stop_time_ns) = self.stop_time_ns
479 && self.next_time_ns == stop_time_ns
480 {
481 self.is_expired = true;
482 }
483
484 self.next_time_ns += self.interval_ns;
485
486 Some(item)
487 }
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use std::num::NonZeroU64;
494
495 use nautilus_core::{UUID4, UnixNanos};
496 use rstest::*;
497 use ustr::Ustr;
498
499 use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler};
500
501 #[rstest]
502 fn test_test_timer_pop_event() {
503 let mut timer = TestTimer::new(
504 Ustr::from("TEST_TIMER"),
505 NonZeroU64::new(1).unwrap(),
506 UnixNanos::from(1),
507 None,
508 false,
509 );
510
511 assert!(timer.next().is_some());
512 assert!(timer.next().is_some());
513 timer.is_expired = true;
514 assert!(timer.next().is_none());
515 }
516
517 #[rstest]
518 fn test_test_timer_advance_within_next_time_ns() {
519 let mut timer = TestTimer::new(
520 Ustr::from("TEST_TIMER"),
521 NonZeroU64::new(5).unwrap(),
522 UnixNanos::default(),
523 None,
524 false,
525 );
526 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
527 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
528 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
529 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
530 assert_eq!(timer.next_time_ns, 5);
531 assert!(!timer.is_expired);
532 }
533
534 #[rstest]
535 fn test_test_timer_advance_up_to_next_time_ns() {
536 let mut timer = TestTimer::new(
537 Ustr::from("TEST_TIMER"),
538 NonZeroU64::new(1).unwrap(),
539 UnixNanos::default(),
540 None,
541 false,
542 );
543 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
544 assert!(!timer.is_expired);
545 }
546
547 #[rstest]
548 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
549 let mut timer = TestTimer::new(
550 Ustr::from("TEST_TIMER"),
551 NonZeroU64::new(1).unwrap(),
552 UnixNanos::default(),
553 Some(UnixNanos::from(2)),
554 false,
555 );
556 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
557 assert!(timer.is_expired);
558 }
559
560 #[rstest]
561 fn test_test_timer_advance_beyond_next_time_ns() {
562 let mut timer = TestTimer::new(
563 Ustr::from("TEST_TIMER"),
564 NonZeroU64::new(1).unwrap(),
565 UnixNanos::default(),
566 Some(UnixNanos::from(5)),
567 false,
568 );
569 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
570 assert!(timer.is_expired);
571 }
572
573 #[rstest]
574 fn test_test_timer_advance_beyond_stop_time() {
575 let mut timer = TestTimer::new(
576 Ustr::from("TEST_TIMER"),
577 NonZeroU64::new(1).unwrap(),
578 UnixNanos::default(),
579 Some(UnixNanos::from(5)),
580 false,
581 );
582 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
583 assert!(timer.is_expired);
584 }
585
586 #[rstest]
587 fn test_test_timer_advance_exact_boundary() {
588 let mut timer = TestTimer::new(
589 Ustr::from("TEST_TIMER"),
590 NonZeroU64::new(5).unwrap(),
591 UnixNanos::from(0),
592 None,
593 false,
594 );
595 assert_eq!(
596 timer.advance(UnixNanos::from(5)).count(),
597 1,
598 "Expected one event at the 5 ns boundary"
599 );
600 assert_eq!(
601 timer.advance(UnixNanos::from(10)).count(),
602 1,
603 "Expected one event at the 10 ns boundary"
604 );
605 }
606
607 #[rstest]
608 fn test_test_timer_fire_immediately_true() {
609 let mut timer = TestTimer::new(
610 Ustr::from("TEST_TIMER"),
611 NonZeroU64::new(5).unwrap(),
612 UnixNanos::from(10),
613 None,
614 true, );
616
617 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
619
620 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
622 assert_eq!(events.len(), 1);
623 assert_eq!(events[0].ts_event, UnixNanos::from(10));
624
625 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
627 }
628
629 #[rstest]
630 fn test_test_timer_fire_immediately_false() {
631 let mut timer = TestTimer::new(
632 Ustr::from("TEST_TIMER"),
633 NonZeroU64::new(5).unwrap(),
634 UnixNanos::from(10),
635 None,
636 false, );
638
639 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
641
642 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
644
645 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
647 assert_eq!(events.len(), 1);
648 assert_eq!(events[0].ts_event, UnixNanos::from(15));
649 }
650
651 #[rstest]
652 fn test_time_event_handler_ordering_uses_tie_breakers() {
653 let callback = TimeEventCallback::from(|_: TimeEvent| {});
654
655 let later_name = TimeEventHandler::new(
656 TimeEvent::new(
657 Ustr::from("time_bar_ESM4-2-MINUTE-ASK-INTERNAL"),
658 UUID4::from("00000000-0000-4000-8000-000000000003"),
659 100.into(),
660 100.into(),
661 ),
662 callback.clone(),
663 );
664 let earlier_name = TimeEventHandler::new(
665 TimeEvent::new(
666 Ustr::from("spread_quote_ESM4"),
667 UUID4::from("00000000-0000-4000-8000-000000000002"),
668 100.into(),
669 100.into(),
670 ),
671 callback.clone(),
672 );
673 let later_init = TimeEventHandler::new(
674 TimeEvent::new(
675 Ustr::from("spread_quote_ESM4"),
676 UUID4::from("00000000-0000-4000-8000-000000000004"),
677 100.into(),
678 101.into(),
679 ),
680 callback.clone(),
681 );
682 let later_id = TimeEventHandler::new(
683 TimeEvent::new(
684 Ustr::from("spread_quote_ESM4"),
685 UUID4::from("00000000-0000-4000-8000-000000000005"),
686 100.into(),
687 100.into(),
688 ),
689 callback,
690 );
691
692 assert!(earlier_name < later_name);
693 assert!(earlier_name < later_init);
694 assert!(earlier_name < later_id);
695 assert_ne!(earlier_name, later_id);
696 }
697
698 use proptest::prelude::*;
703
704 #[derive(Clone, Debug)]
705 enum TimerOperation {
706 AdvanceTime(u64),
707 Cancel,
708 }
709
710 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
711 prop_oneof![
712 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
713 2 => Just(TimerOperation::Cancel),
714 ]
715 }
716
717 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
718 (
719 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
724 }
725
726 fn timer_test_strategy()
727 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
728 (
729 prop::collection::vec(timer_operation_strategy(), 5..=50),
730 timer_config_strategy(),
731 )
732 }
733
734 #[allow(clippy::needless_collect)] fn test_timer_with_operations(
736 operations: Vec<TimerOperation>,
737 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
738 ) {
739 let mut timer = TestTimer::new(
740 Ustr::from("PROP_TEST_TIMER"),
741 NonZeroU64::new(interval_ns).unwrap(),
742 UnixNanos::from(start_time_ns),
743 stop_time_ns.map(UnixNanos::from),
744 fire_immediately,
745 );
746
747 let mut current_time = start_time_ns;
748
749 for operation in operations {
750 if timer.is_expired() {
751 break;
752 }
753
754 match operation {
755 TimerOperation::AdvanceTime(delta) => {
756 let to_time = current_time + delta;
757 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
758 current_time = to_time;
759
760 for (i, event) in events.iter().enumerate() {
762 if i > 0 {
764 assert!(
765 event.ts_event >= events[i - 1].ts_event,
766 "Events should be in chronological order"
767 );
768 }
769
770 assert!(
772 event.ts_event.as_u64() >= start_time_ns,
773 "Event timestamp should not be before start time"
774 );
775
776 assert!(
777 event.ts_event.as_u64() <= to_time,
778 "Event timestamp should not be after advance time"
779 );
780
781 if let Some(stop_time_ns) = stop_time_ns {
783 assert!(
784 event.ts_event.as_u64() <= stop_time_ns,
785 "Event timestamp should not exceed stop time"
786 );
787 }
788 }
789 }
790 TimerOperation::Cancel => {
791 timer.cancel();
792 assert!(timer.is_expired(), "Timer should be expired after cancel");
793 }
794 }
795
796 if !timer.is_expired() {
798 let expected_interval_multiple = if fire_immediately {
800 timer.next_time_ns().as_u64() >= start_time_ns
801 } else {
802 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
803 };
804 assert!(
805 expected_interval_multiple,
806 "Next time should respect interval spacing"
807 );
808
809 if let Some(stop_time_ns) = stop_time_ns
812 && timer.next_time_ns().as_u64() > stop_time_ns
813 {
814 let mut test_timer = timer.clone();
816 let events: Vec<TimeEvent> = test_timer
817 .advance(UnixNanos::from(stop_time_ns + 1))
818 .collect();
819 assert!(
820 events.is_empty() || test_timer.is_expired(),
821 "Timer should not generate events beyond stop time"
822 );
823 }
824 }
825 }
826
827 if !timer.is_expired()
830 && let Some(stop_time_ns) = stop_time_ns
831 {
832 let events: Vec<TimeEvent> = timer
833 .advance(UnixNanos::from(stop_time_ns + 1000))
834 .collect();
835 assert!(
836 timer.is_expired() || events.is_empty(),
837 "Timer should eventually expire or stop generating events"
838 );
839 }
840 }
841
842 proptest! {
843 #[rstest]
844 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
845 test_timer_with_operations(operations, config);
846 }
847
848 #[rstest]
849 fn prop_timer_interval_consistency(
850 interval_ns in 1u64..=100u64,
851 start_time_ns in 0u64..=50u64,
852 fire_immediately in prop::bool::ANY,
853 advance_count in 1usize..=20usize,
854 ) {
855 let mut timer = TestTimer::new(
856 Ustr::from("CONSISTENCY_TEST"),
857 NonZeroU64::new(interval_ns).unwrap(),
858 UnixNanos::from(start_time_ns),
859 None, fire_immediately,
861 );
862
863 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
864
865 for _ in 0..advance_count {
866 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
867
868 if !events.is_empty() {
869 prop_assert_eq!(events.len(), 1);
871 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
872 }
873
874 previous_event_time += interval_ns;
875 }
876 }
877 }
878}