qubit-lock 0.9.0

Lock utilities library providing synchronous, asynchronous, and monitor-based locking primitives
Documentation
/*******************************************************************************
 *
 *    Copyright (c) 2025 - 2026 Haixing Hu.
 *
 *    SPDX-License-Identifier: Apache-2.0
 *
 *    Licensed under the Apache License, Version 2.0.
 *
 ******************************************************************************/
//! Arc-wrapped Tokio monitor.

use std::{
    ops::Deref,
    sync::Arc,
    time::Duration,
};

use super::{
    AsyncConditionWaiter,
    AsyncMonitorFuture,
    AsyncNotificationWaiter,
    AsyncTimeoutConditionWaiter,
    AsyncTimeoutNotificationWaiter,
    Notifier,
    TokioMonitor,
    WaitTimeoutResult,
    WaitTimeoutStatus,
};

/// Cloneable handle around a [`TokioMonitor`].
pub struct ArcTokioMonitor<T> {
    /// Shared Tokio monitor.
    inner: Arc<TokioMonitor<T>>,
}

impl<T> ArcTokioMonitor<T> {
    /// Creates an Arc-wrapped Tokio monitor.
    ///
    /// # Arguments
    ///
    /// * `state` - Initial protected state.
    ///
    /// # Returns
    ///
    /// A cloneable Tokio monitor handle.
    pub fn new(state: T) -> Self {
        Self {
            inner: Arc::new(TokioMonitor::new(state)),
        }
    }

    /// Reads protected state asynchronously.
    pub async fn async_read<R, F>(&self, f: F) -> R
    where
        F: FnOnce(&T) -> R,
    {
        self.inner.async_read(f).await
    }

    /// Mutates protected state asynchronously without notifying.
    pub async fn async_write<R, F>(&self, f: F) -> R
    where
        F: FnOnce(&mut T) -> R,
    {
        self.inner.async_write(f).await
    }

    /// Mutates protected state asynchronously and wakes one waiter.
    pub async fn async_write_notify_one<R, F>(&self, f: F) -> R
    where
        F: FnOnce(&mut T) -> R,
    {
        self.inner.async_write_notify_one(f).await
    }

    /// Mutates protected state asynchronously and wakes all waiters.
    pub async fn async_write_notify_all<R, F>(&self, f: F) -> R
    where
        F: FnOnce(&mut T) -> R,
    {
        self.inner.async_write_notify_all(f).await
    }

    /// Wakes one async waiter.
    pub fn notify_one(&self) {
        self.inner.notify_one();
    }

    /// Wakes all async waiters.
    pub fn notify_all(&self) {
        self.inner.notify_all();
    }

    /// Returns a future that resolves after an async notification.
    pub fn wait_async(&self) -> AsyncMonitorFuture<'_, ()>
    where
        T: Send,
    {
        <TokioMonitor<T> as AsyncNotificationWaiter>::wait_async(self.inner.as_ref())
    }

    /// Returns a future that resolves after notification or timeout.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum relative duration to wait.
    ///
    /// # Returns
    ///
    /// A future resolving to the timeout status.
    pub fn wait_for_async(&self, timeout: Duration) -> AsyncMonitorFuture<'_, WaitTimeoutStatus>
    where
        T: Send,
    {
        <TokioMonitor<T> as AsyncTimeoutNotificationWaiter>::wait_for_async(
            self.inner.as_ref(),
            timeout,
        )
    }

