use std::future::Future;
use std::pin::pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Waker;
use std::vec::Vec;
use super::*;
use crate::latch::Latch;
#[test]
fn no_permits() {
Semaphore::new(0);
}
#[test]
fn try_acquire() {
let sem = Semaphore::new(1);
{
let p1 = sem.try_acquire(1);
assert!(p1.is_some());
let p2 = sem.try_acquire(1);
assert!(p2.is_none());
}
let p3 = sem.try_acquire(1);
assert!(p3.is_some());
}
#[tokio::test]
async fn acquire() {
let sem = Arc::new(Semaphore::new(1));
let p1 = sem.try_acquire(1).unwrap();
let sem_clone = sem.clone();
let j = tokio::spawn(async move {
let _p2 = sem_clone.acquire(1).await;
});
drop(p1);
j.await.unwrap();
}
#[tokio::test]
async fn add_permits() {
let sem = Arc::new(Semaphore::new(0));
let sem_clone = sem.clone();
let j = tokio::spawn(async move {
let _p2 = sem_clone.acquire(1).await;
});
sem.release(1);
j.await.unwrap();
}
#[test]
fn forget() {
let sem = Arc::new(Semaphore::new(1));
{
let p = sem.try_acquire(1).unwrap();
assert_eq!(sem.available_permits(), 0);
p.forget();
assert_eq!(sem.available_permits(), 0);
}
assert_eq!(sem.available_permits(), 0);
assert!(sem.try_acquire(1).is_none());
}
#[tokio::test]
async fn stress_test() {
let sem = Arc::new(Semaphore::new(5));
let mut join_handles = Vec::new();
for i in 0..100 {
let sem_clone = sem.clone();
join_handles.push(tokio::spawn(async move {
let _p = sem_clone.acquire(1).await;
tokio::time::sleep(std::time::Duration::from_millis(100 - i)).await;
}));
}
for j in join_handles {
j.await.unwrap();
}
let _p1 = sem.try_acquire(1).unwrap();
let _p2 = sem.try_acquire(1).unwrap();
let _p3 = sem.try_acquire(1).unwrap();
let _p4 = sem.try_acquire(1).unwrap();
let _p5 = sem.try_acquire(1).unwrap();
assert!(sem.try_acquire(1).is_none());
}
#[test]
fn add_max_amount_permits() {
let s = Semaphore::new(0);
s.release(usize::MAX);
assert_eq!(s.available_permits(), usize::MAX);
}
#[test]
#[should_panic]
fn add_more_than_max_amount_permits1() {
let s = Semaphore::new(1);
s.release(usize::MAX);
}
#[test]
#[should_panic]
fn add_more_than_max_amount_permits2() {
let s = Semaphore::new(usize::MAX - 1);
s.release(1);
s.release(1);
}
#[test]
fn no_panic_at_max_permits() {
let _ = Semaphore::new(usize::MAX);
let s = Semaphore::new(usize::MAX - 1);
s.release(1);
}
#[test]
fn try_acquire_concurrently() {
let s = Semaphore::new(1);
let p1 = s.try_acquire(1).unwrap();
assert_eq!(s.available_permits(), 0);
let p2 = s.try_acquire(1);
assert!(p2.is_none());
assert_eq!(s.available_permits(), 0);
drop(p1);
assert_eq!(s.available_permits(), 1);
}
#[test]
fn acquire_then_drop() {
let waker = Waker::noop();
let mut context = Context::from_waker(waker);
let s = Semaphore::new(1);
let p1 = s.try_acquire(1).unwrap();
{
let p2 = s.acquire(1);
let poll = pin!(p2).poll(&mut context);
assert!(poll.is_pending());
}
drop(p1);
assert_eq!(s.available_permits(), 1);
}
#[test]
fn wake_then_drop() {
let waker = Waker::noop();
let mut context = Context::from_waker(waker);
let s = Semaphore::new(2);
let p1 = s.try_acquire(2).unwrap();
{
let p2 = s.acquire(1);
let p2 = pin!(p2);
assert!(p2.poll(&mut context).is_pending());
{
let p3 = s.acquire(1);
let p3 = pin!(p3);
assert!(p3.poll(&mut context).is_pending());
drop(p1);
}
}
assert_eq!(s.available_permits(), 2);
}
#[tokio::test]
async fn acquire_then_forget_exact() {
let s = Arc::new(Semaphore::new(5));
s.forget_exact(3);
assert_eq!(s.available_permits(), 2);
let acquired = Arc::new(Latch::new(1));
let acquired_clone = acquired.clone();
let s_clone = s.clone();
tokio::spawn(async move {
let _p = s_clone.acquire(3).await;
acquired_clone.count_down();
});
assert!(acquired.try_wait().is_err());
s.forget_exact(2);
s.release(2);
assert!(acquired.try_wait().is_err());
s.release(1);
acquired.wait().await;
assert_eq!(s.available_permits(), 3);
}