Skip to main content

qubit_lock/monitor/
mock_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Mock monitor with manually controlled timeout time.
11
12use std::sync::{
13    Condvar,
14    Mutex,
15    MutexGuard,
16};
17use std::time::Duration;
18
19#[cfg(feature = "async")]
20use tokio::sync::{
21    Notify,
22    watch,
23};
24
25#[cfg(feature = "async")]
26use super::{
27    AsyncConditionWaiter,
28    AsyncMonitorFuture,
29    AsyncNotificationWaiter,
30    AsyncTimeoutConditionWaiter,
31    AsyncTimeoutNotificationWaiter,
32};
33use super::{
34    ConditionWaiter,
35    NotificationWaiter,
36    Notifier,
37    TimeoutConditionWaiter,
38    TimeoutNotificationWaiter,
39    WaitTimeoutResult,
40    WaitTimeoutStatus,
41};
42
43/// Monitor implementation for deterministic tests.
44///
45/// `MockMonitor` protects a state value like the real monitor implementations,
46/// but timeout methods use manually controlled mock elapsed time. Advancing the
47/// mock time wakes waiters so they can recheck predicates and timeout budgets.
48pub struct MockMonitor<T> {
49    /// Protected mock state and clock state.
50    state: Mutex<MockMonitorState<T>>,
51    /// Condition variable used by blocking waiters.
52    changed: Condvar,
53    /// Tokio notification primitive used by async notification waiters.
54    #[cfg(feature = "async")]
55    async_notification: Notify,
56    /// Broadcasts mock state or mock time changes to async timeout waiters.
57    #[cfg(feature = "async")]
58    async_change_sender: watch::Sender<u64>,
59}
60
61/// State protected by [`MockMonitor`].
62struct MockMonitorState<T> {
63    /// User-visible protected value.
64    value: T,
65    /// Manually controlled elapsed time.
66    elapsed: Duration,
67    /// Epoch incremented only by notification calls.
68    notification_epoch: u64,
69    /// Epoch incremented by notifications and mock time changes.
70    change_epoch: u64,
71}
72
73impl<T> MockMonitor<T> {
74    /// Creates a mock monitor protecting the supplied state value.
75    ///
76    /// # Arguments
77    ///
78    /// * `state` - Initial protected state.
79    ///
80    /// # Returns
81    ///
82    /// A mock monitor whose elapsed time starts at zero.
83    pub fn new(state: T) -> Self {
84        #[cfg(feature = "async")]
85        let (async_change_sender, _) = watch::channel(0);
86        Self {
87            state: Mutex::new(MockMonitorState {
88                value: state,
89                elapsed: Duration::ZERO,
90                notification_epoch: 0,
91                change_epoch: 0,
92            }),
93            changed: Condvar::new(),
94            #[cfg(feature = "async")]
95            async_notification: Notify::new(),
96            #[cfg(feature = "async")]
97            async_change_sender,
98        }
99    }
100
101    /// Returns the current mock elapsed time.
102    ///
103    /// # Returns
104    ///
105    /// The elapsed time used by timeout waits.
106    pub fn elapsed(&self) -> Duration {
107        self.lock_state().elapsed
108    }
109
110    /// Sets the current mock elapsed time.
111    ///
112    /// This wakes timeout waiters so they can recheck their budgets.
113    ///
114    /// # Arguments
115    ///
116    /// * `elapsed` - New mock elapsed time.
117    pub fn set_elapsed(&self, elapsed: Duration) {
118        let change_epoch = {
119            let mut state = self.lock_state();
120            state.elapsed = elapsed;
121            Self::advance_change_epoch(&mut state)
122        };
123        self.changed.notify_all();
124        self.notify_async_change(change_epoch);
125    }
126
127    /// Advances mock elapsed time by a relative duration.
128    ///
129    /// # Arguments
130    ///
131    /// * `duration` - Duration added to the current mock elapsed time.
132    pub fn advance(&self, duration: Duration) {
133        let change_epoch = {
134            let mut state = self.lock_state();
135            state.elapsed = state.elapsed.saturating_add(duration);
136            Self::advance_change_epoch(&mut state)
137        };
138        self.changed.notify_all();
139        self.notify_async_change(change_epoch);
140    }
141
142    /// Resets mock elapsed time to zero.
143    pub fn reset_elapsed(&self) {
144        self.set_elapsed(Duration::ZERO);
145    }
146
147    /// Acquires the monitor and reads the protected state.
148    ///
149    /// # Arguments
150    ///
151    /// * `f` - Closure that receives an immutable reference to the state.
152    ///
153    /// # Returns
154    ///
155    /// The value returned by the closure.
156    pub fn read<R, F>(&self, f: F) -> R
157    where
158        F: FnOnce(&T) -> R,
159    {
160        let state = self.lock_state();
161        f(&state.value)
162    }
163
164    /// Acquires the monitor and mutates the protected state.
165    ///
166    /// This does not notify waiters automatically.
167    ///
168    /// # Arguments
169    ///
170    /// * `f` - Closure that receives a mutable reference to the state.
171    ///
172    /// # Returns
173    ///
174    /// The value returned by the closure.
175    pub fn write<R, F>(&self, f: F) -> R
176    where
177        F: FnOnce(&mut T) -> R,
178    {
179        let mut state = self.lock_state();
180        f(&mut state.value)
181    }
182
183    /// Mutates the protected state and wakes one waiter.
184    ///
185    /// # Arguments
186    ///
187    /// * `f` - Closure that receives a mutable reference to the state.
188    ///
189    /// # Returns
190    ///
191    /// The value returned by the closure.
192    pub fn write_notify_one<R, F>(&self, f: F) -> R
193    where
194        F: FnOnce(&mut T) -> R,
195    {
196        let result = self.write(f);
197        self.notify_one();
198        result
199    }
200
201    /// Mutates the protected state and wakes all waiters.
202    ///
203    /// # Arguments
204    ///
205    /// * `f` - Closure that receives a mutable reference to the state.
206    ///
207    /// # Returns
208    ///
209    /// The value returned by the closure.
210    pub fn write_notify_all<R, F>(&self, f: F) -> R
211    where
212        F: FnOnce(&mut T) -> R,
213    {
214        let result = self.write(f);
215        self.notify_all();
216        result
217    }
218
219    /// Wakes one waiter if one is blocked.
220    pub fn notify_one(&self) {
221        let change_epoch = self.advance_notification_epoch();
222        self.changed.notify_one();
223        #[cfg(feature = "async")]
224        self.async_notification.notify_one();
225        self.notify_async_change(change_epoch);
226    }
227
228    /// Wakes all waiters.
229    pub fn notify_all(&self) {
230        let change_epoch = self.advance_notification_epoch();
231        self.changed.notify_all();
232        #[cfg(feature = "async")]
233        self.async_notification.notify_waiters();
234        self.notify_async_change(change_epoch);
235    }
236
237    /// Locks the internal state and recovers from poisoning.
238    ///
239    /// # Returns
240    ///
241    /// A guard for the internal mock monitor state.
242    fn lock_state(&self) -> MutexGuard<'_, MockMonitorState<T>> {
243        self.state
244            .lock()
245            .unwrap_or_else(std::sync::PoisonError::into_inner)
246    }
247
248    /// Increments the change epoch.
249    ///
250    /// # Arguments
251    ///
252    /// * `state` - Internal state whose change epoch should advance.
253    ///
254    /// # Returns
255    ///
256    /// The new change epoch.
257    fn advance_change_epoch(state: &mut MockMonitorState<T>) -> u64 {
258        state.change_epoch = state.change_epoch.wrapping_add(1);
259        state.change_epoch
260    }
261
262    /// Increments the notification and change epochs.
263    ///
264    /// # Returns
265    ///
266    /// The new change epoch.
267    fn advance_notification_epoch(&self) -> u64 {
268        let mut state = self.lock_state();
269        state.notification_epoch = state.notification_epoch.wrapping_add(1);
270        Self::advance_change_epoch(&mut state)
271    }
272
273    /// Notifies asynchronous timeout waiters about a state or time change.
274    ///
275    /// # Arguments
276    ///
277    /// * `change_epoch` - New change epoch.
278    #[cfg(feature = "async")]
279    fn notify_async_change(&self, change_epoch: u64) {
280        let _ = self.async_change_sender.send(change_epoch);
281    }
282
283    /// No-op when async support is disabled.
284    #[cfg(not(feature = "async"))]
285    fn notify_async_change(&self, _change_epoch: u64) {}
286}
287
288impl<T> Notifier for MockMonitor<T> {
289    /// Wakes one waiter if one is blocked.
290    fn notify_one(&self) {
291        Self::notify_one(self);
292    }
293
294    /// Wakes all waiters.
295    fn notify_all(&self) {
296        Self::notify_all(self);
297    }
298}
299
300impl<T> NotificationWaiter for MockMonitor<T> {
301    /// Blocks until a notification happens after this call starts.
302    fn wait(&self) {
303        let mut state = self.lock_state();
304        let observed_epoch = state.notification_epoch;
305        while state.notification_epoch == observed_epoch {
306            state = self
307                .changed
308                .wait(state)
309                .unwrap_or_else(std::sync::PoisonError::into_inner);
310        }
311    }
312}
313
314impl<T> TimeoutNotificationWaiter for MockMonitor<T> {
315    /// Blocks until a notification happens or mock elapsed time reaches timeout.
316    fn wait_for(&self, timeout: Duration) -> WaitTimeoutStatus {
317        let mut state = self.lock_state();
318        let observed_epoch = state.notification_epoch;
319        let target_elapsed = state.elapsed.saturating_add(timeout);
320        loop {
321            if state.notification_epoch != observed_epoch {
322                return WaitTimeoutStatus::Woken;
323            }
324            if state.elapsed >= target_elapsed {
325                return WaitTimeoutStatus::TimedOut;
326            }
327            state = self
328                .changed
329                .wait(state)
330                .unwrap_or_else(std::sync::PoisonError::into_inner);
331        }
332    }
333}
334
335impl<T> ConditionWaiter for MockMonitor<T> {
336    type State = T;
337
338    /// Blocks while the predicate remains true, then runs the action.
339    fn wait_while<R, P, F>(&self, mut predicate: P, action: F) -> R
340    where
341        P: FnMut(&Self::State) -> bool,
342        F: FnOnce(&mut Self::State) -> R,
343    {
344        let mut state = self.lock_state();
345        while predicate(&state.value) {
346            state = self
347                .changed
348                .wait(state)
349                .unwrap_or_else(std::sync::PoisonError::into_inner);
350        }
351        action(&mut state.value)
352    }
353}
354
355impl<T> TimeoutConditionWaiter for MockMonitor<T> {
356    /// Blocks while the predicate remains true or until mock elapsed time reaches timeout.
357    fn wait_while_for<R, P, F>(
358        &self,
359        timeout: Duration,
360        mut predicate: P,
361        action: F,
362    ) -> WaitTimeoutResult<R>
363    where
364        P: FnMut(&Self::State) -> bool,
365        F: FnOnce(&mut Self::State) -> R,
366    {
367        let mut state = self.lock_state();
368        let target_elapsed = state.elapsed.saturating_add(timeout);
369        loop {
370            if !predicate(&state.value) {
371                return WaitTimeoutResult::Ready(action(&mut state.value));
372            }
373            if state.elapsed >= target_elapsed {
374                return WaitTimeoutResult::TimedOut;
375            }
376            state = self
377                .changed
378                .wait(state)
379                .unwrap_or_else(std::sync::PoisonError::into_inner);
380        }
381    }
382}
383
384#[cfg(feature = "async")]
385impl<T: Send> AsyncNotificationWaiter for MockMonitor<T> {
386    /// Returns a future that resolves after an async notification.
387    fn wait_async<'a>(&'a self) -> AsyncMonitorFuture<'a, ()> {
388        let notified = self.async_notification.notified();
389        Box::pin(notified)
390    }
391}
392
393#[cfg(feature = "async")]
394impl<T: Send> AsyncTimeoutNotificationWaiter for MockMonitor<T> {
395    /// Returns a future that resolves after notification or mock timeout.
396    fn wait_for_async<'a>(
397        &'a self,
398        timeout: Duration,
399    ) -> AsyncMonitorFuture<'a, WaitTimeoutStatus> {
400        let mut change_receiver = self.async_change_sender.subscribe();
401        let (observed_epoch, target_elapsed) = {
402            let state = self.lock_state();
403            (
404                state.notification_epoch,
405                state.elapsed.saturating_add(timeout),
406            )
407        };
408        Box::pin(async move {
409            loop {
410                {
411                    let state = self.lock_state();
412                    if state.notification_epoch != observed_epoch {
413                        return WaitTimeoutStatus::Woken;
414                    }
415                    if state.elapsed >= target_elapsed {
416                        return WaitTimeoutStatus::TimedOut;
417                    }
418                }
419                change_receiver
420                    .changed()
421                    .await
422                    .expect("mock monitor sender should live while the monitor is borrowed");
423            }
424        })
425    }
426}
427
428#[cfg(feature = "async")]
429impl<T: Send> AsyncConditionWaiter for MockMonitor<T> {
430    type State = T;
431
432    /// Returns a future that waits while the predicate remains true.
433    fn wait_while_async<'a, R, P, F>(
434        &'a self,
435        mut predicate: P,
436        action: F,
437    ) -> AsyncMonitorFuture<'a, R>
438    where
439        R: Send + 'a,
440        P: FnMut(&Self::State) -> bool + Send + 'a,
441        F: FnOnce(&mut Self::State) -> R + Send + 'a,
442    {
443        Box::pin(async move {
444            loop {
445                let notified = {
446                    let mut state = self.lock_state();
447                    if !predicate(&state.value) {
448                        return action(&mut state.value);
449                    }
450                    self.async_notification.notified()
451                };
452                notified.await;
453            }
454        })
455    }
456}
457
458#[cfg(feature = "async")]
459impl<T: Send> AsyncTimeoutConditionWaiter for MockMonitor<T> {
460    /// Returns a future that waits while the predicate remains true or times out.
461    fn wait_while_for_async<'a, R, P, F>(
462        &'a self,
463        timeout: Duration,
464        mut predicate: P,
465        action: F,
466    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
467    where
468        R: Send + 'a,
469        P: FnMut(&Self::State) -> bool + Send + 'a,
470        F: FnOnce(&mut Self::State) -> R + Send + 'a,
471    {
472        let target_elapsed = self.elapsed().saturating_add(timeout);
473        let mut change_receiver = self.async_change_sender.subscribe();
474        Box::pin(async move {
475            loop {
476                {
477                    let mut state = self.lock_state();
478                    if !predicate(&state.value) {
479                        return WaitTimeoutResult::Ready(action(&mut state.value));
480                    }
481                    if state.elapsed >= target_elapsed {
482                        return WaitTimeoutResult::TimedOut;
483                    }
484                }
485                change_receiver
486                    .changed()
487                    .await
488                    .expect("mock monitor sender should live while the monitor is borrowed");
489            }
490        })
491    }
492}
493
494impl<T> From<T> for MockMonitor<T> {
495    /// Creates a mock monitor from an initial state value.
496    fn from(value: T) -> Self {
497        Self::new(value)
498    }
499}
500
501impl<T: Default> Default for MockMonitor<T> {
502    /// Creates a mock monitor containing `T::default()`.
503    fn default() -> Self {
504        Self::new(T::default())
505    }
506}