qubit_lock/monitor/monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! # Monitor
10//!
11//! Provides a synchronous monitor built from a mutex and a condition variable.
12//! A monitor protects one shared state value and binds that state to the
13//! condition variable used to wait for changes. This is the same low-level
14//! mechanism as using [`std::sync::Mutex`] and [`std::sync::Condvar`] directly,
15//! but packaged so callers do not have to keep a mutex and its matching
16//! condition variable as separate fields.
17//!
18//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
19//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
20//! short critical sections and simple guarded-suspension flows. The lower-level
21//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
22//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
23//! state machines such as thread pools.
24//!
25//! # Author
26//!
27//! Haixing Hu
28
29use std::{
30 sync::{
31 Condvar,
32 Mutex,
33 },
34 time::{
35 Duration,
36 Instant,
37 },
38};
39
40use super::monitor_guard::MonitorGuard;
41use super::{
42 wait_timeout_result::WaitTimeoutResult,
43 wait_timeout_status::WaitTimeoutStatus,
44};
45
46/// Shared state protected by a mutex and a condition variable.
47///
48/// `Monitor` is useful when callers need more than a short critical section.
49/// It models the classic monitor object pattern: one mutex protects the state,
50/// and one condition variable lets threads wait until that state changes. This
51/// is the same relationship used by `std::sync::Mutex` and
52/// `std::sync::Condvar`, but represented as one object so the condition
53/// variable is not accidentally used with unrelated state.
54///
55/// `Monitor` deliberately has two levels of API:
56///
57/// * `read` and `write` acquire the mutex, run a closure, and release it.
58/// * `wait_while`, `wait_until`, and their timeout variants implement common
59/// predicate-based waits.
60/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
61/// loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
62///
63/// A poisoned mutex is recovered by taking the inner state. This makes
64/// `Monitor` suitable for coordination state that should remain observable
65/// after another thread panics while holding the lock.
66///
67/// # Difference from `Mutex` and `Condvar`
68///
69/// With the standard library primitives, callers usually store two fields and
70/// manually keep them paired:
71///
72/// ```rust
73/// # use std::sync::{Condvar, Mutex};
74/// # struct State;
75/// struct Shared {
76/// state: Mutex<State>,
77/// changed: Condvar,
78/// }
79/// ```
80///
81/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
82/// wrapper around the standard library's `MutexGuard`; it keeps the protected
83/// state locked and knows which monitor it belongs to, so its wait methods use
84/// the matching condition variable.
85///
86/// # Type Parameters
87///
88/// * `T` - The state protected by this monitor.
89///
90/// # Example
91///
92/// ```rust
93/// use std::thread;
94///
95/// use qubit_lock::lock::ArcMonitor;
96///
97/// let monitor = ArcMonitor::new(false);
98/// let waiter_monitor = monitor.clone();
99///
100/// let waiter = thread::spawn(move || {
101/// waiter_monitor.wait_until(
102/// |ready| *ready,
103/// |ready| {
104/// *ready = false;
105/// },
106/// );
107/// });
108///
109/// monitor.write(|ready| {
110/// *ready = true;
111/// });
112/// monitor.notify_all();
113///
114/// waiter.join().expect("waiter should finish");
115/// assert!(!monitor.read(|ready| *ready));
116/// ```
117///
118/// # Author
119///
120/// Haixing Hu
121pub struct Monitor<T> {
122 /// Mutex protecting the monitor state.
123 state: Mutex<T>,
124 /// Condition variable used to wake predicate waiters after state changes.
125 pub(super) changed: Condvar,
126}
127
128impl<T> Monitor<T> {
129 /// Creates a monitor protecting the supplied state value.
130 ///
131 /// # Arguments
132 ///
133 /// * `state` - Initial state protected by the monitor.
134 ///
135 /// # Returns
136 ///
137 /// A monitor initialized with the supplied state.
138 ///
139 /// # Example
140 ///
141 /// ```rust
142 /// use qubit_lock::lock::Monitor;
143 ///
144 /// let monitor = Monitor::new(0_u32);
145 /// assert_eq!(monitor.read(|n| *n), 0);
146 /// ```
147 #[inline]
148 pub fn new(state: T) -> Self {
149 Self {
150 state: Mutex::new(state),
151 changed: Condvar::new(),
152 }
153 }
154
155 /// Acquires the monitor and returns a guard for explicit state-machine code.
156 ///
157 /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
158 /// guard is dropped. It can also be passed through
159 /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
160 /// release the lock while waiting on this monitor's condition variable.
161 ///
162 /// If the mutex is poisoned, this method recovers the inner state and still
163 /// returns a guard.
164 ///
165 /// # Returns
166 ///
167 /// A guard that provides read and write access to the protected state.
168 ///
169 /// # Example
170 ///
171 /// ```rust
172 /// use qubit_lock::lock::Monitor;
173 ///
174 /// let monitor = Monitor::new(1);
175 /// {
176 /// let mut value = monitor.lock();
177 /// *value += 1;
178 /// }
179 ///
180 /// assert_eq!(monitor.read(|value| *value), 2);
181 /// ```
182 #[inline]
183 pub fn lock(&self) -> MonitorGuard<'_, T> {
184 MonitorGuard::new(
185 self,
186 self.state
187 .lock()
188 .unwrap_or_else(std::sync::PoisonError::into_inner),
189 )
190 }
191
192 /// Acquires the monitor and reads the protected state.
193 ///
194 /// The closure runs while the mutex is held. Keep the closure short and do
195 /// not call code that may block for a long time.
196 ///
197 /// If the mutex is poisoned, this method recovers the inner state and still
198 /// executes the closure.
199 ///
200 /// # Arguments
201 ///
202 /// * `f` - Closure that receives an immutable reference to the state.
203 ///
204 /// # Returns
205 ///
206 /// The value returned by the closure.
207 ///
208 /// # Example
209 ///
210 /// ```rust
211 /// use qubit_lock::lock::Monitor;
212 ///
213 /// let monitor = Monitor::new(10_i32);
214 /// let n = monitor.read(|x| *x);
215 /// assert_eq!(n, 10);
216 /// ```
217 #[inline]
218 pub fn read<R, F>(&self, f: F) -> R
219 where
220 F: FnOnce(&T) -> R,
221 {
222 let guard = self.lock();
223 f(&*guard)
224 }
225
226 /// Acquires the monitor and mutates the protected state.
227 ///
228 /// The closure runs while the mutex is held. This method only changes the
229 /// state; callers should explicitly call [`Self::notify_one`] or
230 /// [`Self::notify_all`] after changing a condition that waiters may be
231 /// observing.
232 ///
233 /// If the mutex is poisoned, this method recovers the inner state and still
234 /// executes the closure.
235 ///
236 /// # Arguments
237 ///
238 /// * `f` - Closure that receives a mutable reference to the state.
239 ///
240 /// # Returns
241 ///
242 /// The value returned by the closure.
243 ///
244 /// # Example
245 ///
246 /// ```rust
247 /// use qubit_lock::lock::Monitor;
248 ///
249 /// let monitor = Monitor::new(String::new());
250 /// let len = monitor.write(|s| {
251 /// s.push_str("hi");
252 /// s.len()
253 /// });
254 /// assert_eq!(len, 2);
255 /// ```
256 #[inline]
257 pub fn write<R, F>(&self, f: F) -> R
258 where
259 F: FnOnce(&mut T) -> R,
260 {
261 let mut guard = self.lock();
262 f(&mut *guard)
263 }
264
265 /// Waits for a notification or timeout without checking state.
266 ///
267 /// This convenience method locks the monitor, waits once on the condition
268 /// variable, and returns why the timed wait completed. It is useful only
269 /// when the caller genuinely needs a notification wait without inspecting
270 /// state before or after the wait. Most coordination code should prefer
271 /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
272 /// [`MonitorGuard::wait_timeout`] loop.
273 ///
274 /// Condition variables may wake spuriously, so
275 /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
276 /// state.
277 ///
278 /// If the mutex is poisoned, this method recovers the inner state and
279 /// continues waiting.
280 ///
281 /// # Arguments
282 ///
283 /// * `timeout` - Maximum duration to wait for a notification.
284 ///
285 /// # Returns
286 ///
287 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
288 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
289 ///
290 /// # Example
291 ///
292 /// ```rust
293 /// use std::time::Duration;
294 ///
295 /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
296 ///
297 /// let monitor = Monitor::new(false);
298 /// let status = monitor.wait_notify(Duration::from_millis(1));
299 ///
300 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
301 /// ```
302 #[inline]
303 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
304 let guard = self.lock();
305 let (_guard, status) = guard.wait_timeout(timeout);
306 status
307 }
308
309 /// Waits while a predicate remains true, then mutates the protected state.
310 ///
311 /// This is the monitor equivalent of the common `while condition { wait }`
312 /// guarded-suspension pattern. The predicate is evaluated while holding the
313 /// mutex. If it returns `true`, the current thread waits on the condition
314 /// variable and atomically releases the mutex. After a notification or
315 /// spurious wakeup, the mutex is reacquired and the predicate is evaluated
316 /// again. When the predicate returns `false`, `f` runs while the mutex is
317 /// still held.
318 ///
319 /// This method may block indefinitely if no thread changes the state so
320 /// that `waiting` becomes false and sends a notification.
321 ///
322 /// If the mutex is poisoned before or during the wait, this method recovers
323 /// the inner state and continues waiting or executes the closure.
324 ///
325 /// # Arguments
326 ///
327 /// * `waiting` - Predicate that returns `true` while the caller should
328 /// keep waiting.
329 /// * `f` - Closure that receives mutable access after waiting is no longer
330 /// required.
331 ///
332 /// # Returns
333 ///
334 /// The value returned by `f`.
335 ///
336 /// # Example
337 ///
338 /// ```rust
339 /// use std::{
340 /// sync::Arc,
341 /// thread,
342 /// };
343 ///
344 /// use qubit_lock::lock::Monitor;
345 ///
346 /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
347 /// let worker_monitor = Arc::clone(&monitor);
348 ///
349 /// let worker = thread::spawn(move || {
350 /// worker_monitor.wait_while(
351 /// |items| items.is_empty(),
352 /// |items| items.pop().expect("item should be ready"),
353 /// )
354 /// });
355 ///
356 /// monitor.write(|items| items.push(7));
357 /// monitor.notify_one();
358 ///
359 /// assert_eq!(worker.join().expect("worker should finish"), 7);
360 /// ```
361 #[inline]
362 pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
363 where
364 P: FnMut(&T) -> bool,
365 F: FnOnce(&mut T) -> R,
366 {
367 let mut guard = self.lock();
368 while waiting(&*guard) {
369 guard = guard.wait();
370 }
371 f(&mut *guard)
372 }
373
374 /// Waits until the protected state satisfies a predicate, then mutates it.
375 ///
376 /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
377 /// predicate is evaluated while holding the mutex. If it returns `false`,
378 /// the current thread waits on the condition variable and atomically
379 /// releases the mutex. After a notification or spurious wakeup, the mutex
380 /// is reacquired and the predicate is evaluated again. When the predicate
381 /// returns `true`, `f` runs while the mutex is still held.
382 ///
383 /// This method may block indefinitely if no thread changes the state to
384 /// satisfy the predicate and sends a notification.
385 ///
386 /// If the mutex is poisoned before or during the wait, this method recovers
387 /// the inner state and continues waiting or executes the closure.
388 ///
389 /// # Arguments
390 ///
391 /// * `ready` - Predicate that returns `true` when the state is ready.
392 /// * `f` - Closure that receives mutable access to the ready state.
393 ///
394 /// # Returns
395 ///
396 /// The value returned by `f` after the predicate has become true.
397 ///
398 /// # Example
399 ///
400 /// ```rust
401 /// use std::{
402 /// sync::Arc,
403 /// thread,
404 /// };
405 ///
406 /// use qubit_lock::lock::Monitor;
407 ///
408 /// let monitor = Arc::new(Monitor::new(false));
409 /// let waiter_monitor = Arc::clone(&monitor);
410 ///
411 /// let waiter = thread::spawn(move || {
412 /// waiter_monitor.wait_until(
413 /// |ready| *ready,
414 /// |ready| {
415 /// *ready = false;
416 /// "done"
417 /// },
418 /// )
419 /// });
420 ///
421 /// monitor.write(|ready| *ready = true);
422 /// monitor.notify_one();
423 ///
424 /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
425 /// ```
426 #[inline]
427 pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
428 where
429 P: FnMut(&T) -> bool,
430 F: FnOnce(&mut T) -> R,
431 {
432 self.wait_while(|state| !ready(state), f)
433 }
434
435 /// Waits while a predicate remains true, with an overall time limit.
436 ///
437 /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
438 /// rechecking `waiting` under the monitor lock and waits only for the
439 /// remaining portion of `timeout`. If `waiting` becomes false before the
440 /// timeout expires, `f` runs while the lock is still held. If the timeout
441 /// expires first, the closure is not called.
442 ///
443 /// Condition variables may wake spuriously, and timeout status alone is not
444 /// used as proof that the predicate is still true; the predicate is always
445 /// rechecked under the lock.
446 ///
447 /// If the mutex is poisoned before or during the wait, this method recovers
448 /// the inner state and continues waiting or executes the closure.
449 ///
450 /// # Arguments
451 ///
452 /// * `timeout` - Maximum total duration to wait.
453 /// * `waiting` - Predicate that returns `true` while the caller should
454 /// continue waiting.
455 /// * `f` - Closure that receives mutable access when waiting is no longer
456 /// required.
457 ///
458 /// # Returns
459 ///
460 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
461 /// predicate stops blocking before the timeout. Returns
462 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
463 ///
464 /// # Example
465 ///
466 /// ```rust
467 /// use std::time::Duration;
468 ///
469 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
470 ///
471 /// let monitor = Monitor::new(Vec::<i32>::new());
472 /// let result = monitor.wait_timeout_while(
473 /// Duration::from_millis(1),
474 /// |items| items.is_empty(),
475 /// |items| items.pop(),
476 /// );
477 ///
478 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
479 /// ```
480 #[inline]
481 pub fn wait_timeout_while<R, P, F>(
482 &self,
483 timeout: Duration,
484 mut waiting: P,
485 f: F,
486 ) -> WaitTimeoutResult<R>
487 where
488 P: FnMut(&T) -> bool,
489 F: FnOnce(&mut T) -> R,
490 {
491 let mut guard = self.lock();
492 let start = Instant::now();
493 loop {
494 if !waiting(&*guard) {
495 return WaitTimeoutResult::Ready(f(&mut *guard));
496 }
497
498 let elapsed = start.elapsed();
499 let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
500 if remaining.is_zero() {
501 return WaitTimeoutResult::TimedOut;
502 }
503
504 let (next_guard, _status) = guard.wait_timeout(remaining);
505 guard = next_guard;
506 }
507 }
508
509 /// Waits until a predicate becomes true, with an overall time limit.
510 ///
511 /// This is the positive-predicate counterpart of
512 /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
513 /// expires, `f` runs while the monitor lock is still held. If the timeout
514 /// expires first, the closure is not called.
515 ///
516 /// Condition variables may wake spuriously, and timeout status alone is not
517 /// used as proof that the predicate is still false; the predicate is always
518 /// rechecked under the lock.
519 ///
520 /// If the mutex is poisoned before or during the wait, this method recovers
521 /// the inner state and continues waiting or executes the closure.
522 ///
523 /// # Arguments
524 ///
525 /// * `timeout` - Maximum total duration to wait.
526 /// * `ready` - Predicate that returns `true` when the caller may continue.
527 /// * `f` - Closure that receives mutable access to the ready state.
528 ///
529 /// # Returns
530 ///
531 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
532 /// predicate becomes true before the timeout. Returns
533 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
534 ///
535 /// # Example
536 ///
537 /// ```rust
538 /// use std::{
539 /// sync::Arc,
540 /// thread,
541 /// time::Duration,
542 /// };
543 ///
544 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
545 ///
546 /// let monitor = Arc::new(Monitor::new(false));
547 /// let waiter_monitor = Arc::clone(&monitor);
548 ///
549 /// let waiter = thread::spawn(move || {
550 /// waiter_monitor.wait_timeout_until(
551 /// Duration::from_secs(1),
552 /// |ready| *ready,
553 /// |ready| {
554 /// *ready = false;
555 /// 5
556 /// },
557 /// )
558 /// });
559 ///
560 /// monitor.write(|ready| *ready = true);
561 /// monitor.notify_one();
562 ///
563 /// assert_eq!(
564 /// waiter.join().expect("waiter should finish"),
565 /// WaitTimeoutResult::Ready(5),
566 /// );
567 /// ```
568 #[inline]
569 pub fn wait_timeout_until<R, P, F>(
570 &self,
571 timeout: Duration,
572 mut ready: P,
573 f: F,
574 ) -> WaitTimeoutResult<R>
575 where
576 P: FnMut(&T) -> bool,
577 F: FnOnce(&mut T) -> R,
578 {
579 self.wait_timeout_while(timeout, |state| !ready(state), f)
580 }
581
582 /// Wakes one thread waiting on this monitor's condition variable.
583 ///
584 /// Notifications do not carry state by themselves. A waiting thread only
585 /// proceeds safely after rechecking the protected state. Call this after
586 /// changing state that may make one waiter able to continue.
587 ///
588 /// # Example
589 ///
590 /// ```rust
591 /// use std::thread;
592 ///
593 /// use qubit_lock::lock::ArcMonitor;
594 ///
595 /// let monitor = ArcMonitor::new(0_u32);
596 /// let waiter = {
597 /// let m = monitor.clone();
598 /// thread::spawn(move || {
599 /// m.wait_until(|n| *n > 0, |n| {
600 /// *n -= 1;
601 /// });
602 /// })
603 /// };
604 ///
605 /// monitor.write(|n| *n = 1);
606 /// monitor.notify_one();
607 /// waiter.join().expect("waiter should finish");
608 /// ```
609 #[inline]
610 pub fn notify_one(&self) {
611 self.changed.notify_one();
612 }
613
614 /// Wakes all threads waiting on this monitor's condition variable.
615 ///
616 /// Notifications do not carry state by themselves. Every awakened thread
617 /// must recheck the protected state before continuing. Call this after a
618 /// state change that may allow multiple waiters to make progress.
619 ///
620 /// # Example
621 ///
622 /// ```rust
623 /// use std::thread;
624 ///
625 /// use qubit_lock::lock::ArcMonitor;
626 ///
627 /// let monitor = ArcMonitor::new(false);
628 /// let mut handles = Vec::new();
629 /// for _ in 0..2 {
630 /// let m = monitor.clone();
631 /// handles.push(thread::spawn(move || {
632 /// m.wait_until(|ready| *ready, |_| ());
633 /// }));
634 /// }
635 ///
636 /// monitor.write(|ready| *ready = true);
637 /// monitor.notify_all();
638 /// for h in handles {
639 /// h.join().expect("waiter should finish");
640 /// }
641 /// ```
642 #[inline]
643 pub fn notify_all(&self) {
644 self.changed.notify_all();
645 }
646}
647
648impl<T: Default> Default for Monitor<T> {
649 /// Creates a monitor containing `T::default()`.
650 ///
651 /// # Returns
652 ///
653 /// A monitor protecting the default value for `T`.
654 ///
655 /// # Example
656 ///
657 /// ```rust
658 /// use qubit_lock::lock::Monitor;
659 ///
660 /// let monitor: Monitor<String> = Monitor::default();
661 /// assert!(monitor.read(|s| s.is_empty()));
662 /// ```
663 #[inline]
664 fn default() -> Self {
665 Self::new(T::default())
666 }
667}