Skip to main content

qubit_clock/mock/
mock_timeline.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Shared monotonic timeline for mock time components.
11
12use parking_lot::{
13    Condvar,
14    Mutex,
15    MutexGuard,
16};
17use std::sync::Arc;
18use std::sync::atomic::{
19    AtomicU64,
20    Ordering,
21};
22use std::time::{
23    Duration,
24    Instant,
25};
26
27#[cfg(feature = "tokio")]
28use crate::sleep::AsyncSleepFuture;
29#[cfg(feature = "tokio")]
30use tokio::sync::watch;
31
32use crate::{
33    MockInstant,
34    MockTimeError,
35    MockWaiterKind,
36};
37
38/// Next globally unique mock timeline id.
39static NEXT_MOCK_TIMELINE_ID: AtomicU64 = AtomicU64::new(1);
40
41/// Shared monotonic time source for deterministic tests.
42///
43/// `MockTimeline` is the single authority for elapsed mock time. Cloned
44/// timelines share the same internal state, so a clock, sleeper, and future
45/// timeout-aware primitive can all observe the same logical time progression.
46/// The timeline starts at elapsed zero and advances only when a test calls
47/// [`advance`](Self::advance). It never follows wall-clock time by itself.
48///
49/// The timeline maintains two related notions of progress:
50///
51/// - **elapsed time** is the monotonic duration since the timeline origin.
52///   [`MockClock`](crate::MockClock) and
53///   [`crate::sleep::MockSleeper`] derive their behavior from this value.
54/// - **event epoch** is a notification counter incremented when time advances
55///   or when [`notify_external_change`](Self::notify_external_change) is
56///   called. It lets monitor-like primitives wait for either a state change or
57///   a later deadline without inventing a second mock clock.
58///
59/// Blocking waiters use condition variables, and async waiters use a Tokio
60/// watch channel when the `tokio` feature is enabled. Waiter counts are tracked
61/// by [`MockWaiterKind`] so tests can wait until a thread or future has really
62/// entered a mock wait before advancing time. Reset is rejected while waiters
63/// are active, because rewinding a timeline under a blocked waiter would make
64/// deadline semantics ambiguous. Deadlines also carry the id of the timeline
65/// that created them. Passing a deadline from one timeline into another
66/// timeline is rejected with [`MockTimeError::MismatchedTimeline`].
67///
68/// `MockTimeline` uses non-poisoning synchronization primitives internally.
69/// A panic in one test thread does not permanently poison the timeline for the
70/// rest of the test process.
71#[derive(Clone, Debug)]
72pub struct MockTimeline {
73    id: u64,
74    shared: Arc<MockTimelineShared>,
75    #[cfg(feature = "tokio")]
76    async_event_sender: watch::Sender<u64>,
77}
78
79/// Shared state and condition variables for a mock timeline.
80#[derive(Debug)]
81struct MockTimelineShared {
82    state: Mutex<MockTimelineState>,
83    event_changed: Condvar,
84    waiters_changed: Condvar,
85}
86
87/// Mutable mock timeline state.
88#[derive(Debug)]
89struct MockTimelineState {
90    elapsed_nanos: u128,
91    time_epoch: u64,
92    event_epoch: u64,
93    sleep_waiters: usize,
94    deadline_waiters: usize,
95}
96
97/// Registration for a mock timeline waiter.
98#[derive(Debug)]
99struct MockTimelineWaiterRegistration {
100    timeline: MockTimeline,
101    kind: MockWaiterKind,
102}
103
104impl MockTimelineWaiterRegistration {
105    /// Registers a waiter on a mock timeline.
106    ///
107    /// # Parameters
108    /// - `timeline`: Timeline that owns the waiter count.
109    /// - `kind`: Waiter group to increment.
110    ///
111    /// # Returns
112    /// A registration that decrements the waiter count when dropped.
113    fn new(timeline: MockTimeline, kind: MockWaiterKind) -> Self {
114        {
115            let mut state = timeline.lock_state();
116            MockTimeline::increment_waiter(&mut state, kind);
117        }
118        timeline.shared.waiters_changed.notify_all();
119        Self { timeline, kind }
120    }
121}
122
123impl Drop for MockTimelineWaiterRegistration {
124    /// Removes the registered waiter from the timeline.
125    fn drop(&mut self) {
126        {
127            let mut state = self.timeline.lock_state();
128            MockTimeline::decrement_waiter(&mut state, self.kind);
129        }
130        self.timeline.shared.waiters_changed.notify_all();
131    }
132}
133
134impl MockTimeline {
135    /// Creates a new timeline at elapsed zero.
136    ///
137    /// # Returns
138    /// A mock timeline with no elapsed time.
139    #[must_use]
140    pub fn new() -> Self {
141        #[cfg(feature = "tokio")]
142        let (async_event_sender, _) = watch::channel(0);
143        Self {
144            id: next_mock_timeline_id(),
145            shared: Arc::new(MockTimelineShared {
146                state: Mutex::new(MockTimelineState {
147                    elapsed_nanos: 0,
148                    time_epoch: 0,
149                    event_epoch: 0,
150                    sleep_waiters: 0,
151                    deadline_waiters: 0,
152                }),
153                event_changed: Condvar::new(),
154                waiters_changed: Condvar::new(),
155            }),
156            #[cfg(feature = "tokio")]
157            async_event_sender,
158        }
159    }
160
161    /// Returns the globally unique id of this timeline.
162    ///
163    /// Clones of the same timeline return the same id. Independently created
164    /// timelines receive different ids, which lets deadline APIs reject
165    /// [`MockInstant`] values from the wrong timeline.
166    ///
167    /// # Returns
168    /// The timeline id.
169    #[inline]
170    pub const fn id(&self) -> u64 {
171        self.id
172    }
173
174    /// Returns elapsed mock time as a standard duration.
175    ///
176    /// # Returns
177    /// Elapsed monotonic time since the timeline origin.
178    #[inline]
179    pub fn elapsed(&self) -> Duration {
180        duration_from_nanos_saturating(self.elapsed_nanos())
181    }
182
183    /// Returns elapsed mock time in nanoseconds.
184    ///
185    /// # Returns
186    /// Elapsed monotonic nanoseconds since the timeline origin.
187    #[inline]
188    pub fn elapsed_nanos(&self) -> u128 {
189        self.lock_state().elapsed_nanos
190    }
191
192    /// Returns the current mock instant.
193    ///
194    /// # Returns
195    /// Current instant on this timeline.
196    #[inline]
197    pub fn now(&self) -> MockInstant {
198        MockInstant::from_nanos_since_origin(self.id, self.elapsed_nanos())
199    }
200
201    /// Returns the current event epoch.
202    ///
203    /// # Returns
204    /// Epoch incremented by time advances and external notifications.
205    #[inline]
206    pub fn event_epoch(&self) -> u64 {
207        self.lock_state().event_epoch
208    }
209
210    /// Advances mock time and wakes all timeline waiters.
211    ///
212    /// # Parameters
213    /// - `duration`: Non-negative duration to add.
214    pub fn advance(&self, duration: Duration) {
215        let event_epoch = {
216            let mut state = self.lock_state();
217            state.elapsed_nanos = state.elapsed_nanos.saturating_add(duration.as_nanos());
218            state.time_epoch = state.time_epoch.wrapping_add(1);
219            state.event_epoch = state.event_epoch.wrapping_add(1);
220            state.event_epoch
221        };
222        self.notify_waiters(event_epoch);
223    }
224
225    /// Resets the timeline to elapsed zero when no waiters are active.
226    ///
227    /// # Returns
228    /// `Ok(())` when reset succeeds.
229    ///
230    /// # Errors
231    /// Returns [`MockTimeError::ActiveWaiters`] when timeline waiters are active.
232    pub fn reset(&self) -> Result<(), MockTimeError> {
233        let event_epoch = {
234            let mut state = self.lock_state();
235            if state.sleep_waiters != 0 || state.deadline_waiters != 0 {
236                return Err(MockTimeError::ActiveWaiters);
237            }
238            state.elapsed_nanos = 0;
239            state.time_epoch = state.time_epoch.wrapping_add(1);
240            state.event_epoch = state.event_epoch.wrapping_add(1);
241            state.event_epoch
242        };
243        self.notify_waiters(event_epoch);
244        Ok(())
245    }
246
247    /// Wakes waiters without changing elapsed time.
248    ///
249    /// This is intended for synchronization primitives that combine state-change
250    /// notifications with timeout deadlines.
251    pub fn notify_external_change(&self) {
252        let event_epoch = {
253            let mut state = self.lock_state();
254            state.event_epoch = state.event_epoch.wrapping_add(1);
255            state.event_epoch
256        };
257        self.notify_waiters(event_epoch);
258    }
259
260    /// Blocks until the current mock instant reaches `deadline`.
261    ///
262    /// # Parameters
263    /// - `deadline`: Mock instant at which the wait should complete.
264    ///
265    /// # Returns
266    /// `Ok(())` when the wait completes.
267    ///
268    /// # Errors
269    /// Returns [`MockTimeError::MismatchedTimeline`] if `deadline` was created
270    /// by a different timeline.
271    #[inline]
272    pub fn wait_until(&self, deadline: MockInstant) -> Result<(), MockTimeError> {
273        self.wait_until_with_kind(deadline, MockWaiterKind::Deadline)
274    }
275
276    /// Blocks until `duration` has elapsed on the mock timeline.
277    ///
278    /// # Parameters
279    /// - `duration`: Relative mock duration to wait.
280    #[inline]
281    pub fn wait_for(&self, duration: Duration) {
282        self.wait_until(self.now().saturating_add(duration))
283            .expect("relative waits should create deadlines on the same timeline");
284    }
285
286    /// Blocks until the event epoch changes after `observed_epoch`.
287    ///
288    /// # Parameters
289    /// - `observed_epoch`: Event epoch already observed by the caller.
290    pub fn wait_for_event_after(&self, observed_epoch: u64) {
291        let mut state = self.lock_state();
292        while state.event_epoch == observed_epoch {
293            self.shared.event_changed.wait(&mut state);
294        }
295    }
296
297    /// Blocks until a registered waiter count is observed or real timeout expires.
298    ///
299    /// # Parameters
300    /// - `kind`: Waiter group to inspect.
301    /// - `count`: Minimum number of waiters expected.
302    /// - `real_timeout`: Real wall-clock limit used only to keep tests from
303    ///   hanging forever.
304    ///
305    /// # Returns
306    /// `true` when enough waiters are observed before the real timeout.
307    pub fn wait_for_blocked_waiters(&self, kind: MockWaiterKind, count: usize, real_timeout: Duration) -> bool {
308        let Some(deadline) = Instant::now().checked_add(real_timeout) else {
309            return false;
310        };
311        let mut state = self.lock_state();
312        while Self::waiter_count(&state, kind) < count {
313            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
314                return false;
315            };
316            let wait_result = self.shared.waiters_changed.wait_for(&mut state, remaining);
317            if wait_result.timed_out() && Self::waiter_count(&state, kind) < count {
318                return false;
319            }
320        }
321        true
322    }
323
324    /// Blocks until a deadline with the specified waiter kind is reached.
325    ///
326    /// # Parameters
327    /// - `deadline`: Mock instant at which the wait should complete.
328    /// - `kind`: Waiter group used for test observability.
329    pub(crate) fn wait_until_with_kind(
330        &self,
331        deadline: MockInstant,
332        kind: MockWaiterKind,
333    ) -> Result<(), MockTimeError> {
334        self.ensure_own_instant(deadline)?;
335        let mut state = self.lock_state();
336        if state.elapsed_nanos >= deadline.nanos_since_origin() {
337            return Ok(());
338        }
339        Self::increment_waiter(&mut state, kind);
340        self.shared.waiters_changed.notify_all();
341        while state.elapsed_nanos < deadline.nanos_since_origin() {
342            self.shared.event_changed.wait(&mut state);
343        }
344        Self::decrement_waiter(&mut state, kind);
345        self.shared.waiters_changed.notify_all();
346        Ok(())
347    }
348
349    /// Returns a future that completes once the deadline is reached.
350    ///
351    /// # Parameters
352    /// - `deadline`: Mock instant at which the future should resolve.
353    /// - `kind`: Waiter group used for test observability.
354    ///
355    /// # Returns
356    /// A future resolving after the mock deadline is reached.
357    #[cfg(feature = "tokio")]
358    pub(crate) fn wait_until_async_with_kind<'a>(
359        &'a self,
360        deadline: MockInstant,
361        kind: MockWaiterKind,
362    ) -> Result<AsyncSleepFuture<'a>, MockTimeError> {
363        self.ensure_own_instant(deadline)?;
364        if self.elapsed_nanos() >= deadline.nanos_since_origin() {
365            return Ok(Box::pin(async {}));
366        }
367        let registration = MockTimelineWaiterRegistration::new(self.clone(), kind);
368        let mut event_receiver = self.async_event_sender.subscribe();
369        Ok(Box::pin(async move {
370            let _registration = registration;
371            loop {
372                if self.elapsed_nanos() >= deadline.nanos_since_origin() {
373                    return;
374                }
375                event_receiver
376                    .changed()
377                    .await
378                    .expect("mock timeline sender should live while timeline is borrowed");
379            }
380        }))
381    }
382
383    /// Ensures an instant belongs to this timeline.
384    ///
385    /// # Parameters
386    /// - `instant`: Instant to validate.
387    ///
388    /// # Returns
389    /// `Ok(())` when the instant belongs to this timeline.
390    ///
391    /// # Errors
392    /// Returns [`MockTimeError::MismatchedTimeline`] when the instant belongs
393    /// to a different timeline.
394    fn ensure_own_instant(&self, instant: MockInstant) -> Result<(), MockTimeError> {
395        if instant.timeline_id() == self.id {
396            Ok(())
397        } else {
398            Err(MockTimeError::MismatchedTimeline {
399                expected: self.id,
400                actual: instant.timeline_id(),
401            })
402        }
403    }
404
405    /// Locks timeline state.
406    ///
407    /// # Returns
408    /// A guard for timeline state.
409    #[inline]
410    fn lock_state(&self) -> MutexGuard<'_, MockTimelineState> {
411        self.shared.state.lock()
412    }
413
414    /// Wakes blocking and async waiters after an event-epoch change.
415    ///
416    /// # Parameters
417    /// - `event_epoch`: New event epoch to publish to async waiters.
418    fn notify_waiters(&self, event_epoch: u64) {
419        self.shared.event_changed.notify_all();
420        self.shared.waiters_changed.notify_all();
421        self.notify_async_waiters(event_epoch);
422    }
423
424    /// Publishes an event change to async waiters.
425    ///
426    /// # Parameters
427    /// - `event_epoch`: New event epoch.
428    #[cfg(feature = "tokio")]
429    #[inline]
430    fn notify_async_waiters(&self, event_epoch: u64) {
431        let _ = self.async_event_sender.send(event_epoch);
432    }
433
434    /// No-op when async support is disabled.
435    ///
436    /// # Parameters
437    /// - `_event_epoch`: New event epoch.
438    #[cfg(not(feature = "tokio"))]
439    #[inline]
440    fn notify_async_waiters(&self, _event_epoch: u64) {}
441
442    /// Increments a waiter count.
443    ///
444    /// # Parameters
445    /// - `state`: Timeline state to mutate.
446    /// - `kind`: Waiter group to increment.
447    fn increment_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
448        match kind {
449            MockWaiterKind::Sleep => {
450                state.sleep_waiters = state.sleep_waiters.saturating_add(1);
451            }
452            MockWaiterKind::Deadline => {
453                state.deadline_waiters = state.deadline_waiters.saturating_add(1);
454            }
455        }
456    }
457
458    /// Decrements a waiter count.
459    ///
460    /// # Parameters
461    /// - `state`: Timeline state to mutate.
462    /// - `kind`: Waiter group to decrement.
463    fn decrement_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
464        match kind {
465            MockWaiterKind::Sleep => {
466                state.sleep_waiters = state.sleep_waiters.saturating_sub(1);
467            }
468            MockWaiterKind::Deadline => {
469                state.deadline_waiters = state.deadline_waiters.saturating_sub(1);
470            }
471        }
472    }
473
474    /// Returns the waiter count for a group.
475    ///
476    /// # Parameters
477    /// - `state`: Timeline state to inspect.
478    /// - `kind`: Waiter group to read.
479    ///
480    /// # Returns
481    /// Number of registered waiters in the group.
482    #[inline]
483    fn waiter_count(state: &MockTimelineState, kind: MockWaiterKind) -> usize {
484        match kind {
485            MockWaiterKind::Sleep => state.sleep_waiters,
486            MockWaiterKind::Deadline => state.deadline_waiters,
487        }
488    }
489}
490
491impl Default for MockTimeline {
492    /// Creates a zero-elapsed mock timeline.
493    #[inline]
494    fn default() -> Self {
495        Self::new()
496    }
497}
498
499/// Converts nanoseconds to [`Duration`], saturating at `Duration::MAX`.
500///
501/// # Parameters
502/// - `nanos`: Nanoseconds to convert.
503///
504/// # Returns
505/// A standard duration.
506fn duration_from_nanos_saturating(nanos: u128) -> Duration {
507    let secs = nanos / 1_000_000_000;
508    let sub_nanos = (nanos % 1_000_000_000) as u32;
509    let secs = match u64::try_from(secs) {
510        Ok(secs) => secs,
511        Err(_) => return Duration::MAX,
512    };
513    Duration::new(secs, sub_nanos)
514}
515
516/// Allocates a new globally unique mock timeline id.
517///
518/// # Returns
519/// A non-zero timeline id.
520///
521/// # Panics
522/// Panics if all `u64` timeline ids have been exhausted.
523fn next_mock_timeline_id() -> u64 {
524    loop {
525        let current = NEXT_MOCK_TIMELINE_ID.load(Ordering::Relaxed);
526        assert_ne!(current, u64::MAX, "mock timeline id space exhausted");
527        if NEXT_MOCK_TIMELINE_ID
528            .compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
529            .is_ok()
530        {
531            return current;
532        }
533    }
534}