qubit_lock/monitor/monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Monitor
11//!
12//! Provides a synchronous monitor built from a mutex and a condition variable.
13//! A monitor protects one shared state value and binds that state to the
14//! condition variable used to wait for changes. This is the same low-level
15//! mechanism as using [`parking_lot::Mutex`] and [`parking_lot::Condvar`] directly,
16//! but packaged so callers do not have to keep a mutex and its matching
17//! condition variable as separate fields.
18//!
19//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
20//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
21//! short critical sections and simple guarded-suspension flows. The lower-level
22//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
23//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
24//! state machines such as thread pools.
25//!
26
27use std::time::{
28 Duration,
29 Instant,
30};
31
32use parking_lot::{
33 Condvar,
34 Mutex,
35};
36
37use super::monitor_guard::MonitorGuard;
38use super::{
39 wait_timeout_result::WaitTimeoutResult,
40 wait_timeout_status::WaitTimeoutStatus,
41};
42
43/// Shared state protected by a mutex and a condition variable.
44///
45/// `Monitor` is useful when callers need more than a short critical section.
46/// It models the classic monitor object pattern: one mutex protects the state,
47/// and one condition variable lets threads wait until that state changes. This
48/// is the same relationship used by `parking_lot::Mutex` and
49/// `parking_lot::Condvar`, but represented as one object so the condition
50/// variable is not accidentally used with unrelated state.
51///
52/// `Monitor` deliberately has two levels of API:
53///
54/// * `read` and `write` acquire the mutex, run a closure, and release it.
55/// * `wait_while`, `wait_until`, and their timeout variants implement common
56/// predicate-based waits.
57/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
58/// loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
59///
60/// The underlying `parking_lot` mutex is not poisoned when a thread panics
61/// while holding the lock. This keeps monitor coordination state observable
62/// after panic unwinding.
63///
64/// # Difference from raw `Mutex` and `Condvar`
65///
66/// With raw parking_lot primitives, callers usually store two fields and
67/// manually keep them paired:
68///
69/// ```rust
70/// # use parking_lot::{Condvar, Mutex};
71/// # struct State;
72/// struct Shared {
73/// state: Mutex<State>,
74/// changed: Condvar,
75/// }
76/// ```
77///
78/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
79/// wrapper around the parking_lot `MutexGuard`; it keeps the protected
80/// state locked and knows which monitor it belongs to, so its wait methods use
81/// the matching condition variable.
82///
83/// # Type Parameters
84///
85/// * `T` - The state protected by this monitor.
86///
87/// # Example
88///
89/// ```rust
90/// use std::thread;
91///
92/// use qubit_lock::ArcMonitor;
93///
94/// let monitor = ArcMonitor::new(false);
95/// let waiter_monitor = monitor.clone();
96///
97/// let waiter = thread::spawn(move || {
98/// waiter_monitor.wait_until(
99/// |ready| *ready,
100/// |ready| {
101/// *ready = false;
102/// },
103/// );
104/// });
105///
106/// monitor.write(|ready| {
107/// *ready = true;
108/// });
109/// monitor.notify_all();
110///
111/// waiter.join().expect("waiter should finish");
112/// assert!(!monitor.read(|ready| *ready));
113/// ```
114///
115pub struct Monitor<T> {
116 /// Mutex protecting the monitor state.
117 state: Mutex<T>,
118 /// Condition variable used to wake predicate waiters after state changes.
119 pub(super) changed: Condvar,
120}
121
122impl<T> Monitor<T> {
123 /// Creates a monitor protecting the supplied state value.
124 ///
125 /// # Arguments
126 ///
127 /// * `state` - Initial state protected by the monitor.
128 ///
129 /// # Returns
130 ///
131 /// A monitor initialized with the supplied state.
132 ///
133 /// # Example
134 ///
135 /// ```rust
136 /// use qubit_lock::Monitor;
137 ///
138 /// let monitor = Monitor::new(0_u32);
139 /// assert_eq!(monitor.read(|n| *n), 0);
140 /// ```
141 #[inline]
142 pub fn new(state: T) -> Self {
143 Self {
144 state: Mutex::new(state),
145 changed: Condvar::new(),
146 }
147 }
148
149 /// Acquires the monitor and returns a guard for explicit state-machine code.
150 ///
151 /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
152 /// guard is dropped. It can also be passed through
153 /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
154 /// release the lock while waiting on this monitor's condition variable.
155 ///
156 /// # Returns
157 ///
158 /// A guard that provides read and write access to the protected state.
159 ///
160 /// # Example
161 ///
162 /// ```rust
163 /// use qubit_lock::Monitor;
164 ///
165 /// let monitor = Monitor::new(1);
166 /// {
167 /// let mut value = monitor.lock();
168 /// *value += 1;
169 /// }
170 ///
171 /// assert_eq!(monitor.read(|value| *value), 2);
172 /// ```
173 #[inline]
174 pub fn lock(&self) -> MonitorGuard<'_, T> {
175 MonitorGuard::new(self, self.state.lock())
176 }
177
178 /// Acquires the monitor and reads the protected state.
179 ///
180 /// The closure runs while the mutex is held. Keep the closure short and do
181 /// not call code that may block for a long time.
182 ///
183 /// # Arguments
184 ///
185 /// * `f` - Closure that receives an immutable reference to the state.
186 ///
187 /// # Returns
188 ///
189 /// The value returned by the closure.
190 ///
191 /// # Example
192 ///
193 /// ```rust
194 /// use qubit_lock::Monitor;
195 ///
196 /// let monitor = Monitor::new(10_i32);
197 /// let n = monitor.read(|x| *x);
198 /// assert_eq!(n, 10);
199 /// ```
200 #[inline]
201 pub fn read<R, F>(&self, f: F) -> R
202 where
203 F: FnOnce(&T) -> R,
204 {
205 let guard = self.lock();
206 f(&*guard)
207 }
208
209 /// Acquires the monitor and mutates the protected state.
210 ///
211 /// The closure runs while the mutex is held. This method only changes the
212 /// state; callers should explicitly call [`Self::notify_one`] or
213 /// [`Self::notify_all`] after changing a condition that waiters may be
214 /// observing.
215 ///
216 /// # Arguments
217 ///
218 /// * `f` - Closure that receives a mutable reference to the state.
219 ///
220 /// # Returns
221 ///
222 /// The value returned by the closure.
223 ///
224 /// # Example
225 ///
226 /// ```rust
227 /// use qubit_lock::Monitor;
228 ///
229 /// let monitor = Monitor::new(String::new());
230 /// let len = monitor.write(|s| {
231 /// s.push_str("hi");
232 /// s.len()
233 /// });
234 /// assert_eq!(len, 2);
235 /// ```
236 #[inline]
237 pub fn write<R, F>(&self, f: F) -> R
238 where
239 F: FnOnce(&mut T) -> R,
240 {
241 let mut guard = self.lock();
242 f(&mut *guard)
243 }
244
245 /// Acquires the monitor, mutates the protected state, and wakes one waiter.
246 ///
247 /// The closure runs while the mutex is held. After the closure returns, the
248 /// mutex guard is dropped and one thread waiting on this monitor's condition
249 /// variable is notified. This is a convenience method for the common
250 /// "update state, then notify one waiter" pattern.
251 ///
252 /// If `f` panics, the panic is propagated and no notification is sent.
253 ///
254 /// # Arguments
255 ///
256 /// * `f` - Closure that receives a mutable reference to the state.
257 ///
258 /// # Returns
259 ///
260 /// The value returned by the closure.
261 ///
262 /// # Example
263 ///
264 /// ```rust
265 /// use qubit_lock::Monitor;
266 ///
267 /// let monitor = Monitor::new(Vec::<i32>::new());
268 /// let len = monitor.write_notify_one(|items| {
269 /// items.push(7);
270 /// items.len()
271 /// });
272 ///
273 /// assert_eq!(len, 1);
274 /// ```
275 #[inline]
276 pub fn write_notify_one<R, F>(&self, f: F) -> R
277 where
278 F: FnOnce(&mut T) -> R,
279 {
280 let result = self.write(f);
281 self.notify_one();
282 result
283 }
284
285 /// Acquires the monitor, mutates the protected state, and wakes all waiters.
286 ///
287 /// The closure runs while the mutex is held. After the closure returns, the
288 /// mutex guard is dropped and all threads waiting on this monitor's condition
289 /// variable are notified. This is a convenience method for state changes that
290 /// may allow multiple waiters to make progress.
291 ///
292 /// If `f` panics, the panic is propagated and no notification is sent.
293 ///
294 /// # Arguments
295 ///
296 /// * `f` - Closure that receives a mutable reference to the state.
297 ///
298 /// # Returns
299 ///
300 /// The value returned by the closure.
301 ///
302 /// # Example
303 ///
304 /// ```rust
305 /// use qubit_lock::Monitor;
306 ///
307 /// let monitor = Monitor::new(false);
308 /// let ready = monitor.write_notify_all(|ready| {
309 /// *ready = true;
310 /// *ready
311 /// });
312 ///
313 /// assert!(ready);
314 /// ```
315 #[inline]
316 pub fn write_notify_all<R, F>(&self, f: F) -> R
317 where
318 F: FnOnce(&mut T) -> R,
319 {
320 let result = self.write(f);
321 self.notify_all();
322 result
323 }
324
325 /// Waits for a notification or timeout without checking state.
326 ///
327 /// This convenience method locks the monitor, waits once on the condition
328 /// variable, and returns why the timed wait completed. It is useful only
329 /// when the caller genuinely needs a notification wait without inspecting
330 /// state before or after the wait. Most coordination code should prefer
331 /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
332 /// [`MonitorGuard::wait_timeout`] loop.
333 ///
334 /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
335 /// but it does not prove that the protected state changed in a useful way.
336 ///
337 /// # Arguments
338 ///
339 /// * `timeout` - Maximum duration to wait for a notification.
340 ///
341 /// # Returns
342 ///
343 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
344 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
345 ///
346 /// # Example
347 ///
348 /// ```rust
349 /// use std::time::Duration;
350 ///
351 /// use qubit_lock::{Monitor, WaitTimeoutStatus};
352 ///
353 /// let monitor = Monitor::new(false);
354 /// let status = monitor.wait_notify(Duration::from_millis(1));
355 ///
356 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
357 /// ```
358 #[inline]
359 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
360 let guard = self.lock();
361 let (_guard, status) = guard.wait_timeout(timeout);
362 status
363 }
364
365 /// Waits while a predicate remains true, then mutates the protected state.
366 ///
367 /// This is the monitor equivalent of the common `while condition { wait }`
368 /// guarded-suspension pattern. The predicate is evaluated while holding the
369 /// mutex. If it returns `true`, the current thread waits on the condition
370 /// variable and atomically releases the mutex. After a notification, the
371 /// mutex is reacquired and the predicate is evaluated again. When the
372 /// predicate returns `false`, `f` runs while the mutex is still held.
373 ///
374 /// This method may block indefinitely if no thread changes the state so
375 /// that `waiting` becomes false and sends a notification.
376 ///
377 /// # Arguments
378 ///
379 /// * `waiting` - Predicate that returns `true` while the caller should
380 /// keep waiting.
381 /// * `f` - Closure that receives mutable access after waiting is no longer
382 /// required.
383 ///
384 /// # Returns
385 ///
386 /// The value returned by `f`.
387 ///
388 /// # Example
389 ///
390 /// ```rust
391 /// use std::{
392 /// sync::Arc,
393 /// thread,
394 /// };
395 ///
396 /// use qubit_lock::Monitor;
397 ///
398 /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
399 /// let worker_monitor = Arc::clone(&monitor);
400 ///
401 /// let worker = thread::spawn(move || {
402 /// worker_monitor.wait_while(
403 /// |items| items.is_empty(),
404 /// |items| items.pop().expect("item should be ready"),
405 /// )
406 /// });
407 ///
408 /// monitor.write(|items| items.push(7));
409 /// monitor.notify_one();
410 ///
411 /// assert_eq!(worker.join().expect("worker should finish"), 7);
412 /// ```
413 #[inline]
414 pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
415 where
416 P: FnMut(&T) -> bool,
417 F: FnOnce(&mut T) -> R,
418 {
419 let mut guard = self.lock();
420 while waiting(&*guard) {
421 guard = guard.wait();
422 }
423 f(&mut *guard)
424 }
425
426 /// Waits until the protected state satisfies a predicate, then mutates it.
427 ///
428 /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
429 /// predicate is evaluated while holding the mutex. If it returns `false`,
430 /// the current thread waits on the condition variable and atomically
431 /// releases the mutex. After a notification, the mutex is reacquired and
432 /// the predicate is evaluated again. When the predicate returns `true`, `f`
433 /// runs while the mutex is still held.
434 ///
435 /// This method may block indefinitely if no thread changes the state to
436 /// satisfy the predicate and sends a notification.
437 ///
438 /// # Arguments
439 ///
440 /// * `ready` - Predicate that returns `true` when the state is ready.
441 /// * `f` - Closure that receives mutable access to the ready state.
442 ///
443 /// # Returns
444 ///
445 /// The value returned by `f` after the predicate has become true.
446 ///
447 /// # Example
448 ///
449 /// ```rust
450 /// use std::{
451 /// sync::Arc,
452 /// thread,
453 /// };
454 ///
455 /// use qubit_lock::Monitor;
456 ///
457 /// let monitor = Arc::new(Monitor::new(false));
458 /// let waiter_monitor = Arc::clone(&monitor);
459 ///
460 /// let waiter = thread::spawn(move || {
461 /// waiter_monitor.wait_until(
462 /// |ready| *ready,
463 /// |ready| {
464 /// *ready = false;
465 /// "done"
466 /// },
467 /// )
468 /// });
469 ///
470 /// monitor.write(|ready| *ready = true);
471 /// monitor.notify_one();
472 ///
473 /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
474 /// ```
475 #[inline]
476 pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
477 where
478 P: FnMut(&T) -> bool,
479 F: FnOnce(&mut T) -> R,
480 {
481 self.wait_while(|state| !ready(state), f)
482 }
483
484 /// Waits while a predicate remains true, with an overall time limit.
485 ///
486 /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
487 /// rechecking `waiting` under the monitor lock and waits only for the
488 /// remaining portion of `timeout`. If `waiting` becomes false before the
489 /// timeout expires, `f` runs while the lock is still held. If the timeout
490 /// expires first, the closure is not called.
491 ///
492 /// Timeout status alone is not used as proof that the predicate is still
493 /// true; the predicate is always rechecked under the lock.
494 ///
495 /// # Arguments
496 ///
497 /// * `timeout` - Maximum total duration to wait.
498 /// * `waiting` - Predicate that returns `true` while the caller should
499 /// continue waiting.
500 /// * `f` - Closure that receives mutable access when waiting is no longer
501 /// required.
502 ///
503 /// # Returns
504 ///
505 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
506 /// predicate stops blocking before the timeout. Returns
507 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
508 ///
509 /// # Example
510 ///
511 /// ```rust
512 /// use std::time::Duration;
513 ///
514 /// use qubit_lock::{Monitor, WaitTimeoutResult};
515 ///
516 /// let monitor = Monitor::new(Vec::<i32>::new());
517 /// let result = monitor.wait_timeout_while(
518 /// Duration::from_millis(1),
519 /// |items| items.is_empty(),
520 /// |items| items.pop(),
521 /// );
522 ///
523 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
524 /// ```
525 #[inline]
526 pub fn wait_timeout_while<R, P, F>(
527 &self,
528 timeout: Duration,
529 mut waiting: P,
530 f: F,
531 ) -> WaitTimeoutResult<R>
532 where
533 P: FnMut(&T) -> bool,
534 F: FnOnce(&mut T) -> R,
535 {
536 let mut guard = self.lock();
537 let start = Instant::now();
538 loop {
539 if !waiting(&*guard) {
540 return WaitTimeoutResult::Ready(f(&mut *guard));
541 }
542
543 let elapsed = start.elapsed();
544 let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
545 if remaining.is_zero() {
546 return WaitTimeoutResult::TimedOut;
547 }
548
549 let (next_guard, _status) = guard.wait_timeout(remaining);
550 guard = next_guard;
551 }
552 }
553
554 /// Waits until a predicate becomes true, with an overall time limit.
555 ///
556 /// This is the positive-predicate counterpart of
557 /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
558 /// expires, `f` runs while the monitor lock is still held. If the timeout
559 /// expires first, the closure is not called.
560 ///
561 /// Timeout status alone is not used as proof that the predicate is still
562 /// false; the predicate is always rechecked under the lock.
563 ///
564 /// # Arguments
565 ///
566 /// * `timeout` - Maximum total duration to wait.
567 /// * `ready` - Predicate that returns `true` when the caller may continue.
568 /// * `f` - Closure that receives mutable access to the ready state.
569 ///
570 /// # Returns
571 ///
572 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
573 /// predicate becomes true before the timeout. Returns
574 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
575 ///
576 /// # Example
577 ///
578 /// ```rust
579 /// use std::{
580 /// sync::Arc,
581 /// thread,
582 /// time::Duration,
583 /// };
584 ///
585 /// use qubit_lock::{Monitor, WaitTimeoutResult};
586 ///
587 /// let monitor = Arc::new(Monitor::new(false));
588 /// let waiter_monitor = Arc::clone(&monitor);
589 ///
590 /// let waiter = thread::spawn(move || {
591 /// waiter_monitor.wait_timeout_until(
592 /// Duration::from_secs(1),
593 /// |ready| *ready,
594 /// |ready| {
595 /// *ready = false;
596 /// 5
597 /// },
598 /// )
599 /// });
600 ///
601 /// monitor.write(|ready| *ready = true);
602 /// monitor.notify_one();
603 ///
604 /// assert_eq!(
605 /// waiter.join().expect("waiter should finish"),
606 /// WaitTimeoutResult::Ready(5),
607 /// );
608 /// ```
609 #[inline]
610 pub fn wait_timeout_until<R, P, F>(
611 &self,
612 timeout: Duration,
613 mut ready: P,
614 f: F,
615 ) -> WaitTimeoutResult<R>
616 where
617 P: FnMut(&T) -> bool,
618 F: FnOnce(&mut T) -> R,
619 {
620 self.wait_timeout_while(timeout, |state| !ready(state), f)
621 }
622
623 /// Wakes one thread waiting on this monitor's condition variable.
624 ///
625 /// Notifications do not carry state by themselves. A waiting thread only
626 /// proceeds safely after rechecking the protected state. Call this after
627 /// changing state that may make one waiter able to continue.
628 ///
629 /// # Example
630 ///
631 /// ```rust
632 /// use std::thread;
633 ///
634 /// use qubit_lock::ArcMonitor;
635 ///
636 /// let monitor = ArcMonitor::new(0_u32);
637 /// let waiter = {
638 /// let m = monitor.clone();
639 /// thread::spawn(move || {
640 /// m.wait_until(|n| *n > 0, |n| {
641 /// *n -= 1;
642 /// });
643 /// })
644 /// };
645 ///
646 /// monitor.write(|n| *n = 1);
647 /// monitor.notify_one();
648 /// waiter.join().expect("waiter should finish");
649 /// ```
650 #[inline]
651 pub fn notify_one(&self) {
652 self.changed.notify_one();
653 }
654
655 /// Wakes all threads waiting on this monitor's condition variable.
656 ///
657 /// Notifications do not carry state by themselves. Every awakened thread
658 /// must recheck the protected state before continuing. Call this after a
659 /// state change that may allow multiple waiters to make progress.
660 ///
661 /// # Example
662 ///
663 /// ```rust
664 /// use std::thread;
665 ///
666 /// use qubit_lock::ArcMonitor;
667 ///
668 /// let monitor = ArcMonitor::new(false);
669 /// let mut handles = Vec::new();
670 /// for _ in 0..2 {
671 /// let m = monitor.clone();
672 /// handles.push(thread::spawn(move || {
673 /// m.wait_until(|ready| *ready, |_| ());
674 /// }));
675 /// }
676 ///
677 /// monitor.write(|ready| *ready = true);
678 /// monitor.notify_all();
679 /// for h in handles {
680 /// h.join().expect("waiter should finish");
681 /// }
682 /// ```
683 #[inline]
684 pub fn notify_all(&self) {
685 self.changed.notify_all();
686 }
687}
688
689impl<T> From<T> for Monitor<T> {
690 /// Creates a monitor from an initial state value.
691 ///
692 /// # Arguments
693 ///
694 /// * `value` - Initial state protected by the monitor.
695 ///
696 /// # Returns
697 ///
698 /// A monitor initialized with `value`.
699 #[inline]
700 fn from(value: T) -> Self {
701 Self::new(value)
702 }
703}
704
705impl<T: Default> Default for Monitor<T> {
706 /// Creates a monitor containing `T::default()`.
707 ///
708 /// # Returns
709 ///
710 /// A monitor protecting the default value for `T`.
711 ///
712 /// # Example
713 ///
714 /// ```rust
715 /// use qubit_lock::Monitor;
716 ///
717 /// let monitor: Monitor<String> = Monitor::default();
718 /// assert!(monitor.read(|s| s.is_empty()));
719 /// ```
720 #[inline]
721 fn default() -> Self {
722 Self::new(T::default())
723 }
724}