use futures::{FutureExt, stream};
#[cfg(feature = "tokio")]
use futures::future::ready;
use futures::stream::StreamExt;
use std::sync::Arc;
#[cfg(feature = "tokio")]
use std::rc::Rc;
use tokio::{self, sync::Barrier};
#[cfg(feature = "tokio")]
use tokio::runtime;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
use futures_locks::*;
#[test]
fn mutex_weak_some() {
let mutex = Mutex::<u32>::new(0);
let mutex_weak = Mutex::downgrade(&mutex);
assert!(mutex_weak.upgrade().is_some())
}
#[test]
fn mutex_weak_none() {
let mutex = Mutex::<u32>::new(0);
let mutex_weak = Mutex::downgrade(&mutex);
drop(mutex);
assert!(mutex_weak.upgrade().is_none())
}
#[test]
fn mutex_eq_ptr_true() {
let mutex = Mutex::<u32>::new(0);
let mutex_other = mutex.clone();
assert!(Mutex::ptr_eq(&mutex, &mutex_other));
}
#[test]
fn mutex_eq_ptr_false() {
let mutex = Mutex::<u32>::new(0);
let mutex_other = Mutex::<u32>::new(0);
assert!(!Mutex::ptr_eq(&mutex, &mutex_other));
}
#[tokio::test]
async fn drop_when_ready() {
let mutex = Mutex::<u32>::new(0);
let guard1 = mutex.lock().await;
let fut2 = mutex.lock();
drop(guard1); drop(fut2); let _guard3 = mutex.lock().await;
}
#[test]
fn drop_before_ready() {
let mutex = Mutex::<u32>::new(0);
let mut fut1 = spawn(mutex.lock());
let guard1 = assert_ready!(fut1.poll());
let mut fut2 = spawn(mutex.lock());
assert_pending!(fut2.poll());
let mut fut3 = spawn(mutex.lock());
assert_pending!(fut3.poll());
drop(fut2); drop(guard1); drop(fut1);
assert!(fut3.is_woken());
assert_ready!(fut3.poll());
}
#[test]
fn get_mut() {
let mut mutex = Mutex::<u32>::new(42);
*mutex.get_mut().unwrap() += 1;
assert_eq!(*mutex.get_mut().unwrap(), 43);
}
#[test]
fn get_mut_cloned() {
let mut mutex = Mutex::<u32>::new(42);
let _clone = mutex.clone();
assert!(mutex.get_mut().is_none());
}
#[tokio::test]
async fn lock_uncontested() {
let mutex = Mutex::<u32>::new(0);
let guard = mutex.lock().await;
let result = *guard + 5;
drop(guard);
assert_eq!(result, 5);
}
#[test]
fn lock_contested() {
let mutex = Mutex::<u32>::new(0);
let mut fut0 = spawn(mutex.lock());
let guard0 = assert_ready!(fut0.poll());
let mut fut1 = spawn(mutex.lock());
assert_pending!(fut1.poll());
drop(guard0); assert!(fut1.is_woken());
assert_ready!(fut1.poll());
}
#[tokio::test]
async fn lock_multithreaded() {
let mutex = Mutex::<u32>::new(0);
let mtx_clone0 = mutex.clone();
let mtx_clone1 = mutex.clone();
let mtx_clone2 = mutex.clone();
let mtx_clone3 = mutex.clone();
let barrier = Arc::new(Barrier::new(5));
let b0 = barrier.clone();
let b1 = barrier.clone();
let b2 = barrier.clone();
let b3 = barrier.clone();
tokio::task::spawn(async move {
stream::iter(0..1000).for_each(move |_| {
mtx_clone0.lock().map(|mut guard| { *guard += 2 })
}).await;
b0.wait().await;
});
tokio::task::spawn(async move {
stream::iter(0..1000).for_each(move |_| {
mtx_clone1.lock().map(|mut guard| { *guard += 3 })
}).await;
b1.wait().await;
});
tokio::task::spawn(async move {
stream::iter(0..1000).for_each(move |_| {
mtx_clone2.lock().map(|mut guard| { *guard += 5 })
}).await;
b2.wait().await;
});
tokio::task::spawn(async move {
stream::iter(0..1000).for_each(move |_| {
mtx_clone3.lock().map(|mut guard| { *guard += 7 })
}).await;
b3.wait().await;
});
barrier.wait().await;
assert_eq!(mutex.try_unwrap().expect("try_unwrap"), 17_000);
}
#[tokio::test]
async fn lock_order() {
let mutex = Mutex::<Vec<u32>>::new(vec![]);
let fut2 = mutex.lock().map(|mut guard| guard.push(2));
let fut1 = mutex.lock().map(|mut guard| guard.push(1));
fut1.then(|_| fut2).await;
assert_eq!(mutex.try_unwrap().unwrap(), vec![1, 2]);
}
#[test]
fn try_lock_uncontested() {
let mutex = Mutex::<u32>::new(5);
let guard = mutex.try_lock().unwrap();
assert_eq!(5, *guard);
}
#[test]
fn try_lock_contested() {
let mutex = Mutex::<u32>::new(0);
let _guard = mutex.try_lock().unwrap();
assert!(mutex.try_lock().is_err());
}
#[test]
fn try_unwrap_multiply_referenced() {
let mtx = Mutex::<u32>::new(0);
let _mtx2 = mtx.clone();
assert!(mtx.try_unwrap().is_err());
}
#[cfg(feature = "tokio")]
#[test]
fn with_err() {
let mtx = Mutex::<i32>::new(-5);
let rt = runtime::Builder::new_current_thread().build().unwrap();
let r = rt.block_on(async {
mtx.with(|guard| {
if *guard > 0 {
ready(Ok(*guard))
} else {
ready(Err("Whoops!"))
}
}).await
});
assert_eq!(r, Err("Whoops!"));
}
#[cfg(feature = "tokio")]
#[test]
fn with_ok() {
let mtx = Mutex::<i32>::new(5);
let rt = runtime::Builder::new_current_thread().build().unwrap();
let r = rt.block_on(async {
mtx.with(|guard| {
ready(*guard)
}).await
});
assert_eq!(r, 5);
}
#[cfg(feature = "tokio")]
#[test]
fn with_threadpool() {
let mtx = Mutex::<i32>::new(5);
let rt = runtime::Builder::new_multi_thread().build().unwrap();
let r = rt.block_on(async {
mtx.with(|guard| {
ready(*guard)
}).await
});
assert_eq!(r, 5);
}
#[cfg(feature = "tokio")]
#[test]
fn with_local_ok() {
let mtx = Mutex::<Rc<i32>>::new(Rc::new(5));
let rt = runtime::Builder::new_current_thread().build().unwrap();
let r = rt.block_on(async {
mtx.with_local(|guard| {
ready(**guard)
}).await
});
assert_eq!(r, 5);
}