Skip to main content

qubit_clock/mock/
mock_timeline.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Shared monotonic timeline for mock time components.
11
12use parking_lot::{
13    Condvar,
14    Mutex,
15    MutexGuard,
16};
17use std::sync::Arc;
18use std::sync::atomic::{
19    AtomicU64,
20    Ordering,
21};
22use std::time::{
23    Duration,
24    Instant,
25};
26
27#[cfg(feature = "tokio")]
28use crate::sleep::AsyncSleepFuture;
29#[cfg(feature = "tokio")]
30use tokio::sync::watch;
31
32use crate::{
33    MockInstant,
34    MockTimeError,
35    MockWaiterKind,
36};
37
38/// Next globally unique mock timeline id.
39static NEXT_MOCK_TIMELINE_ID: AtomicU64 = AtomicU64::new(1);
40
41/// Shared monotonic time source for deterministic tests.
42///
43/// `MockTimeline` is the single authority for elapsed mock time. Cloned
44/// timelines share the same internal state, so a clock, sleeper, and future
45/// timeout-aware primitive can all observe the same logical time progression.
46/// The timeline starts at elapsed zero and advances only when a test calls
47/// [`advance`](Self::advance). It never follows wall-clock time by itself.
48///
49/// The timeline maintains two related notions of progress:
50///
51/// - **elapsed time** is the monotonic duration since the timeline origin.
52///   [`MockClock`](crate::MockClock) and
53///   [`crate::sleep::MockSleeper`] derive their behavior from this value.
54/// - **event epoch** is a notification counter incremented when time advances
55///   or when [`notify_external_change`](Self::notify_external_change) is
56///   called. It lets monitor-like primitives wait for either a state change or
57///   a later deadline without inventing a second mock clock.
58///
59/// Blocking waiters use condition variables, and async waiters use a Tokio
60/// watch channel when the `tokio` feature is enabled. Waiter counts are tracked
61/// by [`MockWaiterKind`] so tests can wait until a thread or future has really
62/// entered a mock wait before advancing time. Reset is rejected while waiters
63/// are active, because rewinding a timeline under a blocked waiter would make
64/// deadline semantics ambiguous. Deadlines also carry the id of the timeline
65/// that created them. Passing a deadline from one timeline into another
66/// timeline is rejected with [`MockTimeError::MismatchedTimeline`].
67///
68/// `MockTimeline` uses non-poisoning synchronization primitives internally.
69/// A panic in one test thread does not permanently poison the timeline for the
70/// rest of the test process.
71#[derive(Clone, Debug)]
72pub struct MockTimeline {
73    id: u64,
74    shared: Arc<MockTimelineShared>,
75    #[cfg(feature = "tokio")]
76    async_event_sender: watch::Sender<u64>,
77}
78
79/// Shared state and condition variables for a mock timeline.
80#[derive(Debug)]
81struct MockTimelineShared {
82    state: Mutex<MockTimelineState>,
83    event_changed: Condvar,
84    waiters_changed: Condvar,
85}
86
87/// Mutable mock timeline state.
88#[derive(Debug)]
89struct MockTimelineState {
90    elapsed_nanos: u128,
91    time_epoch: u64,
92    event_epoch: u64,
93    sleep_waiters: usize,
94    deadline_waiters: usize,
95}
96
97/// Registration for a mock timeline waiter.
98#[cfg(feature = "tokio")]
99#[derive(Debug)]
100struct MockTimelineWaiterRegistration {
101    timeline: MockTimeline,
102    kind: MockWaiterKind,
103}
104
105#[cfg(feature = "tokio")]
106impl MockTimelineWaiterRegistration {
107    /// Registers a waiter on a mock timeline.
108    ///
109    /// # Parameters
110    /// - `timeline`: Timeline that owns the waiter count.
111    /// - `kind`: Waiter group to increment.
112    ///
113    /// # Returns
114    /// A registration that decrements the waiter count when dropped.
115    fn new(timeline: MockTimeline, kind: MockWaiterKind) -> Self {
116        {
117            let mut state = timeline.lock_state();
118            MockTimeline::increment_waiter(&mut state, kind);
119        }
120        timeline.shared.waiters_changed.notify_all();
121        Self { timeline, kind }
122    }
123}
124
125#[cfg(feature = "tokio")]
126impl Drop for MockTimelineWaiterRegistration {
127    /// Removes the registered waiter from the timeline.
128    fn drop(&mut self) {
129        {
130            let mut state = self.timeline.lock_state();
131            MockTimeline::decrement_waiter(&mut state, self.kind);
132        }
133        self.timeline.shared.waiters_changed.notify_all();
134    }
135}
136
137impl MockTimeline {
138    /// Creates a new timeline at elapsed zero.
139    ///
140    /// # Returns
141    /// A mock timeline with no elapsed time.
142    #[must_use]
143    pub fn new() -> Self {
144        #[cfg(feature = "tokio")]
145        let (async_event_sender, _) = watch::channel(0);
146        Self {
147            id: next_mock_timeline_id(),
148            shared: Arc::new(MockTimelineShared {
149                state: Mutex::new(MockTimelineState {
150                    elapsed_nanos: 0,
151                    time_epoch: 0,
152                    event_epoch: 0,
153                    sleep_waiters: 0,
154                    deadline_waiters: 0,
155                }),
156                event_changed: Condvar::new(),
157                waiters_changed: Condvar::new(),
158            }),
159            #[cfg(feature = "tokio")]
160            async_event_sender,
161        }
162    }
163
164    /// Returns the globally unique id of this timeline.
165    ///
166    /// Clones of the same timeline return the same id. Independently created
167    /// timelines receive different ids, which lets deadline APIs reject
168    /// [`MockInstant`] values from the wrong timeline.
169    ///
170    /// # Returns
171    /// The timeline id.
172    #[inline]
173    pub const fn id(&self) -> u64 {
174        self.id
175    }
176
177    /// Returns elapsed mock time as a standard duration.
178    ///
179    /// # Returns
180    /// Elapsed monotonic time since the timeline origin.
181    #[inline]
182    pub fn elapsed(&self) -> Duration {
183        duration_from_nanos_saturating(self.elapsed_nanos())
184    }
185
186    /// Returns elapsed mock time in nanoseconds.
187    ///
188    /// # Returns
189    /// Elapsed monotonic nanoseconds since the timeline origin.
190    #[inline]
191    pub fn elapsed_nanos(&self) -> u128 {
192        self.lock_state().elapsed_nanos
193    }
194
195    /// Returns the current mock instant.
196    ///
197    /// # Returns
198    /// Current instant on this timeline.
199    #[inline]
200    pub fn now(&self) -> MockInstant {
201        MockInstant::from_nanos_since_origin(self.id, self.elapsed_nanos())
202    }
203
204    /// Returns the current event epoch.
205    ///
206    /// # Returns
207    /// Epoch incremented by time advances and external notifications.
208    #[inline]
209    pub fn event_epoch(&self) -> u64 {
210        self.lock_state().event_epoch
211    }
212
213    /// Advances mock time and wakes all timeline waiters.
214    ///
215    /// # Parameters
216    /// - `duration`: Non-negative duration to add.
217    pub fn advance(&self, duration: Duration) {
218        let event_epoch = {
219            let mut state = self.lock_state();
220            state.elapsed_nanos = state.elapsed_nanos.saturating_add(duration.as_nanos());
221            state.time_epoch = state.time_epoch.wrapping_add(1);
222            state.event_epoch = state.event_epoch.wrapping_add(1);
223            state.event_epoch
224        };
225        self.notify_waiters(event_epoch);
226    }
227
228    /// Resets the timeline to elapsed zero when no waiters are active.
229    ///
230    /// # Returns
231    /// `Ok(())` when reset succeeds.
232    ///
233    /// # Errors
234    /// Returns [`MockTimeError::ActiveWaiters`] when timeline waiters are active.
235    pub fn reset(&self) -> Result<(), MockTimeError> {
236        let event_epoch = {
237            let mut state = self.lock_state();
238            if state.sleep_waiters != 0 || state.deadline_waiters != 0 {
239                return Err(MockTimeError::ActiveWaiters);
240            }
241            state.elapsed_nanos = 0;
242            state.time_epoch = state.time_epoch.wrapping_add(1);
243            state.event_epoch = state.event_epoch.wrapping_add(1);
244            state.event_epoch
245        };
246        self.notify_waiters(event_epoch);
247        Ok(())
248    }
249
250    /// Wakes waiters without changing elapsed time.
251    ///
252    /// This is intended for synchronization primitives that combine state-change
253    /// notifications with timeout deadlines.
254    pub fn notify_external_change(&self) {
255        let event_epoch = {
256            let mut state = self.lock_state();
257            state.event_epoch = state.event_epoch.wrapping_add(1);
258            state.event_epoch
259        };
260        self.notify_waiters(event_epoch);
261    }
262
263    /// Blocks until the current mock instant reaches `deadline`.
264    ///
265    /// # Parameters
266    /// - `deadline`: Mock instant at which the wait should complete.
267    ///
268    /// # Returns
269    /// `Ok(())` when the wait completes.
270    ///
271    /// # Errors
272    /// Returns [`MockTimeError::MismatchedTimeline`] if `deadline` was created
273    /// by a different timeline.
274    #[inline]
275    pub fn wait_until(&self, deadline: MockInstant) -> Result<(), MockTimeError> {
276        self.wait_until_with_kind(deadline, MockWaiterKind::Deadline)
277    }
278
279    /// Blocks until `duration` has elapsed on the mock timeline.
280    ///
281    /// # Parameters
282    /// - `duration`: Relative mock duration to wait.
283    #[inline]
284    pub fn wait_for(&self, duration: Duration) {
285        self.wait_until(self.now().saturating_add(duration))
286            .expect("relative waits should create deadlines on the same timeline");
287    }
288
289    /// Blocks until the event epoch changes after `observed_epoch`.
290    ///
291    /// # Parameters
292    /// - `observed_epoch`: Event epoch already observed by the caller.
293    pub fn wait_for_event_after(&self, observed_epoch: u64) {
294        let mut state = self.lock_state();
295        while state.event_epoch == observed_epoch {
296            self.shared.event_changed.wait(&mut state);
297        }
298    }
299
300    /// Blocks until a registered waiter count is observed or real timeout expires.
301    ///
302    /// # Parameters
303    /// - `kind`: Waiter group to inspect.
304    /// - `count`: Minimum number of waiters expected.
305    /// - `real_timeout`: Real wall-clock limit used only to keep tests from
306    ///   hanging forever.
307    ///
308    /// # Returns
309    /// `true` when enough waiters are observed before the real timeout.
310    pub fn wait_for_blocked_waiters(&self, kind: MockWaiterKind, count: usize, real_timeout: Duration) -> bool {
311        let Some(deadline) = Instant::now().checked_add(real_timeout) else {
312            return false;
313        };
314        let mut state = self.lock_state();
315        while Self::waiter_count(&state, kind) < count {
316            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
317                return false;
318            };
319            let wait_result = self.shared.waiters_changed.wait_for(&mut state, remaining);
320            if wait_result.timed_out() && Self::waiter_count(&state, kind) < count {
321                return false;
322            }
323        }
324        true
325    }
326
327    /// Blocks until a deadline with the specified waiter kind is reached.
328    ///
329    /// # Parameters
330    /// - `deadline`: Mock instant at which the wait should complete.
331    /// - `kind`: Waiter group used for test observability.
332    pub(crate) fn wait_until_with_kind(
333        &self,
334        deadline: MockInstant,
335        kind: MockWaiterKind,
336    ) -> Result<(), MockTimeError> {
337        self.ensure_own_instant(deadline)?;
338        let mut state = self.lock_state();
339        if state.elapsed_nanos >= deadline.nanos_since_origin() {
340            return Ok(());
341        }
342        Self::increment_waiter(&mut state, kind);
343        self.shared.waiters_changed.notify_all();
344        while state.elapsed_nanos < deadline.nanos_since_origin() {
345            self.shared.event_changed.wait(&mut state);
346        }
347        Self::decrement_waiter(&mut state, kind);
348        self.shared.waiters_changed.notify_all();
349        Ok(())
350    }
351
352    /// Returns a future that completes once the deadline is reached.
353    ///
354    /// # Parameters
355    /// - `deadline`: Mock instant at which the future should resolve.
356    /// - `kind`: Waiter group used for test observability.
357    ///
358    /// # Returns
359    /// A future resolving after the mock deadline is reached.
360    #[cfg(feature = "tokio")]
361    pub(crate) fn wait_until_async_with_kind<'a>(
362        &'a self,
363        deadline: MockInstant,
364        kind: MockWaiterKind,
365    ) -> Result<AsyncSleepFuture<'a>, MockTimeError> {
366        self.ensure_own_instant(deadline)?;
367        if self.elapsed_nanos() >= deadline.nanos_since_origin() {
368            return Ok(Box::pin(async {}));
369        }
370        let registration = MockTimelineWaiterRegistration::new(self.clone(), kind);
371        let mut event_receiver = self.async_event_sender.subscribe();
372        Ok(Box::pin(async move {
373            let _registration = registration;
374            loop {
375                if self.elapsed_nanos() >= deadline.nanos_since_origin() {
376                    return;
377                }
378                event_receiver
379                    .changed()
380                    .await
381                    .expect("mock timeline sender should live while timeline is borrowed");
382            }
383        }))
384    }
385
386    /// Ensures an instant belongs to this timeline.
387    ///
388    /// # Parameters
389    /// - `instant`: Instant to validate.
390    ///
391    /// # Returns
392    /// `Ok(())` when the instant belongs to this timeline.
393    ///
394    /// # Errors
395    /// Returns [`MockTimeError::MismatchedTimeline`] when the instant belongs
396    /// to a different timeline.
397    fn ensure_own_instant(&self, instant: MockInstant) -> Result<(), MockTimeError> {
398        if instant.timeline_id() == self.id {
399            Ok(())
400        } else {
401            Err(MockTimeError::MismatchedTimeline {
402                expected: self.id,
403                actual: instant.timeline_id(),
404            })
405        }
406    }
407
408    /// Locks timeline state.
409    ///
410    /// # Returns
411    /// A guard for timeline state.
412    #[inline]
413    fn lock_state(&self) -> MutexGuard<'_, MockTimelineState> {
414        self.shared.state.lock()
415    }
416
417    /// Wakes blocking and async waiters after an event-epoch change.
418    ///
419    /// # Parameters
420    /// - `event_epoch`: New event epoch to publish to async waiters.
421    fn notify_waiters(&self, event_epoch: u64) {
422        self.shared.event_changed.notify_all();
423        self.shared.waiters_changed.notify_all();
424        self.notify_async_waiters(event_epoch);
425    }
426
427    /// Publishes an event change to async waiters.
428    ///
429    /// # Parameters
430    /// - `event_epoch`: New event epoch.
431    #[cfg(feature = "tokio")]
432    #[inline]
433    fn notify_async_waiters(&self, event_epoch: u64) {
434        let _ = self.async_event_sender.send(event_epoch);
435    }
436
437    /// No-op when async support is disabled.
438    ///
439    /// # Parameters
440    /// - `_event_epoch`: New event epoch.
441    #[cfg(not(feature = "tokio"))]
442    #[inline]
443    fn notify_async_waiters(&self, _event_epoch: u64) {}
444
445    /// Increments a waiter count.
446    ///
447    /// # Parameters
448    /// - `state`: Timeline state to mutate.
449    /// - `kind`: Waiter group to increment.
450    fn increment_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
451        match kind {
452            MockWaiterKind::Sleep => {
453                state.sleep_waiters = state.sleep_waiters.saturating_add(1);
454            }
455            MockWaiterKind::Deadline => {
456                state.deadline_waiters = state.deadline_waiters.saturating_add(1);
457            }
458        }
459    }
460
461    /// Decrements a waiter count.
462    ///
463    /// # Parameters
464    /// - `state`: Timeline state to mutate.
465    /// - `kind`: Waiter group to decrement.
466    fn decrement_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
467        match kind {
468            MockWaiterKind::Sleep => {
469                state.sleep_waiters = state.sleep_waiters.saturating_sub(1);
470            }
471            MockWaiterKind::Deadline => {
472                state.deadline_waiters = state.deadline_waiters.saturating_sub(1);
473            }
474        }
475    }
476
477    /// Returns the waiter count for a group.
478    ///
479    /// # Parameters
480    /// - `state`: Timeline state to inspect.
481    /// - `kind`: Waiter group to read.
482    ///
483    /// # Returns
484    /// Number of registered waiters in the group.
485    #[inline]
486    fn waiter_count(state: &MockTimelineState, kind: MockWaiterKind) -> usize {
487        match kind {
488            MockWaiterKind::Sleep => state.sleep_waiters,
489            MockWaiterKind::Deadline => state.deadline_waiters,
490        }
491    }
492}
493
494impl Default for MockTimeline {
495    /// Creates a zero-elapsed mock timeline.
496    #[inline]
497    fn default() -> Self {
498        Self::new()
499    }
500}
501
502/// Converts nanoseconds to [`Duration`], saturating at `Duration::MAX`.
503///
504/// # Parameters
505/// - `nanos`: Nanoseconds to convert.
506///
507/// # Returns
508/// A standard duration.
509fn duration_from_nanos_saturating(nanos: u128) -> Duration {
510    let secs = nanos / 1_000_000_000;
511    let sub_nanos = (nanos % 1_000_000_000) as u32;
512    let secs = match u64::try_from(secs) {
513        Ok(secs) => secs,
514        Err(_) => return Duration::MAX,
515    };
516    Duration::new(secs, sub_nanos)
517}
518
519/// Allocates a new globally unique mock timeline id.
520///
521/// # Returns
522/// A non-zero timeline id.
523///
524/// # Panics
525/// Panics if all `u64` timeline ids have been exhausted.
526fn next_mock_timeline_id() -> u64 {
527    loop {
528        let current = NEXT_MOCK_TIMELINE_ID.load(Ordering::Relaxed);
529        assert_ne!(current, u64::MAX, "mock timeline id space exhausted");
530        if NEXT_MOCK_TIMELINE_ID
531            .compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
532            .is_ok()
533        {
534            return current;
535        }
536    }
537}