qubit_lock/monitor/monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Monitor
11//!
12//! Provides a synchronous monitor built from a mutex and a condition variable.
13//! A monitor protects one shared state value and binds that state to the
14//! condition variable used to wait for changes. This is the same low-level
15//! mechanism as using [`std::sync::Mutex`] and [`std::sync::Condvar`] directly,
16//! but packaged so callers do not have to keep a mutex and its matching
17//! condition variable as separate fields.
18//!
19//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
20//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
21//! short critical sections and simple guarded-suspension flows. The lower-level
22//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
23//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
24//! state machines such as thread pools.
25//!
26
27use std::{
28 sync::{
29 Condvar,
30 Mutex,
31 },
32 time::{
33 Duration,
34 Instant,
35 },
36};
37
38use super::monitor_guard::MonitorGuard;
39use super::{
40 wait_timeout_result::WaitTimeoutResult,
41 wait_timeout_status::WaitTimeoutStatus,
42};
43
44/// Shared state protected by a mutex and a condition variable.
45///
46/// `Monitor` is useful when callers need more than a short critical section.
47/// It models the classic monitor object pattern: one mutex protects the state,
48/// and one condition variable lets threads wait until that state changes. This
49/// is the same relationship used by `std::sync::Mutex` and
50/// `std::sync::Condvar`, but represented as one object so the condition
51/// variable is not accidentally used with unrelated state.
52///
53/// `Monitor` deliberately has two levels of API:
54///
55/// * `read` and `write` acquire the mutex, run a closure, and release it.
56/// * `wait_while`, `wait_until`, and their timeout variants implement common
57/// predicate-based waits.
58/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
59/// loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
60///
61/// A poisoned mutex is recovered by taking the inner state. This makes
62/// `Monitor` suitable for coordination state that should remain observable
63/// after another thread panics while holding the lock.
64///
65/// # Difference from `Mutex` and `Condvar`
66///
67/// With the standard library primitives, callers usually store two fields and
68/// manually keep them paired:
69///
70/// ```rust
71/// # use std::sync::{Condvar, Mutex};
72/// # struct State;
73/// struct Shared {
74/// state: Mutex<State>,
75/// changed: Condvar,
76/// }
77/// ```
78///
79/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
80/// wrapper around the standard library's `MutexGuard`; it keeps the protected
81/// state locked and knows which monitor it belongs to, so its wait methods use
82/// the matching condition variable.
83///
84/// # Type Parameters
85///
86/// * `T` - The state protected by this monitor.
87///
88/// # Example
89///
90/// ```rust
91/// use std::thread;
92///
93/// use qubit_lock::lock::ArcMonitor;
94///
95/// let monitor = ArcMonitor::new(false);
96/// let waiter_monitor = monitor.clone();
97///
98/// let waiter = thread::spawn(move || {
99/// waiter_monitor.wait_until(
100/// |ready| *ready,
101/// |ready| {
102/// *ready = false;
103/// },
104/// );
105/// });
106///
107/// monitor.write(|ready| {
108/// *ready = true;
109/// });
110/// monitor.notify_all();
111///
112/// waiter.join().expect("waiter should finish");
113/// assert!(!monitor.read(|ready| *ready));
114/// ```
115///
116pub struct Monitor<T> {
117 /// Mutex protecting the monitor state.
118 state: Mutex<T>,
119 /// Condition variable used to wake predicate waiters after state changes.
120 pub(super) changed: Condvar,
121}
122
123impl<T> Monitor<T> {
124 /// Creates a monitor protecting the supplied state value.
125 ///
126 /// # Arguments
127 ///
128 /// * `state` - Initial state protected by the monitor.
129 ///
130 /// # Returns
131 ///
132 /// A monitor initialized with the supplied state.
133 ///
134 /// # Example
135 ///
136 /// ```rust
137 /// use qubit_lock::lock::Monitor;
138 ///
139 /// let monitor = Monitor::new(0_u32);
140 /// assert_eq!(monitor.read(|n| *n), 0);
141 /// ```
142 #[inline]
143 pub fn new(state: T) -> Self {
144 Self {
145 state: Mutex::new(state),
146 changed: Condvar::new(),
147 }
148 }
149
150 /// Acquires the monitor and returns a guard for explicit state-machine code.
151 ///
152 /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
153 /// guard is dropped. It can also be passed through
154 /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
155 /// release the lock while waiting on this monitor's condition variable.
156 ///
157 /// If the mutex is poisoned, this method recovers the inner state and still
158 /// returns a guard.
159 ///
160 /// # Returns
161 ///
162 /// A guard that provides read and write access to the protected state.
163 ///
164 /// # Example
165 ///
166 /// ```rust
167 /// use qubit_lock::lock::Monitor;
168 ///
169 /// let monitor = Monitor::new(1);
170 /// {
171 /// let mut value = monitor.lock();
172 /// *value += 1;
173 /// }
174 ///
175 /// assert_eq!(monitor.read(|value| *value), 2);
176 /// ```
177 #[inline]
178 pub fn lock(&self) -> MonitorGuard<'_, T> {
179 MonitorGuard::new(
180 self,
181 self.state
182 .lock()
183 .unwrap_or_else(std::sync::PoisonError::into_inner),
184 )
185 }
186
187 /// Acquires the monitor and reads the protected state.
188 ///
189 /// The closure runs while the mutex is held. Keep the closure short and do
190 /// not call code that may block for a long time.
191 ///
192 /// If the mutex is poisoned, this method recovers the inner state and still
193 /// executes the closure.
194 ///
195 /// # Arguments
196 ///
197 /// * `f` - Closure that receives an immutable reference to the state.
198 ///
199 /// # Returns
200 ///
201 /// The value returned by the closure.
202 ///
203 /// # Example
204 ///
205 /// ```rust
206 /// use qubit_lock::lock::Monitor;
207 ///
208 /// let monitor = Monitor::new(10_i32);
209 /// let n = monitor.read(|x| *x);
210 /// assert_eq!(n, 10);
211 /// ```
212 #[inline]
213 pub fn read<R, F>(&self, f: F) -> R
214 where
215 F: FnOnce(&T) -> R,
216 {
217 let guard = self.lock();
218 f(&*guard)
219 }
220
221 /// Acquires the monitor and mutates the protected state.
222 ///
223 /// The closure runs while the mutex is held. This method only changes the
224 /// state; callers should explicitly call [`Self::notify_one`] or
225 /// [`Self::notify_all`] after changing a condition that waiters may be
226 /// observing.
227 ///
228 /// If the mutex is poisoned, this method recovers the inner state and still
229 /// executes the closure.
230 ///
231 /// # Arguments
232 ///
233 /// * `f` - Closure that receives a mutable reference to the state.
234 ///
235 /// # Returns
236 ///
237 /// The value returned by the closure.
238 ///
239 /// # Example
240 ///
241 /// ```rust
242 /// use qubit_lock::lock::Monitor;
243 ///
244 /// let monitor = Monitor::new(String::new());
245 /// let len = monitor.write(|s| {
246 /// s.push_str("hi");
247 /// s.len()
248 /// });
249 /// assert_eq!(len, 2);
250 /// ```
251 #[inline]
252 pub fn write<R, F>(&self, f: F) -> R
253 where
254 F: FnOnce(&mut T) -> R,
255 {
256 let mut guard = self.lock();
257 f(&mut *guard)
258 }
259
260 /// Waits for a notification or timeout without checking state.
261 ///
262 /// This convenience method locks the monitor, waits once on the condition
263 /// variable, and returns why the timed wait completed. It is useful only
264 /// when the caller genuinely needs a notification wait without inspecting
265 /// state before or after the wait. Most coordination code should prefer
266 /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
267 /// [`MonitorGuard::wait_timeout`] loop.
268 ///
269 /// Condition variables may wake spuriously, so
270 /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
271 /// state.
272 ///
273 /// If the mutex is poisoned, this method recovers the inner state and
274 /// continues waiting.
275 ///
276 /// # Arguments
277 ///
278 /// * `timeout` - Maximum duration to wait for a notification.
279 ///
280 /// # Returns
281 ///
282 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
283 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
284 ///
285 /// # Example
286 ///
287 /// ```rust
288 /// use std::time::Duration;
289 ///
290 /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
291 ///
292 /// let monitor = Monitor::new(false);
293 /// let status = monitor.wait_notify(Duration::from_millis(1));
294 ///
295 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
296 /// ```
297 #[inline]
298 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
299 let guard = self.lock();
300 let (_guard, status) = guard.wait_timeout(timeout);
301 status
302 }
303
304 /// Waits while a predicate remains true, then mutates the protected state.
305 ///
306 /// This is the monitor equivalent of the common `while condition { wait }`
307 /// guarded-suspension pattern. The predicate is evaluated while holding the
308 /// mutex. If it returns `true`, the current thread waits on the condition
309 /// variable and atomically releases the mutex. After a notification or
310 /// spurious wakeup, the mutex is reacquired and the predicate is evaluated
311 /// again. When the predicate returns `false`, `f` runs while the mutex is
312 /// still held.
313 ///
314 /// This method may block indefinitely if no thread changes the state so
315 /// that `waiting` becomes false and sends a notification.
316 ///
317 /// If the mutex is poisoned before or during the wait, this method recovers
318 /// the inner state and continues waiting or executes the closure.
319 ///
320 /// # Arguments
321 ///
322 /// * `waiting` - Predicate that returns `true` while the caller should
323 /// keep waiting.
324 /// * `f` - Closure that receives mutable access after waiting is no longer
325 /// required.
326 ///
327 /// # Returns
328 ///
329 /// The value returned by `f`.
330 ///
331 /// # Example
332 ///
333 /// ```rust
334 /// use std::{
335 /// sync::Arc,
336 /// thread,
337 /// };
338 ///
339 /// use qubit_lock::lock::Monitor;
340 ///
341 /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
342 /// let worker_monitor = Arc::clone(&monitor);
343 ///
344 /// let worker = thread::spawn(move || {
345 /// worker_monitor.wait_while(
346 /// |items| items.is_empty(),
347 /// |items| items.pop().expect("item should be ready"),
348 /// )
349 /// });
350 ///
351 /// monitor.write(|items| items.push(7));
352 /// monitor.notify_one();
353 ///
354 /// assert_eq!(worker.join().expect("worker should finish"), 7);
355 /// ```
356 #[inline]
357 pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
358 where
359 P: FnMut(&T) -> bool,
360 F: FnOnce(&mut T) -> R,
361 {
362 let mut guard = self.lock();
363 while waiting(&*guard) {
364 guard = guard.wait();
365 }
366 f(&mut *guard)
367 }
368
369 /// Waits until the protected state satisfies a predicate, then mutates it.
370 ///
371 /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
372 /// predicate is evaluated while holding the mutex. If it returns `false`,
373 /// the current thread waits on the condition variable and atomically
374 /// releases the mutex. After a notification or spurious wakeup, the mutex
375 /// is reacquired and the predicate is evaluated again. When the predicate
376 /// returns `true`, `f` runs while the mutex is still held.
377 ///
378 /// This method may block indefinitely if no thread changes the state to
379 /// satisfy the predicate and sends a notification.
380 ///
381 /// If the mutex is poisoned before or during the wait, this method recovers
382 /// the inner state and continues waiting or executes the closure.
383 ///
384 /// # Arguments
385 ///
386 /// * `ready` - Predicate that returns `true` when the state is ready.
387 /// * `f` - Closure that receives mutable access to the ready state.
388 ///
389 /// # Returns
390 ///
391 /// The value returned by `f` after the predicate has become true.
392 ///
393 /// # Example
394 ///
395 /// ```rust
396 /// use std::{
397 /// sync::Arc,
398 /// thread,
399 /// };
400 ///
401 /// use qubit_lock::lock::Monitor;
402 ///
403 /// let monitor = Arc::new(Monitor::new(false));
404 /// let waiter_monitor = Arc::clone(&monitor);
405 ///
406 /// let waiter = thread::spawn(move || {
407 /// waiter_monitor.wait_until(
408 /// |ready| *ready,
409 /// |ready| {
410 /// *ready = false;
411 /// "done"
412 /// },
413 /// )
414 /// });
415 ///
416 /// monitor.write(|ready| *ready = true);
417 /// monitor.notify_one();
418 ///
419 /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
420 /// ```
421 #[inline]
422 pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
423 where
424 P: FnMut(&T) -> bool,
425 F: FnOnce(&mut T) -> R,
426 {
427 self.wait_while(|state| !ready(state), f)
428 }
429
430 /// Waits while a predicate remains true, with an overall time limit.
431 ///
432 /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
433 /// rechecking `waiting` under the monitor lock and waits only for the
434 /// remaining portion of `timeout`. If `waiting` becomes false before the
435 /// timeout expires, `f` runs while the lock is still held. If the timeout
436 /// expires first, the closure is not called.
437 ///
438 /// Condition variables may wake spuriously, and timeout status alone is not
439 /// used as proof that the predicate is still true; the predicate is always
440 /// rechecked under the lock.
441 ///
442 /// If the mutex is poisoned before or during the wait, this method recovers
443 /// the inner state and continues waiting or executes the closure.
444 ///
445 /// # Arguments
446 ///
447 /// * `timeout` - Maximum total duration to wait.
448 /// * `waiting` - Predicate that returns `true` while the caller should
449 /// continue waiting.
450 /// * `f` - Closure that receives mutable access when waiting is no longer
451 /// required.
452 ///
453 /// # Returns
454 ///
455 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
456 /// predicate stops blocking before the timeout. Returns
457 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
458 ///
459 /// # Example
460 ///
461 /// ```rust
462 /// use std::time::Duration;
463 ///
464 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
465 ///
466 /// let monitor = Monitor::new(Vec::<i32>::new());
467 /// let result = monitor.wait_timeout_while(
468 /// Duration::from_millis(1),
469 /// |items| items.is_empty(),
470 /// |items| items.pop(),
471 /// );
472 ///
473 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
474 /// ```
475 #[inline]
476 pub fn wait_timeout_while<R, P, F>(
477 &self,
478 timeout: Duration,
479 mut waiting: P,
480 f: F,
481 ) -> WaitTimeoutResult<R>
482 where
483 P: FnMut(&T) -> bool,
484 F: FnOnce(&mut T) -> R,
485 {
486 let mut guard = self.lock();
487 let start = Instant::now();
488 loop {
489 if !waiting(&*guard) {
490 return WaitTimeoutResult::Ready(f(&mut *guard));
491 }
492
493 let elapsed = start.elapsed();
494 let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
495 if remaining.is_zero() {
496 return WaitTimeoutResult::TimedOut;
497 }
498
499 let (next_guard, _status) = guard.wait_timeout(remaining);
500 guard = next_guard;
501 }
502 }
503
504 /// Waits until a predicate becomes true, with an overall time limit.
505 ///
506 /// This is the positive-predicate counterpart of
507 /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
508 /// expires, `f` runs while the monitor lock is still held. If the timeout
509 /// expires first, the closure is not called.
510 ///
511 /// Condition variables may wake spuriously, and timeout status alone is not
512 /// used as proof that the predicate is still false; the predicate is always
513 /// rechecked under the lock.
514 ///
515 /// If the mutex is poisoned before or during the wait, this method recovers
516 /// the inner state and continues waiting or executes the closure.
517 ///
518 /// # Arguments
519 ///
520 /// * `timeout` - Maximum total duration to wait.
521 /// * `ready` - Predicate that returns `true` when the caller may continue.
522 /// * `f` - Closure that receives mutable access to the ready state.
523 ///
524 /// # Returns
525 ///
526 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
527 /// predicate becomes true before the timeout. Returns
528 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
529 ///
530 /// # Example
531 ///
532 /// ```rust
533 /// use std::{
534 /// sync::Arc,
535 /// thread,
536 /// time::Duration,
537 /// };
538 ///
539 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
540 ///
541 /// let monitor = Arc::new(Monitor::new(false));
542 /// let waiter_monitor = Arc::clone(&monitor);
543 ///
544 /// let waiter = thread::spawn(move || {
545 /// waiter_monitor.wait_timeout_until(
546 /// Duration::from_secs(1),
547 /// |ready| *ready,
548 /// |ready| {
549 /// *ready = false;
550 /// 5
551 /// },
552 /// )
553 /// });
554 ///
555 /// monitor.write(|ready| *ready = true);
556 /// monitor.notify_one();
557 ///
558 /// assert_eq!(
559 /// waiter.join().expect("waiter should finish"),
560 /// WaitTimeoutResult::Ready(5),
561 /// );
562 /// ```
563 #[inline]
564 pub fn wait_timeout_until<R, P, F>(
565 &self,
566 timeout: Duration,
567 mut ready: P,
568 f: F,
569 ) -> WaitTimeoutResult<R>
570 where
571 P: FnMut(&T) -> bool,
572 F: FnOnce(&mut T) -> R,
573 {
574 self.wait_timeout_while(timeout, |state| !ready(state), f)
575 }
576
577 /// Wakes one thread waiting on this monitor's condition variable.
578 ///
579 /// Notifications do not carry state by themselves. A waiting thread only
580 /// proceeds safely after rechecking the protected state. Call this after
581 /// changing state that may make one waiter able to continue.
582 ///
583 /// # Example
584 ///
585 /// ```rust
586 /// use std::thread;
587 ///
588 /// use qubit_lock::lock::ArcMonitor;
589 ///
590 /// let monitor = ArcMonitor::new(0_u32);
591 /// let waiter = {
592 /// let m = monitor.clone();
593 /// thread::spawn(move || {
594 /// m.wait_until(|n| *n > 0, |n| {
595 /// *n -= 1;
596 /// });
597 /// })
598 /// };
599 ///
600 /// monitor.write(|n| *n = 1);
601 /// monitor.notify_one();
602 /// waiter.join().expect("waiter should finish");
603 /// ```
604 #[inline]
605 pub fn notify_one(&self) {
606 self.changed.notify_one();
607 }
608
609 /// Wakes all threads waiting on this monitor's condition variable.
610 ///
611 /// Notifications do not carry state by themselves. Every awakened thread
612 /// must recheck the protected state before continuing. Call this after a
613 /// state change that may allow multiple waiters to make progress.
614 ///
615 /// # Example
616 ///
617 /// ```rust
618 /// use std::thread;
619 ///
620 /// use qubit_lock::lock::ArcMonitor;
621 ///
622 /// let monitor = ArcMonitor::new(false);
623 /// let mut handles = Vec::new();
624 /// for _ in 0..2 {
625 /// let m = monitor.clone();
626 /// handles.push(thread::spawn(move || {
627 /// m.wait_until(|ready| *ready, |_| ());
628 /// }));
629 /// }
630 ///
631 /// monitor.write(|ready| *ready = true);
632 /// monitor.notify_all();
633 /// for h in handles {
634 /// h.join().expect("waiter should finish");
635 /// }
636 /// ```
637 #[inline]
638 pub fn notify_all(&self) {
639 self.changed.notify_all();
640 }
641}
642
643impl<T: Default> Default for Monitor<T> {
644 /// Creates a monitor containing `T::default()`.
645 ///
646 /// # Returns
647 ///
648 /// A monitor protecting the default value for `T`.
649 ///
650 /// # Example
651 ///
652 /// ```rust
653 /// use qubit_lock::lock::Monitor;
654 ///
655 /// let monitor: Monitor<String> = Monitor::default();
656 /// assert!(monitor.read(|s| s.is_empty()));
657 /// ```
658 #[inline]
659 fn default() -> Self {
660 Self::new(T::default())
661 }
662}