fast_async_mutex/
mutex_ordered.rs1use crate::inner::OrderedInner;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
14pub struct OrderedMutex<T: ?Sized> {
15 inner: OrderedInner<T>,
16}
17
18impl<T> OrderedMutex<T> {
19 #[inline]
21 pub const fn new(data: T) -> OrderedMutex<T> {
22 OrderedMutex {
23 inner: OrderedInner::new(data),
24 }
25 }
26}
27
28impl<T: ?Sized> OrderedMutex<T> {
29 #[inline]
46 pub fn lock(&self) -> OrderedMutexGuardFuture<T> {
47 OrderedMutexGuardFuture {
48 mutex: &self,
49 id: self.inner.generate_id(),
50 is_realized: false,
51 }
52 }
53
54 #[inline]
72 pub fn lock_owned(self: &Arc<Self>) -> OrderedMutexOwnedGuardFuture<T> {
73 OrderedMutexOwnedGuardFuture {
74 mutex: self.clone(),
75 id: self.inner.generate_id(),
76 is_realized: false,
77 }
78 }
79}
80
81#[derive(Debug)]
85pub struct OrderedMutexGuard<'a, T: ?Sized> {
86 mutex: &'a OrderedMutex<T>,
87}
88
89#[derive(Debug)]
90pub struct OrderedMutexGuardFuture<'a, T: ?Sized> {
91 mutex: &'a OrderedMutex<T>,
92 id: usize,
93 is_realized: bool,
94}
95
96#[derive(Debug)]
101pub struct OrderedMutexOwnedGuard<T: ?Sized> {
102 mutex: Arc<OrderedMutex<T>>,
103}
104
105#[derive(Debug)]
106pub struct OrderedMutexOwnedGuardFuture<T: ?Sized> {
107 mutex: Arc<OrderedMutex<T>>,
108 id: usize,
109 is_realized: bool,
110}
111
112impl<'a, T: ?Sized> Future for OrderedMutexGuardFuture<'a, T> {
113 type Output = OrderedMutexGuard<'a, T>;
114
115 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
116 if self.mutex.inner.try_acquire(self.id) {
117 self.is_realized = true;
118 Poll::Ready(OrderedMutexGuard { mutex: self.mutex })
119 } else {
120 self.mutex.inner.store_waker(cx.waker());
121 Poll::Pending
122 }
123 }
124}
125
126impl<T: ?Sized> Future for OrderedMutexOwnedGuardFuture<T> {
127 type Output = OrderedMutexOwnedGuard<T>;
128
129 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 if self.mutex.inner.try_acquire(self.id) {
131 self.is_realized = true;
132 Poll::Ready(OrderedMutexOwnedGuard {
133 mutex: self.mutex.clone(),
134 })
135 } else {
136 self.mutex.inner.store_waker(cx.waker());
137 Poll::Pending
138 }
139 }
140}
141
142crate::impl_send_sync_mutex!(OrderedMutex, OrderedMutexGuard, OrderedMutexOwnedGuard);
143
144crate::impl_deref_mut!(OrderedMutexGuard, 'a);
145crate::impl_deref_mut!(OrderedMutexOwnedGuard);
146
147crate::impl_drop_guard!(OrderedMutexGuard, 'a, unlock);
148crate::impl_drop_guard!(OrderedMutexOwnedGuard, unlock);
149crate::impl_drop_guard_future!(OrderedMutexGuardFuture, 'a, unlock);
150crate::impl_drop_guard_future!(OrderedMutexOwnedGuardFuture, unlock);
151
152#[cfg(test)]
153mod tests {
154 use crate::mutex_ordered::{OrderedMutex, OrderedMutexGuard, OrderedMutexOwnedGuard};
155 use futures::executor::block_on;
156 use futures::{FutureExt, StreamExt, TryStreamExt};
157 use std::ops::AddAssign;
158 use std::sync::atomic::AtomicUsize;
159 use std::sync::Arc;
160 use tokio::time::{sleep, Duration};
161
162 #[tokio::test(flavor = "multi_thread", worker_threads = 12)]
163 async fn test_mutex() {
164 let c = OrderedMutex::new(0);
165
166 futures::stream::iter(0..10000)
167 .for_each_concurrent(None, |_| async {
168 let mut co: OrderedMutexGuard<i32> = c.lock().await;
169 *co += 1;
170 })
171 .await;
172
173 let co = c.lock().await;
174 assert_eq!(*co, 10000)
175 }
176
177 #[tokio::test(flavor = "multi_thread", worker_threads = 12)]
178 async fn test_mutex_delay() {
179 let expected_result = 100;
180 let c = OrderedMutex::new(0);
181
182 futures::stream::iter(0..expected_result)
183 .then(|i| c.lock().map(move |co| (i, co)))
184 .for_each_concurrent(None, |(i, mut co)| async move {
185 sleep(Duration::from_millis(expected_result - i)).await;
186 *co += 1;
187 })
188 .await;
189
190 let co = c.lock().await;
191 assert_eq!(*co, expected_result)
192 }
193
194 #[tokio::test(flavor = "multi_thread", worker_threads = 12)]
195 async fn test_owned_mutex() {
196 let c = Arc::new(OrderedMutex::new(0));
197
198 futures::stream::iter(0..10000)
199 .for_each_concurrent(None, |_| async {
200 let mut co: OrderedMutexOwnedGuard<i32> = c.lock_owned().await;
201 *co += 1;
202 })
203 .await;
204
205 let co = c.lock_owned().await;
206 assert_eq!(*co, 10000)
207 }
208
209 #[tokio::test]
210 async fn test_container() {
211 let c = OrderedMutex::new(String::from("lol"));
212
213 let mut co: OrderedMutexGuard<String> = c.lock().await;
214 co.add_assign("lol");
215
216 assert_eq!(*co, "lollol");
217 }
218
219 #[tokio::test]
220 async fn test_overflow() {
221 let mut c = OrderedMutex::new(String::from("lol"));
222
223 c.inner.state = AtomicUsize::new(usize::max_value());
224 c.inner.current = AtomicUsize::new(usize::max_value());
225
226 let mut co: OrderedMutexGuard<String> = c.lock().await;
227 co.add_assign("lol");
228
229 assert_eq!(*co, "lollol");
230 }
231
232 #[tokio::test]
233 async fn test_timeout() {
234 let c = OrderedMutex::new(String::from("lol"));
235
236 let co: OrderedMutexGuard<String> = c.lock().await;
237
238 futures::stream::iter(0..10000i32)
239 .then(|_| tokio::time::timeout(Duration::from_nanos(1), c.lock()))
240 .try_for_each_concurrent(None, |_c| futures::future::ok(()))
241 .await
242 .expect_err("timout must be");
243
244 drop(co);
245
246 let mut co: OrderedMutexGuard<String> = c.lock().await;
247 co.add_assign("lol");
248
249 assert_eq!(*co, "lollol");
250 }
251
252 #[test]
253 fn multithreading_test() {
254 let num = 100;
255 let mutex = Arc::new(OrderedMutex::new(0));
256 let ths: Vec<_> = (0..num)
257 .map(|_| {
258 let mutex = mutex.clone();
259 std::thread::spawn(move || {
260 block_on(async {
261 let mut lock = mutex.lock().await;
262 *lock += 1;
263 })
264 })
265 })
266 .collect();
267
268 for thread in ths {
269 thread.join().unwrap();
270 }
271
272 block_on(async {
273 let lock = mutex.lock().await;
274 assert_eq!(num, *lock)
275 })
276 }
277}