1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::Arc,
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::python::IntoPyObjectNautilusExt;
28use nautilus_core::{
29 UUID4, UnixNanos,
30 correctness::{FAILED, check_valid_string_utf8},
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, PyResult, Python, types::PyCapsule};
34use ustr::Ustr;
35
36#[must_use]
40pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
41 NonZeroU64::new(interval_ns).unwrap_or(NonZeroU64::MIN)
42}
43
44#[repr(C)]
45#[derive(Clone, Debug, PartialEq, Eq)]
46#[cfg_attr(
47 feature = "python",
48 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
49)]
50#[cfg_attr(
51 feature = "python",
52 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
53)]
54pub struct TimeEvent {
59 pub name: Ustr,
61 pub event_id: UUID4,
63 pub ts_event: UnixNanos,
65 pub ts_init: UnixNanos,
67}
68
69impl TimeEvent {
70 #[must_use]
72 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
73 Self {
74 name,
75 event_id,
76 ts_event,
77 ts_init,
78 }
79 }
80}
81
82impl Display for TimeEvent {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(
85 f,
86 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
87 stringify!(TimeEvent),
88 self.name,
89 self.event_id,
90 self.ts_event,
91 self.ts_init
92 )
93 }
94}
95
96#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
103pub struct ScheduledTimeEvent(pub TimeEvent);
104
105impl ScheduledTimeEvent {
106 #[must_use]
108 pub const fn new(event: TimeEvent) -> Self {
109 Self(event)
110 }
111
112 #[must_use]
114 pub fn into_inner(self) -> TimeEvent {
115 self.0
116 }
117}
118
119impl PartialOrd for ScheduledTimeEvent {
120 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for ScheduledTimeEvent {
126 fn cmp(&self, other: &Self) -> Ordering {
127 other.0.ts_event.cmp(&self.0.ts_event)
129 }
130}
131
132#[cfg(feature = "python")]
133#[derive(Clone, Copy, Debug, PartialEq, Eq)]
134pub enum PythonTimeEventCallbackArg {
136 TimeEvent,
138 LegacyCapsule,
140}
141
142#[cfg(feature = "python")]
143pub struct PythonTimeEventCallback {
145 callback: Py<PyAny>,
146 arg: PythonTimeEventCallbackArg,
147}
148
149#[cfg(feature = "python")]
150impl PythonTimeEventCallback {
151 #[must_use]
153 pub const fn new(callback: Py<PyAny>, arg: PythonTimeEventCallbackArg) -> Self {
154 Self { callback, arg }
155 }
156
157 #[must_use]
159 pub const fn callback(&self) -> &Py<PyAny> {
160 &self.callback
161 }
162
163 pub fn call(&self, event: TimeEvent) {
165 Python::attach(|py| {
166 let result = match self.arg {
167 PythonTimeEventCallbackArg::TimeEvent => self.callback.call1(py, (event,)),
168 PythonTimeEventCallbackArg::LegacyCapsule => {
169 call_legacy_python_time_event_callback(py, event, &self.callback)
170 }
171 };
172
173 if let Err(e) = result {
174 log::error!("Python time event callback raised exception: {e}");
175 }
176 });
177 }
178}
179
180#[cfg(feature = "python")]
181impl Debug for PythonTimeEventCallback {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_struct(stringify!(PythonTimeEventCallback))
184 .field("arg", &self.arg)
185 .finish_non_exhaustive()
186 }
187}
188
189pub enum TimeEventCallback {
216 #[cfg(feature = "python")]
218 Python(Arc<PythonTimeEventCallback>),
219 Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
221 RustLocal(Rc<dyn Fn(TimeEvent)>),
223}
224
225impl Clone for TimeEventCallback {
226 fn clone(&self) -> Self {
227 match self {
228 #[cfg(feature = "python")]
229 Self::Python(callback) => Self::Python(callback.clone()),
230 Self::Rust(cb) => Self::Rust(cb.clone()),
231 Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
232 }
233 }
234}
235
236impl Debug for TimeEventCallback {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 match self {
239 #[cfg(feature = "python")]
240 Self::Python(_) => f.write_str("Python callback"),
241 Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
242 Self::RustLocal(_) => f.write_str("Rust callback (local)"),
243 }
244 }
245}
246
247impl TimeEventCallback {
248 #[must_use]
250 pub const fn is_rust(&self) -> bool {
251 matches!(self, Self::Rust(_))
252 }
253
254 #[must_use]
259 pub const fn is_local(&self) -> bool {
260 matches!(self, Self::RustLocal(_))
261 }
262
263 pub fn call(&self, event: TimeEvent) {
267 match self {
268 #[cfg(feature = "python")]
269 Self::Python(callback) => callback.call(event),
270 Self::Rust(callback) => callback(event),
271 Self::RustLocal(callback) => callback(event),
272 }
273 }
274}
275
276impl<F> From<F> for TimeEventCallback
277where
278 F: Fn(TimeEvent) + Send + Sync + 'static,
279{
280 fn from(value: F) -> Self {
281 Self::Rust(Arc::new(value))
282 }
283}
284
285impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
286 fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
287 Self::Rust(value)
288 }
289}
290
291impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
292 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
293 Self::RustLocal(value)
294 }
295}
296
297#[cfg(feature = "python")]
298impl From<Py<PyAny>> for TimeEventCallback {
299 fn from(value: Py<PyAny>) -> Self {
300 Self::from_python_time_event(value)
301 }
302}
303
304#[cfg(feature = "python")]
305impl TimeEventCallback {
306 #[must_use]
308 pub fn from_python_time_event(callback: Py<PyAny>) -> Self {
309 Self::Python(Arc::new(PythonTimeEventCallback::new(
310 callback,
311 PythonTimeEventCallbackArg::TimeEvent,
312 )))
313 }
314
315 #[must_use]
317 pub fn from_python_legacy_capsule(callback: Py<PyAny>) -> Self {
318 Self::Python(Arc::new(PythonTimeEventCallback::new(
319 callback,
320 PythonTimeEventCallbackArg::LegacyCapsule,
321 )))
322 }
323}
324
325#[allow(unsafe_code)]
341unsafe impl Send for TimeEventCallback {}
342#[allow(unsafe_code)]
343unsafe impl Sync for TimeEventCallback {}
344
345#[cfg(feature = "python")]
346fn call_legacy_python_time_event_callback(
347 py: Python<'_>,
348 event: TimeEvent,
349 callback: &Py<PyAny>,
350) -> PyResult<Py<PyAny>> {
351 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
352 .expect("Error creating `PyCapsule`")
353 .into_py_any_unwrap(py);
354
355 callback.call1(py, (capsule,))
356}
357
358#[repr(C)]
359#[derive(Clone, Debug)]
360pub struct TimeEventHandler {
365 pub event: TimeEvent,
367 pub callback: TimeEventCallback,
369}
370
371impl TimeEventHandler {
372 #[must_use]
374 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
375 Self { event, callback }
376 }
377
378 fn cmp_event(&self, other: &Self) -> Ordering {
379 self.event
380 .ts_event
381 .cmp(&other.event.ts_event)
382 .then_with(|| self.event.name.cmp(&other.event.name))
383 .then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
384 .then_with(|| {
385 self.event
386 .event_id
387 .as_str()
388 .cmp(other.event.event_id.as_str())
389 })
390 }
391
392 pub fn run(self) {
394 let Self { event, callback } = self;
395 crate::msgbus::dispatch_tap_time_event(&event);
396 callback.call(event);
397 }
398}
399
400impl PartialOrd for TimeEventHandler {
401 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
402 Some(self.cmp(other))
403 }
404}
405
406impl PartialEq for TimeEventHandler {
407 fn eq(&self, other: &Self) -> bool {
408 self.cmp_event(other).is_eq()
409 }
410}
411
412impl Eq for TimeEventHandler {}
413
414impl Ord for TimeEventHandler {
415 fn cmp(&self, other: &Self) -> Ordering {
416 self.cmp_event(other)
417 }
418}
419
420pub(crate) trait Timer {
421 fn is_expired(&self) -> bool;
422 fn cancel(&mut self);
423}
424
425#[derive(Clone, Debug)]
434pub struct TestTimer {
435 pub name: Ustr,
437 pub interval_ns: NonZeroU64,
439 pub start_time_ns: UnixNanos,
441 pub stop_time_ns: Option<UnixNanos>,
443 pub fire_immediately: bool,
445 next_time_ns: UnixNanos,
446 is_expired: bool,
447}
448
449impl TestTimer {
450 #[must_use]
456 pub fn new(
457 name: Ustr,
458 interval_ns: NonZeroU64,
459 start_time_ns: UnixNanos,
460 stop_time_ns: Option<UnixNanos>,
461 fire_immediately: bool,
462 ) -> Self {
463 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
464
465 let next_time_ns = if fire_immediately {
466 start_time_ns
467 } else {
468 start_time_ns + interval_ns.get()
469 };
470
471 Self {
472 name,
473 interval_ns,
474 start_time_ns,
475 stop_time_ns,
476 fire_immediately,
477 next_time_ns,
478 is_expired: false,
479 }
480 }
481
482 #[must_use]
484 pub const fn next_time_ns(&self) -> UnixNanos {
485 self.next_time_ns
486 }
487
488 #[must_use]
490 pub const fn is_expired(&self) -> bool {
491 self.is_expired
492 }
493
494 #[must_use]
495 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
496 TimeEvent {
497 name: self.name,
498 event_id,
499 ts_event: self.next_time_ns,
500 ts_init,
501 }
502 }
503
504 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
510 let advances = if self.next_time_ns <= to_time_ns {
512 ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
513 .saturating_add(1)
514 } else {
515 0
516 };
517 self.take(advances as usize).map(|(event, _)| event)
518 }
519
520 pub const fn cancel(&mut self) {
524 self.is_expired = true;
525 }
526}
527
528impl Timer for TestTimer {
529 fn is_expired(&self) -> bool {
530 self.is_expired
531 }
532
533 fn cancel(&mut self) {
534 self.is_expired = true;
535 }
536}
537
538impl Iterator for TestTimer {
539 type Item = (TimeEvent, UnixNanos);
540
541 fn next(&mut self) -> Option<Self::Item> {
542 if self.is_expired {
543 None
544 } else {
545 if let Some(stop_time_ns) = self.stop_time_ns
547 && self.next_time_ns > stop_time_ns
548 {
549 self.is_expired = true;
550 return None;
551 }
552
553 let item = (
554 TimeEvent {
555 name: self.name,
556 event_id: UUID4::new(),
557 ts_event: self.next_time_ns,
558 ts_init: self.next_time_ns,
559 },
560 self.next_time_ns,
561 );
562
563 if let Some(stop_time_ns) = self.stop_time_ns
565 && self.next_time_ns == stop_time_ns
566 {
567 self.is_expired = true;
568 }
569
570 self.next_time_ns += self.interval_ns;
571
572 Some(item)
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use std::{cell::RefCell, num::NonZeroU64, rc::Rc};
580
581 use nautilus_core::{UUID4, UnixNanos};
582 #[cfg(feature = "python")]
583 use pyo3::{
584 Bound, PyResult, Python,
585 types::{
586 PyAnyMethods, PyCFunction, PyDict, PyList, PyListMethods, PyTuple, PyTupleMethods,
587 PyTypeMethods,
588 },
589 };
590 use rstest::*;
591 use ustr::Ustr;
592
593 use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval};
594 use crate::msgbus::{
595 BusTap, Endpoint, MStr, MessagingSwitchboard, Topic, clear_bus_tap, set_bus_tap,
596 };
597
598 #[rstest]
599 #[case(0, 1)]
600 #[case(1, 1)]
601 #[case(25, 25)]
602 fn test_create_valid_interval(#[case] interval_ns: u64, #[case] expected: u64) {
603 assert_eq!(create_valid_interval(interval_ns).get(), expected);
604 }
605
606 #[rstest]
607 fn test_test_timer_pop_event() {
608 let mut timer = TestTimer::new(
609 Ustr::from("TEST_TIMER"),
610 NonZeroU64::new(1).unwrap(),
611 UnixNanos::from(1),
612 None,
613 false,
614 );
615
616 assert!(timer.next().is_some());
617 assert!(timer.next().is_some());
618 timer.is_expired = true;
619 assert!(timer.next().is_none());
620 }
621
622 #[rstest]
623 fn test_test_timer_advance_within_next_time_ns() {
624 let mut timer = TestTimer::new(
625 Ustr::from("TEST_TIMER"),
626 NonZeroU64::new(5).unwrap(),
627 UnixNanos::default(),
628 None,
629 false,
630 );
631 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
632 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
633 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
634 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
635 assert_eq!(timer.next_time_ns, 5);
636 assert!(!timer.is_expired);
637 }
638
639 #[rstest]
640 fn test_test_timer_advance_up_to_next_time_ns() {
641 let mut timer = TestTimer::new(
642 Ustr::from("TEST_TIMER"),
643 NonZeroU64::new(1).unwrap(),
644 UnixNanos::default(),
645 None,
646 false,
647 );
648 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
649 assert!(!timer.is_expired);
650 }
651
652 #[rstest]
653 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
654 let mut timer = TestTimer::new(
655 Ustr::from("TEST_TIMER"),
656 NonZeroU64::new(1).unwrap(),
657 UnixNanos::default(),
658 Some(UnixNanos::from(2)),
659 false,
660 );
661 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
662 assert!(timer.is_expired);
663 }
664
665 #[rstest]
666 fn test_test_timer_advance_beyond_next_time_ns() {
667 let mut timer = TestTimer::new(
668 Ustr::from("TEST_TIMER"),
669 NonZeroU64::new(1).unwrap(),
670 UnixNanos::default(),
671 Some(UnixNanos::from(5)),
672 false,
673 );
674 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
675 assert!(timer.is_expired);
676 }
677
678 #[rstest]
679 fn test_test_timer_advance_beyond_stop_time() {
680 let mut timer = TestTimer::new(
681 Ustr::from("TEST_TIMER"),
682 NonZeroU64::new(1).unwrap(),
683 UnixNanos::default(),
684 Some(UnixNanos::from(5)),
685 false,
686 );
687 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
688 assert!(timer.is_expired);
689 }
690
691 #[rstest]
692 fn test_test_timer_advance_exact_boundary() {
693 let mut timer = TestTimer::new(
694 Ustr::from("TEST_TIMER"),
695 NonZeroU64::new(5).unwrap(),
696 UnixNanos::from(0),
697 None,
698 false,
699 );
700 assert_eq!(
701 timer.advance(UnixNanos::from(5)).count(),
702 1,
703 "Expected one event at the 5 ns boundary"
704 );
705 assert_eq!(
706 timer.advance(UnixNanos::from(10)).count(),
707 1,
708 "Expected one event at the 10 ns boundary"
709 );
710 }
711
712 #[rstest]
713 fn test_test_timer_fire_immediately_true() {
714 let mut timer = TestTimer::new(
715 Ustr::from("TEST_TIMER"),
716 NonZeroU64::new(5).unwrap(),
717 UnixNanos::from(10),
718 None,
719 true, );
721
722 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
724
725 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
727 assert_eq!(events.len(), 1);
728 assert_eq!(events[0].ts_event, UnixNanos::from(10));
729
730 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
732 }
733
734 #[rstest]
735 fn test_test_timer_fire_immediately_false() {
736 let mut timer = TestTimer::new(
737 Ustr::from("TEST_TIMER"),
738 NonZeroU64::new(5).unwrap(),
739 UnixNanos::from(10),
740 None,
741 false, );
743
744 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
746
747 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
749
750 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
752 assert_eq!(events.len(), 1);
753 assert_eq!(events[0].ts_event, UnixNanos::from(15));
754 }
755
756 #[rstest]
757 fn test_time_event_handler_ordering_uses_tie_breakers() {
758 let callback = TimeEventCallback::from(|_: TimeEvent| {});
759
760 let later_name = TimeEventHandler::new(
761 TimeEvent::new(
762 Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
763 UUID4::from("00000000-0000-4000-8000-000000000003"),
764 100.into(),
765 100.into(),
766 ),
767 callback.clone(),
768 );
769 let earlier_name = TimeEventHandler::new(
770 TimeEvent::new(
771 Ustr::from("SPREAD_QUOTE_ESM4"),
772 UUID4::from("00000000-0000-4000-8000-000000000002"),
773 100.into(),
774 100.into(),
775 ),
776 callback.clone(),
777 );
778 let later_init = TimeEventHandler::new(
779 TimeEvent::new(
780 Ustr::from("SPREAD_QUOTE_ESM4"),
781 UUID4::from("00000000-0000-4000-8000-000000000004"),
782 100.into(),
783 101.into(),
784 ),
785 callback.clone(),
786 );
787 let later_id = TimeEventHandler::new(
788 TimeEvent::new(
789 Ustr::from("SPREAD_QUOTE_ESM4"),
790 UUID4::from("00000000-0000-4000-8000-000000000005"),
791 100.into(),
792 100.into(),
793 ),
794 callback,
795 );
796
797 assert!(earlier_name < later_name);
798 assert!(earlier_name < later_init);
799 assert!(earlier_name < later_id);
800 assert_ne!(earlier_name, later_id);
801 }
802
803 #[cfg(feature = "python")]
804 #[rstest]
805 fn test_python_callback_modes_pass_expected_argument_types() {
806 Python::initialize();
807
808 Python::attach(|py| {
809 let seen = PyList::empty(py);
810 let seen_obj = seen.clone().unbind().into_any();
811 let callback = PyCFunction::new_closure(
812 py,
813 None,
814 None,
815 move |args: &Bound<'_, PyTuple>,
816 _kwargs: Option<&Bound<'_, PyDict>>|
817 -> PyResult<()> {
818 let arg = args.get_item(0)?;
819 let type_name = arg.get_type().name()?.to_string();
820 Python::attach(|py| seen_obj.call_method1(py, "append", (type_name,)))?;
821 Ok(())
822 },
823 )
824 .expect("callback should create")
825 .into_any()
826 .unbind();
827
828 let event = TimeEvent::new(
829 Ustr::from("PY_CALLBACK_MODE"),
830 UUID4::from("00000000-0000-4000-8000-000000000007"),
831 UnixNanos::from(100),
832 UnixNanos::from(99),
833 );
834
835 TimeEventCallback::from_python_time_event(callback.clone_ref(py)).call(event.clone());
836 TimeEventCallback::from_python_legacy_capsule(callback).call(event);
837
838 assert_eq!(
839 seen.get_item(0).unwrap().extract::<String>().unwrap(),
840 "TimeEvent"
841 );
842 assert_eq!(
843 seen.get_item(1).unwrap().extract::<String>().unwrap(),
844 "PyCapsule"
845 );
846 });
847 }
848
849 #[derive(Default)]
850 struct RecordingTimeEventTap {
851 time_events: RefCell<Vec<(String, TimeEvent)>>,
852 }
853
854 impl RecordingTimeEventTap {
855 fn time_events(&self) -> Vec<(String, TimeEvent)> {
856 self.time_events.borrow().clone()
857 }
858 }
859
860 impl BusTap for RecordingTimeEventTap {
861 fn on_publish(&self, topic: MStr<Topic>, message: &dyn std::any::Any) {
862 if let Some(event) = message.downcast_ref::<TimeEvent>() {
863 self.time_events
864 .borrow_mut()
865 .push((topic.to_string(), event.clone()));
866 }
867 }
868
869 fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
870 }
871
872 #[rstest]
873 fn test_time_event_handler_run_dispatches_tap_before_callback() {
874 let event = TimeEvent::new(
875 Ustr::from("strategy.heartbeat"),
876 UUID4::from("00000000-0000-4000-8000-000000000006"),
877 UnixNanos::from(100),
878 UnixNanos::from(99),
879 );
880 let tap = Rc::new(RecordingTimeEventTap::default());
881 let callback_seen: Rc<RefCell<Vec<TimeEvent>>> = Rc::new(RefCell::new(Vec::new()));
882 let expected_topic = MessagingSwitchboard::time_event_topic().to_string();
883 let callback_expected = event.clone();
884 let callback_expected_topic = expected_topic.clone();
885 let callback_tap = Rc::clone(&tap);
886 let callback_seen_ref = Rc::clone(&callback_seen);
887 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |callback_event| {
888 assert_eq!(
889 callback_tap.time_events(),
890 vec![(callback_expected_topic.clone(), callback_expected.clone())],
891 );
892 callback_seen_ref.borrow_mut().push(callback_event);
893 });
894
895 set_bus_tap(tap.clone());
896 TimeEventHandler::new(event.clone(), TimeEventCallback::from(callback)).run();
897 clear_bus_tap();
898
899 assert_eq!(tap.time_events(), vec![(expected_topic, event.clone())]);
900 assert_eq!(*callback_seen.borrow(), vec![event]);
901 }
902
903 use proptest::prelude::*;
908
909 #[derive(Clone, Debug)]
910 enum TimerOperation {
911 AdvanceTime(u64),
912 Cancel,
913 }
914
915 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
916 prop_oneof![
917 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
918 2 => Just(TimerOperation::Cancel),
919 ]
920 }
921
922 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
923 (
924 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
929 }
930
931 fn timer_test_strategy()
932 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
933 (
934 prop::collection::vec(timer_operation_strategy(), 5..=50),
935 timer_config_strategy(),
936 )
937 }
938
939 #[expect(clippy::needless_collect)] fn test_timer_with_operations(
941 operations: Vec<TimerOperation>,
942 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
943 ) {
944 let mut timer = TestTimer::new(
945 Ustr::from("PROP_TEST_TIMER"),
946 NonZeroU64::new(interval_ns).unwrap(),
947 UnixNanos::from(start_time_ns),
948 stop_time_ns.map(UnixNanos::from),
949 fire_immediately,
950 );
951
952 let mut current_time = start_time_ns;
953
954 for operation in operations {
955 if timer.is_expired() {
956 break;
957 }
958
959 match operation {
960 TimerOperation::AdvanceTime(delta) => {
961 let to_time = current_time + delta;
962 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
963 current_time = to_time;
964
965 for (i, event) in events.iter().enumerate() {
967 if i > 0 {
969 assert!(
970 event.ts_event >= events[i - 1].ts_event,
971 "Events should be in chronological order"
972 );
973 }
974
975 assert!(
977 event.ts_event.as_u64() >= start_time_ns,
978 "Event timestamp should not be before start time"
979 );
980
981 assert!(
982 event.ts_event.as_u64() <= to_time,
983 "Event timestamp should not be after advance time"
984 );
985
986 if let Some(stop_time_ns) = stop_time_ns {
988 assert!(
989 event.ts_event.as_u64() <= stop_time_ns,
990 "Event timestamp should not exceed stop time"
991 );
992 }
993 }
994 }
995 TimerOperation::Cancel => {
996 timer.cancel();
997 assert!(timer.is_expired(), "Timer should be expired after cancel");
998 }
999 }
1000
1001 if !timer.is_expired() {
1003 let expected_interval_multiple = if fire_immediately {
1005 timer.next_time_ns().as_u64() >= start_time_ns
1006 } else {
1007 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
1008 };
1009 assert!(
1010 expected_interval_multiple,
1011 "Next time should respect interval spacing"
1012 );
1013
1014 if let Some(stop_time_ns) = stop_time_ns
1017 && timer.next_time_ns().as_u64() > stop_time_ns
1018 {
1019 let mut test_timer = timer.clone();
1021 let events: Vec<TimeEvent> = test_timer
1022 .advance(UnixNanos::from(stop_time_ns + 1))
1023 .collect();
1024 assert!(
1025 events.is_empty() || test_timer.is_expired(),
1026 "Timer should not generate events beyond stop time"
1027 );
1028 }
1029 }
1030 }
1031
1032 if !timer.is_expired()
1035 && let Some(stop_time_ns) = stop_time_ns
1036 {
1037 let events: Vec<TimeEvent> = timer
1038 .advance(UnixNanos::from(stop_time_ns + 1000))
1039 .collect();
1040 assert!(
1041 timer.is_expired() || events.is_empty(),
1042 "Timer should eventually expire or stop generating events"
1043 );
1044 }
1045 }
1046
1047 proptest! {
1048 #[rstest]
1049 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
1050 test_timer_with_operations(operations, config);
1051 }
1052
1053 #[rstest]
1054 fn prop_timer_interval_consistency(
1055 interval_ns in 1u64..=100u64,
1056 start_time_ns in 0u64..=50u64,
1057 fire_immediately in prop::bool::ANY,
1058 advance_count in 1usize..=20usize,
1059 ) {
1060 let mut timer = TestTimer::new(
1061 Ustr::from("CONSISTENCY_TEST"),
1062 NonZeroU64::new(interval_ns).unwrap(),
1063 UnixNanos::from(start_time_ns),
1064 None, fire_immediately,
1066 );
1067
1068 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
1069
1070 for _ in 0..advance_count {
1071 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
1072
1073 if !events.is_empty() {
1074 prop_assert_eq!(events.len(), 1);
1076 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
1077 }
1078
1079 previous_event_time += interval_ns;
1080 }
1081 }
1082 }
1083}