Skip to main content

qubit_lock/monitor/
arc_tokio_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Arc-wrapped Tokio monitor.
11
12use std::{
13    ops::Deref,
14    sync::Arc,
15    time::Duration,
16};
17
18use super::{
19    AsyncConditionWaiter,
20    AsyncMonitorFuture,
21    AsyncNotificationWaiter,
22    AsyncTimeoutConditionWaiter,
23    AsyncTimeoutNotificationWaiter,
24    Notifier,
25    TokioMonitor,
26    WaitTimeoutResult,
27    WaitTimeoutStatus,
28};
29
30/// Cloneable handle around a [`TokioMonitor`].
31pub struct ArcTokioMonitor<T> {
32    /// Shared Tokio monitor.
33    inner: Arc<TokioMonitor<T>>,
34}
35
36impl<T> ArcTokioMonitor<T> {
37    /// Creates an Arc-wrapped Tokio monitor.
38    ///
39    /// # Arguments
40    ///
41    /// * `state` - Initial protected state.
42    ///
43    /// # Returns
44    ///
45    /// A cloneable Tokio monitor handle.
46    pub fn new(state: T) -> Self {
47        Self {
48            inner: Arc::new(TokioMonitor::new(state)),
49        }
50    }
51
52    /// Reads protected state asynchronously.
53    pub async fn async_read<R, F>(&self, f: F) -> R
54    where
55        F: FnOnce(&T) -> R,
56    {
57        self.inner.async_read(f).await
58    }
59
60    /// Mutates protected state asynchronously without notifying.
61    pub async fn async_write<R, F>(&self, f: F) -> R
62    where
63        F: FnOnce(&mut T) -> R,
64    {
65        self.inner.async_write(f).await
66    }
67
68    /// Mutates protected state asynchronously and wakes one waiter.
69    pub async fn async_write_notify_one<R, F>(&self, f: F) -> R
70    where
71        F: FnOnce(&mut T) -> R,
72    {
73        self.inner.async_write_notify_one(f).await
74    }
75
76    /// Mutates protected state asynchronously and wakes all waiters.
77    pub async fn async_write_notify_all<R, F>(&self, f: F) -> R
78    where
79        F: FnOnce(&mut T) -> R,
80    {
81        self.inner.async_write_notify_all(f).await
82    }
83
84    /// Wakes one async waiter.
85    pub fn notify_one(&self) {
86        self.inner.notify_one();
87    }
88
89    /// Wakes all async waiters.
90    pub fn notify_all(&self) {
91        self.inner.notify_all();
92    }
93
94    /// Returns a future that resolves after an async notification.
95    pub fn wait_async(&self) -> AsyncMonitorFuture<'_, ()>
96    where
97        T: Send,
98    {
99        <TokioMonitor<T> as AsyncNotificationWaiter>::wait_async(self.inner.as_ref())
100    }
101
102    /// Returns a future that resolves after notification or timeout.
103    ///
104    /// # Arguments
105    ///
106    /// * `timeout` - Maximum relative duration to wait.
107    ///
108    /// # Returns
109    ///
110    /// A future resolving to the timeout status.
111    pub fn wait_for_async(&self, timeout: Duration) -> AsyncMonitorFuture<'_, WaitTimeoutStatus>
112    where
113        T: Send,
114    {
115        <TokioMonitor<T> as AsyncTimeoutNotificationWaiter>::wait_for_async(
116            self.inner.as_ref(),
117            timeout,
118        )
119    }
120
121    /// Returns a future that waits until the predicate becomes true.
122    ///
123    /// # Arguments
124    ///
125    /// * `predicate` - Predicate that returns `true` when the state is ready.
126    /// * `action` - Action to run after the predicate becomes true.
127    ///
128    /// # Returns
129    ///
130    /// A future resolving to the action result.
131    pub fn wait_until_async<'a, R, P, F>(
132        &'a self,
133        predicate: P,
134        action: F,
135    ) -> AsyncMonitorFuture<'a, R>
136    where
137        T: Send,
138        R: Send + 'a,
139        P: FnMut(&T) -> bool + Send + 'a,
140        F: FnOnce(&mut T) -> R + Send + 'a,
141    {
142        <TokioMonitor<T> as AsyncConditionWaiter>::wait_until_async(
143            self.inner.as_ref(),
144            predicate,
145            action,
146        )
147    }
148
149    /// Returns a future that waits while the predicate remains true.
150    ///
151    /// # Arguments
152    ///
153    /// * `predicate` - Predicate that returns `true` while waiting should continue.
154    /// * `action` - Action to run after the predicate becomes false.
155    ///
156    /// # Returns
157    ///
158    /// A future resolving to the action result.
159    pub fn wait_while_async<'a, R, P, F>(
160        &'a self,
161        predicate: P,
162        action: F,
163    ) -> AsyncMonitorFuture<'a, R>
164    where
165        T: Send,
166        R: Send + 'a,
167        P: FnMut(&T) -> bool + Send + 'a,
168        F: FnOnce(&mut T) -> R + Send + 'a,
169    {
170        <TokioMonitor<T> as AsyncConditionWaiter>::wait_while_async(
171            self.inner.as_ref(),
172            predicate,
173            action,
174        )
175    }
176
177    /// Returns a future that waits until the predicate becomes true or times out.
178    ///
179    /// # Arguments
180    ///
181    /// * `timeout` - Maximum relative duration to wait.
182    /// * `predicate` - Predicate that returns `true` when the state is ready.
183    /// * `action` - Action to run after the predicate becomes true.
184    ///
185    /// # Returns
186    ///
187    /// A future resolving to the timed wait result.
188    pub fn wait_until_for_async<'a, R, P, F>(
189        &'a self,
190        timeout: Duration,
191        predicate: P,
192        action: F,
193    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
194    where
195        T: Send,
196        R: Send + 'a,
197        P: FnMut(&T) -> bool + Send + 'a,
198        F: FnOnce(&mut T) -> R + Send + 'a,
199    {
200        <TokioMonitor<T> as AsyncTimeoutConditionWaiter>::wait_until_for_async(
201            self.inner.as_ref(),
202            timeout,
203            predicate,
204            action,
205        )
206    }
207
208    /// Returns a future that waits while the predicate remains true or times out.
209    ///
210    /// # Arguments
211    ///
212    /// * `timeout` - Maximum relative duration to wait.
213    /// * `predicate` - Predicate that returns `true` while waiting should continue.
214    /// * `action` - Action to run after the predicate becomes false.
215    ///
216    /// # Returns
217    ///
218    /// A future resolving to the timed wait result.
219    pub fn wait_while_for_async<'a, R, P, F>(
220        &'a self,
221        timeout: Duration,
222        predicate: P,
223        action: F,
224    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
225    where
226        T: Send,
227        R: Send + 'a,
228        P: FnMut(&T) -> bool + Send + 'a,
229        F: FnOnce(&mut T) -> R + Send + 'a,
230    {
231        <TokioMonitor<T> as AsyncTimeoutConditionWaiter>::wait_while_for_async(
232            self.inner.as_ref(),
233            timeout,
234            predicate,
235            action,
236        )
237    }
238}
239
240impl<T> Notifier for ArcTokioMonitor<T> {
241    /// Wakes one async waiter.
242    fn notify_one(&self) {
243        Self::notify_one(self);
244    }
245
246    /// Wakes all async waiters.
247    fn notify_all(&self) {
248        Self::notify_all(self);
249    }
250}
251
252impl<T: Send> AsyncNotificationWaiter for ArcTokioMonitor<T> {
253    /// Returns a future that resolves after an async notification.
254    fn wait_async<'a>(&'a self) -> AsyncMonitorFuture<'a, ()> {
255        self.inner.wait_async()
256    }
257}
258
259impl<T: Send> AsyncTimeoutNotificationWaiter for ArcTokioMonitor<T> {
260    /// Returns a future that resolves after notification or timeout.
261    fn wait_for_async<'a>(
262        &'a self,
263        timeout: Duration,
264    ) -> AsyncMonitorFuture<'a, WaitTimeoutStatus> {
265        self.inner.wait_for_async(timeout)
266    }
267}
268
269impl<T: Send> AsyncConditionWaiter for ArcTokioMonitor<T> {
270    type State = T;
271
272    /// Returns a future that waits while the predicate remains true.
273    fn wait_while_async<'a, R, P, F>(&'a self, predicate: P, action: F) -> AsyncMonitorFuture<'a, R>
274    where
275        R: Send + 'a,
276        P: FnMut(&Self::State) -> bool + Send + 'a,
277        F: FnOnce(&mut Self::State) -> R + Send + 'a,
278    {
279        self.inner.wait_while_async(predicate, action)
280    }
281}
282
283impl<T: Send> AsyncTimeoutConditionWaiter for ArcTokioMonitor<T> {
284    /// Returns a future that waits while the predicate remains true or times out.
285    fn wait_while_for_async<'a, R, P, F>(
286        &'a self,
287        timeout: Duration,
288        predicate: P,
289        action: F,
290    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
291    where
292        R: Send + 'a,
293        P: FnMut(&Self::State) -> bool + Send + 'a,
294        F: FnOnce(&mut Self::State) -> R + Send + 'a,
295    {
296        self.inner.wait_while_for_async(timeout, predicate, action)
297    }
298}
299
300impl<T> AsRef<TokioMonitor<T>> for ArcTokioMonitor<T> {
301    /// Returns a reference to the wrapped Tokio monitor.
302    fn as_ref(&self) -> &TokioMonitor<T> {
303        self.inner.as_ref()
304    }
305}
306
307impl<T> Deref for ArcTokioMonitor<T> {
308    type Target = TokioMonitor<T>;
309
310    /// Dereferences to the wrapped Tokio monitor.
311    fn deref(&self) -> &Self::Target {
312        self.inner.as_ref()
313    }
314}
315
316impl<T> Clone for ArcTokioMonitor<T> {
317    /// Clones this shared Tokio monitor handle.
318    fn clone(&self) -> Self {
319        Self {
320            inner: self.inner.clone(),
321        }
322    }
323}
324
325impl<T> From<T> for ArcTokioMonitor<T> {
326    /// Creates an Arc-wrapped Tokio monitor from an initial state value.
327    fn from(value: T) -> Self {
328        Self::new(value)
329    }
330}
331
332impl<T: Default> Default for ArcTokioMonitor<T> {
333    /// Creates an Arc-wrapped Tokio monitor containing `T::default()`.
334    fn default() -> Self {
335        Self::new(T::default())
336    }
337}