qubit_lock/monitor/monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Monitor
11//!
12//! Provides a synchronous monitor built from a mutex and a condition variable.
13//! A monitor protects one shared state value and binds that state to the
14//! condition variable used to wait for changes. This is the same low-level
15//! mechanism as using [`parking_lot::Mutex`] and [`parking_lot::Condvar`] directly,
16//! but packaged so callers do not have to keep a mutex and its matching
17//! condition variable as separate fields.
18//!
19//! The high-level APIs ([`Monitor::read`], [`Monitor::write`],
20//! [`Monitor::wait_while`], and [`Monitor::wait_until`]) are intended for
21//! short critical sections and simple guarded-suspension flows. The lower-level
22//! [`Monitor::lock`] API returns a [`MonitorGuard`], which supports
23//! [`MonitorGuard::wait`] and [`MonitorGuard::wait_timeout`] for more complex
24//! state machines such as thread pools.
25//!
26
27use std::time::{
28 Duration,
29 Instant,
30};
31
32use parking_lot::{
33 Condvar,
34 Mutex,
35};
36
37use super::monitor_guard::MonitorGuard;
38use super::{
39 wait_timeout_result::WaitTimeoutResult,
40 wait_timeout_status::WaitTimeoutStatus,
41};
42
43/// Shared state protected by a mutex and a condition variable.
44///
45/// `Monitor` is useful when callers need more than a short critical section.
46/// It models the classic monitor object pattern: one mutex protects the state,
47/// and one condition variable lets threads wait until that state changes. This
48/// is the same relationship used by `parking_lot::Mutex` and
49/// `parking_lot::Condvar`, but represented as one object so the condition
50/// variable is not accidentally used with unrelated state.
51///
52/// `Monitor` deliberately has two levels of API:
53///
54/// * `read` and `write` acquire the mutex, run a closure, and release it.
55/// * `wait_while`, `wait_until`, and their timeout variants implement common
56/// predicate-based waits.
57/// * `lock` returns a [`MonitorGuard`] for callers that need to write their own
58/// loop around [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`].
59///
60/// The underlying `parking_lot` mutex is not poisoned when a thread panics
61/// while holding the lock. This keeps monitor coordination state observable
62/// after panic unwinding.
63///
64/// # Difference from raw `Mutex` and `Condvar`
65///
66/// With raw parking_lot primitives, callers usually store two fields and
67/// manually keep them paired:
68///
69/// ```rust
70/// # use parking_lot::{Condvar, Mutex};
71/// # struct State;
72/// struct Shared {
73/// state: Mutex<State>,
74/// changed: Condvar,
75/// }
76/// ```
77///
78/// `Monitor<State>` stores the same pair internally. A [`MonitorGuard`] is a
79/// wrapper around the parking_lot `MutexGuard`; it keeps the protected
80/// state locked and knows which monitor it belongs to, so its wait methods use
81/// the matching condition variable.
82///
83/// # Type Parameters
84///
85/// * `T` - The state protected by this monitor.
86///
87/// # Example
88///
89/// ```rust
90/// use std::thread;
91///
92/// use qubit_lock::lock::ArcMonitor;
93///
94/// let monitor = ArcMonitor::new(false);
95/// let waiter_monitor = monitor.clone();
96///
97/// let waiter = thread::spawn(move || {
98/// waiter_monitor.wait_until(
99/// |ready| *ready,
100/// |ready| {
101/// *ready = false;
102/// },
103/// );
104/// });
105///
106/// monitor.write(|ready| {
107/// *ready = true;
108/// });
109/// monitor.notify_all();
110///
111/// waiter.join().expect("waiter should finish");
112/// assert!(!monitor.read(|ready| *ready));
113/// ```
114///
115pub struct Monitor<T> {
116 /// Mutex protecting the monitor state.
117 state: Mutex<T>,
118 /// Condition variable used to wake predicate waiters after state changes.
119 pub(super) changed: Condvar,
120}
121
122impl<T> Monitor<T> {
123 /// Creates a monitor protecting the supplied state value.
124 ///
125 /// # Arguments
126 ///
127 /// * `state` - Initial state protected by the monitor.
128 ///
129 /// # Returns
130 ///
131 /// A monitor initialized with the supplied state.
132 ///
133 /// # Example
134 ///
135 /// ```rust
136 /// use qubit_lock::lock::Monitor;
137 ///
138 /// let monitor = Monitor::new(0_u32);
139 /// assert_eq!(monitor.read(|n| *n), 0);
140 /// ```
141 #[inline]
142 pub fn new(state: T) -> Self {
143 Self {
144 state: Mutex::new(state),
145 changed: Condvar::new(),
146 }
147 }
148
149 /// Acquires the monitor and returns a guard for explicit state-machine code.
150 ///
151 /// The returned [`MonitorGuard`] keeps the monitor mutex locked until the
152 /// guard is dropped. It can also be passed through
153 /// [`MonitorGuard::wait`] or [`MonitorGuard::wait_timeout`] to temporarily
154 /// release the lock while waiting on this monitor's condition variable.
155 ///
156 /// # Returns
157 ///
158 /// A guard that provides read and write access to the protected state.
159 ///
160 /// # Example
161 ///
162 /// ```rust
163 /// use qubit_lock::lock::Monitor;
164 ///
165 /// let monitor = Monitor::new(1);
166 /// {
167 /// let mut value = monitor.lock();
168 /// *value += 1;
169 /// }
170 ///
171 /// assert_eq!(monitor.read(|value| *value), 2);
172 /// ```
173 #[inline]
174 pub fn lock(&self) -> MonitorGuard<'_, T> {
175 MonitorGuard::new(self, self.state.lock())
176 }
177
178 /// Acquires the monitor and reads the protected state.
179 ///
180 /// The closure runs while the mutex is held. Keep the closure short and do
181 /// not call code that may block for a long time.
182 ///
183 /// # Arguments
184 ///
185 /// * `f` - Closure that receives an immutable reference to the state.
186 ///
187 /// # Returns
188 ///
189 /// The value returned by the closure.
190 ///
191 /// # Example
192 ///
193 /// ```rust
194 /// use qubit_lock::lock::Monitor;
195 ///
196 /// let monitor = Monitor::new(10_i32);
197 /// let n = monitor.read(|x| *x);
198 /// assert_eq!(n, 10);
199 /// ```
200 #[inline]
201 pub fn read<R, F>(&self, f: F) -> R
202 where
203 F: FnOnce(&T) -> R,
204 {
205 let guard = self.lock();
206 f(&*guard)
207 }
208
209 /// Acquires the monitor and mutates the protected state.
210 ///
211 /// The closure runs while the mutex is held. This method only changes the
212 /// state; callers should explicitly call [`Self::notify_one`] or
213 /// [`Self::notify_all`] after changing a condition that waiters may be
214 /// observing.
215 ///
216 /// # Arguments
217 ///
218 /// * `f` - Closure that receives a mutable reference to the state.
219 ///
220 /// # Returns
221 ///
222 /// The value returned by the closure.
223 ///
224 /// # Example
225 ///
226 /// ```rust
227 /// use qubit_lock::lock::Monitor;
228 ///
229 /// let monitor = Monitor::new(String::new());
230 /// let len = monitor.write(|s| {
231 /// s.push_str("hi");
232 /// s.len()
233 /// });
234 /// assert_eq!(len, 2);
235 /// ```
236 #[inline]
237 pub fn write<R, F>(&self, f: F) -> R
238 where
239 F: FnOnce(&mut T) -> R,
240 {
241 let mut guard = self.lock();
242 f(&mut *guard)
243 }
244
245 /// Waits for a notification or timeout without checking state.
246 ///
247 /// This convenience method locks the monitor, waits once on the condition
248 /// variable, and returns why the timed wait completed. It is useful only
249 /// when the caller genuinely needs a notification wait without inspecting
250 /// state before or after the wait. Most coordination code should prefer
251 /// [`Self::wait_while`], [`Self::wait_until`], or the explicit
252 /// [`MonitorGuard::wait_timeout`] loop.
253 ///
254 /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
255 /// but it does not prove that the protected state changed in a useful way.
256 ///
257 /// # Arguments
258 ///
259 /// * `timeout` - Maximum duration to wait for a notification.
260 ///
261 /// # Returns
262 ///
263 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
264 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
265 ///
266 /// # Example
267 ///
268 /// ```rust
269 /// use std::time::Duration;
270 ///
271 /// use qubit_lock::lock::{Monitor, WaitTimeoutStatus};
272 ///
273 /// let monitor = Monitor::new(false);
274 /// let status = monitor.wait_notify(Duration::from_millis(1));
275 ///
276 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
277 /// ```
278 #[inline]
279 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
280 let guard = self.lock();
281 let (_guard, status) = guard.wait_timeout(timeout);
282 status
283 }
284
285 /// Waits while a predicate remains true, then mutates the protected state.
286 ///
287 /// This is the monitor equivalent of the common `while condition { wait }`
288 /// guarded-suspension pattern. The predicate is evaluated while holding the
289 /// mutex. If it returns `true`, the current thread waits on the condition
290 /// variable and atomically releases the mutex. After a notification, the
291 /// mutex is reacquired and the predicate is evaluated again. When the
292 /// predicate returns `false`, `f` runs while the mutex is still held.
293 ///
294 /// This method may block indefinitely if no thread changes the state so
295 /// that `waiting` becomes false and sends a notification.
296 ///
297 /// # Arguments
298 ///
299 /// * `waiting` - Predicate that returns `true` while the caller should
300 /// keep waiting.
301 /// * `f` - Closure that receives mutable access after waiting is no longer
302 /// required.
303 ///
304 /// # Returns
305 ///
306 /// The value returned by `f`.
307 ///
308 /// # Example
309 ///
310 /// ```rust
311 /// use std::{
312 /// sync::Arc,
313 /// thread,
314 /// };
315 ///
316 /// use qubit_lock::lock::Monitor;
317 ///
318 /// let monitor = Arc::new(Monitor::new(Vec::<i32>::new()));
319 /// let worker_monitor = Arc::clone(&monitor);
320 ///
321 /// let worker = thread::spawn(move || {
322 /// worker_monitor.wait_while(
323 /// |items| items.is_empty(),
324 /// |items| items.pop().expect("item should be ready"),
325 /// )
326 /// });
327 ///
328 /// monitor.write(|items| items.push(7));
329 /// monitor.notify_one();
330 ///
331 /// assert_eq!(worker.join().expect("worker should finish"), 7);
332 /// ```
333 #[inline]
334 pub fn wait_while<R, P, F>(&self, mut waiting: P, f: F) -> R
335 where
336 P: FnMut(&T) -> bool,
337 F: FnOnce(&mut T) -> R,
338 {
339 let mut guard = self.lock();
340 while waiting(&*guard) {
341 guard = guard.wait();
342 }
343 f(&mut *guard)
344 }
345
346 /// Waits until the protected state satisfies a predicate, then mutates it.
347 ///
348 /// This is the positive-predicate counterpart of [`Self::wait_while`]. The
349 /// predicate is evaluated while holding the mutex. If it returns `false`,
350 /// the current thread waits on the condition variable and atomically
351 /// releases the mutex. After a notification, the mutex is reacquired and
352 /// the predicate is evaluated again. When the predicate returns `true`, `f`
353 /// runs while the mutex is still held.
354 ///
355 /// This method may block indefinitely if no thread changes the state to
356 /// satisfy the predicate and sends a notification.
357 ///
358 /// # Arguments
359 ///
360 /// * `ready` - Predicate that returns `true` when the state is ready.
361 /// * `f` - Closure that receives mutable access to the ready state.
362 ///
363 /// # Returns
364 ///
365 /// The value returned by `f` after the predicate has become true.
366 ///
367 /// # Example
368 ///
369 /// ```rust
370 /// use std::{
371 /// sync::Arc,
372 /// thread,
373 /// };
374 ///
375 /// use qubit_lock::lock::Monitor;
376 ///
377 /// let monitor = Arc::new(Monitor::new(false));
378 /// let waiter_monitor = Arc::clone(&monitor);
379 ///
380 /// let waiter = thread::spawn(move || {
381 /// waiter_monitor.wait_until(
382 /// |ready| *ready,
383 /// |ready| {
384 /// *ready = false;
385 /// "done"
386 /// },
387 /// )
388 /// });
389 ///
390 /// monitor.write(|ready| *ready = true);
391 /// monitor.notify_one();
392 ///
393 /// assert_eq!(waiter.join().expect("waiter should finish"), "done");
394 /// ```
395 #[inline]
396 pub fn wait_until<R, P, F>(&self, mut ready: P, f: F) -> R
397 where
398 P: FnMut(&T) -> bool,
399 F: FnOnce(&mut T) -> R,
400 {
401 self.wait_while(|state| !ready(state), f)
402 }
403
404 /// Waits while a predicate remains true, with an overall time limit.
405 ///
406 /// This method is the timeout-aware form of [`Self::wait_while`]. It keeps
407 /// rechecking `waiting` under the monitor lock and waits only for the
408 /// remaining portion of `timeout`. If `waiting` becomes false before the
409 /// timeout expires, `f` runs while the lock is still held. If the timeout
410 /// expires first, the closure is not called.
411 ///
412 /// Timeout status alone is not used as proof that the predicate is still
413 /// true; the predicate is always rechecked under the lock.
414 ///
415 /// # Arguments
416 ///
417 /// * `timeout` - Maximum total duration to wait.
418 /// * `waiting` - Predicate that returns `true` while the caller should
419 /// continue waiting.
420 /// * `f` - Closure that receives mutable access when waiting is no longer
421 /// required.
422 ///
423 /// # Returns
424 ///
425 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
426 /// predicate stops blocking before the timeout. Returns
427 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
428 ///
429 /// # Example
430 ///
431 /// ```rust
432 /// use std::time::Duration;
433 ///
434 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
435 ///
436 /// let monitor = Monitor::new(Vec::<i32>::new());
437 /// let result = monitor.wait_timeout_while(
438 /// Duration::from_millis(1),
439 /// |items| items.is_empty(),
440 /// |items| items.pop(),
441 /// );
442 ///
443 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
444 /// ```
445 #[inline]
446 pub fn wait_timeout_while<R, P, F>(
447 &self,
448 timeout: Duration,
449 mut waiting: P,
450 f: F,
451 ) -> WaitTimeoutResult<R>
452 where
453 P: FnMut(&T) -> bool,
454 F: FnOnce(&mut T) -> R,
455 {
456 let mut guard = self.lock();
457 let start = Instant::now();
458 loop {
459 if !waiting(&*guard) {
460 return WaitTimeoutResult::Ready(f(&mut *guard));
461 }
462
463 let elapsed = start.elapsed();
464 let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
465 if remaining.is_zero() {
466 return WaitTimeoutResult::TimedOut;
467 }
468
469 let (next_guard, _status) = guard.wait_timeout(remaining);
470 guard = next_guard;
471 }
472 }
473
474 /// Waits until a predicate becomes true, with an overall time limit.
475 ///
476 /// This is the positive-predicate counterpart of
477 /// [`Self::wait_timeout_while`]. If `ready` becomes true before the timeout
478 /// expires, `f` runs while the monitor lock is still held. If the timeout
479 /// expires first, the closure is not called.
480 ///
481 /// Timeout status alone is not used as proof that the predicate is still
482 /// false; the predicate is always rechecked under the lock.
483 ///
484 /// # Arguments
485 ///
486 /// * `timeout` - Maximum total duration to wait.
487 /// * `ready` - Predicate that returns `true` when the caller may continue.
488 /// * `f` - Closure that receives mutable access to the ready state.
489 ///
490 /// # Returns
491 ///
492 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
493 /// predicate becomes true before the timeout. Returns
494 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
495 ///
496 /// # Example
497 ///
498 /// ```rust
499 /// use std::{
500 /// sync::Arc,
501 /// thread,
502 /// time::Duration,
503 /// };
504 ///
505 /// use qubit_lock::lock::{Monitor, WaitTimeoutResult};
506 ///
507 /// let monitor = Arc::new(Monitor::new(false));
508 /// let waiter_monitor = Arc::clone(&monitor);
509 ///
510 /// let waiter = thread::spawn(move || {
511 /// waiter_monitor.wait_timeout_until(
512 /// Duration::from_secs(1),
513 /// |ready| *ready,
514 /// |ready| {
515 /// *ready = false;
516 /// 5
517 /// },
518 /// )
519 /// });
520 ///
521 /// monitor.write(|ready| *ready = true);
522 /// monitor.notify_one();
523 ///
524 /// assert_eq!(
525 /// waiter.join().expect("waiter should finish"),
526 /// WaitTimeoutResult::Ready(5),
527 /// );
528 /// ```
529 #[inline]
530 pub fn wait_timeout_until<R, P, F>(
531 &self,
532 timeout: Duration,
533 mut ready: P,
534 f: F,
535 ) -> WaitTimeoutResult<R>
536 where
537 P: FnMut(&T) -> bool,
538 F: FnOnce(&mut T) -> R,
539 {
540 self.wait_timeout_while(timeout, |state| !ready(state), f)
541 }
542
543 /// Wakes one thread waiting on this monitor's condition variable.
544 ///
545 /// Notifications do not carry state by themselves. A waiting thread only
546 /// proceeds safely after rechecking the protected state. Call this after
547 /// changing state that may make one waiter able to continue.
548 ///
549 /// # Example
550 ///
551 /// ```rust
552 /// use std::thread;
553 ///
554 /// use qubit_lock::lock::ArcMonitor;
555 ///
556 /// let monitor = ArcMonitor::new(0_u32);
557 /// let waiter = {
558 /// let m = monitor.clone();
559 /// thread::spawn(move || {
560 /// m.wait_until(|n| *n > 0, |n| {
561 /// *n -= 1;
562 /// });
563 /// })
564 /// };
565 ///
566 /// monitor.write(|n| *n = 1);
567 /// monitor.notify_one();
568 /// waiter.join().expect("waiter should finish");
569 /// ```
570 #[inline]
571 pub fn notify_one(&self) {
572 self.changed.notify_one();
573 }
574
575 /// Wakes all threads waiting on this monitor's condition variable.
576 ///
577 /// Notifications do not carry state by themselves. Every awakened thread
578 /// must recheck the protected state before continuing. Call this after a
579 /// state change that may allow multiple waiters to make progress.
580 ///
581 /// # Example
582 ///
583 /// ```rust
584 /// use std::thread;
585 ///
586 /// use qubit_lock::lock::ArcMonitor;
587 ///
588 /// let monitor = ArcMonitor::new(false);
589 /// let mut handles = Vec::new();
590 /// for _ in 0..2 {
591 /// let m = monitor.clone();
592 /// handles.push(thread::spawn(move || {
593 /// m.wait_until(|ready| *ready, |_| ());
594 /// }));
595 /// }
596 ///
597 /// monitor.write(|ready| *ready = true);
598 /// monitor.notify_all();
599 /// for h in handles {
600 /// h.join().expect("waiter should finish");
601 /// }
602 /// ```
603 #[inline]
604 pub fn notify_all(&self) {
605 self.changed.notify_all();
606 }
607}
608
609impl<T: Default> Default for Monitor<T> {
610 /// Creates a monitor containing `T::default()`.
611 ///
612 /// # Returns
613 ///
614 /// A monitor protecting the default value for `T`.
615 ///
616 /// # Example
617 ///
618 /// ```rust
619 /// use qubit_lock::lock::Monitor;
620 ///
621 /// let monitor: Monitor<String> = Monitor::default();
622 /// assert!(monitor.read(|s| s.is_empty()));
623 /// ```
624 #[inline]
625 fn default() -> Self {
626 Self::new(T::default())
627 }
628}