Skip to main content

nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18use std::{
19    cmp::Ordering,
20    fmt::{Debug, Display},
21    num::NonZeroU64,
22    rc::Rc,
23    sync::Arc,
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::python::IntoPyObjectNautilusExt;
28use nautilus_core::{
29    UUID4, UnixNanos,
30    correctness::{FAILED, check_valid_string_utf8},
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, PyResult, Python, types::PyCapsule};
34use ustr::Ustr;
35
36/// Creates a valid nanoseconds interval that is guaranteed to be positive.
37///
38/// Coerces zero to one to ensure a valid `NonZeroU64`.
39#[must_use]
40pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
41    NonZeroU64::new(interval_ns).unwrap_or(NonZeroU64::MIN)
42}
43
44#[repr(C)]
45#[derive(Clone, Debug, PartialEq, Eq)]
46#[cfg_attr(
47    feature = "python",
48    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
49)]
50#[cfg_attr(
51    feature = "python",
52    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
53)]
54/// Represents a time event occurring at the event timestamp.
55///
56/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
57/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
58pub struct TimeEvent {
59    /// The event name, identifying the nature or purpose of the event.
60    pub name: Ustr,
61    /// The unique identifier for the event.
62    pub event_id: UUID4,
63    /// UNIX timestamp (nanoseconds) when the event occurred.
64    pub ts_event: UnixNanos,
65    /// UNIX timestamp (nanoseconds) when the instance was created.
66    pub ts_init: UnixNanos,
67}
68
69impl TimeEvent {
70    /// Creates a new [`TimeEvent`] instance.
71    #[must_use]
72    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
73        Self {
74            name,
75            event_id,
76            ts_event,
77            ts_init,
78        }
79    }
80}
81
82impl Display for TimeEvent {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        write!(
85            f,
86            "{}(name={}, event_id={}, ts_event={}, ts_init={})",
87            stringify!(TimeEvent),
88            self.name,
89            self.event_id,
90            self.ts_event,
91            self.ts_init
92        )
93    }
94}
95
96/// Wrapper for [`TimeEvent`] that implements ordering by timestamp for heap scheduling.
97///
98/// This newtype allows time events to be ordered in a priority queue (max heap) by their
99/// timestamp while keeping [`TimeEvent`] itself clean with standard field-based equality.
100/// Events are ordered in reverse (earlier timestamps have higher priority).
101#[repr(transparent)] // Guarantees zero-cost abstraction with identical memory layout
102#[derive(Clone, Debug, PartialEq, Eq)]
103pub struct ScheduledTimeEvent(pub TimeEvent);
104
105impl ScheduledTimeEvent {
106    /// Creates a new scheduled time event.
107    #[must_use]
108    pub const fn new(event: TimeEvent) -> Self {
109        Self(event)
110    }
111
112    /// Extracts the inner time event.
113    #[must_use]
114    pub fn into_inner(self) -> TimeEvent {
115        self.0
116    }
117}
118
119impl PartialOrd for ScheduledTimeEvent {
120    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121        Some(self.cmp(other))
122    }
123}
124
125impl Ord for ScheduledTimeEvent {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse order for max heap: earlier timestamps have higher priority
128        other.0.ts_event.cmp(&self.0.ts_event)
129    }
130}
131
132#[cfg(feature = "python")]
133#[derive(Clone, Copy, Debug, PartialEq, Eq)]
134/// Python time event callback argument mode.
135pub enum PythonTimeEventCallbackArg {
136    /// Callbacks receive the PyO3 `TimeEvent` object.
137    TimeEvent,
138    /// Legacy Cython callbacks receive a `PyCapsule` containing the Rust event.
139    LegacyCapsule,
140}
141
142#[cfg(feature = "python")]
143/// Python callback for time events.
144pub struct PythonTimeEventCallback {
145    callback: Py<PyAny>,
146    arg: PythonTimeEventCallbackArg,
147}
148
149#[cfg(feature = "python")]
150impl PythonTimeEventCallback {
151    /// Creates a new [`PythonTimeEventCallback`] instance.
152    #[must_use]
153    pub const fn new(callback: Py<PyAny>, arg: PythonTimeEventCallbackArg) -> Self {
154        Self { callback, arg }
155    }
156
157    /// Returns the Python callable.
158    #[must_use]
159    pub const fn callback(&self) -> &Py<PyAny> {
160        &self.callback
161    }
162
163    /// Invokes the Python callback for the given `TimeEvent`.
164    pub fn call(&self, event: TimeEvent) {
165        Python::attach(|py| {
166            let result = match self.arg {
167                PythonTimeEventCallbackArg::TimeEvent => self.callback.call1(py, (event,)),
168                PythonTimeEventCallbackArg::LegacyCapsule => {
169                    call_legacy_python_time_event_callback(py, event, &self.callback)
170                }
171            };
172
173            if let Err(e) = result {
174                log::error!("Python time event callback raised exception: {e}");
175            }
176        });
177    }
178}
179
180#[cfg(feature = "python")]
181impl Debug for PythonTimeEventCallback {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        f.debug_struct(stringify!(PythonTimeEventCallback))
184            .field("arg", &self.arg)
185            .finish_non_exhaustive()
186    }
187}
188
189/// Callback type for time events.
190///
191/// # Variants
192///
193/// - `Python`: For Python callbacks (requires `python` feature).
194/// - `Rust`: Thread-safe callbacks using `Arc`. Use when the closure is `Send + Sync`.
195/// - `RustLocal`: Single-threaded callbacks using `Rc`. Use when capturing `Rc<RefCell<...>>`.
196///
197/// # Choosing Between `Rust` and `RustLocal`
198///
199/// Use `Rust` (thread-safe) when:
200/// - The callback doesn't capture `Rc<RefCell<...>>` or other non-`Send` types.
201/// - The closure is `Send + Sync` (most simple closures qualify).
202///
203/// Use `RustLocal` when:
204/// - The callback captures `Rc<RefCell<...>>` for shared mutable state.
205/// - Thread safety constraints prevent using `Arc`.
206///
207/// `RustLocal` works with `TestClock`. With `LiveClock`, use it only for existing
208/// single-threaded callback paths; live timer callback registry dispatch is still pending.
209///
210/// # Automatic Conversion
211///
212/// - Closures that are `Fn + Send + Sync + 'static` automatically convert to `Rust`.
213/// - `Rc<dyn Fn(TimeEvent)>` converts to `RustLocal`.
214/// - `Arc<dyn Fn(TimeEvent) + Send + Sync>` converts to `Rust`.
215pub enum TimeEventCallback {
216    /// Python callable for use from Python via PyO3.
217    #[cfg(feature = "python")]
218    Python(Arc<PythonTimeEventCallback>),
219    /// Thread-safe Rust callback using `Arc` (`Send + Sync`).
220    Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
221    /// Local Rust callback using `Rc` (not `Send`/`Sync`).
222    RustLocal(Rc<dyn Fn(TimeEvent)>),
223}
224
225impl Clone for TimeEventCallback {
226    fn clone(&self) -> Self {
227        match self {
228            #[cfg(feature = "python")]
229            Self::Python(callback) => Self::Python(callback.clone()),
230            Self::Rust(cb) => Self::Rust(cb.clone()),
231            Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
232        }
233    }
234}
235
236impl Debug for TimeEventCallback {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        match self {
239            #[cfg(feature = "python")]
240            Self::Python(_) => f.write_str("Python callback"),
241            Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
242            Self::RustLocal(_) => f.write_str("Rust callback (local)"),
243        }
244    }
245}
246
247impl TimeEventCallback {
248    /// Returns `true` if this is a thread-safe Rust callback.
249    #[must_use]
250    pub const fn is_rust(&self) -> bool {
251        matches!(self, Self::Rust(_))
252    }
253
254    /// Returns `true` if this is a local (non-thread-safe) Rust callback.
255    ///
256    /// Local callbacks use `Rc` internally and require creation, cloning, dropping,
257    /// and invocation to stay on the originating thread.
258    #[must_use]
259    pub const fn is_local(&self) -> bool {
260        matches!(self, Self::RustLocal(_))
261    }
262
263    /// Invokes the callback for the given `TimeEvent`.
264    ///
265    /// For Python callbacks, exceptions are logged as errors rather than panicking.
266    pub fn call(&self, event: TimeEvent) {
267        match self {
268            #[cfg(feature = "python")]
269            Self::Python(callback) => callback.call(event),
270            Self::Rust(callback) => callback(event),
271            Self::RustLocal(callback) => callback(event),
272        }
273    }
274}
275
276impl<F> From<F> for TimeEventCallback
277where
278    F: Fn(TimeEvent) + Send + Sync + 'static,
279{
280    fn from(value: F) -> Self {
281        Self::Rust(Arc::new(value))
282    }
283}
284
285impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
286    fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
287        Self::Rust(value)
288    }
289}
290
291impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
292    fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
293        Self::RustLocal(value)
294    }
295}
296
297#[cfg(feature = "python")]
298impl From<Py<PyAny>> for TimeEventCallback {
299    fn from(value: Py<PyAny>) -> Self {
300        Self::from_python_time_event(value)
301    }
302}
303
304#[cfg(feature = "python")]
305impl TimeEventCallback {
306    /// Creates a Python callback that receives a PyO3 `TimeEvent`.
307    #[must_use]
308    pub fn from_python_time_event(callback: Py<PyAny>) -> Self {
309        Self::Python(Arc::new(PythonTimeEventCallback::new(
310            callback,
311            PythonTimeEventCallbackArg::TimeEvent,
312        )))
313    }
314
315    /// Creates a legacy Python callback that receives a `PyCapsule`.
316    #[must_use]
317    pub fn from_python_legacy_capsule(callback: Py<PyAny>) -> Self {
318        Self::Python(Arc::new(PythonTimeEventCallback::new(
319            callback,
320            PythonTimeEventCallbackArg::LegacyCapsule,
321        )))
322    }
323}
324
325// SAFETY: TimeEventCallback is Send + Sync with the following invariants:
326//
327// - Python variant: Arc clone/drop does not require the GIL, and the callable is
328//   only invoked after acquiring the GIL.
329//
330// - Rust variant: Arc<dyn Fn + Send + Sync> is inherently Send + Sync.
331//
332// - RustLocal variant: Rc<dyn Fn> is not Send/Sync. This unsafe impl preserves
333//   existing API compatibility and relies on callers to keep RustLocal callbacks
334//   on the originating event-loop thread. LiveTimer logs a warning because its
335//   callback registry dispatch follow-up is still pending.
336//
337//   INVARIANT: RustLocal callbacks must only be cloned, dropped, or called from
338//   the thread that created them. Violating this invariant causes undefined behavior.
339//   Use the Rust variant with Arc if cross-thread execution is needed.
340#[allow(unsafe_code)]
341unsafe impl Send for TimeEventCallback {}
342#[allow(unsafe_code)]
343unsafe impl Sync for TimeEventCallback {}
344
345#[cfg(feature = "python")]
346fn call_legacy_python_time_event_callback(
347    py: Python<'_>,
348    event: TimeEvent,
349    callback: &Py<PyAny>,
350) -> PyResult<Py<PyAny>> {
351    let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
352        .expect("Error creating `PyCapsule`")
353        .into_py_any_unwrap(py);
354
355    callback.call1(py, (capsule,))
356}
357
358#[repr(C)]
359#[derive(Clone, Debug)]
360/// Represents a time event and its associated handler.
361///
362/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
363/// when the event's timestamp is reached.
364pub struct TimeEventHandler {
365    /// The time event.
366    pub event: TimeEvent,
367    /// The callable handler for the event.
368    pub callback: TimeEventCallback,
369}
370
371impl TimeEventHandler {
372    /// Creates a new [`TimeEventHandler`] instance.
373    #[must_use]
374    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
375        Self { event, callback }
376    }
377
378    fn cmp_event(&self, other: &Self) -> Ordering {
379        self.event
380            .ts_event
381            .cmp(&other.event.ts_event)
382            .then_with(|| self.event.name.cmp(&other.event.name))
383            .then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
384            .then_with(|| {
385                self.event
386                    .event_id
387                    .as_str()
388                    .cmp(other.event.event_id.as_str())
389            })
390    }
391
392    /// Executes the handler by invoking its callback for the associated event.
393    pub fn run(self) {
394        let Self { event, callback } = self;
395        crate::msgbus::dispatch_tap_time_event(&event);
396        callback.call(event);
397    }
398}
399
400impl PartialOrd for TimeEventHandler {
401    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
402        Some(self.cmp(other))
403    }
404}
405
406impl PartialEq for TimeEventHandler {
407    fn eq(&self, other: &Self) -> bool {
408        self.cmp_event(other).is_eq()
409    }
410}
411
412impl Eq for TimeEventHandler {}
413
414impl Ord for TimeEventHandler {
415    fn cmp(&self, other: &Self) -> Ordering {
416        self.cmp_event(other)
417    }
418}
419
420pub(crate) trait Timer {
421    fn is_expired(&self) -> bool;
422    fn cancel(&mut self);
423}
424
425/// A test timer for user with a `TestClock`.
426///
427/// `TestTimer` simulates time progression in a controlled environment,
428/// allowing for precise control over event generation in test scenarios.
429///
430/// # Threading
431///
432/// The timer mutates its internal state and should only be used from its owning thread.
433#[derive(Clone, Debug)]
434pub struct TestTimer {
435    /// The name of the timer.
436    pub name: Ustr,
437    /// The interval between timer events in nanoseconds.
438    pub interval_ns: NonZeroU64,
439    /// The start time of the timer in UNIX nanoseconds.
440    pub start_time_ns: UnixNanos,
441    /// The optional stop time of the timer in UNIX nanoseconds.
442    pub stop_time_ns: Option<UnixNanos>,
443    /// If the timer should fire immediately at start time.
444    pub fire_immediately: bool,
445    next_time_ns: UnixNanos,
446    is_expired: bool,
447}
448
449impl TestTimer {
450    /// Creates a new [`TestTimer`] instance.
451    ///
452    /// # Panics
453    ///
454    /// Panics if `name` is not a valid string.
455    #[must_use]
456    pub fn new(
457        name: Ustr,
458        interval_ns: NonZeroU64,
459        start_time_ns: UnixNanos,
460        stop_time_ns: Option<UnixNanos>,
461        fire_immediately: bool,
462    ) -> Self {
463        check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
464
465        let next_time_ns = if fire_immediately {
466            start_time_ns
467        } else {
468            start_time_ns + interval_ns.get()
469        };
470
471        Self {
472            name,
473            interval_ns,
474            start_time_ns,
475            stop_time_ns,
476            fire_immediately,
477            next_time_ns,
478            is_expired: false,
479        }
480    }
481
482    /// Returns the next time in UNIX nanoseconds when the timer will fire.
483    #[must_use]
484    pub const fn next_time_ns(&self) -> UnixNanos {
485        self.next_time_ns
486    }
487
488    /// Returns whether the timer is expired.
489    #[must_use]
490    pub const fn is_expired(&self) -> bool {
491        self.is_expired
492    }
493
494    #[must_use]
495    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
496        TimeEvent {
497            name: self.name,
498            event_id,
499            ts_event: self.next_time_ns,
500            ts_init,
501        }
502    }
503
504    /// Advance the test timer forward to the given time, generating a sequence
505    /// of events. A [`TimeEvent`] is appended for each time a next event is
506    /// <= the given `to_time_ns`.
507    ///
508    /// This allows testing of multiple time intervals within a single step.
509    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
510        // Calculate how many events should fire up to and including to_time_ns
511        let advances = if self.next_time_ns <= to_time_ns {
512            ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
513                .saturating_add(1)
514        } else {
515            0
516        };
517        self.take(advances as usize).map(|(event, _)| event)
518    }
519
520    /// Cancels the timer (the timer will not generate an event).
521    ///
522    /// Used to stop the timer before its scheduled stop time.
523    pub const fn cancel(&mut self) {
524        self.is_expired = true;
525    }
526}
527
528impl Timer for TestTimer {
529    fn is_expired(&self) -> bool {
530        self.is_expired
531    }
532
533    fn cancel(&mut self) {
534        self.is_expired = true;
535    }
536}
537
538impl Iterator for TestTimer {
539    type Item = (TimeEvent, UnixNanos);
540
541    fn next(&mut self) -> Option<Self::Item> {
542        if self.is_expired {
543            None
544        } else {
545            // Check if current event would exceed stop time before creating the event
546            if let Some(stop_time_ns) = self.stop_time_ns
547                && self.next_time_ns > stop_time_ns
548            {
549                self.is_expired = true;
550                return None;
551            }
552
553            let item = (
554                TimeEvent {
555                    name: self.name,
556                    event_id: UUID4::new(),
557                    ts_event: self.next_time_ns,
558                    ts_init: self.next_time_ns,
559                },
560                self.next_time_ns,
561            );
562
563            // Check if we should expire after this event (for repeating timers at stop boundary)
564            if let Some(stop_time_ns) = self.stop_time_ns
565                && self.next_time_ns == stop_time_ns
566            {
567                self.is_expired = true;
568            }
569
570            self.next_time_ns += self.interval_ns;
571
572            Some(item)
573        }
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use std::{cell::RefCell, num::NonZeroU64, rc::Rc};
580
581    use nautilus_core::{UUID4, UnixNanos};
582    #[cfg(feature = "python")]
583    use pyo3::{
584        Bound, PyResult, Python,
585        types::{
586            PyAnyMethods, PyCFunction, PyDict, PyList, PyListMethods, PyTuple, PyTupleMethods,
587            PyTypeMethods,
588        },
589    };
590    use rstest::*;
591    use ustr::Ustr;
592
593    use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval};
594    use crate::msgbus::{
595        BusTap, Endpoint, MStr, MessagingSwitchboard, Topic, clear_bus_tap, set_bus_tap,
596    };
597
598    #[rstest]
599    #[case(0, 1)]
600    #[case(1, 1)]
601    #[case(25, 25)]
602    fn test_create_valid_interval(#[case] interval_ns: u64, #[case] expected: u64) {
603        assert_eq!(create_valid_interval(interval_ns).get(), expected);
604    }
605
606    #[rstest]
607    fn test_test_timer_pop_event() {
608        let mut timer = TestTimer::new(
609            Ustr::from("TEST_TIMER"),
610            NonZeroU64::new(1).unwrap(),
611            UnixNanos::from(1),
612            None,
613            false,
614        );
615
616        assert!(timer.next().is_some());
617        assert!(timer.next().is_some());
618        timer.is_expired = true;
619        assert!(timer.next().is_none());
620    }
621
622    #[rstest]
623    fn test_test_timer_advance_within_next_time_ns() {
624        let mut timer = TestTimer::new(
625            Ustr::from("TEST_TIMER"),
626            NonZeroU64::new(5).unwrap(),
627            UnixNanos::default(),
628            None,
629            false,
630        );
631        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
632        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
633        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
634        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
635        assert_eq!(timer.next_time_ns, 5);
636        assert!(!timer.is_expired);
637    }
638
639    #[rstest]
640    fn test_test_timer_advance_up_to_next_time_ns() {
641        let mut timer = TestTimer::new(
642            Ustr::from("TEST_TIMER"),
643            NonZeroU64::new(1).unwrap(),
644            UnixNanos::default(),
645            None,
646            false,
647        );
648        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
649        assert!(!timer.is_expired);
650    }
651
652    #[rstest]
653    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
654        let mut timer = TestTimer::new(
655            Ustr::from("TEST_TIMER"),
656            NonZeroU64::new(1).unwrap(),
657            UnixNanos::default(),
658            Some(UnixNanos::from(2)),
659            false,
660        );
661        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
662        assert!(timer.is_expired);
663    }
664
665    #[rstest]
666    fn test_test_timer_advance_beyond_next_time_ns() {
667        let mut timer = TestTimer::new(
668            Ustr::from("TEST_TIMER"),
669            NonZeroU64::new(1).unwrap(),
670            UnixNanos::default(),
671            Some(UnixNanos::from(5)),
672            false,
673        );
674        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
675        assert!(timer.is_expired);
676    }
677
678    #[rstest]
679    fn test_test_timer_advance_beyond_stop_time() {
680        let mut timer = TestTimer::new(
681            Ustr::from("TEST_TIMER"),
682            NonZeroU64::new(1).unwrap(),
683            UnixNanos::default(),
684            Some(UnixNanos::from(5)),
685            false,
686        );
687        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
688        assert!(timer.is_expired);
689    }
690
691    #[rstest]
692    fn test_test_timer_advance_exact_boundary() {
693        let mut timer = TestTimer::new(
694            Ustr::from("TEST_TIMER"),
695            NonZeroU64::new(5).unwrap(),
696            UnixNanos::from(0),
697            None,
698            false,
699        );
700        assert_eq!(
701            timer.advance(UnixNanos::from(5)).count(),
702            1,
703            "Expected one event at the 5 ns boundary"
704        );
705        assert_eq!(
706            timer.advance(UnixNanos::from(10)).count(),
707            1,
708            "Expected one event at the 10 ns boundary"
709        );
710    }
711
712    #[rstest]
713    fn test_test_timer_fire_immediately_true() {
714        let mut timer = TestTimer::new(
715            Ustr::from("TEST_TIMER"),
716            NonZeroU64::new(5).unwrap(),
717            UnixNanos::from(10),
718            None,
719            true, // fire_immediately = true
720        );
721
722        // With fire_immediately=true, next_time_ns should be start_time_ns
723        assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
724
725        // Advance to start time should produce an event
726        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
727        assert_eq!(events.len(), 1);
728        assert_eq!(events[0].ts_event, UnixNanos::from(10));
729
730        // Next event should be at start_time + interval
731        assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
732    }
733
734    #[rstest]
735    fn test_test_timer_fire_immediately_false() {
736        let mut timer = TestTimer::new(
737            Ustr::from("TEST_TIMER"),
738            NonZeroU64::new(5).unwrap(),
739            UnixNanos::from(10),
740            None,
741            false, // fire_immediately = false
742        );
743
744        // With fire_immediately=false, next_time_ns should be start_time_ns + interval
745        assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
746
747        // Advance to start time should produce no events
748        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
749
750        // Advance to first interval should produce an event
751        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
752        assert_eq!(events.len(), 1);
753        assert_eq!(events[0].ts_event, UnixNanos::from(15));
754    }
755
756    #[rstest]
757    fn test_time_event_handler_ordering_uses_tie_breakers() {
758        let callback = TimeEventCallback::from(|_: TimeEvent| {});
759
760        let later_name = TimeEventHandler::new(
761            TimeEvent::new(
762                Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
763                UUID4::from("00000000-0000-4000-8000-000000000003"),
764                100.into(),
765                100.into(),
766            ),
767            callback.clone(),
768        );
769        let earlier_name = TimeEventHandler::new(
770            TimeEvent::new(
771                Ustr::from("SPREAD_QUOTE_ESM4"),
772                UUID4::from("00000000-0000-4000-8000-000000000002"),
773                100.into(),
774                100.into(),
775            ),
776            callback.clone(),
777        );
778        let later_init = TimeEventHandler::new(
779            TimeEvent::new(
780                Ustr::from("SPREAD_QUOTE_ESM4"),
781                UUID4::from("00000000-0000-4000-8000-000000000004"),
782                100.into(),
783                101.into(),
784            ),
785            callback.clone(),
786        );
787        let later_id = TimeEventHandler::new(
788            TimeEvent::new(
789                Ustr::from("SPREAD_QUOTE_ESM4"),
790                UUID4::from("00000000-0000-4000-8000-000000000005"),
791                100.into(),
792                100.into(),
793            ),
794            callback,
795        );
796
797        assert!(earlier_name < later_name);
798        assert!(earlier_name < later_init);
799        assert!(earlier_name < later_id);
800        assert_ne!(earlier_name, later_id);
801    }
802
803    #[cfg(feature = "python")]
804    #[rstest]
805    fn test_python_callback_modes_pass_expected_argument_types() {
806        Python::initialize();
807
808        Python::attach(|py| {
809            let seen = PyList::empty(py);
810            let seen_obj = seen.clone().unbind().into_any();
811            let callback = PyCFunction::new_closure(
812                py,
813                None,
814                None,
815                move |args: &Bound<'_, PyTuple>,
816                      _kwargs: Option<&Bound<'_, PyDict>>|
817                      -> PyResult<()> {
818                    let arg = args.get_item(0)?;
819                    let type_name = arg.get_type().name()?.to_string();
820                    Python::attach(|py| seen_obj.call_method1(py, "append", (type_name,)))?;
821                    Ok(())
822                },
823            )
824            .expect("callback should create")
825            .into_any()
826            .unbind();
827
828            let event = TimeEvent::new(
829                Ustr::from("PY_CALLBACK_MODE"),
830                UUID4::from("00000000-0000-4000-8000-000000000007"),
831                UnixNanos::from(100),
832                UnixNanos::from(99),
833            );
834
835            TimeEventCallback::from_python_time_event(callback.clone_ref(py)).call(event.clone());
836            TimeEventCallback::from_python_legacy_capsule(callback).call(event);
837
838            assert_eq!(
839                seen.get_item(0).unwrap().extract::<String>().unwrap(),
840                "TimeEvent"
841            );
842            assert_eq!(
843                seen.get_item(1).unwrap().extract::<String>().unwrap(),
844                "PyCapsule"
845            );
846        });
847    }
848
849    #[derive(Default)]
850    struct RecordingTimeEventTap {
851        time_events: RefCell<Vec<(String, TimeEvent)>>,
852    }
853
854    impl RecordingTimeEventTap {
855        fn time_events(&self) -> Vec<(String, TimeEvent)> {
856            self.time_events.borrow().clone()
857        }
858    }
859
860    impl BusTap for RecordingTimeEventTap {
861        fn on_publish(&self, topic: MStr<Topic>, message: &dyn std::any::Any) {
862            if let Some(event) = message.downcast_ref::<TimeEvent>() {
863                self.time_events
864                    .borrow_mut()
865                    .push((topic.to_string(), event.clone()));
866            }
867        }
868
869        fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
870    }
871
872    #[rstest]
873    fn test_time_event_handler_run_dispatches_tap_before_callback() {
874        let event = TimeEvent::new(
875            Ustr::from("strategy.heartbeat"),
876            UUID4::from("00000000-0000-4000-8000-000000000006"),
877            UnixNanos::from(100),
878            UnixNanos::from(99),
879        );
880        let tap = Rc::new(RecordingTimeEventTap::default());
881        let callback_seen: Rc<RefCell<Vec<TimeEvent>>> = Rc::new(RefCell::new(Vec::new()));
882        let expected_topic = MessagingSwitchboard::time_event_topic().to_string();
883        let callback_expected = event.clone();
884        let callback_expected_topic = expected_topic.clone();
885        let callback_tap = Rc::clone(&tap);
886        let callback_seen_ref = Rc::clone(&callback_seen);
887        let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |callback_event| {
888            assert_eq!(
889                callback_tap.time_events(),
890                vec![(callback_expected_topic.clone(), callback_expected.clone())],
891            );
892            callback_seen_ref.borrow_mut().push(callback_event);
893        });
894
895        set_bus_tap(tap.clone());
896        TimeEventHandler::new(event.clone(), TimeEventCallback::from(callback)).run();
897        clear_bus_tap();
898
899        assert_eq!(tap.time_events(), vec![(expected_topic, event.clone())]);
900        assert_eq!(*callback_seen.borrow(), vec![event]);
901    }
902
903    ////////////////////////////////////////////////////////////////////////////////
904    // Property-based testing
905    ////////////////////////////////////////////////////////////////////////////////
906
907    use proptest::prelude::*;
908
909    #[derive(Clone, Debug)]
910    enum TimerOperation {
911        AdvanceTime(u64),
912        Cancel,
913    }
914
915    fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
916        prop_oneof![
917            8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
918            2 => Just(TimerOperation::Cancel),
919        ]
920    }
921
922    fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
923        (
924            1u64..=100u64,                    // interval_ns (1-100)
925            0u64..=50u64,                     // start_time_ns (0-50)
926            prop::option::of(51u64..=200u64), // stop_time_ns (51-200 or None)
927            prop::bool::ANY,                  // fire_immediately
928        )
929    }
930
931    fn timer_test_strategy()
932    -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
933        (
934            prop::collection::vec(timer_operation_strategy(), 5..=50),
935            timer_config_strategy(),
936        )
937    }
938
939    #[expect(clippy::needless_collect)] // Collect needed for indexing and .is_empty()
940    fn test_timer_with_operations(
941        operations: Vec<TimerOperation>,
942        (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
943    ) {
944        let mut timer = TestTimer::new(
945            Ustr::from("PROP_TEST_TIMER"),
946            NonZeroU64::new(interval_ns).unwrap(),
947            UnixNanos::from(start_time_ns),
948            stop_time_ns.map(UnixNanos::from),
949            fire_immediately,
950        );
951
952        let mut current_time = start_time_ns;
953
954        for operation in operations {
955            if timer.is_expired() {
956                break;
957            }
958
959            match operation {
960                TimerOperation::AdvanceTime(delta) => {
961                    let to_time = current_time + delta;
962                    let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
963                    current_time = to_time;
964
965                    // Verify event ordering and timing
966                    for (i, event) in events.iter().enumerate() {
967                        // Event timestamps should be in order
968                        if i > 0 {
969                            assert!(
970                                event.ts_event >= events[i - 1].ts_event,
971                                "Events should be in chronological order"
972                            );
973                        }
974
975                        // Event timestamp should be within reasonable bounds
976                        assert!(
977                            event.ts_event.as_u64() >= start_time_ns,
978                            "Event timestamp should not be before start time"
979                        );
980
981                        assert!(
982                            event.ts_event.as_u64() <= to_time,
983                            "Event timestamp should not be after advance time"
984                        );
985
986                        // If there's a stop time, event should not exceed it
987                        if let Some(stop_time_ns) = stop_time_ns {
988                            assert!(
989                                event.ts_event.as_u64() <= stop_time_ns,
990                                "Event timestamp should not exceed stop time"
991                            );
992                        }
993                    }
994                }
995                TimerOperation::Cancel => {
996                    timer.cancel();
997                    assert!(timer.is_expired(), "Timer should be expired after cancel");
998                }
999            }
1000
1001            // Timer invariants
1002            if !timer.is_expired() {
1003                // Next time should be properly spaced
1004                let expected_interval_multiple = if fire_immediately {
1005                    timer.next_time_ns().as_u64() >= start_time_ns
1006                } else {
1007                    timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
1008                };
1009                assert!(
1010                    expected_interval_multiple,
1011                    "Next time should respect interval spacing"
1012                );
1013
1014                // If timer has stop time, check if it should be considered logically expired
1015                // Note: Timer only becomes actually expired when advance() or next() is called
1016                if let Some(stop_time_ns) = stop_time_ns
1017                    && timer.next_time_ns().as_u64() > stop_time_ns
1018                {
1019                    // The timer should expire on the next advance/iteration
1020                    let mut test_timer = timer.clone();
1021                    let events: Vec<TimeEvent> = test_timer
1022                        .advance(UnixNanos::from(stop_time_ns + 1))
1023                        .collect();
1024                    assert!(
1025                        events.is_empty() || test_timer.is_expired(),
1026                        "Timer should not generate events beyond stop time"
1027                    );
1028                }
1029            }
1030        }
1031
1032        // Final consistency check: if timer is not expired and we haven't hit stop time,
1033        // advancing far enough should eventually expire it
1034        if !timer.is_expired()
1035            && let Some(stop_time_ns) = stop_time_ns
1036        {
1037            let events: Vec<TimeEvent> = timer
1038                .advance(UnixNanos::from(stop_time_ns + 1000))
1039                .collect();
1040            assert!(
1041                timer.is_expired() || events.is_empty(),
1042                "Timer should eventually expire or stop generating events"
1043            );
1044        }
1045    }
1046
1047    proptest! {
1048        #[rstest]
1049        fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
1050            test_timer_with_operations(operations, config);
1051        }
1052
1053        #[rstest]
1054        fn prop_timer_interval_consistency(
1055            interval_ns in 1u64..=100u64,
1056            start_time_ns in 0u64..=50u64,
1057            fire_immediately in prop::bool::ANY,
1058            advance_count in 1usize..=20usize,
1059        ) {
1060            let mut timer = TestTimer::new(
1061                Ustr::from("CONSISTENCY_TEST"),
1062                NonZeroU64::new(interval_ns).unwrap(),
1063                UnixNanos::from(start_time_ns),
1064                None, // No stop time for this test
1065                fire_immediately,
1066            );
1067
1068            let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
1069
1070            for _ in 0..advance_count {
1071                let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
1072
1073                if !events.is_empty() {
1074                    // Should get exactly one event at the expected time
1075                    prop_assert_eq!(events.len(), 1);
1076                    prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
1077                }
1078
1079                previous_event_time += interval_ns;
1080            }
1081        }
1082    }
1083}