1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::Arc,
24};
25
26use nautilus_core::{
27 UUID4, UnixNanos,
28 correctness::{FAILED, check_valid_string_utf8},
29};
30#[cfg(feature = "python")]
31use pyo3::{Py, PyAny, Python};
32use ustr::Ustr;
33
34#[must_use]
38#[allow(clippy::missing_panics_doc)] pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
40 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
41}
42
43#[repr(C)]
44#[derive(Clone, Debug, PartialEq, Eq)]
45#[cfg_attr(
46 feature = "python",
47 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
48)]
49pub struct TimeEvent {
54 pub name: Ustr,
56 pub event_id: UUID4,
58 pub ts_event: UnixNanos,
60 pub ts_init: UnixNanos,
62}
63
64impl TimeEvent {
65 #[must_use]
67 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
68 Self {
69 name,
70 event_id,
71 ts_event,
72 ts_init,
73 }
74 }
75}
76
77impl Display for TimeEvent {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
82 stringify!(TimeEvent),
83 self.name,
84 self.event_id,
85 self.ts_event,
86 self.ts_init
87 )
88 }
89}
90
91#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
98pub struct ScheduledTimeEvent(pub TimeEvent);
99
100impl ScheduledTimeEvent {
101 #[must_use]
103 pub const fn new(event: TimeEvent) -> Self {
104 Self(event)
105 }
106
107 #[must_use]
109 pub fn into_inner(self) -> TimeEvent {
110 self.0
111 }
112}
113
114impl PartialOrd for ScheduledTimeEvent {
115 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
116 Some(self.cmp(other))
117 }
118}
119
120impl Ord for ScheduledTimeEvent {
121 fn cmp(&self, other: &Self) -> Ordering {
122 other.0.ts_event.cmp(&self.0.ts_event)
124 }
125}
126
127pub enum TimeEventCallback {
155 #[cfg(feature = "python")]
157 Python(Py<PyAny>),
158 Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
160 RustLocal(Rc<dyn Fn(TimeEvent)>),
162}
163
164impl Clone for TimeEventCallback {
165 fn clone(&self) -> Self {
166 match self {
167 #[cfg(feature = "python")]
168 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
169 Self::Rust(cb) => Self::Rust(cb.clone()),
170 Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
171 }
172 }
173}
174
175impl Debug for TimeEventCallback {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self {
178 #[cfg(feature = "python")]
179 Self::Python(_) => f.write_str("Python callback"),
180 Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
181 Self::RustLocal(_) => f.write_str("Rust callback (local)"),
182 }
183 }
184}
185
186impl TimeEventCallback {
187 #[must_use]
189 pub const fn is_rust(&self) -> bool {
190 matches!(self, Self::Rust(_))
191 }
192
193 #[must_use]
198 pub const fn is_local(&self) -> bool {
199 matches!(self, Self::RustLocal(_))
200 }
201
202 pub fn call(&self, event: TimeEvent) {
206 match self {
207 #[cfg(feature = "python")]
208 Self::Python(callback) => {
209 Python::attach(|py| {
210 if let Err(e) = callback.call1(py, (event,)) {
211 log::error!("Python time event callback raised exception: {e}");
212 }
213 });
214 }
215 Self::Rust(callback) => callback(event),
216 Self::RustLocal(callback) => callback(event),
217 }
218 }
219}
220
221impl<F> From<F> for TimeEventCallback
222where
223 F: Fn(TimeEvent) + Send + Sync + 'static,
224{
225 fn from(value: F) -> Self {
226 Self::Rust(Arc::new(value))
227 }
228}
229
230impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
231 fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
232 Self::Rust(value)
233 }
234}
235
236impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
237 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
238 Self::RustLocal(value)
239 }
240}
241
242#[cfg(feature = "python")]
243impl From<Py<PyAny>> for TimeEventCallback {
244 fn from(value: Py<PyAny>) -> Self {
245 Self::Python(value)
246 }
247}
248
249#[allow(unsafe_code)]
265unsafe impl Send for TimeEventCallback {}
266#[allow(unsafe_code)]
267unsafe impl Sync for TimeEventCallback {}
268
269#[repr(C)]
270#[derive(Clone, Debug)]
271pub struct TimeEventHandler {
276 pub event: TimeEvent,
278 pub callback: TimeEventCallback,
280}
281
282impl TimeEventHandler {
283 #[must_use]
285 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
286 Self { event, callback }
287 }
288
289 pub fn run(self) {
291 let Self { event, callback } = self;
292 callback.call(event);
293 }
294}
295
296impl PartialOrd for TimeEventHandler {
297 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
298 Some(self.cmp(other))
299 }
300}
301
302impl PartialEq for TimeEventHandler {
303 fn eq(&self, other: &Self) -> bool {
304 self.event.ts_event == other.event.ts_event
305 }
306}
307
308impl Eq for TimeEventHandler {}
309
310impl Ord for TimeEventHandler {
311 fn cmp(&self, other: &Self) -> Ordering {
312 self.event.ts_event.cmp(&other.event.ts_event)
313 }
314}
315
316#[derive(Clone, Debug)]
325pub struct TestTimer {
326 pub name: Ustr,
328 pub interval_ns: NonZeroU64,
330 pub start_time_ns: UnixNanos,
332 pub stop_time_ns: Option<UnixNanos>,
334 pub fire_immediately: bool,
336 next_time_ns: UnixNanos,
337 is_expired: bool,
338}
339
340impl TestTimer {
341 #[must_use]
347 pub fn new(
348 name: Ustr,
349 interval_ns: NonZeroU64,
350 start_time_ns: UnixNanos,
351 stop_time_ns: Option<UnixNanos>,
352 fire_immediately: bool,
353 ) -> Self {
354 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
355
356 let next_time_ns = if fire_immediately {
357 start_time_ns
358 } else {
359 start_time_ns + interval_ns.get()
360 };
361
362 Self {
363 name,
364 interval_ns,
365 start_time_ns,
366 stop_time_ns,
367 fire_immediately,
368 next_time_ns,
369 is_expired: false,
370 }
371 }
372
373 #[must_use]
375 pub const fn next_time_ns(&self) -> UnixNanos {
376 self.next_time_ns
377 }
378
379 #[must_use]
381 pub const fn is_expired(&self) -> bool {
382 self.is_expired
383 }
384
385 #[must_use]
386 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
387 TimeEvent {
388 name: self.name,
389 event_id,
390 ts_event: self.next_time_ns,
391 ts_init,
392 }
393 }
394
395 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
401 let advances = if self.next_time_ns <= to_time_ns {
403 ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
404 .saturating_add(1)
405 } else {
406 0
407 };
408 self.take(advances as usize).map(|(event, _)| event)
409 }
410
411 pub const fn cancel(&mut self) {
415 self.is_expired = true;
416 }
417}
418
419impl Iterator for TestTimer {
420 type Item = (TimeEvent, UnixNanos);
421
422 fn next(&mut self) -> Option<Self::Item> {
423 if self.is_expired {
424 None
425 } else {
426 if let Some(stop_time_ns) = self.stop_time_ns
428 && self.next_time_ns > stop_time_ns
429 {
430 self.is_expired = true;
431 return None;
432 }
433
434 let item = (
435 TimeEvent {
436 name: self.name,
437 event_id: UUID4::new(),
438 ts_event: self.next_time_ns,
439 ts_init: self.next_time_ns,
440 },
441 self.next_time_ns,
442 );
443
444 if let Some(stop_time_ns) = self.stop_time_ns
446 && self.next_time_ns == stop_time_ns
447 {
448 self.is_expired = true;
449 }
450
451 self.next_time_ns += self.interval_ns;
452
453 Some(item)
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use std::num::NonZeroU64;
461
462 use nautilus_core::UnixNanos;
463 use rstest::*;
464 use ustr::Ustr;
465
466 use super::{TestTimer, TimeEvent};
467
468 #[rstest]
469 fn test_test_timer_pop_event() {
470 let mut timer = TestTimer::new(
471 Ustr::from("TEST_TIMER"),
472 NonZeroU64::new(1).unwrap(),
473 UnixNanos::from(1),
474 None,
475 false,
476 );
477
478 assert!(timer.next().is_some());
479 assert!(timer.next().is_some());
480 timer.is_expired = true;
481 assert!(timer.next().is_none());
482 }
483
484 #[rstest]
485 fn test_test_timer_advance_within_next_time_ns() {
486 let mut timer = TestTimer::new(
487 Ustr::from("TEST_TIMER"),
488 NonZeroU64::new(5).unwrap(),
489 UnixNanos::default(),
490 None,
491 false,
492 );
493 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
494 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
495 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
496 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
497 assert_eq!(timer.next_time_ns, 5);
498 assert!(!timer.is_expired);
499 }
500
501 #[rstest]
502 fn test_test_timer_advance_up_to_next_time_ns() {
503 let mut timer = TestTimer::new(
504 Ustr::from("TEST_TIMER"),
505 NonZeroU64::new(1).unwrap(),
506 UnixNanos::default(),
507 None,
508 false,
509 );
510 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
511 assert!(!timer.is_expired);
512 }
513
514 #[rstest]
515 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
516 let mut timer = TestTimer::new(
517 Ustr::from("TEST_TIMER"),
518 NonZeroU64::new(1).unwrap(),
519 UnixNanos::default(),
520 Some(UnixNanos::from(2)),
521 false,
522 );
523 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
524 assert!(timer.is_expired);
525 }
526
527 #[rstest]
528 fn test_test_timer_advance_beyond_next_time_ns() {
529 let mut timer = TestTimer::new(
530 Ustr::from("TEST_TIMER"),
531 NonZeroU64::new(1).unwrap(),
532 UnixNanos::default(),
533 Some(UnixNanos::from(5)),
534 false,
535 );
536 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
537 assert!(timer.is_expired);
538 }
539
540 #[rstest]
541 fn test_test_timer_advance_beyond_stop_time() {
542 let mut timer = TestTimer::new(
543 Ustr::from("TEST_TIMER"),
544 NonZeroU64::new(1).unwrap(),
545 UnixNanos::default(),
546 Some(UnixNanos::from(5)),
547 false,
548 );
549 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
550 assert!(timer.is_expired);
551 }
552
553 #[rstest]
554 fn test_test_timer_advance_exact_boundary() {
555 let mut timer = TestTimer::new(
556 Ustr::from("TEST_TIMER"),
557 NonZeroU64::new(5).unwrap(),
558 UnixNanos::from(0),
559 None,
560 false,
561 );
562 assert_eq!(
563 timer.advance(UnixNanos::from(5)).count(),
564 1,
565 "Expected one event at the 5 ns boundary"
566 );
567 assert_eq!(
568 timer.advance(UnixNanos::from(10)).count(),
569 1,
570 "Expected one event at the 10 ns boundary"
571 );
572 }
573
574 #[rstest]
575 fn test_test_timer_fire_immediately_true() {
576 let mut timer = TestTimer::new(
577 Ustr::from("TEST_TIMER"),
578 NonZeroU64::new(5).unwrap(),
579 UnixNanos::from(10),
580 None,
581 true, );
583
584 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
586
587 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
589 assert_eq!(events.len(), 1);
590 assert_eq!(events[0].ts_event, UnixNanos::from(10));
591
592 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
594 }
595
596 #[rstest]
597 fn test_test_timer_fire_immediately_false() {
598 let mut timer = TestTimer::new(
599 Ustr::from("TEST_TIMER"),
600 NonZeroU64::new(5).unwrap(),
601 UnixNanos::from(10),
602 None,
603 false, );
605
606 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
608
609 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
611
612 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
614 assert_eq!(events.len(), 1);
615 assert_eq!(events[0].ts_event, UnixNanos::from(15));
616 }
617
618 use proptest::prelude::*;
623
624 #[derive(Clone, Debug)]
625 enum TimerOperation {
626 AdvanceTime(u64),
627 Cancel,
628 }
629
630 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
631 prop_oneof![
632 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
633 2 => Just(TimerOperation::Cancel),
634 ]
635 }
636
637 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
638 (
639 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
644 }
645
646 fn timer_test_strategy()
647 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
648 (
649 prop::collection::vec(timer_operation_strategy(), 5..=50),
650 timer_config_strategy(),
651 )
652 }
653
654 #[allow(clippy::needless_collect)] fn test_timer_with_operations(
656 operations: Vec<TimerOperation>,
657 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
658 ) {
659 let mut timer = TestTimer::new(
660 Ustr::from("PROP_TEST_TIMER"),
661 NonZeroU64::new(interval_ns).unwrap(),
662 UnixNanos::from(start_time_ns),
663 stop_time_ns.map(UnixNanos::from),
664 fire_immediately,
665 );
666
667 let mut current_time = start_time_ns;
668
669 for operation in operations {
670 if timer.is_expired() {
671 break;
672 }
673
674 match operation {
675 TimerOperation::AdvanceTime(delta) => {
676 let to_time = current_time + delta;
677 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
678 current_time = to_time;
679
680 for (i, event) in events.iter().enumerate() {
682 if i > 0 {
684 assert!(
685 event.ts_event >= events[i - 1].ts_event,
686 "Events should be in chronological order"
687 );
688 }
689
690 assert!(
692 event.ts_event.as_u64() >= start_time_ns,
693 "Event timestamp should not be before start time"
694 );
695
696 assert!(
697 event.ts_event.as_u64() <= to_time,
698 "Event timestamp should not be after advance time"
699 );
700
701 if let Some(stop_time_ns) = stop_time_ns {
703 assert!(
704 event.ts_event.as_u64() <= stop_time_ns,
705 "Event timestamp should not exceed stop time"
706 );
707 }
708 }
709 }
710 TimerOperation::Cancel => {
711 timer.cancel();
712 assert!(timer.is_expired(), "Timer should be expired after cancel");
713 }
714 }
715
716 if !timer.is_expired() {
718 let expected_interval_multiple = if fire_immediately {
720 timer.next_time_ns().as_u64() >= start_time_ns
721 } else {
722 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
723 };
724 assert!(
725 expected_interval_multiple,
726 "Next time should respect interval spacing"
727 );
728
729 if let Some(stop_time_ns) = stop_time_ns
732 && timer.next_time_ns().as_u64() > stop_time_ns
733 {
734 let mut test_timer = timer.clone();
736 let events: Vec<TimeEvent> = test_timer
737 .advance(UnixNanos::from(stop_time_ns + 1))
738 .collect();
739 assert!(
740 events.is_empty() || test_timer.is_expired(),
741 "Timer should not generate events beyond stop time"
742 );
743 }
744 }
745 }
746
747 if !timer.is_expired()
750 && let Some(stop_time_ns) = stop_time_ns
751 {
752 let events: Vec<TimeEvent> = timer
753 .advance(UnixNanos::from(stop_time_ns + 1000))
754 .collect();
755 assert!(
756 timer.is_expired() || events.is_empty(),
757 "Timer should eventually expire or stop generating events"
758 );
759 }
760 }
761
762 proptest! {
763 #[rstest]
764 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
765 test_timer_with_operations(operations, config);
766 }
767
768 #[rstest]
769 fn prop_timer_interval_consistency(
770 interval_ns in 1u64..=100u64,
771 start_time_ns in 0u64..=50u64,
772 fire_immediately in prop::bool::ANY,
773 advance_count in 1usize..=20usize,
774 ) {
775 let mut timer = TestTimer::new(
776 Ustr::from("CONSISTENCY_TEST"),
777 NonZeroU64::new(interval_ns).unwrap(),
778 UnixNanos::from(start_time_ns),
779 None, fire_immediately,
781 );
782
783 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
784
785 for _ in 0..advance_count {
786 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
787
788 if !events.is_empty() {
789 prop_assert_eq!(events.len(), 1);
791 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
792 }
793
794 previous_event_time += interval_ns;
795 }
796 }
797 }
798}