qubit_lock/monitor/monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! # Monitor
10//!
11//! Provides a synchronous monitor built from a mutex and a condition variable.
12//! A monitor protects one shared state value and binds that state to the
13//! condition variable used to wait for changes. This is the same low-level
14//! mechanism as using [`std::sync::Mutex`] and [`std::sync::Condvar`] directly,
15//! but packaged so callers do not have to keep a mutex and its matching
16//! condition variable as separate fields.
17//!
18//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
19//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
20//! short critical sections and simple guarded-suspension flows. The lower-level
21//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
22//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
23//! state machines such as thread pools.
24//!
25//! # Author
26//!
27//! Haixing Hu
28
29use std::{
30 sync::{
31 Condvar,
32 Mutex,
33 },
34 time::{
35 Duration,
36 Instant,
37 },
38};
39
40use super::monitor_guard::MonitorGuard;
41
42/// Result of a timed wait operation.
43///
44/// This status is returned by [`MonitorGuard::wait_timeout`] and
45/// [`Monitor::wait_notify`]. It describes why a timed wait
46/// returned, but callers must still re-check the protected state because
47/// condition variables may wake spuriously.
48///
49/// # Example
50///
51/// ```rust
52/// use std::time::Duration;
53///
54/// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
55///
56/// let monitor = Monitor::new(false);
57/// let guard = monitor.lock();
58/// let (_guard, status) = guard.wait_timeout(Duration::from_millis(1));
59/// assert_eq!(status, WaitTimeoutStatus::TimedOut);
60/// ```
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum WaitTimeoutStatus {
63 /// The wait returned before the timeout elapsed.
64 ///
65 /// This usually means another thread called [`Monitor::notify_one`] or
66 /// [`Monitor::notify_all`], but it may also be a spurious wakeup. Always
67 /// re-check the guarded state before acting on this status.
68 Woken,
69 /// The wait reached the timeout boundary.
70 ///
71 /// Even after this status, callers should inspect the protected state
72 /// because another thread may have changed it while the waiting thread was
73 /// reacquiring the mutex.
74 TimedOut,
75}
76
77/// Result of waiting for a predicate with an overall timeout.
78///
79/// This type is returned by [`Monitor::wait_timeout_while`] and
80/// [`Monitor::wait_timeout_until`]. It is more explicit than `Option<R>`: a
81/// ready predicate produces [`Self::Ready`], while an expired timeout produces
82/// [`Self::TimedOut`].
83///
84/// # Type Parameters
85///
86/// * `R` - The value produced after the protected state satisfies the
87/// predicate.
88///
89/// # Example
90///
91/// ```rust
92/// use std::time::Duration;
93///
94/// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
95///
96/// let monitor = Monitor::new(true);
97/// let result = monitor.wait_timeout_until(
98/// Duration::from_secs(1),
99/// |ready| *ready,
100/// |ready| {
101/// *ready = false;
102/// "ready"
103/// },
104/// );
105///
106/// assert_eq!(result, WaitTimeoutResult::Ready("ready"));
107/// ```
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum WaitTimeoutResult<R> {
110 /// The predicate became ready before the timeout and produced this value.
111 Ready(R),
112 /// The timeout elapsed before the predicate became ready.
113 TimedOut,
114}
115
116/// Shared state protected by a mutex and a condition variable.
117///
118/// `Monitor` is useful when callers need more than a short critical section.
119/// It models the classic monitor object pattern: one mutex protects the state,
120/// and one condition variable lets threads wait until that state changes. This
121/// is the same relationship used by `std::sync::Mutex` and
122/// `std::sync::Condvar`, but represented as one object so the condition
123/// variable is not accidentally used with unrelated state.
124///
125/// `Monitor` deliberately has two levels of API:
126///
127/// * `read` and `write` acquire the mutex, run a closure, and release it.
128/// * `wait_while`, `wait_until`, and their timeout variants implement common
129/// predicate-based waits.
130/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
131/// loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
132///
133/// A poisoned mutex is recovered by taking the inner state. This makes
134/// `Monitor` suitable for coordination state that should remain observable
135/// after another thread panics while holding the lock.
136///
137/// # Difference from `Mutex` and `Condvar`
138///
139/// With the standard library primitives, callers usually store two fields and
140/// manually keep them paired:
141///
142/// ```rust
143/// # use std::sync::{Condvar, Mutex};
144/// # struct State;
145/// struct Shared {
146/// state: Mutex<State>,
147/// changed: Condvar,
148/// }
149/// ```
150///
151/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
152/// wrapper around the standard library's `MutexGuard`; it keeps the protected
153/// state locked and knows which monitor it belongs to, so its wait methods use
154/// the matching condition variable.
155///
156/// # Type Parameters
157///
158/// * `T` - The state protected by this monitor.
159///
160/// # Example
161///
162/// ```rust
163/// use std::thread;
164///
165/// use qubit_lock::lock::ArcMonitor;
166///
167/// let monitor = ArcMonitor::new(false);
168/// let waiter_monitor = monitor.clone();
169///
170/// let waiter = thread::spawn(move || {
171/// waiter_monitor.wait_until(
172/// |ready| *ready,
173/// |ready| {
174/// *ready = false;
175/// },
176/// );
177/// });
178///
179/// monitor.write(|ready| {
180/// *ready = true;
181/// });
182/// monitor.notify_all();
183///
184/// waiter.join().expect("waiter should finish");
185/// assert!(!monitor.read(|ready| *ready));
186/// ```
187///
188/// # Author
189///
190/// Haixing Hu
191pub struct Monitor<T> {
192 /// Mutex protecting the monitor state.
193 state: Mutex<T>,
194 /// Condition variable used to wake predicate waiters after state changes.
195 pub(super) changed: Condvar,
196}
197
198impl<T> Monitor<T> {
199 /// Creates a monitor protecting the supplied state value.
200 ///
201 /// # Arguments
202 ///
203 /// * `state` - Initial state protected by the monitor.
204 ///
205 /// # Returns
206 ///
207 /// A monitor initialized with the supplied state.
208 ///
209 /// # Example
210 ///
211 /// ```rust
212 /// use qubit_lock::lock::Monitor;
213 ///
214 /// let monitor = Monitor::new(0_u32);
215 /// assert_eq!(monitor.read(|n| *n), 0);
216 /// ```
217 #[inline]
218 pub fn new(state: T) -> Self {
219 Self {
220 state: Mutex::new(state),
221 changed: Condvar::new(),
222 }
223 }
224
225 /// Acquires the monitor and returns a guard for explicit state-machine code.
226 ///
227 /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
228 /// guard is dropped. It can also be passed through
229 /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
230 /// release the lock while waiting on this monitor's condition variable.
231 ///
232 /// If the mutex is poisoned, this method recovers the inner state and still
233 /// returns a guard.
234 ///
235 /// # Returns
236 ///
237 /// A guard that provides read and write access to the protected state.
238 ///
239 /// # Example
240 ///
241 /// ```rust
242 /// use qubit_lock::lock::Monitor;
243 ///
244 /// let monitor = Monitor::new(1);
245 /// {
246 /// let mut value = monitor.lock();
247 /// *value += 1;
248 /// }
249 ///
250 /// assert_eq!(monitor.read(|value| *value), 2);
251 /// ```
252 #[inline]
253 pub fn lock(&self) -> MonitorGuard<'_, T> {
254 MonitorGuard::new(
255 self,
256 self.state
257 .lock()
258 .unwrap_or_else(std::sync::PoisonError::into_inner),
259 )
260 }
261
262 /// Acquires the monitor and reads the protected state.
263 ///
264 /// The closure runs while the mutex is held. Keep the closure short and do
265 /// not call code that may block for a long time.
266 ///
267 /// If the mutex is poisoned, this method recovers the inner state and still
268 /// executes the closure.
269 ///
270 /// # Arguments
271 ///
272 /// * `f` - Closure that receives an immutable reference to the state.
273 ///
274 /// # Returns
275 ///
276 /// The value returned by the closure.
277 ///
278 /// # Example
279 ///
280 /// ```rust
281 /// use qubit_lock::lock::Monitor;
282 ///
283 /// let monitor = Monitor::new(10_i32);
284 /// let n = monitor.read(|x| *x);
285 /// assert_eq!(n, 10);
286 /// ```
287 #[inline]
288 pub fn read<R, F>(&self, f: F) -> R
289 where
290 F: FnOnce(&T) -> R,
291 {
292 let guard = self.lock();
293 f(&*guard)
294 }
295
296 /// Acquires the monitor and mutates the protected state.
297 ///
298 /// The closure runs while the mutex is held. This method only changes the
299 /// state; callers should explicitly call [`Self::notify_one`] or
300 /// [`Self::notify_all`] after changing a condition that waiters may be
301 /// observing.
302 ///
303 /// If the mutex is poisoned, this method recovers the inner state and still
304 /// executes the closure.
305 ///
306 /// # Arguments
307 ///
308 /// * `f` - Closure that receives a mutable reference to the state.
309 ///
310 /// # Returns
311 ///
312 /// The value returned by the closure.
313 ///
314 /// # Example
315 ///
316 /// ```rust
317 /// use qubit_lock::lock::Monitor;
318 ///
319 /// let monitor = Monitor::new(String::new());
320 /// let len = monitor.write(|s| {
321 /// s.push_str("hi");
322 /// s.len()
323 /// });
324 /// assert_eq!(len, 2);
325 /// ```
326 #[inline]
327 pub fn write<R, F>(&self, f: F) -> R
328 where
329 F: FnOnce(&mut T) -> R,
330 {
331 let mut guard = self.lock();
332 f(&mut *guard)
333 }
334
335 /// Waits for a notification or timeout without checking state.
336 ///
337 /// This convenience method locks the monitor, waits once on the condition
338 /// variable, and returns why the timed wait completed. It is useful only
339 /// when the caller genuinely needs a notification wait without inspecting
340 /// state before or after the wait. Most coordination code should prefer
341 /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
342 /// [`MonitorGuard::wait_timeout`] loop.
343 ///
344 /// Condition variables may wake spuriously, so
345 /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
346 /// state.
347 ///
348 /// If the mutex is poisoned, this method recovers the inner state and
349 /// continues waiting.
350 ///
351 /// # Arguments
352 ///
353 /// * `timeout` - Maximum duration to wait for a notification.
354 ///
355 /// # Returns
356 ///
357 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
358 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
359 ///
360 /// # Example
361 ///
362 /// ```rust
363 /// use std::time::Duration;
364 ///
365 /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
366 ///
367 /// let monitor = Monitor::new(false);
368 /// let status = monitor.wait_notify(Duration::from_millis(1));
369 ///
370 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
371 /// ```
372 #[inline]
373 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
374 let guard = self.lock();
375 let (_guard, status) = guard.wait_timeout(timeout);
376 status
377 }
378
379 /// Waits while a predicate remains true, then mutates the protected state.
380 ///
381 /// This is the monitor equivalent of the common `while condition { wait }`
382 /// guarded-suspension pattern. The predicate is evaluated while holding the
383 /// mutex. If it returns `true`, the current thread waits on the condition
384 /// variable and atomically releases the mutex. After a notification or
385 /// spurious wakeup, the mutex is reacquired and the predicate is evaluated
386 /// again. When the predicate returns `false`, `f` runs while the mutex is
387 /// still held.
388 ///
389 /// This method may block indefinitely if no thread changes the state so
390 /// that `waiting` becomes false and sends a notification.
391 ///
392 /// If the mutex is poisoned before or during the wait, this method recovers
393 /// the inner state and continues waiting or executes the closure.
394 ///
395 /// # Arguments
396 ///
397 /// * `waiting` - Predicate that returns `true` while the caller should
398 /// keep waiting.
399 /// * `f` - Closure that receives mutable access after waiting is no longer
400 /// required.
401 ///
402 /// # Returns
403 ///
404 /// The value returned by `f`.
405 ///
406 /// # Example
407 ///
408 /// ```rust
409 /// use std::{
410 /// sync::Arc,
411 /// thread,
412 /// };
413 ///
414 /// use qubit_lock::lock::Monitor;
415 ///
416 /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
417 /// let worker_monitor = Arc::clone(&monitor);
418 ///
419 /// let worker = thread::spawn(move || {
420 /// worker_monitor.wait_while(
421 /// |items| items.is_empty(),
422 /// |items| items.pop().expect("item should be ready"),
423 /// )
424 /// });
425 ///
426 /// monitor.write(|items| items.push(7));
427 /// monitor.notify_one();
428 ///
429 /// assert_eq!(worker.join().expect("worker should finish"), 7);
430 /// ```
431 #[inline]
432 pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
433 where
434 P: FnMut(&T) -> bool,
435 F: FnOnce(&mut T) -> R,
436 {
437 let mut guard = self.lock();
438 while waiting(&*guard) {
439 guard = guard.wait();
440 }
441 f(&mut *guard)
442 }
443
444 /// Waits until the protected state satisfies a predicate, then mutates it.
445 ///
446 /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
447 /// predicate is evaluated while holding the mutex. If it returns `false`,
448 /// the current thread waits on the condition variable and atomically
449 /// releases the mutex. After a notification or spurious wakeup, the mutex
450 /// is reacquired and the predicate is evaluated again. When the predicate
451 /// returns `true`, `f` runs while the mutex is still held.
452 ///
453 /// This method may block indefinitely if no thread changes the state to
454 /// satisfy the predicate and sends a notification.
455 ///
456 /// If the mutex is poisoned before or during the wait, this method recovers
457 /// the inner state and continues waiting or executes the closure.
458 ///
459 /// # Arguments
460 ///
461 /// * `ready` - Predicate that returns `true` when the state is ready.
462 /// * `f` - Closure that receives mutable access to the ready state.
463 ///
464 /// # Returns
465 ///
466 /// The value returned by `f` after the predicate has become true.
467 ///
468 /// # Example
469 ///
470 /// ```rust
471 /// use std::{
472 /// sync::Arc,
473 /// thread,
474 /// };
475 ///
476 /// use qubit_lock::lock::Monitor;
477 ///
478 /// let monitor = Arc::new(Monitor::new(false));
479 /// let waiter_monitor = Arc::clone(&monitor);
480 ///
481 /// let waiter = thread::spawn(move || {
482 /// waiter_monitor.wait_until(
483 /// |ready| *ready,
484 /// |ready| {
485 /// *ready = false;
486 /// "done"
487 /// },
488 /// )
489 /// });
490 ///
491 /// monitor.write(|ready| *ready = true);
492 /// monitor.notify_one();
493 ///
494 /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
495 /// ```
496 #[inline]
497 pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
498 where
499 P: FnMut(&T) -> bool,
500 F: FnOnce(&mut T) -> R,
501 {
502 self.wait_while(|state| !ready(state), f)
503 }
504
505 /// Waits while a predicate remains true, with an overall time limit.
506 ///
507 /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
508 /// rechecking `waiting` under the monitor lock and waits only for the
509 /// remaining portion of `timeout`. If `waiting` becomes false before the
510 /// timeout expires, `f` runs while the lock is still held. If the timeout
511 /// expires first, the closure is not called.
512 ///
513 /// Condition variables may wake spuriously, and timeout status alone is not
514 /// used as proof that the predicate is still true; the predicate is always
515 /// rechecked under the lock.
516 ///
517 /// If the mutex is poisoned before or during the wait, this method recovers
518 /// the inner state and continues waiting or executes the closure.
519 ///
520 /// # Arguments
521 ///
522 /// * `timeout` - Maximum total duration to wait.
523 /// * `waiting` - Predicate that returns `true` while the caller should
524 /// continue waiting.
525 /// * `f` - Closure that receives mutable access when waiting is no longer
526 /// required.
527 ///
528 /// # Returns
529 ///
530 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
531 /// predicate stops blocking before the timeout. Returns
532 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
533 ///
534 /// # Example
535 ///
536 /// ```rust
537 /// use std::time::Duration;
538 ///
539 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
540 ///
541 /// let monitor = Monitor::new(Vec::<i32>::new());
542 /// let result = monitor.wait_timeout_while(
543 /// Duration::from_millis(1),
544 /// |items| items.is_empty(),
545 /// |items| items.pop(),
546 /// );
547 ///
548 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
549 /// ```
550 #[inline]
551 pub fn wait_timeout_while<R, P, F>(
552 &self,
553 timeout: Duration,
554 mut waiting: P,
555 f: F,
556 ) -> WaitTimeoutResult<R>
557 where
558 P: FnMut(&T) -> bool,
559 F: FnOnce(&mut T) -> R,
560 {
561 let mut guard = self.lock();
562 let start = Instant::now();
563 loop {
564 if !waiting(&*guard) {
565 return WaitTimeoutResult::Ready(f(&mut *guard));
566 }
567
568 let elapsed = start.elapsed();
569 let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
570 if remaining.is_zero() {
571 return WaitTimeoutResult::TimedOut;
572 }
573
574 let (next_guard, _status) = guard.wait_timeout(remaining);
575 guard = next_guard;
576 }
577 }
578
579 /// Waits until a predicate becomes true, with an overall time limit.
580 ///
581 /// This is the positive-predicate counterpart of
582 /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
583 /// expires, `f` runs while the monitor lock is still held. If the timeout
584 /// expires first, the closure is not called.
585 ///
586 /// Condition variables may wake spuriously, and timeout status alone is not
587 /// used as proof that the predicate is still false; the predicate is always
588 /// rechecked under the lock.
589 ///
590 /// If the mutex is poisoned before or during the wait, this method recovers
591 /// the inner state and continues waiting or executes the closure.
592 ///
593 /// # Arguments
594 ///
595 /// * `timeout` - Maximum total duration to wait.
596 /// * `ready` - Predicate that returns `true` when the caller may continue.
597 /// * `f` - Closure that receives mutable access to the ready state.
598 ///
599 /// # Returns
600 ///
601 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
602 /// predicate becomes true before the timeout. Returns
603 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
604 ///
605 /// # Example
606 ///
607 /// ```rust
608 /// use std::{
609 /// sync::Arc,
610 /// thread,
611 /// time::Duration,
612 /// };
613 ///
614 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
615 ///
616 /// let monitor = Arc::new(Monitor::new(false));
617 /// let waiter_monitor = Arc::clone(&monitor);
618 ///
619 /// let waiter = thread::spawn(move || {
620 /// waiter_monitor.wait_timeout_until(
621 /// Duration::from_secs(1),
622 /// |ready| *ready,
623 /// |ready| {
624 /// *ready = false;
625 /// 5
626 /// },
627 /// )
628 /// });
629 ///
630 /// monitor.write(|ready| *ready = true);
631 /// monitor.notify_one();
632 ///
633 /// assert_eq!(
634 /// waiter.join().expect("waiter should finish"),
635 /// WaitTimeoutResult::Ready(5),
636 /// );
637 /// ```
638 #[inline]
639 pub fn wait_timeout_until<R, P, F>(
640 &self,
641 timeout: Duration,
642 mut ready: P,
643 f: F,
644 ) -> WaitTimeoutResult<R>
645 where
646 P: FnMut(&T) -> bool,
647 F: FnOnce(&mut T) -> R,
648 {
649 self.wait_timeout_while(timeout, |state| !ready(state), f)
650 }
651
652 /// Wakes one thread waiting on this monitor's condition variable.
653 ///
654 /// Notifications do not carry state by themselves. A waiting thread only
655 /// proceeds safely after rechecking the protected state. Call this after
656 /// changing state that may make one waiter able to continue.
657 ///
658 /// # Example
659 ///
660 /// ```rust
661 /// use std::thread;
662 ///
663 /// use qubit_lock::lock::ArcMonitor;
664 ///
665 /// let monitor = ArcMonitor::new(0_u32);
666 /// let waiter = {
667 /// let m = monitor.clone();
668 /// thread::spawn(move || {
669 /// m.wait_until(|n| *n > 0, |n| {
670 /// *n -= 1;
671 /// });
672 /// })
673 /// };
674 ///
675 /// monitor.write(|n| *n = 1);
676 /// monitor.notify_one();
677 /// waiter.join().expect("waiter should finish");
678 /// ```
679 #[inline]
680 pub fn notify_one(&self) {
681 self.changed.notify_one();
682 }
683
684 /// Wakes all threads waiting on this monitor's condition variable.
685 ///
686 /// Notifications do not carry state by themselves. Every awakened thread
687 /// must recheck the protected state before continuing. Call this after a
688 /// state change that may allow multiple waiters to make progress.
689 ///
690 /// # Example
691 ///
692 /// ```rust
693 /// use std::thread;
694 ///
695 /// use qubit_lock::lock::ArcMonitor;
696 ///
697 /// let monitor = ArcMonitor::new(false);
698 /// let mut handles = Vec::new();
699 /// for _ in 0..2 {
700 /// let m = monitor.clone();
701 /// handles.push(thread::spawn(move || {
702 /// m.wait_until(|ready| *ready, |_| ());
703 /// }));
704 /// }
705 ///
706 /// monitor.write(|ready| *ready = true);
707 /// monitor.notify_all();
708 /// for h in handles {
709 /// h.join().expect("waiter should finish");
710 /// }
711 /// ```
712 #[inline]
713 pub fn notify_all(&self) {
714 self.changed.notify_all();
715 }
716}
717
718impl<T: Default> Default for Monitor<T> {
719 /// Creates a monitor containing `T::default()`.
720 ///
721 /// # Returns
722 ///
723 /// A monitor protecting the default value for `T`.
724 ///
725 /// # Example
726 ///
727 /// ```rust
728 /// use qubit_lock::lock::Monitor;
729 ///
730 /// let monitor: Monitor<String> = Monitor::default();
731 /// assert!(monitor.read(|s| s.is_empty()));
732 /// ```
733 #[inline]
734 fn default() -> Self {
735 Self::new(T::default())
736 }
737}