Skip to main content

qubit_lock/monitor/
monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Monitor
11//!
12//! Provides a synchronous monitor built from a mutex and a condition variable.
13//! A monitor protects one shared state value and binds that state to the
14//! condition variable used to wait for changes. This is the same low-level
15//! mechanism as using [`std::sync::Mutex`] and [`std::sync::Condvar`] directly,
16//! but packaged so callers do not have to keep a mutex and its matching
17//! condition variable as separate fields.
18//!
19//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
20//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
21//! short critical sections and simple guarded-suspension flows. The lower-level
22//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
23//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
24//! state machines such as thread pools.
25//!
26
27use std::{
28    sync::{
29        Condvar,
30        Mutex,
31    },
32    time::{
33        Duration,
34        Instant,
35    },
36};
37
38use super::monitor_guard::MonitorGuard;
39use super::{
40    wait_timeout_result::WaitTimeoutResult,
41    wait_timeout_status::WaitTimeoutStatus,
42};
43
44/// Shared state protected by a mutex and a condition variable.
45///
46/// `Monitor` is useful when callers need more than a short critical section.
47/// It models the classic monitor object pattern: one mutex protects the state,
48/// and one condition variable lets threads wait until that state changes. This
49/// is the same relationship used by `std::sync::Mutex` and
50/// `std::sync::Condvar`, but represented as one object so the condition
51/// variable is not accidentally used with unrelated state.
52///
53/// `Monitor` deliberately has two levels of API:
54///
55/// * `read` and `write` acquire the mutex, run a closure, and release it.
56/// * `wait_while`, `wait_until`, and their timeout variants implement common
57///   predicate-based waits.
58/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
59///   loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
60///
61/// A poisoned mutex is recovered by taking the inner state. This makes
62/// `Monitor` suitable for coordination state that should remain observable
63/// after another thread panics while holding the lock.
64///
65/// # Difference from `Mutex` and `Condvar`
66///
67/// With the standard library primitives, callers usually store two fields and
68/// manually keep them paired:
69///
70/// ```rust
71/// # use std::sync::{Condvar, Mutex};
72/// # struct State;
73/// struct Shared {
74///     state: Mutex<State>,
75///     changed: Condvar,
76/// }
77/// ```
78///
79/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
80/// wrapper around the standard library's `MutexGuard`; it keeps the protected
81/// state locked and knows which monitor it belongs to, so its wait methods use
82/// the matching condition variable.
83///
84/// # Type Parameters
85///
86/// * `T` - The state protected by this monitor.
87///
88/// # Example
89///
90/// ```rust
91/// use std::thread;
92///
93/// use qubit_lock::lock::ArcMonitor;
94///
95/// let monitor = ArcMonitor::new(false);
96/// let waiter_monitor = monitor.clone();
97///
98/// let waiter = thread::spawn(move || {
99///     waiter_monitor.wait_until(
100///         |ready| *ready,
101///         |ready| {
102///             *ready = false;
103///         },
104///     );
105/// });
106///
107/// monitor.write(|ready| {
108///     *ready = true;
109/// });
110/// monitor.notify_all();
111///
112/// waiter.join().expect("waiter should finish");
113/// assert!(!monitor.read(|ready| *ready));
114/// ```
115///
116pub struct Monitor<T> {
117    /// Mutex protecting the monitor state.
118    state: Mutex<T>,
119    /// Condition variable used to wake predicate waiters after state changes.
120    pub(super) changed: Condvar,
121}
122
123impl<T> Monitor<T> {
124    /// Creates a monitor protecting the supplied state value.
125    ///
126    /// # Arguments
127    ///
128    /// * `state` - Initial state protected by the monitor.
129    ///
130    /// # Returns
131    ///
132    /// A monitor initialized with the supplied state.
133    ///
134    /// # Example
135    ///
136    /// ```rust
137    /// use qubit_lock::lock::Monitor;
138    ///
139    /// let monitor = Monitor::new(0_u32);
140    /// assert_eq!(monitor.read(|n| *n), 0);
141    /// ```
142    #[inline]
143    pub fn new(state: T) -> Self {
144        Self {
145            state: Mutex::new(state),
146            changed: Condvar::new(),
147        }
148    }
149
150    /// Acquires the monitor and returns a guard for explicit state-machine code.
151    ///
152    /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
153    /// guard is dropped. It can also be passed through
154    /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
155    /// release the lock while waiting on this monitor's condition variable.
156    ///
157    /// If the mutex is poisoned, this method recovers the inner state and still
158    /// returns a guard.
159    ///
160    /// # Returns
161    ///
162    /// A guard that provides read and write access to the protected state.
163    ///
164    /// # Example
165    ///
166    /// ```rust
167    /// use qubit_lock::lock::Monitor;
168    ///
169    /// let monitor = Monitor::new(1);
170    /// {
171    ///     let mut value = monitor.lock();
172    ///     *value += 1;
173    /// }
174    ///
175    /// assert_eq!(monitor.read(|value| *value), 2);
176    /// ```
177    #[inline]
178    pub fn lock(&self) -> MonitorGuard<'_, T> {
179        MonitorGuard::new(
180            self,
181            self.state
182                .lock()
183                .unwrap_or_else(std::sync::PoisonError::into_inner),
184        )
185    }
186
187    /// Acquires the monitor and reads the protected state.
188    ///
189    /// The closure runs while the mutex is held. Keep the closure short and do
190    /// not call code that may block for a long time.
191    ///
192    /// If the mutex is poisoned, this method recovers the inner state and still
193    /// executes the closure.
194    ///
195    /// # Arguments
196    ///
197    /// * `f` - Closure that receives an immutable reference to the state.
198    ///
199    /// # Returns
200    ///
201    /// The value returned by the closure.
202    ///
203    /// # Example
204    ///
205    /// ```rust
206    /// use qubit_lock::lock::Monitor;
207    ///
208    /// let monitor = Monitor::new(10_i32);
209    /// let n = monitor.read(|x| *x);
210    /// assert_eq!(n, 10);
211    /// ```
212    #[inline]
213    pub fn read<R, F>(&self, f: F) -> R
214    where
215        F: FnOnce(&T) -> R,
216    {
217        let guard = self.lock();
218        f(&*guard)
219    }
220
221    /// Acquires the monitor and mutates the protected state.
222    ///
223    /// The closure runs while the mutex is held. This method only changes the
224    /// state; callers should explicitly call [`Self::notify_one`] or
225    /// [`Self::notify_all`] after changing a condition that waiters may be
226    /// observing.
227    ///
228    /// If the mutex is poisoned, this method recovers the inner state and still
229    /// executes the closure.
230    ///
231    /// # Arguments
232    ///
233    /// * `f` - Closure that receives a mutable reference to the state.
234    ///
235    /// # Returns
236    ///
237    /// The value returned by the closure.
238    ///
239    /// # Example
240    ///
241    /// ```rust
242    /// use qubit_lock::lock::Monitor;
243    ///
244    /// let monitor = Monitor::new(String::new());
245    /// let len = monitor.write(|s| {
246    ///     s.push_str("hi");
247    ///     s.len()
248    /// });
249    /// assert_eq!(len, 2);
250    /// ```
251    #[inline]
252    pub fn write<R, F>(&self, f: F) -> R
253    where
254        F: FnOnce(&mut T) -> R,
255    {
256        let mut guard = self.lock();
257        f(&mut *guard)
258    }
259
260    /// Waits for a notification or timeout without checking state.
261    ///
262    /// This convenience method locks the monitor, waits once on the condition
263    /// variable, and returns why the timed wait completed. It is useful only
264    /// when the caller genuinely needs a notification wait without inspecting
265    /// state before or after the wait. Most coordination code should prefer
266    /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
267    /// [`MonitorGuard::wait_timeout`] loop.
268    ///
269    /// Condition variables may wake spuriously, so
270    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
271    /// state.
272    ///
273    /// If the mutex is poisoned, this method recovers the inner state and
274    /// continues waiting.
275    ///
276    /// # Arguments
277    ///
278    /// * `timeout` - Maximum duration to wait for a notification.
279    ///
280    /// # Returns
281    ///
282    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
283    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
284    ///
285    /// # Example
286    ///
287    /// ```rust
288    /// use std::time::Duration;
289    ///
290    /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
291    ///
292    /// let monitor = Monitor::new(false);
293    /// let status = monitor.wait_notify(Duration::from_millis(1));
294    ///
295    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
296    /// ```
297    #[inline]
298    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
299        let guard = self.lock();
300        let (_guard, status) = guard.wait_timeout(timeout);
301        status
302    }
303
304    /// Waits while a predicate remains true, then mutates the protected state.
305    ///
306    /// This is the monitor equivalent of the common `while condition { wait }`
307    /// guarded-suspension pattern. The predicate is evaluated while holding the
308    /// mutex. If it returns `true`, the current thread waits on the condition
309    /// variable and atomically releases the mutex. After a notification or
310    /// spurious wakeup, the mutex is reacquired and the predicate is evaluated
311    /// again. When the predicate returns `false`, `f` runs while the mutex is
312    /// still held.
313    ///
314    /// This method may block indefinitely if no thread changes the state so
315    /// that `waiting` becomes false and sends a notification.
316    ///
317    /// If the mutex is poisoned before or during the wait, this method recovers
318    /// the inner state and continues waiting or executes the closure.
319    ///
320    /// # Arguments
321    ///
322    /// * `waiting` - Predicate that returns `true` while the caller should
323    ///   keep waiting.
324    /// * `f` - Closure that receives mutable access after waiting is no longer
325    ///   required.
326    ///
327    /// # Returns
328    ///
329    /// The value returned by `f`.
330    ///
331    /// # Example
332    ///
333    /// ```rust
334    /// use std::{
335    ///     sync::Arc,
336    ///     thread,
337    /// };
338    ///
339    /// use qubit_lock::lock::Monitor;
340    ///
341    /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
342    /// let worker_monitor = Arc::clone(&monitor);
343    ///
344    /// let worker = thread::spawn(move || {
345    ///     worker_monitor.wait_while(
346    ///         |items| items.is_empty(),
347    ///         |items| items.pop().expect("item should be ready"),
348    ///     )
349    /// });
350    ///
351    /// monitor.write(|items| items.push(7));
352    /// monitor.notify_one();
353    ///
354    /// assert_eq!(worker.join().expect("worker should finish"), 7);
355    /// ```
356    #[inline]
357    pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
358    where
359        P: FnMut(&T) -> bool,
360        F: FnOnce(&mut T) -> R,
361    {
362        let mut guard = self.lock();
363        while waiting(&*guard) {
364            guard = guard.wait();
365        }
366        f(&mut *guard)
367    }
368
369    /// Waits until the protected state satisfies a predicate, then mutates it.
370    ///
371    /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
372    /// predicate is evaluated while holding the mutex. If it returns `false`,
373    /// the current thread waits on the condition variable and atomically
374    /// releases the mutex. After a notification or spurious wakeup, the mutex
375    /// is reacquired and the predicate is evaluated again. When the predicate
376    /// returns `true`, `f` runs while the mutex is still held.
377    ///
378    /// This method may block indefinitely if no thread changes the state to
379    /// satisfy the predicate and sends a notification.
380    ///
381    /// If the mutex is poisoned before or during the wait, this method recovers
382    /// the inner state and continues waiting or executes the closure.
383    ///
384    /// # Arguments
385    ///
386    /// * `ready` - Predicate that returns `true` when the state is ready.
387    /// * `f` - Closure that receives mutable access to the ready state.
388    ///
389    /// # Returns
390    ///
391    /// The value returned by `f` after the predicate has become true.
392    ///
393    /// # Example
394    ///
395    /// ```rust
396    /// use std::{
397    ///     sync::Arc,
398    ///     thread,
399    /// };
400    ///
401    /// use qubit_lock::lock::Monitor;
402    ///
403    /// let monitor = Arc::new(Monitor::new(false));
404    /// let waiter_monitor = Arc::clone(&monitor);
405    ///
406    /// let waiter = thread::spawn(move || {
407    ///     waiter_monitor.wait_until(
408    ///         |ready| *ready,
409    ///         |ready| {
410    ///             *ready = false;
411    ///             "done"
412    ///         },
413    ///     )
414    /// });
415    ///
416    /// monitor.write(|ready| *ready = true);
417    /// monitor.notify_one();
418    ///
419    /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
420    /// ```
421    #[inline]
422    pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
423    where
424        P: FnMut(&T) -> bool,
425        F: FnOnce(&mut T) -> R,
426    {
427        self.wait_while(|state| !ready(state), f)
428    }
429
430    /// Waits while a predicate remains true, with an overall time limit.
431    ///
432    /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
433    /// rechecking `waiting` under the monitor lock and waits only for the
434    /// remaining portion of `timeout`. If `waiting` becomes false before the
435    /// timeout expires, `f` runs while the lock is still held. If the timeout
436    /// expires first, the closure is not called.
437    ///
438    /// Condition variables may wake spuriously, and timeout status alone is not
439    /// used as proof that the predicate is still true; the predicate is always
440    /// rechecked under the lock.
441    ///
442    /// If the mutex is poisoned before or during the wait, this method recovers
443    /// the inner state and continues waiting or executes the closure.
444    ///
445    /// # Arguments
446    ///
447    /// * `timeout` - Maximum total duration to wait.
448    /// * `waiting` - Predicate that returns `true` while the caller should
449    ///   continue waiting.
450    /// * `f` - Closure that receives mutable access when waiting is no longer
451    ///   required.
452    ///
453    /// # Returns
454    ///
455    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
456    /// predicate stops blocking before the timeout. Returns
457    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
458    ///
459    /// # Example
460    ///
461    /// ```rust
462    /// use std::time::Duration;
463    ///
464    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
465    ///
466    /// let monitor = Monitor::new(Vec::<i32>::new());
467    /// let result = monitor.wait_timeout_while(
468    ///     Duration::from_millis(1),
469    ///     |items| items.is_empty(),
470    ///     |items| items.pop(),
471    /// );
472    ///
473    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
474    /// ```
475    #[inline]
476    pub fn wait_timeout_while<R, P, F>(
477        &self,
478        timeout: Duration,
479        mut waiting: P,
480        f: F,
481    ) -> WaitTimeoutResult<R>
482    where
483        P: FnMut(&T) -> bool,
484        F: FnOnce(&mut T) -> R,
485    {
486        let mut guard = self.lock();
487        let start = Instant::now();
488        loop {
489            if !waiting(&*guard) {
490                return WaitTimeoutResult::Ready(f(&mut *guard));
491            }
492
493            let elapsed = start.elapsed();
494            let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
495            if remaining.is_zero() {
496                return WaitTimeoutResult::TimedOut;
497            }
498
499            let (next_guard, _status) = guard.wait_timeout(remaining);
500            guard = next_guard;
501        }
502    }
503
504    /// Waits until a predicate becomes true, with an overall time limit.
505    ///
506    /// This is the positive-predicate counterpart of
507    /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
508    /// expires, `f` runs while the monitor lock is still held. If the timeout
509    /// expires first, the closure is not called.
510    ///
511    /// Condition variables may wake spuriously, and timeout status alone is not
512    /// used as proof that the predicate is still false; the predicate is always
513    /// rechecked under the lock.
514    ///
515    /// If the mutex is poisoned before or during the wait, this method recovers
516    /// the inner state and continues waiting or executes the closure.
517    ///
518    /// # Arguments
519    ///
520    /// * `timeout` - Maximum total duration to wait.
521    /// * `ready` - Predicate that returns `true` when the caller may continue.
522    /// * `f` - Closure that receives mutable access to the ready state.
523    ///
524    /// # Returns
525    ///
526    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
527    /// predicate becomes true before the timeout. Returns
528    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
529    ///
530    /// # Example
531    ///
532    /// ```rust
533    /// use std::{
534    ///     sync::Arc,
535    ///     thread,
536    ///     time::Duration,
537    /// };
538    ///
539    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
540    ///
541    /// let monitor = Arc::new(Monitor::new(false));
542    /// let waiter_monitor = Arc::clone(&monitor);
543    ///
544    /// let waiter = thread::spawn(move || {
545    ///     waiter_monitor.wait_timeout_until(
546    ///         Duration::from_secs(1),
547    ///         |ready| *ready,
548    ///         |ready| {
549    ///             *ready = false;
550    ///             5
551    ///         },
552    ///     )
553    /// });
554    ///
555    /// monitor.write(|ready| *ready = true);
556    /// monitor.notify_one();
557    ///
558    /// assert_eq!(
559    ///     waiter.join().expect("waiter should finish"),
560    ///     WaitTimeoutResult::Ready(5),
561    /// );
562    /// ```
563    #[inline]
564    pub fn wait_timeout_until<R, P, F>(
565        &self,
566        timeout: Duration,
567        mut ready: P,
568        f: F,
569    ) -> WaitTimeoutResult<R>
570    where
571        P: FnMut(&T) -> bool,
572        F: FnOnce(&mut T) -> R,
573    {
574        self.wait_timeout_while(timeout, |state| !ready(state), f)
575    }
576
577    /// Wakes one thread waiting on this monitor's condition variable.
578    ///
579    /// Notifications do not carry state by themselves. A waiting thread only
580    /// proceeds safely after rechecking the protected state. Call this after
581    /// changing state that may make one waiter able to continue.
582    ///
583    /// # Example
584    ///
585    /// ```rust
586    /// use std::thread;
587    ///
588    /// use qubit_lock::lock::ArcMonitor;
589    ///
590    /// let monitor = ArcMonitor::new(0_u32);
591    /// let waiter = {
592    ///     let m = monitor.clone();
593    ///     thread::spawn(move || {
594    ///         m.wait_until(|n| *n > 0, |n| {
595    ///             *n -= 1;
596    ///         });
597    ///     })
598    /// };
599    ///
600    /// monitor.write(|n| *n = 1);
601    /// monitor.notify_one();
602    /// waiter.join().expect("waiter should finish");
603    /// ```
604    #[inline]
605    pub fn notify_one(&self) {
606        self.changed.notify_one();
607    }
608
609    /// Wakes all threads waiting on this monitor's condition variable.
610    ///
611    /// Notifications do not carry state by themselves. Every awakened thread
612    /// must recheck the protected state before continuing. Call this after a
613    /// state change that may allow multiple waiters to make progress.
614    ///
615    /// # Example
616    ///
617    /// ```rust
618    /// use std::thread;
619    ///
620    /// use qubit_lock::lock::ArcMonitor;
621    ///
622    /// let monitor = ArcMonitor::new(false);
623    /// let mut handles = Vec::new();
624    /// for _ in 0..2 {
625    ///     let m = monitor.clone();
626    ///     handles.push(thread::spawn(move || {
627    ///         m.wait_until(|ready| *ready, |_| ());
628    ///     }));
629    /// }
630    ///
631    /// monitor.write(|ready| *ready = true);
632    /// monitor.notify_all();
633    /// for h in handles {
634    ///     h.join().expect("waiter should finish");
635    /// }
636    /// ```
637    #[inline]
638    pub fn notify_all(&self) {
639        self.changed.notify_all();
640    }
641}
642
643impl<T: Default> Default for Monitor<T> {
644    /// Creates a monitor containing `T::default()`.
645    ///
646    /// # Returns
647    ///
648    /// A monitor protecting the default value for `T`.
649    ///
650    /// # Example
651    ///
652    /// ```rust
653    /// use qubit_lock::lock::Monitor;
654    ///
655    /// let monitor: Monitor<String> = Monitor::default();
656    /// assert!(monitor.read(|s| s.is_empty()));
657    /// ```
658    #[inline]
659    fn default() -> Self {
660        Self::new(T::default())
661    }
662}