#![ cfg( feature = "implementation" ) ]
#![ cfg(not( target_arch = "wasm32" )) ]
mod common;
use common::{ *, import::* };
async fn cleanup_await_mt_inner( proofs: Vec<Arc<AtomicBool>> ) -> DynResult<()>
{
let (nursery, output) = Nursery::new( AsyncStd );
for proof in proofs.iter()
{
nursery.nurse( prove( proof.clone() ) )?;
}
drop(nursery);
output.await;
Ok(())
}
async fn prove( p: Arc<AtomicBool> )
{
Delay::new( Duration::from_millis( 20 ) ).await;
p.store( true, SeqCst );
}
#[ async_std::test ]
async fn cleanup_await_mt() -> DynResult<()>
{
let num_tasks = 5;
let mut proofs = Vec::new();
for _ in 0..num_tasks
{
proofs.push( Arc::new( AtomicBool::new(false) ) );
}
cleanup_await_mt_inner( proofs.clone() ).await?;
for proof in proofs.iter()
{
assert!( proof.load(SeqCst) );
}
Ok(())
}
async fn cleanup_await_st_inner( proofs: Vec<Arc<AtomicBool>>, exec: TokioCt ) -> DynResult<()>
{
let (nursery, output) = Nursery::new( exec );
for proof in proofs.iter()
{
nursery.nurse( prove( proof.clone() ) )?;
}
drop(nursery);
output.await;
Ok(())
}
#[test] fn cleanup_await_st() -> DynResult<()>
{
let exec = TokioCtBuilder::new().build()?;
let num_tasks = 5;
let mut proofs = Vec::new();
for _ in 0..num_tasks
{
proofs.push( Arc::new( AtomicBool::new(false) ) );
}
exec.block_on( cleanup_await_st_inner( proofs.clone(), exec.clone() ) )?;
for proof in proofs.iter()
{
assert!( proof.load(SeqCst) );
}
Ok(())
}
async fn cleanup_stream_mt_inner( proofs: Vec<Arc<AtomicBool>> ) -> DynResult<()>
{
let (nursery, mut output) = Nursery::new( AsyncStd );
for proof in proofs.iter()
{
nursery.nurse( prove( proof.clone() ) )?;
}
drop(nursery);
while output.next().await.is_some() {}
Ok(())
}
#[ async_std::test ]
async fn cleanup_stream_mt() -> DynResult<()>
{
let num_tasks = 5;
let mut proofs = Vec::new();
for _ in 0..num_tasks
{
proofs.push( Arc::new( AtomicBool::new(false) ) );
}
cleanup_stream_mt_inner( proofs.clone() ).await?;
for proof in proofs.iter()
{
assert!( proof.load(SeqCst) );
}
Ok(())
}
async fn cleanup_stream_st_inner( proofs: Vec<Arc<AtomicBool>>, exec: TokioCt ) -> DynResult<()>
{
let (nursery, mut output) = Nursery::new( exec );
for proof in proofs.iter()
{
nursery.nurse( prove( proof.clone() ) )?;
}
drop(nursery);
while output.next().await.is_some() {}
Ok(())
}
#[test] fn cleanup_stream_st() -> DynResult<()>
{
let exec = TokioCtBuilder::new().build()?;
let num_tasks = 5;
let mut proofs = Vec::new();
for _ in 0..num_tasks
{
proofs.push( Arc::new( AtomicBool::new(false) ) );
}
exec.block_on( cleanup_stream_st_inner( proofs.clone(), exec.clone() ) )?;
for proof in proofs.iter()
{
assert!( proof.load(SeqCst) );
}
Ok(())
}
async fn resource_drop_mt_inner( senders: Vec<mpsc::UnboundedSender<()>> ) -> DynResult<()>
{
let (nursery, _output) = Nursery::new( AsyncStd );
for tx in senders.into_iter()
{
nursery.nurse( slow(tx) )?;
}
Delay::new( Duration::from_millis(10) ).await;
Ok(())
}
async fn slow( tx: mpsc::UnboundedSender<()> ) -> DynResult<()>
{
Delay::new( Duration::from_secs(60) ).await;
tx.unbounded_send(())?;
Ok(())
}
#[ async_std::test ]
async fn resource_drop_mt() -> DynResult<()>
{
let (tx , mut rx ) = mpsc::unbounded();
let (tx2, mut rx2) = mpsc::unbounded();
resource_drop_mt_inner( vec![tx, tx2] ).await?;
assert_eq!( rx .next().await, None );
assert_eq!( rx2.next().await, None );
Ok(())
}
async fn resource_drop_st_inner( senders: Vec<mpsc::UnboundedSender<()>>, exec: TokioCt ) -> DynResult<()>
{
let (nursery, _output) = Nursery::new( exec );
for tx in senders.into_iter()
{
nursery.nurse( slow(tx) )?;
}
Delay::new( Duration::from_millis(10) ).await;
Ok(())
}
#[test] fn resource_drop_st() -> DynResult<()>
{
let exec = TokioCtBuilder::new().build()?;
let (tx , mut rx ) = mpsc::unbounded();
let (tx2, mut rx2) = mpsc::unbounded();
#[ allow(clippy::redundant_clone) ] exec.clone().block_on( async move
{
resource_drop_st_inner( vec![tx, tx2], exec ).await.unwrap();
assert_eq!( rx .next().await, None );
assert_eq!( rx2.next().await, None );
});
Ok(())
}