Skip to main content

qubit_lock/monitor/
monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! # Monitor
10//!
11//! Provides a synchronous monitor built from a mutex and a condition variable.
12//! A monitor protects one shared state value and binds that state to the
13//! condition variable used to wait for changes. This is the same low-level
14//! mechanism as using [`std::sync::Mutex`] and [`std::sync::Condvar`] directly,
15//! but packaged so callers do not have to keep a mutex and its matching
16//! condition variable as separate fields.
17//!
18//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
19//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
20//! short critical sections and simple guarded-suspension flows. The lower-level
21//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
22//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
23//! state machines such as thread pools.
24//!
25//! # Author
26//!
27//! Haixing Hu
28
29use std::{
30    sync::{
31        Condvar,
32        Mutex,
33    },
34    time::{
35        Duration,
36        Instant,
37    },
38};
39
40use super::monitor_guard::MonitorGuard;
41
42/// Result of a timed wait operation.
43///
44/// This status is returned by [`MonitorGuard::wait_timeout`] and
45/// [`Monitor::wait_notify`]. It describes why a timed wait
46/// returned, but callers must still re-check the protected state because
47/// condition variables may wake spuriously.
48///
49/// # Example
50///
51/// ```rust
52/// use std::time::Duration;
53///
54/// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
55///
56/// let monitor = Monitor::new(false);
57/// let guard = monitor.lock();
58/// let (_guard, status) = guard.wait_timeout(Duration::from_millis(1));
59/// assert_eq!(status, WaitTimeoutStatus::TimedOut);
60/// ```
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum WaitTimeoutStatus {
63    /// The wait returned before the timeout elapsed.
64    ///
65    /// This usually means another thread called [`Monitor::notify_one`] or
66    /// [`Monitor::notify_all`], but it may also be a spurious wakeup. Always
67    /// re-check the guarded state before acting on this status.
68    Woken,
69    /// The wait reached the timeout boundary.
70    ///
71    /// Even after this status, callers should inspect the protected state
72    /// because another thread may have changed it while the waiting thread was
73    /// reacquiring the mutex.
74    TimedOut,
75}
76
77/// Result of waiting for a predicate with an overall timeout.
78///
79/// This type is returned by [`Monitor::wait_timeout_while`] and
80/// [`Monitor::wait_timeout_until`]. It is more explicit than `Option<R>`: a
81/// ready predicate produces [`Self::Ready`], while an expired timeout produces
82/// [`Self::TimedOut`].
83///
84/// # Type Parameters
85///
86/// * `R` - The value produced after the protected state satisfies the
87///   predicate.
88///
89/// # Example
90///
91/// ```rust
92/// use std::time::Duration;
93///
94/// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
95///
96/// let monitor = Monitor::new(true);
97/// let result = monitor.wait_timeout_until(
98///     Duration::from_secs(1),
99///     |ready| *ready,
100///     |ready| {
101///         *ready = false;
102///         "ready"
103///     },
104/// );
105///
106/// assert_eq!(result, WaitTimeoutResult::Ready("ready"));
107/// ```
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum WaitTimeoutResult<R> {
110    /// The predicate became ready before the timeout and produced this value.
111    Ready(R),
112    /// The timeout elapsed before the predicate became ready.
113    TimedOut,
114}
115
116/// Shared state protected by a mutex and a condition variable.
117///
118/// `Monitor` is useful when callers need more than a short critical section.
119/// It models the classic monitor object pattern: one mutex protects the state,
120/// and one condition variable lets threads wait until that state changes. This
121/// is the same relationship used by `std::sync::Mutex` and
122/// `std::sync::Condvar`, but represented as one object so the condition
123/// variable is not accidentally used with unrelated state.
124///
125/// `Monitor` deliberately has two levels of API:
126///
127/// * `read` and `write` acquire the mutex, run a closure, and release it.
128/// * `wait_while`, `wait_until`, and their timeout variants implement common
129///   predicate-based waits.
130/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
131///   loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
132///
133/// A poisoned mutex is recovered by taking the inner state. This makes
134/// `Monitor` suitable for coordination state that should remain observable
135/// after another thread panics while holding the lock.
136///
137/// # Difference from `Mutex` and `Condvar`
138///
139/// With the standard library primitives, callers usually store two fields and
140/// manually keep them paired:
141///
142/// ```rust
143/// # use std::sync::{Condvar, Mutex};
144/// # struct State;
145/// struct Shared {
146///     state: Mutex<State>,
147///     changed: Condvar,
148/// }
149/// ```
150///
151/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
152/// wrapper around the standard library's `MutexGuard`; it keeps the protected
153/// state locked and knows which monitor it belongs to, so its wait methods use
154/// the matching condition variable.
155///
156/// # Type Parameters
157///
158/// * `T` - The state protected by this monitor.
159///
160/// # Example
161///
162/// ```rust
163/// use std::thread;
164///
165/// use qubit_lock::lock::ArcMonitor;
166///
167/// let monitor = ArcMonitor::new(false);
168/// let waiter_monitor = monitor.clone();
169///
170/// let waiter = thread::spawn(move || {
171///     waiter_monitor.wait_until(
172///         |ready| *ready,
173///         |ready| {
174///             *ready = false;
175///         },
176///     );
177/// });
178///
179/// monitor.write(|ready| {
180///     *ready = true;
181/// });
182/// monitor.notify_all();
183///
184/// waiter.join().expect("waiter should finish");
185/// assert!(!monitor.read(|ready| *ready));
186/// ```
187///
188/// # Author
189///
190/// Haixing Hu
191pub struct Monitor<T> {
192    /// Mutex protecting the monitor state.
193    state: Mutex<T>,
194    /// Condition variable used to wake predicate waiters after state changes.
195    pub(super) changed: Condvar,
196}
197
198impl<T> Monitor<T> {
199    /// Creates a monitor protecting the supplied state value.
200    ///
201    /// # Arguments
202    ///
203    /// * `state` - Initial state protected by the monitor.
204    ///
205    /// # Returns
206    ///
207    /// A monitor initialized with the supplied state.
208    ///
209    /// # Example
210    ///
211    /// ```rust
212    /// use qubit_lock::lock::Monitor;
213    ///
214    /// let monitor = Monitor::new(0_u32);
215    /// assert_eq!(monitor.read(|n| *n), 0);
216    /// ```
217    #[inline]
218    pub fn new(state: T) -> Self {
219        Self {
220            state: Mutex::new(state),
221            changed: Condvar::new(),
222        }
223    }
224
225    /// Acquires the monitor and returns a guard for explicit state-machine code.
226    ///
227    /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
228    /// guard is dropped. It can also be passed through
229    /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
230    /// release the lock while waiting on this monitor's condition variable.
231    ///
232    /// If the mutex is poisoned, this method recovers the inner state and still
233    /// returns a guard.
234    ///
235    /// # Returns
236    ///
237    /// A guard that provides read and write access to the protected state.
238    ///
239    /// # Example
240    ///
241    /// ```rust
242    /// use qubit_lock::lock::Monitor;
243    ///
244    /// let monitor = Monitor::new(1);
245    /// {
246    ///     let mut value = monitor.lock();
247    ///     *value += 1;
248    /// }
249    ///
250    /// assert_eq!(monitor.read(|value| *value), 2);
251    /// ```
252    #[inline]
253    pub fn lock(&self) -> MonitorGuard<'_, T> {
254        MonitorGuard::new(
255            self,
256            self.state
257                .lock()
258                .unwrap_or_else(std::sync::PoisonError::into_inner),
259        )
260    }
261
262    /// Acquires the monitor and reads the protected state.
263    ///
264    /// The closure runs while the mutex is held. Keep the closure short and do
265    /// not call code that may block for a long time.
266    ///
267    /// If the mutex is poisoned, this method recovers the inner state and still
268    /// executes the closure.
269    ///
270    /// # Arguments
271    ///
272    /// * `f` - Closure that receives an immutable reference to the state.
273    ///
274    /// # Returns
275    ///
276    /// The value returned by the closure.
277    ///
278    /// # Example
279    ///
280    /// ```rust
281    /// use qubit_lock::lock::Monitor;
282    ///
283    /// let monitor = Monitor::new(10_i32);
284    /// let n = monitor.read(|x| *x);
285    /// assert_eq!(n, 10);
286    /// ```
287    #[inline]
288    pub fn read<R, F>(&self, f: F) -> R
289    where
290        F: FnOnce(&T) -> R,
291    {
292        let guard = self.lock();
293        f(&*guard)
294    }
295
296    /// Acquires the monitor and mutates the protected state.
297    ///
298    /// The closure runs while the mutex is held. This method only changes the
299    /// state; callers should explicitly call [`Self::notify_one`] or
300    /// [`Self::notify_all`] after changing a condition that waiters may be
301    /// observing.
302    ///
303    /// If the mutex is poisoned, this method recovers the inner state and still
304    /// executes the closure.
305    ///
306    /// # Arguments
307    ///
308    /// * `f` - Closure that receives a mutable reference to the state.
309    ///
310    /// # Returns
311    ///
312    /// The value returned by the closure.
313    ///
314    /// # Example
315    ///
316    /// ```rust
317    /// use qubit_lock::lock::Monitor;
318    ///
319    /// let monitor = Monitor::new(String::new());
320    /// let len = monitor.write(|s| {
321    ///     s.push_str("hi");
322    ///     s.len()
323    /// });
324    /// assert_eq!(len, 2);
325    /// ```
326    #[inline]
327    pub fn write<R, F>(&self, f: F) -> R
328    where
329        F: FnOnce(&mut T) -> R,
330    {
331        let mut guard = self.lock();
332        f(&mut *guard)
333    }
334
335    /// Waits for a notification or timeout without checking state.
336    ///
337    /// This convenience method locks the monitor, waits once on the condition
338    /// variable, and returns why the timed wait completed. It is useful only
339    /// when the caller genuinely needs a notification wait without inspecting
340    /// state before or after the wait. Most coordination code should prefer
341    /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
342    /// [`MonitorGuard::wait_timeout`] loop.
343    ///
344    /// Condition variables may wake spuriously, so
345    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
346    /// state.
347    ///
348    /// If the mutex is poisoned, this method recovers the inner state and
349    /// continues waiting.
350    ///
351    /// # Arguments
352    ///
353    /// * `timeout` - Maximum duration to wait for a notification.
354    ///
355    /// # Returns
356    ///
357    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
358    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
359    ///
360    /// # Example
361    ///
362    /// ```rust
363    /// use std::time::Duration;
364    ///
365    /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
366    ///
367    /// let monitor = Monitor::new(false);
368    /// let status = monitor.wait_notify(Duration::from_millis(1));
369    ///
370    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
371    /// ```
372    #[inline]
373    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
374        let guard = self.lock();
375        let (_guard, status) = guard.wait_timeout(timeout);
376        status
377    }
378
379    /// Waits while a predicate remains true, then mutates the protected state.
380    ///
381    /// This is the monitor equivalent of the common `while condition { wait }`
382    /// guarded-suspension pattern. The predicate is evaluated while holding the
383    /// mutex. If it returns `true`, the current thread waits on the condition
384    /// variable and atomically releases the mutex. After a notification or
385    /// spurious wakeup, the mutex is reacquired and the predicate is evaluated
386    /// again. When the predicate returns `false`, `f` runs while the mutex is
387    /// still held.
388    ///
389    /// This method may block indefinitely if no thread changes the state so
390    /// that `waiting` becomes false and sends a notification.
391    ///
392    /// If the mutex is poisoned before or during the wait, this method recovers
393    /// the inner state and continues waiting or executes the closure.
394    ///
395    /// # Arguments
396    ///
397    /// * `waiting` - Predicate that returns `true` while the caller should
398    ///   keep waiting.
399    /// * `f` - Closure that receives mutable access after waiting is no longer
400    ///   required.
401    ///
402    /// # Returns
403    ///
404    /// The value returned by `f`.
405    ///
406    /// # Example
407    ///
408    /// ```rust
409    /// use std::{
410    ///     sync::Arc,
411    ///     thread,
412    /// };
413    ///
414    /// use qubit_lock::lock::Monitor;
415    ///
416    /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
417    /// let worker_monitor = Arc::clone(&monitor);
418    ///
419    /// let worker = thread::spawn(move || {
420    ///     worker_monitor.wait_while(
421    ///         |items| items.is_empty(),
422    ///         |items| items.pop().expect("item should be ready"),
423    ///     )
424    /// });
425    ///
426    /// monitor.write(|items| items.push(7));
427    /// monitor.notify_one();
428    ///
429    /// assert_eq!(worker.join().expect("worker should finish"), 7);
430    /// ```
431    #[inline]
432    pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
433    where
434        P: FnMut(&T) -> bool,
435        F: FnOnce(&mut T) -> R,
436    {
437        let mut guard = self.lock();
438        while waiting(&*guard) {
439            guard = guard.wait();
440        }
441        f(&mut *guard)
442    }
443
444    /// Waits until the protected state satisfies a predicate, then mutates it.
445    ///
446    /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
447    /// predicate is evaluated while holding the mutex. If it returns `false`,
448    /// the current thread waits on the condition variable and atomically
449    /// releases the mutex. After a notification or spurious wakeup, the mutex
450    /// is reacquired and the predicate is evaluated again. When the predicate
451    /// returns `true`, `f` runs while the mutex is still held.
452    ///
453    /// This method may block indefinitely if no thread changes the state to
454    /// satisfy the predicate and sends a notification.
455    ///
456    /// If the mutex is poisoned before or during the wait, this method recovers
457    /// the inner state and continues waiting or executes the closure.
458    ///
459    /// # Arguments
460    ///
461    /// * `ready` - Predicate that returns `true` when the state is ready.
462    /// * `f` - Closure that receives mutable access to the ready state.
463    ///
464    /// # Returns
465    ///
466    /// The value returned by `f` after the predicate has become true.
467    ///
468    /// # Example
469    ///
470    /// ```rust
471    /// use std::{
472    ///     sync::Arc,
473    ///     thread,
474    /// };
475    ///
476    /// use qubit_lock::lock::Monitor;
477    ///
478    /// let monitor = Arc::new(Monitor::new(false));
479    /// let waiter_monitor = Arc::clone(&monitor);
480    ///
481    /// let waiter = thread::spawn(move || {
482    ///     waiter_monitor.wait_until(
483    ///         |ready| *ready,
484    ///         |ready| {
485    ///             *ready = false;
486    ///             "done"
487    ///         },
488    ///     )
489    /// });
490    ///
491    /// monitor.write(|ready| *ready = true);
492    /// monitor.notify_one();
493    ///
494    /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
495    /// ```
496    #[inline]
497    pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
498    where
499        P: FnMut(&T) -> bool,
500        F: FnOnce(&mut T) -> R,
501    {
502        self.wait_while(|state| !ready(state), f)
503    }
504
505    /// Waits while a predicate remains true, with an overall time limit.
506    ///
507    /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
508    /// rechecking `waiting` under the monitor lock and waits only for the
509    /// remaining portion of `timeout`. If `waiting` becomes false before the
510    /// timeout expires, `f` runs while the lock is still held. If the timeout
511    /// expires first, the closure is not called.
512    ///
513    /// Condition variables may wake spuriously, and timeout status alone is not
514    /// used as proof that the predicate is still true; the predicate is always
515    /// rechecked under the lock.
516    ///
517    /// If the mutex is poisoned before or during the wait, this method recovers
518    /// the inner state and continues waiting or executes the closure.
519    ///
520    /// # Arguments
521    ///
522    /// * `timeout` - Maximum total duration to wait.
523    /// * `waiting` - Predicate that returns `true` while the caller should
524    ///   continue waiting.
525    /// * `f` - Closure that receives mutable access when waiting is no longer
526    ///   required.
527    ///
528    /// # Returns
529    ///
530    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
531    /// predicate stops blocking before the timeout. Returns
532    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
533    ///
534    /// # Example
535    ///
536    /// ```rust
537    /// use std::time::Duration;
538    ///
539    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
540    ///
541    /// let monitor = Monitor::new(Vec::<i32>::new());
542    /// let result = monitor.wait_timeout_while(
543    ///     Duration::from_millis(1),
544    ///     |items| items.is_empty(),
545    ///     |items| items.pop(),
546    /// );
547    ///
548    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
549    /// ```
550    #[inline]
551    pub fn wait_timeout_while<R, P, F>(
552        &self,
553        timeout: Duration,
554        mut waiting: P,
555        f: F,
556    ) -> WaitTimeoutResult<R>
557    where
558        P: FnMut(&T) -> bool,
559        F: FnOnce(&mut T) -> R,
560    {
561        let mut guard = self.lock();
562        let start = Instant::now();
563        loop {
564            if !waiting(&*guard) {
565                return WaitTimeoutResult::Ready(f(&mut *guard));
566            }
567
568            let elapsed = start.elapsed();
569            let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
570            if remaining.is_zero() {
571                return WaitTimeoutResult::TimedOut;
572            }
573
574            let (next_guard, _status) = guard.wait_timeout(remaining);
575            guard = next_guard;
576        }
577    }
578
579    /// Waits until a predicate becomes true, with an overall time limit.
580    ///
581    /// This is the positive-predicate counterpart of
582    /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
583    /// expires, `f` runs while the monitor lock is still held. If the timeout
584    /// expires first, the closure is not called.
585    ///
586    /// Condition variables may wake spuriously, and timeout status alone is not
587    /// used as proof that the predicate is still false; the predicate is always
588    /// rechecked under the lock.
589    ///
590    /// If the mutex is poisoned before or during the wait, this method recovers
591    /// the inner state and continues waiting or executes the closure.
592    ///
593    /// # Arguments
594    ///
595    /// * `timeout` - Maximum total duration to wait.
596    /// * `ready` - Predicate that returns `true` when the caller may continue.
597    /// * `f` - Closure that receives mutable access to the ready state.
598    ///
599    /// # Returns
600    ///
601    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
602    /// predicate becomes true before the timeout. Returns
603    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
604    ///
605    /// # Example
606    ///
607    /// ```rust
608    /// use std::{
609    ///     sync::Arc,
610    ///     thread,
611    ///     time::Duration,
612    /// };
613    ///
614    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
615    ///
616    /// let monitor = Arc::new(Monitor::new(false));
617    /// let waiter_monitor = Arc::clone(&monitor);
618    ///
619    /// let waiter = thread::spawn(move || {
620    ///     waiter_monitor.wait_timeout_until(
621    ///         Duration::from_secs(1),
622    ///         |ready| *ready,
623    ///         |ready| {
624    ///             *ready = false;
625    ///             5
626    ///         },
627    ///     )
628    /// });
629    ///
630    /// monitor.write(|ready| *ready = true);
631    /// monitor.notify_one();
632    ///
633    /// assert_eq!(
634    ///     waiter.join().expect("waiter should finish"),
635    ///     WaitTimeoutResult::Ready(5),
636    /// );
637    /// ```
638    #[inline]
639    pub fn wait_timeout_until<R, P, F>(
640        &self,
641        timeout: Duration,
642        mut ready: P,
643        f: F,
644    ) -> WaitTimeoutResult<R>
645    where
646        P: FnMut(&T) -> bool,
647        F: FnOnce(&mut T) -> R,
648    {
649        self.wait_timeout_while(timeout, |state| !ready(state), f)
650    }
651
652    /// Wakes one thread waiting on this monitor's condition variable.
653    ///
654    /// Notifications do not carry state by themselves. A waiting thread only
655    /// proceeds safely after rechecking the protected state. Call this after
656    /// changing state that may make one waiter able to continue.
657    ///
658    /// # Example
659    ///
660    /// ```rust
661    /// use std::thread;
662    ///
663    /// use qubit_lock::lock::ArcMonitor;
664    ///
665    /// let monitor = ArcMonitor::new(0_u32);
666    /// let waiter = {
667    ///     let m = monitor.clone();
668    ///     thread::spawn(move || {
669    ///         m.wait_until(|n| *n > 0, |n| {
670    ///             *n -= 1;
671    ///         });
672    ///     })
673    /// };
674    ///
675    /// monitor.write(|n| *n = 1);
676    /// monitor.notify_one();
677    /// waiter.join().expect("waiter should finish");
678    /// ```
679    #[inline]
680    pub fn notify_one(&self) {
681        self.changed.notify_one();
682    }
683
684    /// Wakes all threads waiting on this monitor's condition variable.
685    ///
686    /// Notifications do not carry state by themselves. Every awakened thread
687    /// must recheck the protected state before continuing. Call this after a
688    /// state change that may allow multiple waiters to make progress.
689    ///
690    /// # Example
691    ///
692    /// ```rust
693    /// use std::thread;
694    ///
695    /// use qubit_lock::lock::ArcMonitor;
696    ///
697    /// let monitor = ArcMonitor::new(false);
698    /// let mut handles = Vec::new();
699    /// for _ in 0..2 {
700    ///     let m = monitor.clone();
701    ///     handles.push(thread::spawn(move || {
702    ///         m.wait_until(|ready| *ready, |_| ());
703    ///     }));
704    /// }
705    ///
706    /// monitor.write(|ready| *ready = true);
707    /// monitor.notify_all();
708    /// for h in handles {
709    ///     h.join().expect("waiter should finish");
710    /// }
711    /// ```
712    #[inline]
713    pub fn notify_all(&self) {
714        self.changed.notify_all();
715    }
716}
717
718impl<T: Default> Default for Monitor<T> {
719    /// Creates a monitor containing `T::default()`.
720    ///
721    /// # Returns
722    ///
723    /// A monitor protecting the default value for `T`.
724    ///
725    /// # Example
726    ///
727    /// ```rust
728    /// use qubit_lock::lock::Monitor;
729    ///
730    /// let monitor: Monitor<String> = Monitor::default();
731    /// assert!(monitor.read(|s| s.is_empty()));
732    /// ```
733    #[inline]
734    fn default() -> Self {
735        Self::new(T::default())
736    }
737}