Skip to main content

qubit_lock/monitor/
monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Monitor
11//!
12//! Provides a synchronous monitor built from a mutex and a condition variable.
13//! A monitor protects one shared state value and binds that state to the
14//! condition variable used to wait for changes. This is the same low-level
15//! mechanism as using [`parking_lot::Mutex`] and [`parking_lot::Condvar`] directly,
16//! but packaged so callers do not have to keep a mutex and its matching
17//! condition variable as separate fields.
18//!
19//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
20//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
21//! short critical sections and simple guarded-suspension flows. The lower-level
22//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
23//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
24//! state machines such as thread pools.
25//!
26
27use std::time::{
28    Duration,
29    Instant,
30};
31
32use parking_lot::{
33    Condvar,
34    Mutex,
35};
36
37use super::monitor_guard::MonitorGuard;
38use super::{
39    wait_timeout_result::WaitTimeoutResult,
40    wait_timeout_status::WaitTimeoutStatus,
41};
42
43/// Shared state protected by a mutex and a condition variable.
44///
45/// `Monitor` is useful when callers need more than a short critical section.
46/// It models the classic monitor object pattern: one mutex protects the state,
47/// and one condition variable lets threads wait until that state changes. This
48/// is the same relationship used by `parking_lot::Mutex` and
49/// `parking_lot::Condvar`, but represented as one object so the condition
50/// variable is not accidentally used with unrelated state.
51///
52/// `Monitor` deliberately has two levels of API:
53///
54/// * `read` and `write` acquire the mutex, run a closure, and release it.
55/// * `wait_while`, `wait_until`, and their timeout variants implement common
56///   predicate-based waits.
57/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
58///   loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
59///
60/// The underlying `parking_lot` mutex is not poisoned when a thread panics
61/// while holding the lock. This keeps monitor coordination state observable
62/// after panic unwinding.
63///
64/// # Difference from raw `Mutex` and `Condvar`
65///
66/// With raw parking_lot primitives, callers usually store two fields and
67/// manually keep them paired:
68///
69/// ```rust
70/// # use parking_lot::{Condvar, Mutex};
71/// # struct State;
72/// struct Shared {
73///     state: Mutex<State>,
74///     changed: Condvar,
75/// }
76/// ```
77///
78/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
79/// wrapper around the parking_lot `MutexGuard`; it keeps the protected
80/// state locked and knows which monitor it belongs to, so its wait methods use
81/// the matching condition variable.
82///
83/// # Type Parameters
84///
85/// * `T` - The state protected by this monitor.
86///
87/// # Example
88///
89/// ```rust
90/// use std::thread;
91///
92/// use qubit_lock::lock::ArcMonitor;
93///
94/// let monitor = ArcMonitor::new(false);
95/// let waiter_monitor = monitor.clone();
96///
97/// let waiter = thread::spawn(move || {
98///     waiter_monitor.wait_until(
99///         |ready| *ready,
100///         |ready| {
101///             *ready = false;
102///         },
103///     );
104/// });
105///
106/// monitor.write(|ready| {
107///     *ready = true;
108/// });
109/// monitor.notify_all();
110///
111/// waiter.join().expect("waiter should finish");
112/// assert!(!monitor.read(|ready| *ready));
113/// ```
114///
115pub struct Monitor<T> {
116    /// Mutex protecting the monitor state.
117    state: Mutex<T>,
118    /// Condition variable used to wake predicate waiters after state changes.
119    pub(super) changed: Condvar,
120}
121
122impl<T> Monitor<T> {
123    /// Creates a monitor protecting the supplied state value.
124    ///
125    /// # Arguments
126    ///
127    /// * `state` - Initial state protected by the monitor.
128    ///
129    /// # Returns
130    ///
131    /// A monitor initialized with the supplied state.
132    ///
133    /// # Example
134    ///
135    /// ```rust
136    /// use qubit_lock::lock::Monitor;
137    ///
138    /// let monitor = Monitor::new(0_u32);
139    /// assert_eq!(monitor.read(|n| *n), 0);
140    /// ```
141    #[inline]
142    pub fn new(state: T) -> Self {
143        Self {
144            state: Mutex::new(state),
145            changed: Condvar::new(),
146        }
147    }
148
149    /// Acquires the monitor and returns a guard for explicit state-machine code.
150    ///
151    /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
152    /// guard is dropped. It can also be passed through
153    /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
154    /// release the lock while waiting on this monitor's condition variable.
155    ///
156    /// # Returns
157    ///
158    /// A guard that provides read and write access to the protected state.
159    ///
160    /// # Example
161    ///
162    /// ```rust
163    /// use qubit_lock::lock::Monitor;
164    ///
165    /// let monitor = Monitor::new(1);
166    /// {
167    ///     let mut value = monitor.lock();
168    ///     *value += 1;
169    /// }
170    ///
171    /// assert_eq!(monitor.read(|value| *value), 2);
172    /// ```
173    #[inline]
174    pub fn lock(&self) -> MonitorGuard<'_, T> {
175        MonitorGuard::new(self, self.state.lock())
176    }
177
178    /// Acquires the monitor and reads the protected state.
179    ///
180    /// The closure runs while the mutex is held. Keep the closure short and do
181    /// not call code that may block for a long time.
182    ///
183    /// # Arguments
184    ///
185    /// * `f` - Closure that receives an immutable reference to the state.
186    ///
187    /// # Returns
188    ///
189    /// The value returned by the closure.
190    ///
191    /// # Example
192    ///
193    /// ```rust
194    /// use qubit_lock::lock::Monitor;
195    ///
196    /// let monitor = Monitor::new(10_i32);
197    /// let n = monitor.read(|x| *x);
198    /// assert_eq!(n, 10);
199    /// ```
200    #[inline]
201    pub fn read<R, F>(&self, f: F) -> R
202    where
203        F: FnOnce(&T) -> R,
204    {
205        let guard = self.lock();
206        f(&*guard)
207    }
208
209    /// Acquires the monitor and mutates the protected state.
210    ///
211    /// The closure runs while the mutex is held. This method only changes the
212    /// state; callers should explicitly call [`Self::notify_one`] or
213    /// [`Self::notify_all`] after changing a condition that waiters may be
214    /// observing.
215    ///
216    /// # Arguments
217    ///
218    /// * `f` - Closure that receives a mutable reference to the state.
219    ///
220    /// # Returns
221    ///
222    /// The value returned by the closure.
223    ///
224    /// # Example
225    ///
226    /// ```rust
227    /// use qubit_lock::lock::Monitor;
228    ///
229    /// let monitor = Monitor::new(String::new());
230    /// let len = monitor.write(|s| {
231    ///     s.push_str("hi");
232    ///     s.len()
233    /// });
234    /// assert_eq!(len, 2);
235    /// ```
236    #[inline]
237    pub fn write<R, F>(&self, f: F) -> R
238    where
239        F: FnOnce(&mut T) -> R,
240    {
241        let mut guard = self.lock();
242        f(&mut *guard)
243    }
244
245    /// Waits for a notification or timeout without checking state.
246    ///
247    /// This convenience method locks the monitor, waits once on the condition
248    /// variable, and returns why the timed wait completed. It is useful only
249    /// when the caller genuinely needs a notification wait without inspecting
250    /// state before or after the wait. Most coordination code should prefer
251    /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
252    /// [`MonitorGuard::wait_timeout`] loop.
253    ///
254    /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
255    /// but it does not prove that the protected state changed in a useful way.
256    ///
257    /// # Arguments
258    ///
259    /// * `timeout` - Maximum duration to wait for a notification.
260    ///
261    /// # Returns
262    ///
263    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
264    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
265    ///
266    /// # Example
267    ///
268    /// ```rust
269    /// use std::time::Duration;
270    ///
271    /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
272    ///
273    /// let monitor = Monitor::new(false);
274    /// let status = monitor.wait_notify(Duration::from_millis(1));
275    ///
276    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
277    /// ```
278    #[inline]
279    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
280        let guard = self.lock();
281        let (_guard, status) = guard.wait_timeout(timeout);
282        status
283    }
284
285    /// Waits while a predicate remains true, then mutates the protected state.
286    ///
287    /// This is the monitor equivalent of the common `while condition { wait }`
288    /// guarded-suspension pattern. The predicate is evaluated while holding the
289    /// mutex. If it returns `true`, the current thread waits on the condition
290    /// variable and atomically releases the mutex. After a notification, the
291    /// mutex is reacquired and the predicate is evaluated again. When the
292    /// predicate returns `false`, `f` runs while the mutex is still held.
293    ///
294    /// This method may block indefinitely if no thread changes the state so
295    /// that `waiting` becomes false and sends a notification.
296    ///
297    /// # Arguments
298    ///
299    /// * `waiting` - Predicate that returns `true` while the caller should
300    ///   keep waiting.
301    /// * `f` - Closure that receives mutable access after waiting is no longer
302    ///   required.
303    ///
304    /// # Returns
305    ///
306    /// The value returned by `f`.
307    ///
308    /// # Example
309    ///
310    /// ```rust
311    /// use std::{
312    ///     sync::Arc,
313    ///     thread,
314    /// };
315    ///
316    /// use qubit_lock::lock::Monitor;
317    ///
318    /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
319    /// let worker_monitor = Arc::clone(&monitor);
320    ///
321    /// let worker = thread::spawn(move || {
322    ///     worker_monitor.wait_while(
323    ///         |items| items.is_empty(),
324    ///         |items| items.pop().expect("item should be ready"),
325    ///     )
326    /// });
327    ///
328    /// monitor.write(|items| items.push(7));
329    /// monitor.notify_one();
330    ///
331    /// assert_eq!(worker.join().expect("worker should finish"), 7);
332    /// ```
333    #[inline]
334    pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
335    where
336        P: FnMut(&T) -> bool,
337        F: FnOnce(&mut T) -> R,
338    {
339        let mut guard = self.lock();
340        while waiting(&*guard) {
341            guard = guard.wait();
342        }
343        f(&mut *guard)
344    }
345
346    /// Waits until the protected state satisfies a predicate, then mutates it.
347    ///
348    /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
349    /// predicate is evaluated while holding the mutex. If it returns `false`,
350    /// the current thread waits on the condition variable and atomically
351    /// releases the mutex. After a notification, the mutex is reacquired and
352    /// the predicate is evaluated again. When the predicate returns `true`, `f`
353    /// runs while the mutex is still held.
354    ///
355    /// This method may block indefinitely if no thread changes the state to
356    /// satisfy the predicate and sends a notification.
357    ///
358    /// # Arguments
359    ///
360    /// * `ready` - Predicate that returns `true` when the state is ready.
361    /// * `f` - Closure that receives mutable access to the ready state.
362    ///
363    /// # Returns
364    ///
365    /// The value returned by `f` after the predicate has become true.
366    ///
367    /// # Example
368    ///
369    /// ```rust
370    /// use std::{
371    ///     sync::Arc,
372    ///     thread,
373    /// };
374    ///
375    /// use qubit_lock::lock::Monitor;
376    ///
377    /// let monitor = Arc::new(Monitor::new(false));
378    /// let waiter_monitor = Arc::clone(&monitor);
379    ///
380    /// let waiter = thread::spawn(move || {
381    ///     waiter_monitor.wait_until(
382    ///         |ready| *ready,
383    ///         |ready| {
384    ///             *ready = false;
385    ///             "done"
386    ///         },
387    ///     )
388    /// });
389    ///
390    /// monitor.write(|ready| *ready = true);
391    /// monitor.notify_one();
392    ///
393    /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
394    /// ```
395    #[inline]
396    pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
397    where
398        P: FnMut(&T) -> bool,
399        F: FnOnce(&mut T) -> R,
400    {
401        self.wait_while(|state| !ready(state), f)
402    }
403
404    /// Waits while a predicate remains true, with an overall time limit.
405    ///
406    /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
407    /// rechecking `waiting` under the monitor lock and waits only for the
408    /// remaining portion of `timeout`. If `waiting` becomes false before the
409    /// timeout expires, `f` runs while the lock is still held. If the timeout
410    /// expires first, the closure is not called.
411    ///
412    /// Timeout status alone is not used as proof that the predicate is still
413    /// true; the predicate is always rechecked under the lock.
414    ///
415    /// # Arguments
416    ///
417    /// * `timeout` - Maximum total duration to wait.
418    /// * `waiting` - Predicate that returns `true` while the caller should
419    ///   continue waiting.
420    /// * `f` - Closure that receives mutable access when waiting is no longer
421    ///   required.
422    ///
423    /// # Returns
424    ///
425    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
426    /// predicate stops blocking before the timeout. Returns
427    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
428    ///
429    /// # Example
430    ///
431    /// ```rust
432    /// use std::time::Duration;
433    ///
434    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
435    ///
436    /// let monitor = Monitor::new(Vec::<i32>::new());
437    /// let result = monitor.wait_timeout_while(
438    ///     Duration::from_millis(1),
439    ///     |items| items.is_empty(),
440    ///     |items| items.pop(),
441    /// );
442    ///
443    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
444    /// ```
445    #[inline]
446    pub fn wait_timeout_while<R, P, F>(
447        &self,
448        timeout: Duration,
449        mut waiting: P,
450        f: F,
451    ) -> WaitTimeoutResult<R>
452    where
453        P: FnMut(&T) -> bool,
454        F: FnOnce(&mut T) -> R,
455    {
456        let mut guard = self.lock();
457        let start = Instant::now();
458        loop {
459            if !waiting(&*guard) {
460                return WaitTimeoutResult::Ready(f(&mut *guard));
461            }
462
463            let elapsed = start.elapsed();
464            let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
465            if remaining.is_zero() {
466                return WaitTimeoutResult::TimedOut;
467            }
468
469            let (next_guard, _status) = guard.wait_timeout(remaining);
470            guard = next_guard;
471        }
472    }
473
474    /// Waits until a predicate becomes true, with an overall time limit.
475    ///
476    /// This is the positive-predicate counterpart of
477    /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
478    /// expires, `f` runs while the monitor lock is still held. If the timeout
479    /// expires first, the closure is not called.
480    ///
481    /// Timeout status alone is not used as proof that the predicate is still
482    /// false; the predicate is always rechecked under the lock.
483    ///
484    /// # Arguments
485    ///
486    /// * `timeout` - Maximum total duration to wait.
487    /// * `ready` - Predicate that returns `true` when the caller may continue.
488    /// * `f` - Closure that receives mutable access to the ready state.
489    ///
490    /// # Returns
491    ///
492    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
493    /// predicate becomes true before the timeout. Returns
494    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
495    ///
496    /// # Example
497    ///
498    /// ```rust
499    /// use std::{
500    ///     sync::Arc,
501    ///     thread,
502    ///     time::Duration,
503    /// };
504    ///
505    /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
506    ///
507    /// let monitor = Arc::new(Monitor::new(false));
508    /// let waiter_monitor = Arc::clone(&monitor);
509    ///
510    /// let waiter = thread::spawn(move || {
511    ///     waiter_monitor.wait_timeout_until(
512    ///         Duration::from_secs(1),
513    ///         |ready| *ready,
514    ///         |ready| {
515    ///             *ready = false;
516    ///             5
517    ///         },
518    ///     )
519    /// });
520    ///
521    /// monitor.write(|ready| *ready = true);
522    /// monitor.notify_one();
523    ///
524    /// assert_eq!(
525    ///     waiter.join().expect("waiter should finish"),
526    ///     WaitTimeoutResult::Ready(5),
527    /// );
528    /// ```
529    #[inline]
530    pub fn wait_timeout_until<R, P, F>(
531        &self,
532        timeout: Duration,
533        mut ready: P,
534        f: F,
535    ) -> WaitTimeoutResult<R>
536    where
537        P: FnMut(&T) -> bool,
538        F: FnOnce(&mut T) -> R,
539    {
540        self.wait_timeout_while(timeout, |state| !ready(state), f)
541    }
542
543    /// Wakes one thread waiting on this monitor's condition variable.
544    ///
545    /// Notifications do not carry state by themselves. A waiting thread only
546    /// proceeds safely after rechecking the protected state. Call this after
547    /// changing state that may make one waiter able to continue.
548    ///
549    /// # Example
550    ///
551    /// ```rust
552    /// use std::thread;
553    ///
554    /// use qubit_lock::lock::ArcMonitor;
555    ///
556    /// let monitor = ArcMonitor::new(0_u32);
557    /// let waiter = {
558    ///     let m = monitor.clone();
559    ///     thread::spawn(move || {
560    ///         m.wait_until(|n| *n > 0, |n| {
561    ///             *n -= 1;
562    ///         });
563    ///     })
564    /// };
565    ///
566    /// monitor.write(|n| *n = 1);
567    /// monitor.notify_one();
568    /// waiter.join().expect("waiter should finish");
569    /// ```
570    #[inline]
571    pub fn notify_one(&self) {
572        self.changed.notify_one();
573    }
574
575    /// Wakes all threads waiting on this monitor's condition variable.
576    ///
577    /// Notifications do not carry state by themselves. Every awakened thread
578    /// must recheck the protected state before continuing. Call this after a
579    /// state change that may allow multiple waiters to make progress.
580    ///
581    /// # Example
582    ///
583    /// ```rust
584    /// use std::thread;
585    ///
586    /// use qubit_lock::lock::ArcMonitor;
587    ///
588    /// let monitor = ArcMonitor::new(false);
589    /// let mut handles = Vec::new();
590    /// for _ in 0..2 {
591    ///     let m = monitor.clone();
592    ///     handles.push(thread::spawn(move || {
593    ///         m.wait_until(|ready| *ready, |_| ());
594    ///     }));
595    /// }
596    ///
597    /// monitor.write(|ready| *ready = true);
598    /// monitor.notify_all();
599    /// for h in handles {
600    ///     h.join().expect("waiter should finish");
601    /// }
602    /// ```
603    #[inline]
604    pub fn notify_all(&self) {
605        self.changed.notify_all();
606    }
607}
608
609impl<T: Default> Default for Monitor<T> {
610    /// Creates a monitor containing `T::default()`.
611    ///
612    /// # Returns
613    ///
614    /// A monitor protecting the default value for `T`.
615    ///
616    /// # Example
617    ///
618    /// ```rust
619    /// use qubit_lock::lock::Monitor;
620    ///
621    /// let monitor: Monitor<String> = Monitor::default();
622    /// assert!(monitor.read(|s| s.is_empty()));
623    /// ```
624    #[inline]
625    fn default() -> Self {
626        Self::new(T::default())
627    }
628}