qubit_clock/mock/mock_timeline.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Shared monotonic timeline for mock time components.
11
12use parking_lot::{
13 Condvar,
14 Mutex,
15 MutexGuard,
16};
17use std::sync::Arc;
18use std::sync::atomic::{
19 AtomicU64,
20 Ordering,
21};
22use std::time::{
23 Duration,
24 Instant,
25};
26
27#[cfg(feature = "tokio")]
28use crate::sleep::AsyncSleepFuture;
29#[cfg(feature = "tokio")]
30use tokio::sync::watch;
31
32use crate::{
33 MockInstant,
34 MockTimeError,
35 MockWaiterKind,
36};
37
38/// Next globally unique mock timeline id.
39static NEXT_MOCK_TIMELINE_ID: AtomicU64 = AtomicU64::new(1);
40
41/// Shared monotonic time source for deterministic tests.
42///
43/// `MockTimeline` is the single authority for elapsed mock time. Cloned
44/// timelines share the same internal state, so a clock, sleeper, and future
45/// timeout-aware primitive can all observe the same logical time progression.
46/// The timeline starts at elapsed zero and advances only when a test calls
47/// [`advance`](Self::advance). It never follows wall-clock time by itself.
48///
49/// The timeline maintains two related notions of progress:
50///
51/// - **elapsed time** is the monotonic duration since the timeline origin.
52/// [`MockClock`](crate::MockClock) and
53/// [`crate::sleep::MockSleeper`] derive their behavior from this value.
54/// - **event epoch** is a notification counter incremented when time advances
55/// or when [`notify_external_change`](Self::notify_external_change) is
56/// called. It lets monitor-like primitives wait for either a state change or
57/// a later deadline without inventing a second mock clock.
58///
59/// Blocking waiters use condition variables, and async waiters use a Tokio
60/// watch channel when the `tokio` feature is enabled. Waiter counts are tracked
61/// by [`MockWaiterKind`] so tests can wait until a thread or future has really
62/// entered a mock wait before advancing time. Reset is rejected while waiters
63/// are active, because rewinding a timeline under a blocked waiter would make
64/// deadline semantics ambiguous. Deadlines also carry the id of the timeline
65/// that created them. Passing a deadline from one timeline into another
66/// timeline is rejected with [`MockTimeError::MismatchedTimeline`].
67///
68/// `MockTimeline` uses non-poisoning synchronization primitives internally.
69/// A panic in one test thread does not permanently poison the timeline for the
70/// rest of the test process.
71#[derive(Clone, Debug)]
72pub struct MockTimeline {
73 id: u64,
74 shared: Arc<MockTimelineShared>,
75 #[cfg(feature = "tokio")]
76 async_event_sender: watch::Sender<u64>,
77}
78
79/// Shared state and condition variables for a mock timeline.
80#[derive(Debug)]
81struct MockTimelineShared {
82 state: Mutex<MockTimelineState>,
83 event_changed: Condvar,
84 waiters_changed: Condvar,
85}
86
87/// Mutable mock timeline state.
88#[derive(Debug)]
89struct MockTimelineState {
90 elapsed_nanos: u128,
91 time_epoch: u64,
92 event_epoch: u64,
93 sleep_waiters: usize,
94 deadline_waiters: usize,
95}
96
97/// Registration for a mock timeline waiter.
98#[derive(Debug)]
99struct MockTimelineWaiterRegistration {
100 timeline: MockTimeline,
101 kind: MockWaiterKind,
102}
103
104impl MockTimelineWaiterRegistration {
105 /// Registers a waiter on a mock timeline.
106 ///
107 /// # Parameters
108 /// - `timeline`: Timeline that owns the waiter count.
109 /// - `kind`: Waiter group to increment.
110 ///
111 /// # Returns
112 /// A registration that decrements the waiter count when dropped.
113 fn new(timeline: MockTimeline, kind: MockWaiterKind) -> Self {
114 {
115 let mut state = timeline.lock_state();
116 MockTimeline::increment_waiter(&mut state, kind);
117 }
118 timeline.shared.waiters_changed.notify_all();
119 Self { timeline, kind }
120 }
121}
122
123impl Drop for MockTimelineWaiterRegistration {
124 /// Removes the registered waiter from the timeline.
125 fn drop(&mut self) {
126 {
127 let mut state = self.timeline.lock_state();
128 MockTimeline::decrement_waiter(&mut state, self.kind);
129 }
130 self.timeline.shared.waiters_changed.notify_all();
131 }
132}
133
134impl MockTimeline {
135 /// Creates a new timeline at elapsed zero.
136 ///
137 /// # Returns
138 /// A mock timeline with no elapsed time.
139 #[must_use]
140 pub fn new() -> Self {
141 #[cfg(feature = "tokio")]
142 let (async_event_sender, _) = watch::channel(0);
143 Self {
144 id: next_mock_timeline_id(),
145 shared: Arc::new(MockTimelineShared {
146 state: Mutex::new(MockTimelineState {
147 elapsed_nanos: 0,
148 time_epoch: 0,
149 event_epoch: 0,
150 sleep_waiters: 0,
151 deadline_waiters: 0,
152 }),
153 event_changed: Condvar::new(),
154 waiters_changed: Condvar::new(),
155 }),
156 #[cfg(feature = "tokio")]
157 async_event_sender,
158 }
159 }
160
161 /// Returns the globally unique id of this timeline.
162 ///
163 /// Clones of the same timeline return the same id. Independently created
164 /// timelines receive different ids, which lets deadline APIs reject
165 /// [`MockInstant`] values from the wrong timeline.
166 ///
167 /// # Returns
168 /// The timeline id.
169 #[inline]
170 pub const fn id(&self) -> u64 {
171 self.id
172 }
173
174 /// Returns elapsed mock time as a standard duration.
175 ///
176 /// # Returns
177 /// Elapsed monotonic time since the timeline origin.
178 #[inline]
179 pub fn elapsed(&self) -> Duration {
180 duration_from_nanos_saturating(self.elapsed_nanos())
181 }
182
183 /// Returns elapsed mock time in nanoseconds.
184 ///
185 /// # Returns
186 /// Elapsed monotonic nanoseconds since the timeline origin.
187 #[inline]
188 pub fn elapsed_nanos(&self) -> u128 {
189 self.lock_state().elapsed_nanos
190 }
191
192 /// Returns the current mock instant.
193 ///
194 /// # Returns
195 /// Current instant on this timeline.
196 #[inline]
197 pub fn now(&self) -> MockInstant {
198 MockInstant::from_nanos_since_origin(self.id, self.elapsed_nanos())
199 }
200
201 /// Returns the current event epoch.
202 ///
203 /// # Returns
204 /// Epoch incremented by time advances and external notifications.
205 #[inline]
206 pub fn event_epoch(&self) -> u64 {
207 self.lock_state().event_epoch
208 }
209
210 /// Advances mock time and wakes all timeline waiters.
211 ///
212 /// # Parameters
213 /// - `duration`: Non-negative duration to add.
214 pub fn advance(&self, duration: Duration) {
215 let event_epoch = {
216 let mut state = self.lock_state();
217 state.elapsed_nanos = state.elapsed_nanos.saturating_add(duration.as_nanos());
218 state.time_epoch = state.time_epoch.wrapping_add(1);
219 state.event_epoch = state.event_epoch.wrapping_add(1);
220 state.event_epoch
221 };
222 self.notify_waiters(event_epoch);
223 }
224
225 /// Resets the timeline to elapsed zero when no waiters are active.
226 ///
227 /// # Returns
228 /// `Ok(())` when reset succeeds.
229 ///
230 /// # Errors
231 /// Returns [`MockTimeError::ActiveWaiters`] when timeline waiters are active.
232 pub fn reset(&self) -> Result<(), MockTimeError> {
233 let event_epoch = {
234 let mut state = self.lock_state();
235 if state.sleep_waiters != 0 || state.deadline_waiters != 0 {
236 return Err(MockTimeError::ActiveWaiters);
237 }
238 state.elapsed_nanos = 0;
239 state.time_epoch = state.time_epoch.wrapping_add(1);
240 state.event_epoch = state.event_epoch.wrapping_add(1);
241 state.event_epoch
242 };
243 self.notify_waiters(event_epoch);
244 Ok(())
245 }
246
247 /// Wakes waiters without changing elapsed time.
248 ///
249 /// This is intended for synchronization primitives that combine state-change
250 /// notifications with timeout deadlines.
251 pub fn notify_external_change(&self) {
252 let event_epoch = {
253 let mut state = self.lock_state();
254 state.event_epoch = state.event_epoch.wrapping_add(1);
255 state.event_epoch
256 };
257 self.notify_waiters(event_epoch);
258 }
259
260 /// Blocks until the current mock instant reaches `deadline`.
261 ///
262 /// # Parameters
263 /// - `deadline`: Mock instant at which the wait should complete.
264 ///
265 /// # Returns
266 /// `Ok(())` when the wait completes.
267 ///
268 /// # Errors
269 /// Returns [`MockTimeError::MismatchedTimeline`] if `deadline` was created
270 /// by a different timeline.
271 #[inline]
272 pub fn wait_until(&self, deadline: MockInstant) -> Result<(), MockTimeError> {
273 self.wait_until_with_kind(deadline, MockWaiterKind::Deadline)
274 }
275
276 /// Blocks until `duration` has elapsed on the mock timeline.
277 ///
278 /// # Parameters
279 /// - `duration`: Relative mock duration to wait.
280 #[inline]
281 pub fn wait_for(&self, duration: Duration) {
282 self.wait_until(self.now().saturating_add(duration))
283 .expect("relative waits should create deadlines on the same timeline");
284 }
285
286 /// Blocks until the event epoch changes after `observed_epoch`.
287 ///
288 /// # Parameters
289 /// - `observed_epoch`: Event epoch already observed by the caller.
290 pub fn wait_for_event_after(&self, observed_epoch: u64) {
291 let mut state = self.lock_state();
292 while state.event_epoch == observed_epoch {
293 self.shared.event_changed.wait(&mut state);
294 }
295 }
296
297 /// Blocks until a registered waiter count is observed or real timeout expires.
298 ///
299 /// # Parameters
300 /// - `kind`: Waiter group to inspect.
301 /// - `count`: Minimum number of waiters expected.
302 /// - `real_timeout`: Real wall-clock limit used only to keep tests from
303 /// hanging forever.
304 ///
305 /// # Returns
306 /// `true` when enough waiters are observed before the real timeout.
307 pub fn wait_for_blocked_waiters(&self, kind: MockWaiterKind, count: usize, real_timeout: Duration) -> bool {
308 let Some(deadline) = Instant::now().checked_add(real_timeout) else {
309 return false;
310 };
311 let mut state = self.lock_state();
312 while Self::waiter_count(&state, kind) < count {
313 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
314 return false;
315 };
316 let wait_result = self.shared.waiters_changed.wait_for(&mut state, remaining);
317 if wait_result.timed_out() && Self::waiter_count(&state, kind) < count {
318 return false;
319 }
320 }
321 true
322 }
323
324 /// Blocks until a deadline with the specified waiter kind is reached.
325 ///
326 /// # Parameters
327 /// - `deadline`: Mock instant at which the wait should complete.
328 /// - `kind`: Waiter group used for test observability.
329 pub(crate) fn wait_until_with_kind(
330 &self,
331 deadline: MockInstant,
332 kind: MockWaiterKind,
333 ) -> Result<(), MockTimeError> {
334 self.ensure_own_instant(deadline)?;
335 let mut state = self.lock_state();
336 if state.elapsed_nanos >= deadline.nanos_since_origin() {
337 return Ok(());
338 }
339 Self::increment_waiter(&mut state, kind);
340 self.shared.waiters_changed.notify_all();
341 while state.elapsed_nanos < deadline.nanos_since_origin() {
342 self.shared.event_changed.wait(&mut state);
343 }
344 Self::decrement_waiter(&mut state, kind);
345 self.shared.waiters_changed.notify_all();
346 Ok(())
347 }
348
349 /// Returns a future that completes once the deadline is reached.
350 ///
351 /// # Parameters
352 /// - `deadline`: Mock instant at which the future should resolve.
353 /// - `kind`: Waiter group used for test observability.
354 ///
355 /// # Returns
356 /// A future resolving after the mock deadline is reached.
357 #[cfg(feature = "tokio")]
358 pub(crate) fn wait_until_async_with_kind<'a>(
359 &'a self,
360 deadline: MockInstant,
361 kind: MockWaiterKind,
362 ) -> Result<AsyncSleepFuture<'a>, MockTimeError> {
363 self.ensure_own_instant(deadline)?;
364 if self.elapsed_nanos() >= deadline.nanos_since_origin() {
365 return Ok(Box::pin(async {}));
366 }
367 let registration = MockTimelineWaiterRegistration::new(self.clone(), kind);
368 let mut event_receiver = self.async_event_sender.subscribe();
369 Ok(Box::pin(async move {
370 let _registration = registration;
371 loop {
372 if self.elapsed_nanos() >= deadline.nanos_since_origin() {
373 return;
374 }
375 event_receiver
376 .changed()
377 .await
378 .expect("mock timeline sender should live while timeline is borrowed");
379 }
380 }))
381 }
382
383 /// Ensures an instant belongs to this timeline.
384 ///
385 /// # Parameters
386 /// - `instant`: Instant to validate.
387 ///
388 /// # Returns
389 /// `Ok(())` when the instant belongs to this timeline.
390 ///
391 /// # Errors
392 /// Returns [`MockTimeError::MismatchedTimeline`] when the instant belongs
393 /// to a different timeline.
394 fn ensure_own_instant(&self, instant: MockInstant) -> Result<(), MockTimeError> {
395 if instant.timeline_id() == self.id {
396 Ok(())
397 } else {
398 Err(MockTimeError::MismatchedTimeline {
399 expected: self.id,
400 actual: instant.timeline_id(),
401 })
402 }
403 }
404
405 /// Locks timeline state.
406 ///
407 /// # Returns
408 /// A guard for timeline state.
409 #[inline]
410 fn lock_state(&self) -> MutexGuard<'_, MockTimelineState> {
411 self.shared.state.lock()
412 }
413
414 /// Wakes blocking and async waiters after an event-epoch change.
415 ///
416 /// # Parameters
417 /// - `event_epoch`: New event epoch to publish to async waiters.
418 fn notify_waiters(&self, event_epoch: u64) {
419 self.shared.event_changed.notify_all();
420 self.shared.waiters_changed.notify_all();
421 self.notify_async_waiters(event_epoch);
422 }
423
424 /// Publishes an event change to async waiters.
425 ///
426 /// # Parameters
427 /// - `event_epoch`: New event epoch.
428 #[cfg(feature = "tokio")]
429 #[inline]
430 fn notify_async_waiters(&self, event_epoch: u64) {
431 let _ = self.async_event_sender.send(event_epoch);
432 }
433
434 /// No-op when async support is disabled.
435 ///
436 /// # Parameters
437 /// - `_event_epoch`: New event epoch.
438 #[cfg(not(feature = "tokio"))]
439 #[inline]
440 fn notify_async_waiters(&self, _event_epoch: u64) {}
441
442 /// Increments a waiter count.
443 ///
444 /// # Parameters
445 /// - `state`: Timeline state to mutate.
446 /// - `kind`: Waiter group to increment.
447 fn increment_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
448 match kind {
449 MockWaiterKind::Sleep => {
450 state.sleep_waiters = state.sleep_waiters.saturating_add(1);
451 }
452 MockWaiterKind::Deadline => {
453 state.deadline_waiters = state.deadline_waiters.saturating_add(1);
454 }
455 }
456 }
457
458 /// Decrements a waiter count.
459 ///
460 /// # Parameters
461 /// - `state`: Timeline state to mutate.
462 /// - `kind`: Waiter group to decrement.
463 fn decrement_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
464 match kind {
465 MockWaiterKind::Sleep => {
466 state.sleep_waiters = state.sleep_waiters.saturating_sub(1);
467 }
468 MockWaiterKind::Deadline => {
469 state.deadline_waiters = state.deadline_waiters.saturating_sub(1);
470 }
471 }
472 }
473
474 /// Returns the waiter count for a group.
475 ///
476 /// # Parameters
477 /// - `state`: Timeline state to inspect.
478 /// - `kind`: Waiter group to read.
479 ///
480 /// # Returns
481 /// Number of registered waiters in the group.
482 #[inline]
483 fn waiter_count(state: &MockTimelineState, kind: MockWaiterKind) -> usize {
484 match kind {
485 MockWaiterKind::Sleep => state.sleep_waiters,
486 MockWaiterKind::Deadline => state.deadline_waiters,
487 }
488 }
489}
490
491impl Default for MockTimeline {
492 /// Creates a zero-elapsed mock timeline.
493 #[inline]
494 fn default() -> Self {
495 Self::new()
496 }
497}
498
499/// Converts nanoseconds to [`Duration`], saturating at `Duration::MAX`.
500///
501/// # Parameters
502/// - `nanos`: Nanoseconds to convert.
503///
504/// # Returns
505/// A standard duration.
506fn duration_from_nanos_saturating(nanos: u128) -> Duration {
507 let secs = nanos / 1_000_000_000;
508 let sub_nanos = (nanos % 1_000_000_000) as u32;
509 let secs = match u64::try_from(secs) {
510 Ok(secs) => secs,
511 Err(_) => return Duration::MAX,
512 };
513 Duration::new(secs, sub_nanos)
514}
515
516/// Allocates a new globally unique mock timeline id.
517///
518/// # Returns
519/// A non-zero timeline id.
520///
521/// # Panics
522/// Panics if all `u64` timeline ids have been exhausted.
523fn next_mock_timeline_id() -> u64 {
524 loop {
525 let current = NEXT_MOCK_TIMELINE_ID.load(Ordering::Relaxed);
526 assert_ne!(current, u64::MAX, "mock timeline id space exhausted");
527 if NEXT_MOCK_TIMELINE_ID
528 .compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
529 .is_ok()
530 {
531 return current;
532 }
533 }
534}