qubit_clock/mock/mock_timeline.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Shared monotonic timeline for mock time components.
11
12use parking_lot::{
13 Condvar,
14 Mutex,
15 MutexGuard,
16};
17use std::sync::Arc;
18use std::sync::atomic::{
19 AtomicU64,
20 Ordering,
21};
22use std::time::{
23 Duration,
24 Instant,
25};
26
27#[cfg(feature = "tokio")]
28use crate::sleep::AsyncSleepFuture;
29#[cfg(feature = "tokio")]
30use tokio::sync::watch;
31
32use crate::{
33 MockInstant,
34 MockTimeError,
35 MockWaiterKind,
36};
37
38/// Next globally unique mock timeline id.
39static NEXT_MOCK_TIMELINE_ID: AtomicU64 = AtomicU64::new(1);
40
41/// Shared monotonic time source for deterministic tests.
42///
43/// `MockTimeline` is the single authority for elapsed mock time. Cloned
44/// timelines share the same internal state, so a clock, sleeper, and future
45/// timeout-aware primitive can all observe the same logical time progression.
46/// The timeline starts at elapsed zero and advances only when a test calls
47/// [`advance`](Self::advance). It never follows wall-clock time by itself.
48///
49/// The timeline maintains two related notions of progress:
50///
51/// - **elapsed time** is the monotonic duration since the timeline origin.
52/// [`MockClock`](crate::MockClock) and
53/// [`crate::sleep::MockSleeper`] derive their behavior from this value.
54/// - **event epoch** is a notification counter incremented when time advances
55/// or when [`notify_external_change`](Self::notify_external_change) is
56/// called. It lets monitor-like primitives wait for either a state change or
57/// a later deadline without inventing a second mock clock.
58///
59/// Blocking waiters use condition variables, and async waiters use a Tokio
60/// watch channel when the `tokio` feature is enabled. Waiter counts are tracked
61/// by [`MockWaiterKind`] so tests can wait until a thread or future has really
62/// entered a mock wait before advancing time. Reset is rejected while waiters
63/// are active, because rewinding a timeline under a blocked waiter would make
64/// deadline semantics ambiguous. Deadlines also carry the id of the timeline
65/// that created them. Passing a deadline from one timeline into another
66/// timeline is rejected with [`MockTimeError::MismatchedTimeline`].
67///
68/// `MockTimeline` uses non-poisoning synchronization primitives internally.
69/// A panic in one test thread does not permanently poison the timeline for the
70/// rest of the test process.
71#[derive(Clone, Debug)]
72pub struct MockTimeline {
73 id: u64,
74 shared: Arc<MockTimelineShared>,
75 #[cfg(feature = "tokio")]
76 async_event_sender: watch::Sender<u64>,
77}
78
79/// Shared state and condition variables for a mock timeline.
80#[derive(Debug)]
81struct MockTimelineShared {
82 state: Mutex<MockTimelineState>,
83 event_changed: Condvar,
84 waiters_changed: Condvar,
85}
86
87/// Mutable mock timeline state.
88#[derive(Debug)]
89struct MockTimelineState {
90 elapsed_nanos: u128,
91 time_epoch: u64,
92 event_epoch: u64,
93 sleep_waiters: usize,
94 deadline_waiters: usize,
95}
96
97/// Registration for a mock timeline waiter.
98#[cfg(feature = "tokio")]
99#[derive(Debug)]
100struct MockTimelineWaiterRegistration {
101 timeline: MockTimeline,
102 kind: MockWaiterKind,
103}
104
105#[cfg(feature = "tokio")]
106impl MockTimelineWaiterRegistration {
107 /// Registers a waiter on a mock timeline.
108 ///
109 /// # Parameters
110 /// - `timeline`: Timeline that owns the waiter count.
111 /// - `kind`: Waiter group to increment.
112 ///
113 /// # Returns
114 /// A registration that decrements the waiter count when dropped.
115 fn new(timeline: MockTimeline, kind: MockWaiterKind) -> Self {
116 {
117 let mut state = timeline.lock_state();
118 MockTimeline::increment_waiter(&mut state, kind);
119 }
120 timeline.shared.waiters_changed.notify_all();
121 Self { timeline, kind }
122 }
123}
124
125#[cfg(feature = "tokio")]
126impl Drop for MockTimelineWaiterRegistration {
127 /// Removes the registered waiter from the timeline.
128 fn drop(&mut self) {
129 {
130 let mut state = self.timeline.lock_state();
131 MockTimeline::decrement_waiter(&mut state, self.kind);
132 }
133 self.timeline.shared.waiters_changed.notify_all();
134 }
135}
136
137impl MockTimeline {
138 /// Creates a new timeline at elapsed zero.
139 ///
140 /// # Returns
141 /// A mock timeline with no elapsed time.
142 #[must_use]
143 pub fn new() -> Self {
144 #[cfg(feature = "tokio")]
145 let (async_event_sender, _) = watch::channel(0);
146 Self {
147 id: next_mock_timeline_id(),
148 shared: Arc::new(MockTimelineShared {
149 state: Mutex::new(MockTimelineState {
150 elapsed_nanos: 0,
151 time_epoch: 0,
152 event_epoch: 0,
153 sleep_waiters: 0,
154 deadline_waiters: 0,
155 }),
156 event_changed: Condvar::new(),
157 waiters_changed: Condvar::new(),
158 }),
159 #[cfg(feature = "tokio")]
160 async_event_sender,
161 }
162 }
163
164 /// Returns the globally unique id of this timeline.
165 ///
166 /// Clones of the same timeline return the same id. Independently created
167 /// timelines receive different ids, which lets deadline APIs reject
168 /// [`MockInstant`] values from the wrong timeline.
169 ///
170 /// # Returns
171 /// The timeline id.
172 #[inline]
173 pub const fn id(&self) -> u64 {
174 self.id
175 }
176
177 /// Returns elapsed mock time as a standard duration.
178 ///
179 /// # Returns
180 /// Elapsed monotonic time since the timeline origin.
181 #[inline]
182 pub fn elapsed(&self) -> Duration {
183 duration_from_nanos_saturating(self.elapsed_nanos())
184 }
185
186 /// Returns elapsed mock time in nanoseconds.
187 ///
188 /// # Returns
189 /// Elapsed monotonic nanoseconds since the timeline origin.
190 #[inline]
191 pub fn elapsed_nanos(&self) -> u128 {
192 self.lock_state().elapsed_nanos
193 }
194
195 /// Returns the current mock instant.
196 ///
197 /// # Returns
198 /// Current instant on this timeline.
199 #[inline]
200 pub fn now(&self) -> MockInstant {
201 MockInstant::from_nanos_since_origin(self.id, self.elapsed_nanos())
202 }
203
204 /// Returns the current event epoch.
205 ///
206 /// # Returns
207 /// Epoch incremented by time advances and external notifications.
208 #[inline]
209 pub fn event_epoch(&self) -> u64 {
210 self.lock_state().event_epoch
211 }
212
213 /// Advances mock time and wakes all timeline waiters.
214 ///
215 /// # Parameters
216 /// - `duration`: Non-negative duration to add.
217 pub fn advance(&self, duration: Duration) {
218 let event_epoch = {
219 let mut state = self.lock_state();
220 state.elapsed_nanos = state.elapsed_nanos.saturating_add(duration.as_nanos());
221 state.time_epoch = state.time_epoch.wrapping_add(1);
222 state.event_epoch = state.event_epoch.wrapping_add(1);
223 state.event_epoch
224 };
225 self.notify_waiters(event_epoch);
226 }
227
228 /// Resets the timeline to elapsed zero when no waiters are active.
229 ///
230 /// # Returns
231 /// `Ok(())` when reset succeeds.
232 ///
233 /// # Errors
234 /// Returns [`MockTimeError::ActiveWaiters`] when timeline waiters are active.
235 pub fn reset(&self) -> Result<(), MockTimeError> {
236 let event_epoch = {
237 let mut state = self.lock_state();
238 if state.sleep_waiters != 0 || state.deadline_waiters != 0 {
239 return Err(MockTimeError::ActiveWaiters);
240 }
241 state.elapsed_nanos = 0;
242 state.time_epoch = state.time_epoch.wrapping_add(1);
243 state.event_epoch = state.event_epoch.wrapping_add(1);
244 state.event_epoch
245 };
246 self.notify_waiters(event_epoch);
247 Ok(())
248 }
249
250 /// Wakes waiters without changing elapsed time.
251 ///
252 /// This is intended for synchronization primitives that combine state-change
253 /// notifications with timeout deadlines.
254 pub fn notify_external_change(&self) {
255 let event_epoch = {
256 let mut state = self.lock_state();
257 state.event_epoch = state.event_epoch.wrapping_add(1);
258 state.event_epoch
259 };
260 self.notify_waiters(event_epoch);
261 }
262
263 /// Blocks until the current mock instant reaches `deadline`.
264 ///
265 /// # Parameters
266 /// - `deadline`: Mock instant at which the wait should complete.
267 ///
268 /// # Returns
269 /// `Ok(())` when the wait completes.
270 ///
271 /// # Errors
272 /// Returns [`MockTimeError::MismatchedTimeline`] if `deadline` was created
273 /// by a different timeline.
274 #[inline]
275 pub fn wait_until(&self, deadline: MockInstant) -> Result<(), MockTimeError> {
276 self.wait_until_with_kind(deadline, MockWaiterKind::Deadline)
277 }
278
279 /// Blocks until `duration` has elapsed on the mock timeline.
280 ///
281 /// # Parameters
282 /// - `duration`: Relative mock duration to wait.
283 #[inline]
284 pub fn wait_for(&self, duration: Duration) {
285 self.wait_until(self.now().saturating_add(duration))
286 .expect("relative waits should create deadlines on the same timeline");
287 }
288
289 /// Blocks until the event epoch changes after `observed_epoch`.
290 ///
291 /// # Parameters
292 /// - `observed_epoch`: Event epoch already observed by the caller.
293 pub fn wait_for_event_after(&self, observed_epoch: u64) {
294 let mut state = self.lock_state();
295 while state.event_epoch == observed_epoch {
296 self.shared.event_changed.wait(&mut state);
297 }
298 }
299
300 /// Blocks until a registered waiter count is observed or real timeout expires.
301 ///
302 /// # Parameters
303 /// - `kind`: Waiter group to inspect.
304 /// - `count`: Minimum number of waiters expected.
305 /// - `real_timeout`: Real wall-clock limit used only to keep tests from
306 /// hanging forever.
307 ///
308 /// # Returns
309 /// `true` when enough waiters are observed before the real timeout.
310 pub fn wait_for_blocked_waiters(&self, kind: MockWaiterKind, count: usize, real_timeout: Duration) -> bool {
311 let Some(deadline) = Instant::now().checked_add(real_timeout) else {
312 return false;
313 };
314 let mut state = self.lock_state();
315 while Self::waiter_count(&state, kind) < count {
316 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
317 return false;
318 };
319 let wait_result = self.shared.waiters_changed.wait_for(&mut state, remaining);
320 if wait_result.timed_out() && Self::waiter_count(&state, kind) < count {
321 return false;
322 }
323 }
324 true
325 }
326
327 /// Blocks until a deadline with the specified waiter kind is reached.
328 ///
329 /// # Parameters
330 /// - `deadline`: Mock instant at which the wait should complete.
331 /// - `kind`: Waiter group used for test observability.
332 pub(crate) fn wait_until_with_kind(
333 &self,
334 deadline: MockInstant,
335 kind: MockWaiterKind,
336 ) -> Result<(), MockTimeError> {
337 self.ensure_own_instant(deadline)?;
338 let mut state = self.lock_state();
339 if state.elapsed_nanos >= deadline.nanos_since_origin() {
340 return Ok(());
341 }
342 Self::increment_waiter(&mut state, kind);
343 self.shared.waiters_changed.notify_all();
344 while state.elapsed_nanos < deadline.nanos_since_origin() {
345 self.shared.event_changed.wait(&mut state);
346 }
347 Self::decrement_waiter(&mut state, kind);
348 self.shared.waiters_changed.notify_all();
349 Ok(())
350 }
351
352 /// Returns a future that completes once the deadline is reached.
353 ///
354 /// # Parameters
355 /// - `deadline`: Mock instant at which the future should resolve.
356 /// - `kind`: Waiter group used for test observability.
357 ///
358 /// # Returns
359 /// A future resolving after the mock deadline is reached.
360 #[cfg(feature = "tokio")]
361 pub(crate) fn wait_until_async_with_kind<'a>(
362 &'a self,
363 deadline: MockInstant,
364 kind: MockWaiterKind,
365 ) -> Result<AsyncSleepFuture<'a>, MockTimeError> {
366 self.ensure_own_instant(deadline)?;
367 if self.elapsed_nanos() >= deadline.nanos_since_origin() {
368 return Ok(Box::pin(async {}));
369 }
370 let registration = MockTimelineWaiterRegistration::new(self.clone(), kind);
371 let mut event_receiver = self.async_event_sender.subscribe();
372 Ok(Box::pin(async move {
373 let _registration = registration;
374 loop {
375 if self.elapsed_nanos() >= deadline.nanos_since_origin() {
376 return;
377 }
378 event_receiver
379 .changed()
380 .await
381 .expect("mock timeline sender should live while timeline is borrowed");
382 }
383 }))
384 }
385
386 /// Ensures an instant belongs to this timeline.
387 ///
388 /// # Parameters
389 /// - `instant`: Instant to validate.
390 ///
391 /// # Returns
392 /// `Ok(())` when the instant belongs to this timeline.
393 ///
394 /// # Errors
395 /// Returns [`MockTimeError::MismatchedTimeline`] when the instant belongs
396 /// to a different timeline.
397 fn ensure_own_instant(&self, instant: MockInstant) -> Result<(), MockTimeError> {
398 if instant.timeline_id() == self.id {
399 Ok(())
400 } else {
401 Err(MockTimeError::MismatchedTimeline {
402 expected: self.id,
403 actual: instant.timeline_id(),
404 })
405 }
406 }
407
408 /// Locks timeline state.
409 ///
410 /// # Returns
411 /// A guard for timeline state.
412 #[inline]
413 fn lock_state(&self) -> MutexGuard<'_, MockTimelineState> {
414 self.shared.state.lock()
415 }
416
417 /// Wakes blocking and async waiters after an event-epoch change.
418 ///
419 /// # Parameters
420 /// - `event_epoch`: New event epoch to publish to async waiters.
421 fn notify_waiters(&self, event_epoch: u64) {
422 self.shared.event_changed.notify_all();
423 self.shared.waiters_changed.notify_all();
424 self.notify_async_waiters(event_epoch);
425 }
426
427 /// Publishes an event change to async waiters.
428 ///
429 /// # Parameters
430 /// - `event_epoch`: New event epoch.
431 #[cfg(feature = "tokio")]
432 #[inline]
433 fn notify_async_waiters(&self, event_epoch: u64) {
434 let _ = self.async_event_sender.send(event_epoch);
435 }
436
437 /// No-op when async support is disabled.
438 ///
439 /// # Parameters
440 /// - `_event_epoch`: New event epoch.
441 #[cfg(not(feature = "tokio"))]
442 #[inline]
443 fn notify_async_waiters(&self, _event_epoch: u64) {}
444
445 /// Increments a waiter count.
446 ///
447 /// # Parameters
448 /// - `state`: Timeline state to mutate.
449 /// - `kind`: Waiter group to increment.
450 fn increment_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
451 match kind {
452 MockWaiterKind::Sleep => {
453 state.sleep_waiters = state.sleep_waiters.saturating_add(1);
454 }
455 MockWaiterKind::Deadline => {
456 state.deadline_waiters = state.deadline_waiters.saturating_add(1);
457 }
458 }
459 }
460
461 /// Decrements a waiter count.
462 ///
463 /// # Parameters
464 /// - `state`: Timeline state to mutate.
465 /// - `kind`: Waiter group to decrement.
466 fn decrement_waiter(state: &mut MockTimelineState, kind: MockWaiterKind) {
467 match kind {
468 MockWaiterKind::Sleep => {
469 state.sleep_waiters = state.sleep_waiters.saturating_sub(1);
470 }
471 MockWaiterKind::Deadline => {
472 state.deadline_waiters = state.deadline_waiters.saturating_sub(1);
473 }
474 }
475 }
476
477 /// Returns the waiter count for a group.
478 ///
479 /// # Parameters
480 /// - `state`: Timeline state to inspect.
481 /// - `kind`: Waiter group to read.
482 ///
483 /// # Returns
484 /// Number of registered waiters in the group.
485 #[inline]
486 fn waiter_count(state: &MockTimelineState, kind: MockWaiterKind) -> usize {
487 match kind {
488 MockWaiterKind::Sleep => state.sleep_waiters,
489 MockWaiterKind::Deadline => state.deadline_waiters,
490 }
491 }
492}
493
494impl Default for MockTimeline {
495 /// Creates a zero-elapsed mock timeline.
496 #[inline]
497 fn default() -> Self {
498 Self::new()
499 }
500}
501
502/// Converts nanoseconds to [`Duration`], saturating at `Duration::MAX`.
503///
504/// # Parameters
505/// - `nanos`: Nanoseconds to convert.
506///
507/// # Returns
508/// A standard duration.
509fn duration_from_nanos_saturating(nanos: u128) -> Duration {
510 let secs = nanos / 1_000_000_000;
511 let sub_nanos = (nanos % 1_000_000_000) as u32;
512 let secs = match u64::try_from(secs) {
513 Ok(secs) => secs,
514 Err(_) => return Duration::MAX,
515 };
516 Duration::new(secs, sub_nanos)
517}
518
519/// Allocates a new globally unique mock timeline id.
520///
521/// # Returns
522/// A non-zero timeline id.
523///
524/// # Panics
525/// Panics if all `u64` timeline ids have been exhausted.
526fn next_mock_timeline_id() -> u64 {
527 loop {
528 let current = NEXT_MOCK_TIMELINE_ID.load(Ordering::Relaxed);
529 assert_ne!(current, u64::MAX, "mock timeline id space exhausted");
530 if NEXT_MOCK_TIMELINE_ID
531 .compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
532 .is_ok()
533 {
534 return current;
535 }
536 }
537}