    /// Returns a future that waits until the predicate becomes true.
    ///
    /// # Arguments
    ///
    /// * `predicate` - Predicate that returns `true` when the state is ready.
    /// * `action` - Action to run after the predicate becomes true.
    ///
    /// # Returns
    ///
    /// A future resolving to the action result.
    pub fn wait_until_async<'a, R, P, F>(
        &'a self,
        predicate: P,
        action: F,
    ) -> AsyncMonitorFuture<'a, R>
    where
        T: Send,
        R: Send + 'a,
        P: FnMut(&T) -> bool + Send + 'a,
        F: FnOnce(&mut T) -> R + Send + 'a,
    {
        <TokioMonitor<T> as AsyncConditionWaiter>::wait_until_async(
            self.inner.as_ref(),
            predicate,
            action,
        )
    }

    /// Returns a future that waits while the predicate remains true.
    ///
    /// # Arguments
    ///
    /// * `predicate` - Predicate that returns `true` while waiting should continue.
    /// * `action` - Action to run after the predicate becomes false.
    ///
    /// # Returns
    ///
    /// A future resolving to the action result.
    pub fn wait_while_async<'a, R, P, F>(
        &'a self,
        predicate: P,
        action: F,
    ) -> AsyncMonitorFuture<'a, R>
    where
        T: Send,
        R: Send + 'a,
        P: FnMut(&T) -> bool + Send + 'a,
        F: FnOnce(&mut T) -> R + Send + 'a,
    {
        <TokioMonitor<T> as AsyncConditionWaiter>::wait_while_async(
            self.inner.as_ref(),
            predicate,
            action,
        )
    }

    /// Returns a future that waits until the predicate becomes true or times out.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum relative duration to wait.
    /// * `predicate` - Predicate that returns `true` when the state is ready.
    /// * `action` - Action to run after the predicate becomes true.
    ///
    /// # Returns
    ///
    /// A future resolving to the timed wait result.
    pub fn wait_until_for_async<'a, R, P, F>(
        &'a self,
        timeout: Duration,
        predicate: P,
        action: F,
    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
    where
        T: Send,
        R: Send + 'a,
        P: FnMut(&T) -> bool + Send + 'a,
        F: FnOnce(&mut T) -> R + Send + 'a,
    {
        <TokioMonitor<T> as AsyncTimeoutConditionWaiter>::wait_until_for_async(
            self.inner.as_ref(),
            timeout,
            predicate,
            action,
        )
    }

    /// Returns a future that waits while the predicate remains true or times out.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum relative duration to wait.
    /// * `predicate` - Predicate that returns `true` while waiting should continue.
    /// * `action` - Action to run after the predicate becomes false.
    ///
    /// # Returns
    ///
    /// A future resolving to the timed wait result.
    pub fn wait_while_for_async<'a, R, P, F>(
        &'a self,
        timeout: Duration,
        predicate: P,
        action: F,
    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
    where
        T: Send,
        R: Send + 'a,
        P: FnMut(&T) -> bool + Send + 'a,
        F: FnOnce(&mut T) -> R + Send + 'a,
    {
        <TokioMonitor<T> as AsyncTimeoutConditionWaiter>::wait_while_for_async(
            self.inner.as_ref(),
            timeout,
            predicate,
            action,
        )
    }
}

impl<T> Notifier for ArcTokioMonitor<T> {
    /// Wakes one async waiter.
    fn notify_one(&self) {
        Self::notify_one(self);
    }

    /// Wakes all async waiters.
    fn notify_all(&self) {
        Self::notify_all(self);
    }
}

impl<T: Send> AsyncNotificationWaiter for ArcTokioMonitor<T> {
    /// Returns a future that resolves after an async notification.
    fn wait_async<'a>(&'a self) -> AsyncMonitorFuture<'a, ()> {
        self.inner.wait_async()
    }
}

impl<T: Send> AsyncTimeoutNotificationWaiter for ArcTokioMonitor<T> {
    /// Returns a future that resolves after notification or timeout.
    fn wait_for_async<'a>(
        &'a self,
        timeout: Duration,
    ) -> AsyncMonitorFuture<'a, WaitTimeoutStatus> {
        self.inner.wait_for_async(timeout)
    }
}

impl<T: Send> AsyncConditionWaiter for ArcTokioMonitor<T> {
    type State = T;

    /// Returns a future that waits while the predicate remains true.
    fn wait_while_async<'a, R, P, F>(&'a self, predicate: P, action: F) -> AsyncMonitorFuture<'a, R>
    where
        R: Send + 'a,
        P: FnMut(&Self::State) -> bool + Send + 'a,
        F: FnOnce(&mut Self::State) -> R + Send + 'a,
    {
        self.inner.wait_while_async(predicate, action)
    }
}

impl<T: Send> AsyncTimeoutConditionWaiter for ArcTokioMonitor<T> {
    /// Returns a future that waits while the predicate remains true or times out.
    fn wait_while_for_async<'a, R, P, F>(
        &'a self,
        timeout: Duration,
        predicate: P,
        action: F,
    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
    where
        R: Send + 'a,
        P: FnMut(&Self::State) -> bool + Send + 'a,
        F: FnOnce(&mut Self::State) -> R + Send + 'a,
    {
        self.inner.wait_while_for_async(timeout, predicate, action)
    }
}

impl<T> AsRef<TokioMonitor<T>> for ArcTokioMonitor<T> {
    /// Returns a reference to the wrapped Tokio monitor.
    fn as_ref(&self) -> &TokioMonitor<T> {
        self.inner.as_ref()
    }
}

impl<T> Deref for ArcTokioMonitor<T> {
    type Target = TokioMonitor<T>;

    /// Dereferences to the wrapped Tokio monitor.
    fn deref(&self) -> &Self::Target {
        self.inner.as_ref()
    }
}

impl<T> Clone for ArcTokioMonitor<T> {
    /// Clones this shared Tokio monitor handle.
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<T> From<T> for ArcTokioMonitor<T> {
    /// Creates an Arc-wrapped Tokio monitor from an initial state value.
    fn from(value: T) -> Self {
        Self::new(value)
    }
}

impl<T: Default> Default for ArcTokioMonitor<T> {
    /// Creates an Arc-wrapped Tokio monitor containing `T::default()`.
    fn default() -> Self {
        Self::new(T::default())
    }
}