#[cfg(feature="tokio")]
mod tokio_tests
{
use crate::{BoundedSendError, ReceiveError};
use inc_dec::IncDecSelf;
use tokio::{sync::{Notify, Semaphore}, task::JoinSet};
use std::sync::Arc;
use super::super::{channel, Sender, Receiver};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn basic_sender_oriented()
{
let (sender, receiver) = channel(2);
assert_eq!(sender.capacity(), 2);
assert_eq!(receiver.capacity(), 2);
let task = tokio::spawn(async move {
let result = receiver.recv().await;
assert_eq!(result, Ok(1));
let result = receiver.recv().await;
assert_eq!(result, Ok(2));
});
assert!(!sender.is_closed());
let result = sender.send(1).await;
assert!(result.is_ok());
let result = sender.send(2).await;
assert!(result.is_ok());
assert!(task.await.is_ok());
let result = sender.send(3).await;
assert_eq!(result, Err(3));
assert!(sender.is_closed());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn basic_receiver_oriented()
{
let (sender, receiver) = channel(2);
assert_eq!(receiver.capacity(), 2);
assert_eq!(sender.capacity(), 2);
assert_eq!(receiver.head_room(), 2);
assert!(!receiver.is_full());
assert!(receiver.is_empty());
let task = tokio::spawn(async move {
assert!(!sender.is_closed());
assert_eq!(sender.head_room(), 2);
let result = sender.send(1).await;
assert!(result.is_ok());
let result = sender.send(2).await;
assert!(result.is_ok());
});
assert!(!receiver.is_closed());
let result = receiver.recv().await;
assert_eq!(result, Ok(1));
let result = receiver.recv().await;
assert_eq!(result, Ok(2));
assert!(task.await.is_ok());
let result = receiver.recv().await;
assert_eq!(result, Err(()));
assert!(receiver.is_closed());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn is_full_sender_oriented()
{
let (sender, receiver) = channel(2);
assert_eq!(sender.capacity(), 2);
assert_eq!(receiver.capacity(), 2);
assert_eq!(sender.head_room(), 2);
assert!(!sender.is_full());
assert!(sender.is_empty());
let sender_notify = Notify::new();
let arc_sender_notify = Arc::new(sender_notify);
let arc_sender_notify_task = arc_sender_notify.clone();
let receiver_notify = Notify::new();
let arc_receiver_notify = Arc::new(receiver_notify);
let arc_receiver_notify_task = arc_receiver_notify.clone();
println!("Hello Start");
let task = tokio::spawn(async move
{
assert!(!receiver.is_closed());
println!("Hello Task 1");
let result = receiver.recv().await;
assert_eq!(result, Ok(1));
println!("Hello Task 2");
let result = receiver.recv().await;
assert_eq!(result, Ok(2));
println!("Hello Task notify_one");
arc_sender_notify_task.notify_one();
println!("Hello Task 3");
let result = receiver.recv().await;
assert_eq!(result, Ok(3));
println!("Hello Task 4");
let result = receiver.recv().await;
assert_eq!(result, Ok(4));
println!("Hello Task receiver notified await");
arc_receiver_notify_task.notified().await;
});
assert!(!sender.is_closed());
assert_eq!(sender.head_room(), 2);
println!("Hello 1");
let result = sender.send(1).await;
assert!(result.is_ok());
println!("Hello 2");
let result = sender.send(2).await;
assert!(result.is_ok());
println!("Hello sender notified await");
arc_sender_notify.notified().await;
println!("Hello 3");
let result = sender.try_send(3);
assert!(result.is_ok());
println!("Hello 4");
let result = sender.try_send(4);
assert!(result.is_ok());
arc_receiver_notify.notify_one();
println!("Hello task await is_ok");
assert!(task.await.is_ok());
println!("Hello 6");
let result = sender.try_send(6);
assert_eq!(result, Err(BoundedSendError::Closed(6)));
println!("Hello 7");
let result = sender.send(7).await;
assert_eq!(result, Err(7));
assert!(sender.is_closed());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn is_full_receiver_oriented()
{
let (sender, receiver) = channel(2);
assert_eq!(receiver.capacity(), 2);
assert_eq!(sender.capacity(), 2);
assert_eq!(receiver.head_room(), 2);
assert!(!receiver.is_full());
assert!(receiver.is_empty());
let receiver_notify = Notify::new();
let arc_receiver_notify = Arc::new(receiver_notify);
let arc_receiver_notify_task = arc_receiver_notify.clone();
let sender_notify = Notify::new();
let arc_sender_notify = Arc::new(sender_notify);
let arc_sender_notify_task = arc_sender_notify.clone();
println!("Hello Start");
let task = tokio::spawn(async move {
assert!(!sender.is_closed());
assert_eq!(sender.head_room(), 2);
println!("Hello task 1");
let result = sender.send(1).await;
assert!(result.is_ok());
println!("Hello task 2");
let result = sender.send(2).await;
assert!(result.is_ok());
println!("Hello task 3");
let result = sender.send(3).await;
assert!(result.is_ok());
println!("Hello task 4");
let result = sender.send(4).await;
assert!(result.is_ok());
assert!(sender.is_full());
arc_receiver_notify_task.notify_one();
println!("Hello task notified await");
arc_sender_notify_task.notified().await;
});
assert!(!receiver.is_closed());
println!("Hello 1");
let result = receiver.recv().await;
assert_eq!(result, Ok(1));
println!("Hello 2");
let result = receiver.recv().await;
assert_eq!(result, Ok(2));
println!("Hello notified await");
arc_receiver_notify.notified().await;
assert!(receiver.is_full());
let result = receiver.try_recv();
println!("Hello 3");
assert_eq!(result, Ok(3));
let result = receiver.try_recv();
println!("Hello 4");
assert_eq!(result, Ok(4));
let result = receiver.try_recv();
assert_eq!(result, Err(ReceiveError::Empty));
arc_sender_notify.notify_one();
println!("Hello await is_ok");
assert!(task.await.is_ok());
let result = receiver.try_recv();
assert_eq!(result, Err(ReceiveError::Closed));
println!("Hello recv await");
let result = receiver.recv().await;
assert_eq!(result, Err(()));
assert!(receiver.is_closed());
}
}