Skip to main content

qubit_lock/monitor/
monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! # Monitor
10//!
11//! Provides a synchronous monitor built from a mutex and a condition variable.
12//! A monitor protects one shared state value and binds that state to the
13//! condition variable used to wait for changes. This is the same low-level
14//! mechanism as using [`std::sync::Mutex`] and [`std::sync::Condvar`] directly,
15//! but packaged so callers do not have to keep a mutex and its matching
16//! condition variable as separate fields.
17//!
18//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
19//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
20//! short critical sections and simple guarded-suspension flows. The lower-level
21//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
22//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
23//! state machines such as thread pools.
24//!
25//! # Author
26//!
27//! Haixing Hu
28
29use std::{
30    sync::{
31        Condvar,
32        Mutex,
33    },
34    time::{
35        Duration,
36        Instant,
37    },
38};
39
40use super::monitor_guard::MonitorGuard;
41use super::{
42    wait_timeout_result::WaitTimeoutResult,
43    wait_timeout_status::WaitTimeoutStatus,
44};
45
46/// Shared state protected by a mutex and a condition variable.
47///
48/// `Monitor` is useful when callers need more than a short critical section.
49/// It models the classic monitor object pattern: one mutex protects the state,
50/// and one condition variable lets threads wait until that state changes. This
51/// is the same relationship used by `std::sync::Mutex` and
52/// `std::sync::Condvar`, but represented as one object so the condition
53/// variable is not accidentally used with unrelated state.
54///
55/// `Monitor` deliberately has two levels of API:
56///
57/// * `read` and `write` acquire the mutex, run a closure, and release it.
58/// * `wait_while`, `wait_until`, and their timeout variants implement common
59///   predicate-based waits.
60/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
61///   loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
62///
63/// A poisoned mutex is recovered by taking the inner state. This makes
64/// `Monitor` suitable for coordination state that should remain observable
65/// after another thread panics while holding the lock.
66///
67/// # Difference from `Mutex` and `Condvar`
68///
69/// With the standard library primitives, callers usually store two fields and
70/// manually keep them paired:
71///
72/// ```rust
73/// # use std::sync::{Condvar, Mutex};
74/// # struct State;
75/// struct Shared {
76///     state: Mutex<State>,
77///     changed: Condvar,
78/// }
79/// ```
80///
81/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
82/// wrapper around the standard library's `MutexGuard`; it keeps the protected
83/// state locked and knows which monitor it belongs to, so its wait methods use
84/// the matching condition variable.
85///
86/// # Type Parameters
87///
88/// * `T` - The state protected by this monitor.
89///
90/// # Example
91///
92/// ```rust
93/// use std::thread;
94///
95/// use qubit_lock::lock::ArcMonitor;
96///
97/// let monitor = ArcMonitor::new(false);
98/// let waiter_monitor = monitor.clone();
99///
100/// let waiter = thread::spawn(move || {
101///     waiter_monitor.wait_until(
102///         |ready| *ready,
103///         |ready| {
104///             *ready = false;
105///         },
106///     );
107/// });
108///
109/// monitor.write(|ready| {
110///     *ready = true;
111/// });
112/// monitor.notify_all();
113///
114/// waiter.join().expect("waiter should finish");
115/// assert!(!monitor.read(|ready| *ready));
116/// ```
117///
118/// # Author
119///
120/// Haixing Hu
121pub struct Monitor<T> {
122    /// Mutex protecting the monitor state.
123    state: Mutex<T>,
124    /// Condition variable used to wake predicate waiters after state changes.
125    pub(super) changed: Condvar,
126}
127
128impl<T> Monitor<T> {
129    /// Creates a monitor protecting the supplied state value.
130    ///
131    /// # Arguments
132    ///
133    /// * `state` - Initial state protected by the monitor.
134    ///
135    /// # Returns
136    ///
137    /// A monitor initialized with the supplied state.
138    ///
139    /// # Example
140    ///
141    /// ```rust
142    /// use qubit_lock::lock::Monitor;
143    ///
144    /// let monitor = Monitor::new(0_u32);
145    /// assert_eq!(monitor.read(|n| *n), 0);
146    /// ```
147    #[inline]
148    pub fn new(state: T) -> Self {
149        Self {
150            state: Mutex::new(state),
151            changed: Condvar::new(),
152        }
153    }
154
155    /// Acquires the monitor and returns a guard for explicit state-machine code.
156    ///
157    /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
158    /// guard is dropped. It can also be passed through
159    /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
160    /// release the lock while waiting on this monitor's condition variable.
161    ///
162    /// If the mutex is poisoned, this method recovers the inner state and still
163    /// returns a guard.
164    ///
165    /// # Returns
166    ///
167    /// A guard that provides read and write access to the protected state.
168    ///
169    /// # Example
170    ///
171    /// ```rust
172    /// use qubit_lock::lock::Monitor;
173    ///
174    /// let monitor = Monitor::new(1);
175    /// {
176    ///     let mut value = monitor.lock();
177    ///     *value += 1;
178    /// }
179    ///
180    /// assert_eq!(monitor.read(|value| *value), 2);
181    /// ```
182    #[inline]
183    pub fn lock(&self) -> MonitorGuard<'_, T> {
184        MonitorGuard::new(
185            self,
186            self.state
187                .lock()
188                .unwrap_or_else(std::sync::PoisonError::into_inner),
189        )
190    }
191
192    /// Acquires the monitor and reads the protected state.
193    ///
194    /// The closure runs while the mutex is held. Keep the closure short and do
195    /// not call code that may block for a long time.
196    ///
197    /// If the mutex is poisoned, this method recovers the inner state and still
198    /// executes the closure.
199    ///
200    /// # Arguments
201    ///
202    /// * `f` - Closure that receives an immutable reference to the state.
203    ///
204    /// # Returns
205    ///
206    /// The value returned by the closure.
207    ///
208    /// # Example
209    ///
210    /// ```rust
211    /// use qubit_lock::lock::Monitor;
212    ///
213    /// let monitor = Monitor::new(10_i32);
214    /// let n = monitor.read(|x| *x);
215    /// assert_eq!(n, 10);
216    /// ```
217    #[inline]
218    pub fn read<R, F>(&self, f: F) -> R
219    where
220        F: FnOnce(&T) -> R,
221    {
222        let guard = self.lock();
223        f(&*guard)
224    }
225
226    /// Acquires the monitor and mutates the protected state.
227    ///
228    /// The closure runs while the mutex is held. This method only changes the
229    /// state; callers should explicitly call [`Self::notify_one`] or
230    /// [`Self::notify_all`] after changing a condition that waiters may be
231    /// observing.
232    ///
233    /// If the mutex is poisoned, this method recovers the inner state and still
234    /// executes the closure.
235    ///
236    /// # Arguments
237    ///
238    /// * `f` - Closure that receives a mutable reference to the state.
239    ///
240    /// # Returns
241    ///
242    /// The value returned by the closure.
243    ///
244    /// # Example
245    ///
246    /// ```rust
247    /// use qubit_lock::lock::Monitor;
248    ///
249    /// let monitor = Monitor::new(String::new());
250    /// let len = monitor.write(|s| {
251    ///     s.push_str("hi");
252    ///     s.len()
253    /// });
254    /// assert_eq!(len, 2);
255    /// ```
256    #[inline]
257    pub fn write<R, F>(&self, f: F) -> R
258    where
259        F: FnOnce(&mut T) -> R,
260    {
261        let mut guard = self.lock();
262        f(&mut *guard)
263    }
264
265    /// Waits for a notification or timeout without checking state.
266    ///
267    /// This convenience method locks the monitor, waits once on the condition
268    /// variable, and returns why the timed wait completed. It is useful only
269    /// when the caller genuinely needs a notification wait without inspecting
270    /// state before or after the wait. Most coordination code should prefer
271    /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
272    /// [`MonitorGuard::wait_timeout`] loop.
273    ///
274    /// Condition variables may wake spuriously, so
275    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
276    /// state.
277    ///
278    /// If the mutex is poisoned, this method recovers the inner state and
279    /// continues waiting.
280    ///
281    /// # Arguments
282    ///
283    /// * `timeout` - Maximum duration to wait for a notification.
284    ///
285    /// # Returns
286    ///
287    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
288    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
289    ///
290    /// # Example
291    ///
292    /// ```rust
293    /// use std::time::Duration;
294    ///
295    /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
296    ///
297    /// let monitor = Monitor::new(false);
298    /// let status = monitor.wait_notify(Duration::from_millis(1));
299    ///
300    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
301    /// ```
302    #[inline]
303    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
304        let guard = self.lock();
305        let (_guard, status) = guard.wait_timeout(timeout);
306        status
307    }
308
309    /// Waits while a predicate remains true, then mutates the protected state.
310    ///
311    /// This is the monitor equivalent of the common `while condition { wait }`
312    /// guarded-suspension pattern. The predicate is evaluated while holding the
313    /// mutex. If it returns `true`, the current thread waits on the condition
314    /// variable and atomically releases the mutex. After a notification or
315    /// spurious wakeup, the mutex is reacquired and the predicate is evaluated
316    /// again. When the predicate returns `false`, `f` runs while the mutex is
317    /// still held.
318    ///
319    /// This method may block indefinitely if no thread changes the state so
320    /// that `waiting` becomes false and sends a notification.
321    ///
322    /// If the mutex is poisoned before or during the wait, this method recovers
323    /// the inner state and continues waiting or executes the closure.
324    ///
325    /// # Arguments
326    ///
327    /// * `waiting` - Predicate that returns `true` while the caller should
328    ///   keep waiting.
329    /// * `f` - Closure that receives mutable access after waiting is no longer
330    ///   required.
331    ///
332    /// # Returns
333    ///
334    /// The value returned by `f`.
335    ///
336    /// # Example
337    ///
338    /// ```rust
339    /// use std::{
340    ///     sync::Arc,
341    ///     thread,
342    /// };
343    ///
344    /// use qubit_lock::lock::Monitor;
345    ///
346    /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
347    /// let worker_monitor = Arc::clone(&monitor);
348    ///
349    /// let worker = thread::spawn(move || {
350    ///     worker_monitor.wait_while(
351    ///         |items| items.is_empty(),
352    ///         |items| items.pop().expect("item should be ready"),
353    ///     )
354    /// });
355    ///
356    /// monitor.write(|items| items.push(7));
357    /// monitor.notify_one();
358    ///
359    /// assert_eq!(worker.join().expect("worker should finish"), 7);
360    /// ```
361    #[inline]
362    pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
363    where
364        P: FnMut(&T) -> bool,
365        F: FnOnce(&mut T) -> R,
366    {
367        let mut guard = self.lock();
368        while waiting(&*guard) {
369            guard = guard.wait();
370        }
371        f(&mut *guard)
372    }
373
374    /// Waits until the protected state satisfies a predicate, then mutates it.
375    ///
376    /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
377    /// predicate is evaluated while holding the mutex. If it returns `false`,
378    /// the current thread waits on the condition variable and atomically
379    /// releases the mutex. After a notification or spurious wakeup, the mutex
380    /// is reacquired and the predicate is evaluated again. When the predicate
381    /// returns `true`, `f` runs while the mutex is still held.
382    ///
383    /// This method may block indefinitely if no thread changes the state to
384    /// satisfy the predicate and sends a notification.
385    ///
386    /// If the mutex is poisoned before or during the wait, this method recovers
387    /// the inner state and continues waiting or executes the closure.
388    ///
389    /// # Arguments
390    ///
391    /// * `ready` - Predicate that returns `true` when the state is ready.
392    /// * `f` - Closure that receives mutable access to the ready state.
393    ///
394    /// # Returns
395    ///
396    /// The value returned by `f` after the predicate has become true.
397    ///
398    /// # Example
399    ///
400    /// ```rust
401    /// use std::{
402    ///     sync::Arc,
403    ///     thread,
404    /// };
405    ///
406    /// use qubit_lock::lock::Monitor;
407    ///
408    /// let monitor = Arc::new(Monitor::new(false));
409    /// let waiter_monitor = Arc::clone(&monitor);
410    ///
411    /// let waiter = thread::spawn(move || {
412    ///     waiter_monitor.wait_until(
413    ///         |ready| *ready,
414    ///         |ready| {
415    ///             *ready = false;
416    ///             "done"
417    ///         },
418    ///     )
419    /// });
420    ///
421    /// monitor.write(|ready| *ready = true);
422    /// monitor.notify_one();
423    ///
424    /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
425    /// ```
426    #[inline]
427    pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
428    where
429        P: FnMut(&T) -> bool,
430        F: FnOnce(&mut T) -> R,
431    {
432        self.wait_while(|state| !ready(state), f)
433    }
434
435    /// Waits while a predicate remains true, with an overall time limit.
436    ///
437    /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
438    /// rechecking `waiting` under the monitor lock and waits only for the
439    /// remaining portion of `timeout`. If `waiting` becomes false before the
440    /// timeout expires, `f` runs while the lock is still held. If the timeout
441    /// expires first, the closure is not called.
442    ///
443    /// Condition variables may wake spuriously, and timeout status alone is not
444    /// used as proof that the predicate is still true; the predicate is always
445    /// rechecked under the lock.
446    ///
447    /// If the mutex is poisoned before or during the wait, this method recovers
448    /// the inner state and continues waiting or executes the closure.
449    ///
450    /// # Arguments
451    ///
452    /// * `timeout` - Maximum total duration to wait.
453    /// * `waiting` - Predicate that returns `true` while the caller should
454    ///   continue waiting.
455    /// * `f` - Closure that receives mutable access when waiting is no longer
456    ///   required.
457    ///
458    /// # Returns
459    ///
460    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
461    /// predicate stops blocking before the timeout. Returns
462    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
463    ///
464    /// # Example
465    ///
466    /// ```rust
467    /// use std::time::Duration;
468    ///
469    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
470    ///
471    /// let monitor = Monitor::new(Vec::<i32>::new());
472    /// let result = monitor.wait_timeout_while(
473    ///     Duration::from_millis(1),
474    ///     |items| items.is_empty(),
475    ///     |items| items.pop(),
476    /// );
477    ///
478    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
479    /// ```
480    #[inline]
481    pub fn wait_timeout_while<R, P, F>(
482        &self,
483        timeout: Duration,
484        mut waiting: P,
485        f: F,
486    ) -> WaitTimeoutResult<R>
487    where
488        P: FnMut(&T) -> bool,
489        F: FnOnce(&mut T) -> R,
490    {
491        let mut guard = self.lock();
492        let start = Instant::now();
493        loop {
494            if !waiting(&*guard) {
495                return WaitTimeoutResult::Ready(f(&mut *guard));
496            }
497
498            let elapsed = start.elapsed();
499            let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
500            if remaining.is_zero() {
501                return WaitTimeoutResult::TimedOut;
502            }
503
504            let (next_guard, _status) = guard.wait_timeout(remaining);
505            guard = next_guard;
506        }
507    }
508
509    /// Waits until a predicate becomes true, with an overall time limit.
510    ///
511    /// This is the positive-predicate counterpart of
512    /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
513    /// expires, `f` runs while the monitor lock is still held. If the timeout
514    /// expires first, the closure is not called.
515    ///
516    /// Condition variables may wake spuriously, and timeout status alone is not
517    /// used as proof that the predicate is still false; the predicate is always
518    /// rechecked under the lock.
519    ///
520    /// If the mutex is poisoned before or during the wait, this method recovers
521    /// the inner state and continues waiting or executes the closure.
522    ///
523    /// # Arguments
524    ///
525    /// * `timeout` - Maximum total duration to wait.
526    /// * `ready` - Predicate that returns `true` when the caller may continue.
527    /// * `f` - Closure that receives mutable access to the ready state.
528    ///
529    /// # Returns
530    ///
531    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
532    /// predicate becomes true before the timeout. Returns
533    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
534    ///
535    /// # Example
536    ///
537    /// ```rust
538    /// use std::{
539    ///     sync::Arc,
540    ///     thread,
541    ///     time::Duration,
542    /// };
543    ///
544    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
545    ///
546    /// let monitor = Arc::new(Monitor::new(false));
547    /// let waiter_monitor = Arc::clone(&monitor);
548    ///
549    /// let waiter = thread::spawn(move || {
550    ///     waiter_monitor.wait_timeout_until(
551    ///         Duration::from_secs(1),
552    ///         |ready| *ready,
553    ///         |ready| {
554    ///             *ready = false;
555    ///             5
556    ///         },
557    ///     )
558    /// });
559    ///
560    /// monitor.write(|ready| *ready = true);
561    /// monitor.notify_one();
562    ///
563    /// assert_eq!(
564    ///     waiter.join().expect("waiter should finish"),
565    ///     WaitTimeoutResult::Ready(5),
566    /// );
567    /// ```
568    #[inline]
569    pub fn wait_timeout_until<R, P, F>(
570        &self,
571        timeout: Duration,
572        mut ready: P,
573        f: F,
574    ) -> WaitTimeoutResult<R>
575    where
576        P: FnMut(&T) -> bool,
577        F: FnOnce(&mut T) -> R,
578    {
579        self.wait_timeout_while(timeout, |state| !ready(state), f)
580    }
581
582    /// Wakes one thread waiting on this monitor's condition variable.
583    ///
584    /// Notifications do not carry state by themselves. A waiting thread only
585    /// proceeds safely after rechecking the protected state. Call this after
586    /// changing state that may make one waiter able to continue.
587    ///
588    /// # Example
589    ///
590    /// ```rust
591    /// use std::thread;
592    ///
593    /// use qubit_lock::lock::ArcMonitor;
594    ///
595    /// let monitor = ArcMonitor::new(0_u32);
596    /// let waiter = {
597    ///     let m = monitor.clone();
598    ///     thread::spawn(move || {
599    ///         m.wait_until(|n| *n > 0, |n| {
600    ///             *n -= 1;
601    ///         });
602    ///     })
603    /// };
604    ///
605    /// monitor.write(|n| *n = 1);
606    /// monitor.notify_one();
607    /// waiter.join().expect("waiter should finish");
608    /// ```
609    #[inline]
610    pub fn notify_one(&self) {
611        self.changed.notify_one();
612    }
613
614    /// Wakes all threads waiting on this monitor's condition variable.
615    ///
616    /// Notifications do not carry state by themselves. Every awakened thread
617    /// must recheck the protected state before continuing. Call this after a
618    /// state change that may allow multiple waiters to make progress.
619    ///
620    /// # Example
621    ///
622    /// ```rust
623    /// use std::thread;
624    ///
625    /// use qubit_lock::lock::ArcMonitor;
626    ///
627    /// let monitor = ArcMonitor::new(false);
628    /// let mut handles = Vec::new();
629    /// for _ in 0..2 {
630    ///     let m = monitor.clone();
631    ///     handles.push(thread::spawn(move || {
632    ///         m.wait_until(|ready| *ready, |_| ());
633    ///     }));
634    /// }
635    ///
636    /// monitor.write(|ready| *ready = true);
637    /// monitor.notify_all();
638    /// for h in handles {
639    ///     h.join().expect("waiter should finish");
640    /// }
641    /// ```
642    #[inline]
643    pub fn notify_all(&self) {
644        self.changed.notify_all();
645    }
646}
647
648impl<T: Default> Default for Monitor<T> {
649    /// Creates a monitor containing `T::default()`.
650    ///
651    /// # Returns
652    ///
653    /// A monitor protecting the default value for `T`.
654    ///
655    /// # Example
656    ///
657    /// ```rust
658    /// use qubit_lock::lock::Monitor;
659    ///
660    /// let monitor: Monitor<String> = Monitor::default();
661    /// assert!(monitor.read(|s| s.is_empty()));
662    /// ```
663    #[inline]
664    fn default() -> Self {
665        Self::new(T::default())
666    }
667}