1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::Arc,
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::python::IntoPyObjectNautilusExt;
28use nautilus_core::{
29 UUID4, UnixNanos,
30 correctness::{FAILED, check_valid_string_utf8},
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, PyResult, Python, types::PyCapsule};
34use ustr::Ustr;
35
36#[must_use]
40pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
41 NonZeroU64::new(interval_ns).unwrap_or(NonZeroU64::MIN)
42}
43
44#[repr(C)]
45#[derive(Clone, Debug, PartialEq, Eq)]
46#[cfg_attr(
47 feature = "python",
48 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
49)]
50#[cfg_attr(
51 feature = "python",
52 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
53)]
54pub struct TimeEvent {
59 pub name: Ustr,
61 pub event_id: UUID4,
63 pub ts_event: UnixNanos,
65 pub ts_init: UnixNanos,
67}
68
69impl TimeEvent {
70 #[must_use]
72 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
73 Self {
74 name,
75 event_id,
76 ts_event,
77 ts_init,
78 }
79 }
80}
81
82impl Display for TimeEvent {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(
85 f,
86 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
87 stringify!(TimeEvent),
88 self.name,
89 self.event_id,
90 self.ts_event,
91 self.ts_init
92 )
93 }
94}
95
96#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
103pub struct ScheduledTimeEvent(pub TimeEvent);
104
105impl ScheduledTimeEvent {
106 #[must_use]
108 pub const fn new(event: TimeEvent) -> Self {
109 Self(event)
110 }
111
112 #[must_use]
114 pub fn into_inner(self) -> TimeEvent {
115 self.0
116 }
117}
118
119impl PartialOrd for ScheduledTimeEvent {
120 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for ScheduledTimeEvent {
126 fn cmp(&self, other: &Self) -> Ordering {
127 other.0.ts_event.cmp(&self.0.ts_event)
129 }
130}
131
132#[cfg(feature = "python")]
133#[derive(Clone, Copy, Debug, PartialEq, Eq)]
134pub enum PythonTimeEventCallbackArg {
136 TimeEvent,
138 LegacyCapsule,
140}
141
142#[cfg(feature = "python")]
143pub struct PythonTimeEventCallback {
145 callback: Py<PyAny>,
146 arg: PythonTimeEventCallbackArg,
147}
148
149#[cfg(feature = "python")]
150impl PythonTimeEventCallback {
151 #[must_use]
153 pub const fn new(callback: Py<PyAny>, arg: PythonTimeEventCallbackArg) -> Self {
154 Self { callback, arg }
155 }
156
157 #[must_use]
159 pub const fn callback(&self) -> &Py<PyAny> {
160 &self.callback
161 }
162
163 pub fn call(&self, event: TimeEvent) {
165 Python::attach(|py| {
166 let result = match self.arg {
167 PythonTimeEventCallbackArg::TimeEvent => self.callback.call1(py, (event,)),
168 PythonTimeEventCallbackArg::LegacyCapsule => {
169 call_legacy_python_time_event_callback(py, event, &self.callback)
170 }
171 };
172
173 if let Err(e) = result {
174 log::error!("Python time event callback raised exception: {e}");
175 }
176 });
177 }
178}
179
180#[cfg(feature = "python")]
181impl Debug for PythonTimeEventCallback {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_struct(stringify!(PythonTimeEventCallback))
184 .field("arg", &self.arg)
185 .finish_non_exhaustive()
186 }
187}
188
189pub enum TimeEventCallback {
216 #[cfg(feature = "python")]
218 Python(Arc<PythonTimeEventCallback>),
219 Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
221 RustLocal(Rc<dyn Fn(TimeEvent)>),
223}
224
225impl Clone for TimeEventCallback {
226 fn clone(&self) -> Self {
227 match self {
228 #[cfg(feature = "python")]
229 Self::Python(callback) => Self::Python(callback.clone()),
230 Self::Rust(cb) => Self::Rust(cb.clone()),
231 Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
232 }
233 }
234}
235
236impl Debug for TimeEventCallback {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 match self {
239 #[cfg(feature = "python")]
240 Self::Python(_) => f.write_str("Python callback"),
241 Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
242 Self::RustLocal(_) => f.write_str("Rust callback (local)"),
243 }
244 }
245}
246
247impl TimeEventCallback {
248 #[must_use]
250 pub const fn is_rust(&self) -> bool {
251 matches!(self, Self::Rust(_))
252 }
253
254 #[must_use]
259 pub const fn is_local(&self) -> bool {
260 matches!(self, Self::RustLocal(_))
261 }
262
263 pub fn call(&self, event: TimeEvent) {
267 match self {
268 #[cfg(feature = "python")]
269 Self::Python(callback) => callback.call(event),
270 Self::Rust(callback) => callback(event),
271 Self::RustLocal(callback) => callback(event),
272 }
273 }
274}
275
276impl<F> From<F> for TimeEventCallback
277where
278 F: Fn(TimeEvent) + Send + Sync + 'static,
279{
280 fn from(value: F) -> Self {
281 Self::Rust(Arc::new(value))
282 }
283}
284
285impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
286 fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
287 Self::Rust(value)
288 }
289}
290
291impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
292 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
293 Self::RustLocal(value)
294 }
295}
296
297#[cfg(feature = "python")]
298impl From<Py<PyAny>> for TimeEventCallback {
299 fn from(value: Py<PyAny>) -> Self {
300 Self::from_python_time_event(value)
301 }
302}
303
304#[cfg(feature = "python")]
305impl TimeEventCallback {
306 #[must_use]
308 pub fn from_python_time_event(callback: Py<PyAny>) -> Self {
309 Self::Python(Arc::new(PythonTimeEventCallback::new(
310 callback,
311 PythonTimeEventCallbackArg::TimeEvent,
312 )))
313 }
314
315 #[must_use]
317 pub fn from_python_legacy_capsule(callback: Py<PyAny>) -> Self {
318 Self::Python(Arc::new(PythonTimeEventCallback::new(
319 callback,
320 PythonTimeEventCallbackArg::LegacyCapsule,
321 )))
322 }
323}
324
325#[allow(unsafe_code)]
341unsafe impl Send for TimeEventCallback {}
342#[allow(unsafe_code)]
343unsafe impl Sync for TimeEventCallback {}
344
345#[cfg(feature = "python")]
346fn call_legacy_python_time_event_callback(
347 py: Python<'_>,
348 event: TimeEvent,
349 callback: &Py<PyAny>,
350) -> PyResult<Py<PyAny>> {
351 #[allow(
352 deprecated,
353 reason = "unnamed capsules are required for legacy Cython time-event callbacks"
354 )]
355 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
356 .expect("Error creating `PyCapsule`")
357 .into_py_any_unwrap(py);
358
359 callback.call1(py, (capsule,))
360}
361
362#[repr(C)]
363#[derive(Clone, Debug)]
364pub struct TimeEventHandler {
369 pub event: TimeEvent,
371 pub callback: TimeEventCallback,
373}
374
375impl TimeEventHandler {
376 #[must_use]
378 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
379 Self { event, callback }
380 }
381
382 fn cmp_event(&self, other: &Self) -> Ordering {
383 self.event
384 .ts_event
385 .cmp(&other.event.ts_event)
386 .then_with(|| self.event.name.cmp(&other.event.name))
387 .then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
388 .then_with(|| {
389 self.event
390 .event_id
391 .as_str()
392 .cmp(other.event.event_id.as_str())
393 })
394 }
395
396 pub fn run(self) {
398 let Self { event, callback } = self;
399 crate::msgbus::dispatch_tap_time_event(&event);
400 callback.call(event);
401 }
402}
403
404impl PartialOrd for TimeEventHandler {
405 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
406 Some(self.cmp(other))
407 }
408}
409
410impl PartialEq for TimeEventHandler {
411 fn eq(&self, other: &Self) -> bool {
412 self.cmp_event(other).is_eq()
413 }
414}
415
416impl Eq for TimeEventHandler {}
417
418impl Ord for TimeEventHandler {
419 fn cmp(&self, other: &Self) -> Ordering {
420 self.cmp_event(other)
421 }
422}
423
424pub(crate) trait Timer {
425 fn is_expired(&self) -> bool;
426 fn cancel(&mut self);
427}
428
429#[derive(Clone, Debug)]
438pub struct TestTimer {
439 pub name: Ustr,
441 pub interval_ns: NonZeroU64,
443 pub start_time_ns: UnixNanos,
445 pub stop_time_ns: Option<UnixNanos>,
447 pub fire_immediately: bool,
449 next_time_ns: UnixNanos,
450 is_expired: bool,
451}
452
453impl TestTimer {
454 #[must_use]
460 pub fn new(
461 name: Ustr,
462 interval_ns: NonZeroU64,
463 start_time_ns: UnixNanos,
464 stop_time_ns: Option<UnixNanos>,
465 fire_immediately: bool,
466 ) -> Self {
467 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
468
469 let next_time_ns = if fire_immediately {
470 start_time_ns
471 } else {
472 start_time_ns + interval_ns.get()
473 };
474
475 Self {
476 name,
477 interval_ns,
478 start_time_ns,
479 stop_time_ns,
480 fire_immediately,
481 next_time_ns,
482 is_expired: false,
483 }
484 }
485
486 #[must_use]
488 pub const fn next_time_ns(&self) -> UnixNanos {
489 self.next_time_ns
490 }
491
492 #[must_use]
494 pub const fn is_expired(&self) -> bool {
495 self.is_expired
496 }
497
498 #[must_use]
499 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
500 TimeEvent {
501 name: self.name,
502 event_id,
503 ts_event: self.next_time_ns,
504 ts_init,
505 }
506 }
507
508 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
514 let advances = if self.next_time_ns <= to_time_ns {
516 ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
517 .saturating_add(1)
518 } else {
519 0
520 };
521 self.take(advances as usize).map(|(event, _)| event)
522 }
523
524 pub const fn cancel(&mut self) {
528 self.is_expired = true;
529 }
530}
531
532impl Timer for TestTimer {
533 fn is_expired(&self) -> bool {
534 self.is_expired
535 }
536
537 fn cancel(&mut self) {
538 self.is_expired = true;
539 }
540}
541
542impl Iterator for TestTimer {
543 type Item = (TimeEvent, UnixNanos);
544
545 fn next(&mut self) -> Option<Self::Item> {
546 if self.is_expired {
547 None
548 } else {
549 if let Some(stop_time_ns) = self.stop_time_ns
551 && self.next_time_ns > stop_time_ns
552 {
553 self.is_expired = true;
554 return None;
555 }
556
557 let item = (
558 TimeEvent {
559 name: self.name,
560 event_id: UUID4::new(),
561 ts_event: self.next_time_ns,
562 ts_init: self.next_time_ns,
563 },
564 self.next_time_ns,
565 );
566
567 if let Some(stop_time_ns) = self.stop_time_ns
569 && self.next_time_ns == stop_time_ns
570 {
571 self.is_expired = true;
572 }
573
574 self.next_time_ns += self.interval_ns;
575
576 Some(item)
577 }
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use std::{cell::RefCell, num::NonZeroU64, rc::Rc};
584
585 use nautilus_core::{UUID4, UnixNanos};
586 #[cfg(feature = "python")]
587 use pyo3::{
588 Bound, PyResult, Python,
589 types::{
590 PyAnyMethods, PyCFunction, PyDict, PyList, PyListMethods, PyTuple, PyTupleMethods,
591 PyTypeMethods,
592 },
593 };
594 use rstest::*;
595 use ustr::Ustr;
596
597 use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval};
598 use crate::msgbus::{
599 BusTap, Endpoint, MStr, MessagingSwitchboard, Topic, clear_bus_tap, set_bus_tap,
600 };
601
602 #[rstest]
603 #[case(0, 1)]
604 #[case(1, 1)]
605 #[case(25, 25)]
606 fn test_create_valid_interval(#[case] interval_ns: u64, #[case] expected: u64) {
607 assert_eq!(create_valid_interval(interval_ns).get(), expected);
608 }
609
610 #[rstest]
611 fn test_test_timer_pop_event() {
612 let mut timer = TestTimer::new(
613 Ustr::from("TEST_TIMER"),
614 NonZeroU64::new(1).unwrap(),
615 UnixNanos::from(1),
616 None,
617 false,
618 );
619
620 assert!(timer.next().is_some());
621 assert!(timer.next().is_some());
622 timer.is_expired = true;
623 assert!(timer.next().is_none());
624 }
625
626 #[rstest]
627 fn test_test_timer_advance_within_next_time_ns() {
628 let mut timer = TestTimer::new(
629 Ustr::from("TEST_TIMER"),
630 NonZeroU64::new(5).unwrap(),
631 UnixNanos::default(),
632 None,
633 false,
634 );
635 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
636 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
637 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
638 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
639 assert_eq!(timer.next_time_ns, 5);
640 assert!(!timer.is_expired);
641 }
642
643 #[rstest]
644 fn test_test_timer_advance_up_to_next_time_ns() {
645 let mut timer = TestTimer::new(
646 Ustr::from("TEST_TIMER"),
647 NonZeroU64::new(1).unwrap(),
648 UnixNanos::default(),
649 None,
650 false,
651 );
652 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
653 assert!(!timer.is_expired);
654 }
655
656 #[rstest]
657 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
658 let mut timer = TestTimer::new(
659 Ustr::from("TEST_TIMER"),
660 NonZeroU64::new(1).unwrap(),
661 UnixNanos::default(),
662 Some(UnixNanos::from(2)),
663 false,
664 );
665 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
666 assert!(timer.is_expired);
667 }
668
669 #[rstest]
670 fn test_test_timer_advance_beyond_next_time_ns() {
671 let mut timer = TestTimer::new(
672 Ustr::from("TEST_TIMER"),
673 NonZeroU64::new(1).unwrap(),
674 UnixNanos::default(),
675 Some(UnixNanos::from(5)),
676 false,
677 );
678 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
679 assert!(timer.is_expired);
680 }
681
682 #[rstest]
683 fn test_test_timer_advance_beyond_stop_time() {
684 let mut timer = TestTimer::new(
685 Ustr::from("TEST_TIMER"),
686 NonZeroU64::new(1).unwrap(),
687 UnixNanos::default(),
688 Some(UnixNanos::from(5)),
689 false,
690 );
691 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
692 assert!(timer.is_expired);
693 }
694
695 #[rstest]
696 fn test_test_timer_advance_exact_boundary() {
697 let mut timer = TestTimer::new(
698 Ustr::from("TEST_TIMER"),
699 NonZeroU64::new(5).unwrap(),
700 UnixNanos::from(0),
701 None,
702 false,
703 );
704 assert_eq!(
705 timer.advance(UnixNanos::from(5)).count(),
706 1,
707 "Expected one event at the 5 ns boundary"
708 );
709 assert_eq!(
710 timer.advance(UnixNanos::from(10)).count(),
711 1,
712 "Expected one event at the 10 ns boundary"
713 );
714 }
715
716 #[rstest]
717 fn test_test_timer_fire_immediately_true() {
718 let mut timer = TestTimer::new(
719 Ustr::from("TEST_TIMER"),
720 NonZeroU64::new(5).unwrap(),
721 UnixNanos::from(10),
722 None,
723 true, );
725
726 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
728
729 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
731 assert_eq!(events.len(), 1);
732 assert_eq!(events[0].ts_event, UnixNanos::from(10));
733
734 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
736 }
737
738 #[rstest]
739 fn test_test_timer_fire_immediately_false() {
740 let mut timer = TestTimer::new(
741 Ustr::from("TEST_TIMER"),
742 NonZeroU64::new(5).unwrap(),
743 UnixNanos::from(10),
744 None,
745 false, );
747
748 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
750
751 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
753
754 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
756 assert_eq!(events.len(), 1);
757 assert_eq!(events[0].ts_event, UnixNanos::from(15));
758 }
759
760 #[rstest]
761 fn test_time_event_handler_ordering_uses_tie_breakers() {
762 let callback = TimeEventCallback::from(|_: TimeEvent| {});
763
764 let later_name = TimeEventHandler::new(
765 TimeEvent::new(
766 Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
767 UUID4::from("00000000-0000-4000-8000-000000000003"),
768 100.into(),
769 100.into(),
770 ),
771 callback.clone(),
772 );
773 let earlier_name = TimeEventHandler::new(
774 TimeEvent::new(
775 Ustr::from("SPREAD_QUOTE_ESM4"),
776 UUID4::from("00000000-0000-4000-8000-000000000002"),
777 100.into(),
778 100.into(),
779 ),
780 callback.clone(),
781 );
782 let later_init = TimeEventHandler::new(
783 TimeEvent::new(
784 Ustr::from("SPREAD_QUOTE_ESM4"),
785 UUID4::from("00000000-0000-4000-8000-000000000004"),
786 100.into(),
787 101.into(),
788 ),
789 callback.clone(),
790 );
791 let later_id = TimeEventHandler::new(
792 TimeEvent::new(
793 Ustr::from("SPREAD_QUOTE_ESM4"),
794 UUID4::from("00000000-0000-4000-8000-000000000005"),
795 100.into(),
796 100.into(),
797 ),
798 callback,
799 );
800
801 assert!(earlier_name < later_name);
802 assert!(earlier_name < later_init);
803 assert!(earlier_name < later_id);
804 assert_ne!(earlier_name, later_id);
805 }
806
807 #[cfg(feature = "python")]
808 #[rstest]
809 fn test_python_callback_modes_pass_expected_argument_types() {
810 Python::initialize();
811
812 Python::attach(|py| {
813 let seen = PyList::empty(py);
814 let seen_obj = seen.clone().unbind().into_any();
815 let callback = new_sync_py_callback(
816 py,
817 move |args: &Bound<'_, PyTuple>,
818 _kwargs: Option<&Bound<'_, PyDict>>|
819 -> PyResult<()> {
820 let arg = args.get_item(0)?;
821 let type_name = arg.get_type().name()?.to_string();
822 Python::attach(|py| seen_obj.call_method1(py, "append", (type_name,)))?;
823 Ok(())
824 },
825 )
826 .expect("callback should create")
827 .into_any()
828 .unbind();
829
830 let event = TimeEvent::new(
831 Ustr::from("PY_CALLBACK_MODE"),
832 UUID4::from("00000000-0000-4000-8000-000000000007"),
833 UnixNanos::from(100),
834 UnixNanos::from(99),
835 );
836
837 TimeEventCallback::from_python_time_event(callback.clone_ref(py)).call(event.clone());
838 TimeEventCallback::from_python_legacy_capsule(callback).call(event);
839
840 assert_eq!(
841 seen.get_item(0).unwrap().extract::<String>().unwrap(),
842 "TimeEvent"
843 );
844 assert_eq!(
845 seen.get_item(1).unwrap().extract::<String>().unwrap(),
846 "PyCapsule"
847 );
848 });
849 }
850
851 #[cfg(feature = "python")]
852 fn new_sync_py_callback<F>(py: Python<'_>, closure: F) -> PyResult<Bound<'_, PyCFunction>>
853 where
854 F: Fn(&Bound<'_, PyTuple>, Option<&Bound<'_, PyDict>>) -> PyResult<()>
855 + Send
856 + Sync
857 + 'static,
858 {
859 PyCFunction::new_closure(py, None, None, closure)
860 }
861
862 #[derive(Default)]
863 struct RecordingTimeEventTap {
864 time_events: RefCell<Vec<(String, TimeEvent)>>,
865 }
866
867 impl RecordingTimeEventTap {
868 fn time_events(&self) -> Vec<(String, TimeEvent)> {
869 self.time_events.borrow().clone()
870 }
871 }
872
873 impl BusTap for RecordingTimeEventTap {
874 fn on_publish(&self, topic: MStr<Topic>, message: &dyn std::any::Any) {
875 if let Some(event) = message.downcast_ref::<TimeEvent>() {
876 self.time_events
877 .borrow_mut()
878 .push((topic.to_string(), event.clone()));
879 }
880 }
881
882 fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
883 }
884
885 #[rstest]
886 fn test_time_event_handler_run_dispatches_tap_before_callback() {
887 let event = TimeEvent::new(
888 Ustr::from("strategy.heartbeat"),
889 UUID4::from("00000000-0000-4000-8000-000000000006"),
890 UnixNanos::from(100),
891 UnixNanos::from(99),
892 );
893 let tap = Rc::new(RecordingTimeEventTap::default());
894 let callback_seen: Rc<RefCell<Vec<TimeEvent>>> = Rc::new(RefCell::new(Vec::new()));
895 let expected_topic = MessagingSwitchboard::time_event_topic().to_string();
896 let callback_expected = event.clone();
897 let callback_expected_topic = expected_topic.clone();
898 let callback_tap = Rc::clone(&tap);
899 let callback_seen_ref = Rc::clone(&callback_seen);
900 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |callback_event| {
901 assert_eq!(
902 callback_tap.time_events(),
903 vec![(callback_expected_topic.clone(), callback_expected.clone())],
904 );
905 callback_seen_ref.borrow_mut().push(callback_event);
906 });
907
908 set_bus_tap(tap.clone());
909 TimeEventHandler::new(event.clone(), TimeEventCallback::from(callback)).run();
910 clear_bus_tap();
911
912 assert_eq!(tap.time_events(), vec![(expected_topic, event.clone())]);
913 assert_eq!(*callback_seen.borrow(), vec![event]);
914 }
915
916 use proptest::prelude::*;
921
922 #[derive(Clone, Debug)]
923 enum TimerOperation {
924 AdvanceTime(u64),
925 Cancel,
926 }
927
928 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
929 prop_oneof![
930 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
931 2 => Just(TimerOperation::Cancel),
932 ]
933 }
934
935 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
936 (
937 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
942 }
943
944 fn timer_test_strategy()
945 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
946 (
947 prop::collection::vec(timer_operation_strategy(), 5..=50),
948 timer_config_strategy(),
949 )
950 }
951
952 #[expect(clippy::needless_collect)] fn test_timer_with_operations(
954 operations: Vec<TimerOperation>,
955 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
956 ) {
957 let mut timer = TestTimer::new(
958 Ustr::from("PROP_TEST_TIMER"),
959 NonZeroU64::new(interval_ns).unwrap(),
960 UnixNanos::from(start_time_ns),
961 stop_time_ns.map(UnixNanos::from),
962 fire_immediately,
963 );
964
965 let mut current_time = start_time_ns;
966
967 for operation in operations {
968 if timer.is_expired() {
969 break;
970 }
971
972 match operation {
973 TimerOperation::AdvanceTime(delta) => {
974 let to_time = current_time + delta;
975 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
976 current_time = to_time;
977
978 for (i, event) in events.iter().enumerate() {
980 if i > 0 {
982 assert!(
983 event.ts_event >= events[i - 1].ts_event,
984 "Events should be in chronological order"
985 );
986 }
987
988 assert!(
990 event.ts_event.as_u64() >= start_time_ns,
991 "Event timestamp should not be before start time"
992 );
993
994 assert!(
995 event.ts_event.as_u64() <= to_time,
996 "Event timestamp should not be after advance time"
997 );
998
999 if let Some(stop_time_ns) = stop_time_ns {
1001 assert!(
1002 event.ts_event.as_u64() <= stop_time_ns,
1003 "Event timestamp should not exceed stop time"
1004 );
1005 }
1006 }
1007 }
1008 TimerOperation::Cancel => {
1009 timer.cancel();
1010 assert!(timer.is_expired(), "Timer should be expired after cancel");
1011 }
1012 }
1013
1014 if !timer.is_expired() {
1016 let expected_interval_multiple = if fire_immediately {
1018 timer.next_time_ns().as_u64() >= start_time_ns
1019 } else {
1020 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
1021 };
1022 assert!(
1023 expected_interval_multiple,
1024 "Next time should respect interval spacing"
1025 );
1026
1027 if let Some(stop_time_ns) = stop_time_ns
1030 && timer.next_time_ns().as_u64() > stop_time_ns
1031 {
1032 let mut test_timer = timer.clone();
1034 let events: Vec<TimeEvent> = test_timer
1035 .advance(UnixNanos::from(stop_time_ns + 1))
1036 .collect();
1037 assert!(
1038 events.is_empty() || test_timer.is_expired(),
1039 "Timer should not generate events beyond stop time"
1040 );
1041 }
1042 }
1043 }
1044
1045 if !timer.is_expired()
1048 && let Some(stop_time_ns) = stop_time_ns
1049 {
1050 let events: Vec<TimeEvent> = timer
1051 .advance(UnixNanos::from(stop_time_ns + 1000))
1052 .collect();
1053 assert!(
1054 timer.is_expired() || events.is_empty(),
1055 "Timer should eventually expire or stop generating events"
1056 );
1057 }
1058 }
1059
1060 proptest! {
1061 #[rstest]
1062 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
1063 test_timer_with_operations(operations, config);
1064 }
1065
1066 #[rstest]
1067 fn prop_timer_interval_consistency(
1068 interval_ns in 1u64..=100u64,
1069 start_time_ns in 0u64..=50u64,
1070 fire_immediately in prop::bool::ANY,
1071 advance_count in 1usize..=20usize,
1072 ) {
1073 let mut timer = TestTimer::new(
1074 Ustr::from("CONSISTENCY_TEST"),
1075 NonZeroU64::new(interval_ns).unwrap(),
1076 UnixNanos::from(start_time_ns),
1077 None, fire_immediately,
1079 );
1080
1081 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
1082
1083 for _ in 0..advance_count {
1084 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
1085
1086 if !events.is_empty() {
1087 prop_assert_eq!(events.len(), 1);
1089 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
1090 }
1091
1092 previous_event_time += interval_ns;
1093 }
1094 }
1095 }
1096}