qubit_lock/monitor/arc_monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::{
17 ops::Deref,
18 sync::Arc,
19 time::Duration,
20};
21
22use super::{
23 Monitor,
24 MonitorGuard,
25 WaitTimeoutResult,
26 WaitTimeoutStatus,
27};
28
29/// Arc-wrapped monitor for shared condition-based state coordination.
30///
31/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
32/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
33/// It preserves the same guard-based waiting and predicate-based waiting
34/// semantics as [`Monitor`]. It implements [`Deref`] and [`AsRef`] so callers
35/// can pass it to APIs that expect a [`Monitor`] reference.
36///
37/// # Type Parameters
38///
39/// * `T` - The state protected by this monitor.
40///
41/// # Example
42///
43/// ```rust
44/// use std::thread;
45///
46/// use qubit_lock::lock::ArcMonitor;
47///
48/// let monitor = ArcMonitor::new(false);
49/// let waiter_monitor = monitor.clone();
50///
51/// let waiter = thread::spawn(move || {
52/// waiter_monitor.wait_until(
53/// |ready| *ready,
54/// |ready| {
55/// *ready = false;
56/// },
57/// );
58/// });
59///
60/// monitor.write(|ready| {
61/// *ready = true;
62/// });
63/// monitor.notify_all();
64///
65/// waiter.join().expect("waiter should finish");
66/// assert!(!monitor.read(|ready| *ready));
67/// ```
68///
69pub struct ArcMonitor<T> {
70 /// Shared monitor instance.
71 inner: Arc<Monitor<T>>,
72}
73
74impl<T> ArcMonitor<T> {
75 /// Creates an Arc-wrapped monitor protecting the supplied state value.
76 ///
77 /// # Arguments
78 ///
79 /// * `state` - Initial state protected by the monitor.
80 ///
81 /// # Returns
82 ///
83 /// A cloneable monitor handle initialized with the supplied state.
84 #[inline]
85 pub fn new(state: T) -> Self {
86 Self {
87 inner: Arc::new(Monitor::new(state)),
88 }
89 }
90
91 /// Acquires the shared monitor and returns a guard.
92 ///
93 /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
94 /// keeps the monitor mutex locked until it is dropped. It can also wait on
95 /// the monitor's condition variable through [`MonitorGuard::wait`] or
96 /// [`MonitorGuard::wait_timeout`].
97 ///
98 /// # Returns
99 ///
100 /// A guard that provides read and write access to the protected state.
101 ///
102 /// # Example
103 ///
104 /// ```rust
105 /// use qubit_lock::lock::ArcMonitor;
106 ///
107 /// let monitor = ArcMonitor::new(1);
108 /// {
109 /// let mut value = monitor.lock();
110 /// *value += 1;
111 /// }
112 ///
113 /// assert_eq!(monitor.read(|value| *value), 2);
114 /// ```
115 #[inline]
116 pub fn lock(&self) -> MonitorGuard<'_, T> {
117 self.inner.lock()
118 }
119
120 /// Acquires the monitor and reads the protected state.
121 ///
122 /// This delegates to [`Monitor::read`]. The closure runs while the monitor
123 /// mutex is held, so keep it short and avoid long blocking work.
124 ///
125 /// # Arguments
126 ///
127 /// * `f` - Closure that receives an immutable reference to the state.
128 ///
129 /// # Returns
130 ///
131 /// The value returned by `f`.
132 #[inline]
133 pub fn read<R, F>(&self, f: F) -> R
134 where
135 F: FnOnce(&T) -> R,
136 {
137 self.inner.read(f)
138 }
139
140 /// Acquires the monitor and mutates the protected state.
141 ///
142 /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
143 /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
144 /// waiting thread may observe.
145 ///
146 /// # Arguments
147 ///
148 /// * `f` - Closure that receives a mutable reference to the state.
149 ///
150 /// # Returns
151 ///
152 /// The value returned by `f`.
153 #[inline]
154 pub fn write<R, F>(&self, f: F) -> R
155 where
156 F: FnOnce(&mut T) -> R,
157 {
158 self.inner.write(f)
159 }
160
161 /// Waits for a notification or timeout without checking state.
162 ///
163 /// This delegates to [`Monitor::wait_notify`]. Most
164 /// coordination code should prefer [`Self::wait_while`],
165 /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
166 ///
167 /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
168 /// but it does not prove that the protected state changed in a useful way.
169 ///
170 /// # Arguments
171 ///
172 /// * `timeout` - Maximum duration to wait for a notification.
173 ///
174 /// # Returns
175 ///
176 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
177 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
178 ///
179 /// # Example
180 ///
181 /// ```rust
182 /// use std::time::Duration;
183 ///
184 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
185 ///
186 /// let monitor = ArcMonitor::new(false);
187 /// let status = monitor.wait_notify(Duration::from_millis(1));
188 ///
189 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
190 /// ```
191 #[inline]
192 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
193 self.inner.wait_notify(timeout)
194 }
195
196 /// Waits while a predicate remains true, then mutates the protected state.
197 ///
198 /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
199 /// while holding the monitor mutex, and the closure runs while the mutex is
200 /// still held after the predicate stops blocking.
201 ///
202 /// This method may block indefinitely if no thread changes the state so
203 /// that `waiting` becomes false and sends a notification.
204 ///
205 /// # Arguments
206 ///
207 /// * `waiting` - Predicate that returns `true` while the caller should
208 /// keep waiting.
209 /// * `f` - Closure that receives mutable access after waiting is no longer
210 /// required.
211 ///
212 /// # Returns
213 ///
214 /// The value returned by `f`.
215 ///
216 /// # Example
217 ///
218 /// ```rust
219 /// use std::thread;
220 ///
221 /// use qubit_lock::lock::ArcMonitor;
222 ///
223 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
224 /// let worker_monitor = monitor.clone();
225 ///
226 /// let worker = thread::spawn(move || {
227 /// worker_monitor.wait_while(
228 /// |items| items.is_empty(),
229 /// |items| items.pop().expect("item should be ready"),
230 /// )
231 /// });
232 ///
233 /// monitor.write(|items| items.push(7));
234 /// monitor.notify_one();
235 ///
236 /// assert_eq!(worker.join().expect("worker should finish"), 7);
237 /// ```
238 #[inline]
239 pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
240 where
241 P: FnMut(&T) -> bool,
242 F: FnOnce(&mut T) -> R,
243 {
244 self.inner.wait_while(waiting, f)
245 }
246
247 /// Waits until the protected state satisfies a predicate, then mutates it.
248 ///
249 /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
250 /// no thread changes the state to satisfy the predicate and sends a
251 /// notification.
252 ///
253 /// # Arguments
254 ///
255 /// * `ready` - Predicate that returns `true` when the state is ready.
256 /// * `f` - Closure that receives mutable access to the ready state.
257 ///
258 /// # Returns
259 ///
260 /// The value returned by `f`.
261 #[inline]
262 pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
263 where
264 P: FnMut(&T) -> bool,
265 F: FnOnce(&mut T) -> R,
266 {
267 self.inner.wait_until(ready, f)
268 }
269
270 /// Waits while a predicate remains true, with an overall time limit.
271 ///
272 /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
273 /// false before `timeout` expires, `f` runs while the monitor lock is still
274 /// held. If the timeout expires first, the closure is not called.
275 ///
276 /// # Arguments
277 ///
278 /// * `timeout` - Maximum total duration to wait.
279 /// * `waiting` - Predicate that returns `true` while the caller should
280 /// continue waiting.
281 /// * `f` - Closure that receives mutable access when waiting is no longer
282 /// required.
283 ///
284 /// # Returns
285 ///
286 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
287 /// predicate stops blocking before the timeout. Returns
288 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
289 ///
290 /// # Example
291 ///
292 /// ```rust
293 /// use std::time::Duration;
294 ///
295 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
296 ///
297 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
298 /// let result = monitor.wait_timeout_while(
299 /// Duration::from_millis(1),
300 /// |items| items.is_empty(),
301 /// |items| items.pop(),
302 /// );
303 ///
304 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
305 /// ```
306 #[inline]
307 pub fn wait_timeout_while<R, P, F>(
308 &self,
309 timeout: Duration,
310 waiting: P,
311 f: F,
312 ) -> WaitTimeoutResult<R>
313 where
314 P: FnMut(&T) -> bool,
315 F: FnOnce(&mut T) -> R,
316 {
317 self.inner.wait_timeout_while(timeout, waiting, f)
318 }
319
320 /// Waits until a predicate becomes true, with an overall time limit.
321 ///
322 /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
323 /// true before `timeout` expires, `f` runs while the monitor lock is still
324 /// held. If the timeout expires first, the closure is not called.
325 ///
326 /// # Arguments
327 ///
328 /// * `timeout` - Maximum total duration to wait.
329 /// * `ready` - Predicate that returns `true` when the caller may continue.
330 /// * `f` - Closure that receives mutable access to the ready state.
331 ///
332 /// # Returns
333 ///
334 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
335 /// predicate becomes true before the timeout. Returns
336 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
337 ///
338 /// # Example
339 ///
340 /// ```rust
341 /// use std::{
342 /// thread,
343 /// time::Duration,
344 /// };
345 ///
346 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
347 ///
348 /// let monitor = ArcMonitor::new(false);
349 /// let worker_monitor = monitor.clone();
350 ///
351 /// let worker = thread::spawn(move || {
352 /// worker_monitor.wait_timeout_until(
353 /// Duration::from_secs(1),
354 /// |ready| *ready,
355 /// |ready| {
356 /// *ready = false;
357 /// 5
358 /// },
359 /// )
360 /// });
361 ///
362 /// monitor.write(|ready| *ready = true);
363 /// monitor.notify_one();
364 ///
365 /// assert_eq!(
366 /// worker.join().expect("worker should finish"),
367 /// WaitTimeoutResult::Ready(5),
368 /// );
369 /// ```
370 #[inline]
371 pub fn wait_timeout_until<R, P, F>(
372 &self,
373 timeout: Duration,
374 ready: P,
375 f: F,
376 ) -> WaitTimeoutResult<R>
377 where
378 P: FnMut(&T) -> bool,
379 F: FnOnce(&mut T) -> R,
380 {
381 self.inner.wait_timeout_until(timeout, ready, f)
382 }
383
384 /// Wakes one thread waiting on this monitor's condition variable.
385 ///
386 /// Notifications do not carry state by themselves. A waiting thread only
387 /// proceeds safely after rechecking the protected state. Call this after
388 /// changing state that may make one waiter able to continue.
389 #[inline]
390 pub fn notify_one(&self) {
391 self.inner.notify_one();
392 }
393
394 /// Wakes all threads waiting on this monitor's condition variable.
395 ///
396 /// Notifications do not carry state by themselves. Every awakened thread
397 /// must recheck the protected state before continuing. Call this after a
398 /// state change that may allow multiple waiters to make progress.
399 #[inline]
400 pub fn notify_all(&self) {
401 self.inner.notify_all();
402 }
403}
404
405impl<T> AsRef<Monitor<T>> for ArcMonitor<T> {
406 /// Returns a reference to the underlying monitor.
407 ///
408 /// This is useful when callers need an explicit [`Monitor`] reference while
409 /// keeping the cloneable [`ArcMonitor`] handle.
410 #[inline]
411 fn as_ref(&self) -> &Monitor<T> {
412 self.inner.as_ref()
413 }
414}
415
416impl<T> Deref for ArcMonitor<T> {
417 type Target = Monitor<T>;
418
419 /// Dereferences this wrapper to the underlying monitor.
420 ///
421 /// Method-call dereferencing lets callers use native [`Monitor`] APIs
422 /// directly, while this wrapper still provides cloneable ownership.
423 #[inline]
424 fn deref(&self) -> &Self::Target {
425 self.inner.as_ref()
426 }
427}
428
429impl<T: Default> Default for ArcMonitor<T> {
430 /// Creates an Arc-wrapped monitor containing `T::default()`.
431 ///
432 /// # Returns
433 ///
434 /// A cloneable monitor handle protecting the default value for `T`.
435 #[inline]
436 fn default() -> Self {
437 Self::new(T::default())
438 }
439}
440
441impl<T> Clone for ArcMonitor<T> {
442 /// Clones this monitor handle.
443 ///
444 /// The cloned handle shares the same protected state and condition
445 /// variable with the original.
446 ///
447 /// # Returns
448 ///
449 /// A new handle sharing the same monitor state.
450 #[inline]
451 fn clone(&self) -> Self {
452 Self {
453 inner: self.inner.clone(),
454 }
455 }
456}