qubit_lock/monitor/arc_monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use super::{
20 Monitor,
21 MonitorGuard,
22 WaitTimeoutResult,
23 WaitTimeoutStatus,
24};
25
26/// Arc-wrapped monitor for shared condition-based state coordination.
27///
28/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
29/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
30/// It preserves the same guard-based waiting and predicate-based waiting
31/// semantics as [`Monitor`].
32///
33/// # Type Parameters
34///
35/// * `T` - The state protected by this monitor.
36///
37/// # Example
38///
39/// ```rust
40/// use std::thread;
41///
42/// use qubit_lock::lock::ArcMonitor;
43///
44/// let monitor = ArcMonitor::new(false);
45/// let waiter_monitor = monitor.clone();
46///
47/// let waiter = thread::spawn(move || {
48/// waiter_monitor.wait_until(
49/// |ready| *ready,
50/// |ready| {
51/// *ready = false;
52/// },
53/// );
54/// });
55///
56/// monitor.write(|ready| {
57/// *ready = true;
58/// });
59/// monitor.notify_all();
60///
61/// waiter.join().expect("waiter should finish");
62/// assert!(!monitor.read(|ready| *ready));
63/// ```
64///
65pub struct ArcMonitor<T> {
66 /// Shared monitor instance.
67 inner: Arc<Monitor<T>>,
68}
69
70impl<T> ArcMonitor<T> {
71 /// Creates an Arc-wrapped monitor protecting the supplied state value.
72 ///
73 /// # Arguments
74 ///
75 /// * `state` - Initial state protected by the monitor.
76 ///
77 /// # Returns
78 ///
79 /// A cloneable monitor handle initialized with the supplied state.
80 #[inline]
81 pub fn new(state: T) -> Self {
82 Self {
83 inner: Arc::new(Monitor::new(state)),
84 }
85 }
86
87 /// Acquires the shared monitor and returns a guard.
88 ///
89 /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
90 /// keeps the monitor mutex locked until it is dropped. It can also wait on
91 /// the monitor's condition variable through [`MonitorGuard::wait`] or
92 /// [`MonitorGuard::wait_timeout`].
93 ///
94 /// # Returns
95 ///
96 /// A guard that provides read and write access to the protected state.
97 ///
98 /// # Example
99 ///
100 /// ```rust
101 /// use qubit_lock::lock::ArcMonitor;
102 ///
103 /// let monitor = ArcMonitor::new(1);
104 /// {
105 /// let mut value = monitor.lock();
106 /// *value += 1;
107 /// }
108 ///
109 /// assert_eq!(monitor.read(|value| *value), 2);
110 /// ```
111 #[inline]
112 pub fn lock(&self) -> MonitorGuard<'_, T> {
113 self.inner.lock()
114 }
115
116 /// Acquires the monitor and reads the protected state.
117 ///
118 /// This delegates to [`Monitor::read`]. The closure runs while the monitor
119 /// mutex is held, so keep it short and avoid long blocking work.
120 ///
121 /// # Arguments
122 ///
123 /// * `f` - Closure that receives an immutable reference to the state.
124 ///
125 /// # Returns
126 ///
127 /// The value returned by `f`.
128 #[inline]
129 pub fn read<R, F>(&self, f: F) -> R
130 where
131 F: FnOnce(&T) -> R,
132 {
133 self.inner.read(f)
134 }
135
136 /// Acquires the monitor and mutates the protected state.
137 ///
138 /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
139 /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
140 /// waiting thread may observe.
141 ///
142 /// # Arguments
143 ///
144 /// * `f` - Closure that receives a mutable reference to the state.
145 ///
146 /// # Returns
147 ///
148 /// The value returned by `f`.
149 #[inline]
150 pub fn write<R, F>(&self, f: F) -> R
151 where
152 F: FnOnce(&mut T) -> R,
153 {
154 self.inner.write(f)
155 }
156
157 /// Waits for a notification or timeout without checking state.
158 ///
159 /// This delegates to [`Monitor::wait_notify`]. Most
160 /// coordination code should prefer [`Self::wait_while`],
161 /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
162 ///
163 /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
164 /// but it does not prove that the protected state changed in a useful way.
165 ///
166 /// # Arguments
167 ///
168 /// * `timeout` - Maximum duration to wait for a notification.
169 ///
170 /// # Returns
171 ///
172 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
173 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
174 ///
175 /// # Example
176 ///
177 /// ```rust
178 /// use std::time::Duration;
179 ///
180 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
181 ///
182 /// let monitor = ArcMonitor::new(false);
183 /// let status = monitor.wait_notify(Duration::from_millis(1));
184 ///
185 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
186 /// ```
187 #[inline]
188 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
189 self.inner.wait_notify(timeout)
190 }
191
192 /// Waits while a predicate remains true, then mutates the protected state.
193 ///
194 /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
195 /// while holding the monitor mutex, and the closure runs while the mutex is
196 /// still held after the predicate stops blocking.
197 ///
198 /// This method may block indefinitely if no thread changes the state so
199 /// that `waiting` becomes false and sends a notification.
200 ///
201 /// # Arguments
202 ///
203 /// * `waiting` - Predicate that returns `true` while the caller should
204 /// keep waiting.
205 /// * `f` - Closure that receives mutable access after waiting is no longer
206 /// required.
207 ///
208 /// # Returns
209 ///
210 /// The value returned by `f`.
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// use std::thread;
216 ///
217 /// use qubit_lock::lock::ArcMonitor;
218 ///
219 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
220 /// let worker_monitor = monitor.clone();
221 ///
222 /// let worker = thread::spawn(move || {
223 /// worker_monitor.wait_while(
224 /// |items| items.is_empty(),
225 /// |items| items.pop().expect("item should be ready"),
226 /// )
227 /// });
228 ///
229 /// monitor.write(|items| items.push(7));
230 /// monitor.notify_one();
231 ///
232 /// assert_eq!(worker.join().expect("worker should finish"), 7);
233 /// ```
234 #[inline]
235 pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
236 where
237 P: FnMut(&T) -> bool,
238 F: FnOnce(&mut T) -> R,
239 {
240 self.inner.wait_while(waiting, f)
241 }
242
243 /// Waits until the protected state satisfies a predicate, then mutates it.
244 ///
245 /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
246 /// no thread changes the state to satisfy the predicate and sends a
247 /// notification.
248 ///
249 /// # Arguments
250 ///
251 /// * `ready` - Predicate that returns `true` when the state is ready.
252 /// * `f` - Closure that receives mutable access to the ready state.
253 ///
254 /// # Returns
255 ///
256 /// The value returned by `f`.
257 #[inline]
258 pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
259 where
260 P: FnMut(&T) -> bool,
261 F: FnOnce(&mut T) -> R,
262 {
263 self.inner.wait_until(ready, f)
264 }
265
266 /// Waits while a predicate remains true, with an overall time limit.
267 ///
268 /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
269 /// false before `timeout` expires, `f` runs while the monitor lock is still
270 /// held. If the timeout expires first, the closure is not called.
271 ///
272 /// # Arguments
273 ///
274 /// * `timeout` - Maximum total duration to wait.
275 /// * `waiting` - Predicate that returns `true` while the caller should
276 /// continue waiting.
277 /// * `f` - Closure that receives mutable access when waiting is no longer
278 /// required.
279 ///
280 /// # Returns
281 ///
282 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
283 /// predicate stops blocking before the timeout. Returns
284 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
285 ///
286 /// # Example
287 ///
288 /// ```rust
289 /// use std::time::Duration;
290 ///
291 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
292 ///
293 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
294 /// let result = monitor.wait_timeout_while(
295 /// Duration::from_millis(1),
296 /// |items| items.is_empty(),
297 /// |items| items.pop(),
298 /// );
299 ///
300 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
301 /// ```
302 #[inline]
303 pub fn wait_timeout_while<R, P, F>(
304 &self,
305 timeout: Duration,
306 waiting: P,
307 f: F,
308 ) -> WaitTimeoutResult<R>
309 where
310 P: FnMut(&T) -> bool,
311 F: FnOnce(&mut T) -> R,
312 {
313 self.inner.wait_timeout_while(timeout, waiting, f)
314 }
315
316 /// Waits until a predicate becomes true, with an overall time limit.
317 ///
318 /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
319 /// true before `timeout` expires, `f` runs while the monitor lock is still
320 /// held. If the timeout expires first, the closure is not called.
321 ///
322 /// # Arguments
323 ///
324 /// * `timeout` - Maximum total duration to wait.
325 /// * `ready` - Predicate that returns `true` when the caller may continue.
326 /// * `f` - Closure that receives mutable access to the ready state.
327 ///
328 /// # Returns
329 ///
330 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
331 /// predicate becomes true before the timeout. Returns
332 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
333 ///
334 /// # Example
335 ///
336 /// ```rust
337 /// use std::{
338 /// thread,
339 /// time::Duration,
340 /// };
341 ///
342 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
343 ///
344 /// let monitor = ArcMonitor::new(false);
345 /// let worker_monitor = monitor.clone();
346 ///
347 /// let worker = thread::spawn(move || {
348 /// worker_monitor.wait_timeout_until(
349 /// Duration::from_secs(1),
350 /// |ready| *ready,
351 /// |ready| {
352 /// *ready = false;
353 /// 5
354 /// },
355 /// )
356 /// });
357 ///
358 /// monitor.write(|ready| *ready = true);
359 /// monitor.notify_one();
360 ///
361 /// assert_eq!(
362 /// worker.join().expect("worker should finish"),
363 /// WaitTimeoutResult::Ready(5),
364 /// );
365 /// ```
366 #[inline]
367 pub fn wait_timeout_until<R, P, F>(
368 &self,
369 timeout: Duration,
370 ready: P,
371 f: F,
372 ) -> WaitTimeoutResult<R>
373 where
374 P: FnMut(&T) -> bool,
375 F: FnOnce(&mut T) -> R,
376 {
377 self.inner.wait_timeout_until(timeout, ready, f)
378 }
379
380 /// Wakes one thread waiting on this monitor's condition variable.
381 ///
382 /// Notifications do not carry state by themselves. A waiting thread only
383 /// proceeds safely after rechecking the protected state. Call this after
384 /// changing state that may make one waiter able to continue.
385 #[inline]
386 pub fn notify_one(&self) {
387 self.inner.notify_one();
388 }
389
390 /// Wakes all threads waiting on this monitor's condition variable.
391 ///
392 /// Notifications do not carry state by themselves. Every awakened thread
393 /// must recheck the protected state before continuing. Call this after a
394 /// state change that may allow multiple waiters to make progress.
395 #[inline]
396 pub fn notify_all(&self) {
397 self.inner.notify_all();
398 }
399}
400
401impl<T: Default> Default for ArcMonitor<T> {
402 /// Creates an Arc-wrapped monitor containing `T::default()`.
403 ///
404 /// # Returns
405 ///
406 /// A cloneable monitor handle protecting the default value for `T`.
407 #[inline]
408 fn default() -> Self {
409 Self::new(T::default())
410 }
411}
412
413impl<T> Clone for ArcMonitor<T> {
414 /// Clones this monitor handle.
415 ///
416 /// The cloned handle shares the same protected state and condition
417 /// variable with the original.
418 ///
419 /// # Returns
420 ///
421 /// A new handle sharing the same monitor state.
422 #[inline]
423 fn clone(&self) -> Self {
424 Self {
425 inner: self.inner.clone(),
426 }
427 }
428}