fast_async_mutex/
mutex_ordered.rs

1use crate::inner::OrderedInner;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8/// The Ordered Mutex has its mechanism of locking order when you have concurrent access to data.
9/// It will work well when you needed step by step data locking like sending UDP packages in a specific order.
10///
11/// The main difference with the standard `Mutex` is ordered mutex will check an ordering of blocking.
12/// This way has some guaranties of mutex execution order, but it's a little bit slowly than original mutex.
13#[derive(Debug)]
14pub struct OrderedMutex<T: ?Sized> {
15    inner: OrderedInner<T>,
16}
17
18impl<T> OrderedMutex<T> {
19    /// Create a new `OrderedMutex`
20    #[inline]
21    pub const fn new(data: T) -> OrderedMutex<T> {
22        OrderedMutex {
23            inner: OrderedInner::new(data),
24        }
25    }
26}
27
28impl<T: ?Sized> OrderedMutex<T> {
29    /// Acquires the mutex.
30    ///
31    /// Returns a guard that releases the mutex and wake the next locker when dropped.
32    ///
33    /// # Examples
34    ///
35    /// ```
36    /// use fast_async_mutex::mutex_ordered::OrderedMutex;
37    ///
38    /// #[tokio::main]
39    /// async fn main() {
40    ///     let mutex = OrderedMutex::new(10);
41    ///     let guard = mutex.lock().await;
42    ///     assert_eq!(*guard, 10);
43    /// }
44    /// ```
45    #[inline]
46    pub fn lock(&self) -> OrderedMutexGuardFuture<T> {
47        OrderedMutexGuardFuture {
48            mutex: &self,
49            id: self.inner.generate_id(),
50            is_realized: false,
51        }
52    }
53
54    /// Acquires the mutex.
55    ///
56    /// Returns a guard that releases the mutex and wake the next locker when dropped.
57    /// `OrderedMutexOwnedGuard` have a `'static` lifetime, but requires the `Arc<OrderedMutex<T>>` type
58    ///
59    /// # Examples
60    ///
61    /// ```
62    /// use fast_async_mutex::mutex_ordered::OrderedMutex;
63    /// use std::sync::Arc;
64    /// #[tokio::main]
65    /// async fn main() {
66    ///     let mutex = Arc::new(OrderedMutex::new(10));
67    ///     let guard = mutex.lock_owned().await;
68    ///     assert_eq!(*guard, 10);
69    /// }
70    /// ```
71    #[inline]
72    pub fn lock_owned(self: &Arc<Self>) -> OrderedMutexOwnedGuardFuture<T> {
73        OrderedMutexOwnedGuardFuture {
74            mutex: self.clone(),
75            id: self.inner.generate_id(),
76            is_realized: false,
77        }
78    }
79}
80
81/// The Simple OrderedMutex Guard
82/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard internally borrows the OrderedMutex, so the mutex will not be dropped while a guard exists.
83/// The lock is automatically released and waked the next locker whenever the guard is dropped, at which point lock will succeed yet again.
84#[derive(Debug)]
85pub struct OrderedMutexGuard<'a, T: ?Sized> {
86    mutex: &'a OrderedMutex<T>,
87}
88
89#[derive(Debug)]
90pub struct OrderedMutexGuardFuture<'a, T: ?Sized> {
91    mutex: &'a OrderedMutex<T>,
92    id: usize,
93    is_realized: bool,
94}
95
96/// An owned handle to a held OrderedMutex.
97/// This guard is only available from a OrderedMutex that is wrapped in an `Arc`. It is identical to `OrderedMutexGuard`, except that rather than borrowing the `OrderedMutex`, it clones the `Arc`, incrementing the reference count. This means that unlike `OrderedMutexGuard`, it will have the `'static` lifetime.
98/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard internally keeps a reference-couned pointer to the original `OrderedMutex`, so even if the lock goes away, the guard remains valid.
99/// The lock is automatically released and waked the next locker whenever the guard is dropped, at which point lock will succeed yet again.
100#[derive(Debug)]
101pub struct OrderedMutexOwnedGuard<T: ?Sized> {
102    mutex: Arc<OrderedMutex<T>>,
103}
104
105#[derive(Debug)]
106pub struct OrderedMutexOwnedGuardFuture<T: ?Sized> {
107    mutex: Arc<OrderedMutex<T>>,
108    id: usize,
109    is_realized: bool,
110}
111
112impl<'a, T: ?Sized> Future for OrderedMutexGuardFuture<'a, T> {
113    type Output = OrderedMutexGuard<'a, T>;
114
115    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
116        if self.mutex.inner.try_acquire(self.id) {
117            self.is_realized = true;
118            Poll::Ready(OrderedMutexGuard { mutex: self.mutex })
119        } else {
120            self.mutex.inner.store_waker(cx.waker());
121            Poll::Pending
122        }
123    }
124}
125
126impl<T: ?Sized> Future for OrderedMutexOwnedGuardFuture<T> {
127    type Output = OrderedMutexOwnedGuard<T>;
128
129    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        if self.mutex.inner.try_acquire(self.id) {
131            self.is_realized = true;
132            Poll::Ready(OrderedMutexOwnedGuard {
133                mutex: self.mutex.clone(),
134            })
135        } else {
136            self.mutex.inner.store_waker(cx.waker());
137            Poll::Pending
138        }
139    }
140}
141
142crate::impl_send_sync_mutex!(OrderedMutex, OrderedMutexGuard, OrderedMutexOwnedGuard);
143
144crate::impl_deref_mut!(OrderedMutexGuard, 'a);
145crate::impl_deref_mut!(OrderedMutexOwnedGuard);
146
147crate::impl_drop_guard!(OrderedMutexGuard, 'a, unlock);
148crate::impl_drop_guard!(OrderedMutexOwnedGuard, unlock);
149crate::impl_drop_guard_future!(OrderedMutexGuardFuture, 'a, unlock);
150crate::impl_drop_guard_future!(OrderedMutexOwnedGuardFuture, unlock);
151
152#[cfg(test)]
153mod tests {
154    use crate::mutex_ordered::{OrderedMutex, OrderedMutexGuard, OrderedMutexOwnedGuard};
155    use futures::executor::block_on;
156    use futures::{FutureExt, StreamExt, TryStreamExt};
157    use std::ops::AddAssign;
158    use std::sync::atomic::AtomicUsize;
159    use std::sync::Arc;
160    use tokio::time::{sleep, Duration};
161
162    #[tokio::test(flavor = "multi_thread", worker_threads = 12)]
163    async fn test_mutex() {
164        let c = OrderedMutex::new(0);
165
166        futures::stream::iter(0..10000)
167            .for_each_concurrent(None, |_| async {
168                let mut co: OrderedMutexGuard<i32> = c.lock().await;
169                *co += 1;
170            })
171            .await;
172
173        let co = c.lock().await;
174        assert_eq!(*co, 10000)
175    }
176
177    #[tokio::test(flavor = "multi_thread", worker_threads = 12)]
178    async fn test_mutex_delay() {
179        let expected_result = 100;
180        let c = OrderedMutex::new(0);
181
182        futures::stream::iter(0..expected_result)
183            .then(|i| c.lock().map(move |co| (i, co)))
184            .for_each_concurrent(None, |(i, mut co)| async move {
185                sleep(Duration::from_millis(expected_result - i)).await;
186                *co += 1;
187            })
188            .await;
189
190        let co = c.lock().await;
191        assert_eq!(*co, expected_result)
192    }
193
194    #[tokio::test(flavor = "multi_thread", worker_threads = 12)]
195    async fn test_owned_mutex() {
196        let c = Arc::new(OrderedMutex::new(0));
197
198        futures::stream::iter(0..10000)
199            .for_each_concurrent(None, |_| async {
200                let mut co: OrderedMutexOwnedGuard<i32> = c.lock_owned().await;
201                *co += 1;
202            })
203            .await;
204
205        let co = c.lock_owned().await;
206        assert_eq!(*co, 10000)
207    }
208
209    #[tokio::test]
210    async fn test_container() {
211        let c = OrderedMutex::new(String::from("lol"));
212
213        let mut co: OrderedMutexGuard<String> = c.lock().await;
214        co.add_assign("lol");
215
216        assert_eq!(*co, "lollol");
217    }
218
219    #[tokio::test]
220    async fn test_overflow() {
221        let mut c = OrderedMutex::new(String::from("lol"));
222
223        c.inner.state = AtomicUsize::new(usize::max_value());
224        c.inner.current = AtomicUsize::new(usize::max_value());
225
226        let mut co: OrderedMutexGuard<String> = c.lock().await;
227        co.add_assign("lol");
228
229        assert_eq!(*co, "lollol");
230    }
231
232    #[tokio::test]
233    async fn test_timeout() {
234        let c = OrderedMutex::new(String::from("lol"));
235
236        let co: OrderedMutexGuard<String> = c.lock().await;
237
238        futures::stream::iter(0..10000i32)
239            .then(|_| tokio::time::timeout(Duration::from_nanos(1), c.lock()))
240            .try_for_each_concurrent(None, |_c| futures::future::ok(()))
241            .await
242            .expect_err("timout must be");
243
244        drop(co);
245
246        let mut co: OrderedMutexGuard<String> = c.lock().await;
247        co.add_assign("lol");
248
249        assert_eq!(*co, "lollol");
250    }
251
252    #[test]
253    fn multithreading_test() {
254        let num = 100;
255        let mutex = Arc::new(OrderedMutex::new(0));
256        let ths: Vec<_> = (0..num)
257            .map(|_| {
258                let mutex = mutex.clone();
259                std::thread::spawn(move || {
260                    block_on(async {
261                        let mut lock = mutex.lock().await;
262                        *lock += 1;
263                    })
264                })
265            })
266            .collect();
267
268        for thread in ths {
269            thread.join().unwrap();
270        }
271
272        block_on(async {
273            let lock = mutex.lock().await;
274            assert_eq!(num, *lock)
275        })
276    }
277}