mod error;
mod inner;
mod mutex;
mod receiver;
pub mod schannel;
mod select;
mod sender;
mod util;
use std::sync::Arc;
pub use error::{
ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
SendTimeoutError, TryRecvError, TrySelectError, TrySendError,
};
pub use receiver::{Receiver, RecvFut, RecvStream};
pub use select::{Select, SelectedOperation};
pub use sender::{SendFut, Sender};
pub use schannel::{Select as SchannelSelect, SelectedOperation as SchannelSelectedOperation};
use crate::inner::Inner;
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
assert!(cap > 0);
let (tx, rx) = crossbeam_channel::bounded(cap);
let inner = Arc::new(Inner::default());
(Sender::new(tx, inner.clone()), Receiver::new(rx, inner))
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::unbounded();
let inner = Arc::new(Inner::default());
(Sender::new(tx, inner.clone()), Receiver::new(rx, inner))
}
#[cfg(test)]
mod tests {
use crate::bounded;
use futures::{StreamExt, TryStreamExt};
use std::thread;
use std::time::Duration;
use tokio::time::timeout;
#[test]
fn test_send_receive_sync() {
let (sender, receiver) = bounded::<i32>(1);
sender.send(42).unwrap();
let received = receiver.recv().unwrap();
assert_eq!(received, 42);
}
#[test]
fn test_send_receive_sync_threaded() {
let (sender, receiver) = bounded::<String>(1);
let sender_handle = thread::spawn(move || {
for i in 0..5 {
sender.send(format!("message_{}", i)).unwrap();
thread::sleep(Duration::from_millis(10));
}
});
let receiver_handle = thread::spawn(move || {
let mut messages = Vec::new();
for _ in 0..5 {
messages.push(receiver.recv().unwrap());
}
messages
});
sender_handle.join().unwrap();
let messages = receiver_handle.join().unwrap();
assert_eq!(messages.len(), 5);
for (i, msg) in messages.iter().enumerate() {
assert_eq!(msg, &format!("message_{}", i));
}
}
#[tokio::test]
async fn test_send_receive_async() {
let (sender, receiver) = bounded::<i32>(1);
sender.send_async(42).await.unwrap();
let received = receiver.recv_async().await.unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_send_receive_async_concurrent() {
let (sender, receiver) = bounded::<String>(1);
let sender_task = tokio::spawn(async move {
for i in 0..5 {
sender
.send_async(format!("async_message_{}", i))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
let receiver_task = tokio::spawn(async move {
let mut messages = Vec::new();
for _ in 0..5 {
messages.push(receiver.recv_async().await.unwrap());
}
messages
});
sender_task.await.unwrap();
let messages = receiver_task.await.unwrap();
assert_eq!(messages.len(), 5);
for (i, msg) in messages.iter().enumerate() {
assert_eq!(msg, &format!("async_message_{}", i));
}
}
#[tokio::test]
async fn test_send_sync_receive_async() {
let (sender, receiver) = bounded::<i32>(1);
let _ = std::thread::spawn(move || {
for i in 0..5 {
sender.send(i).unwrap();
}
});
let receiver_task = tokio::spawn(async move {
let mut values = Vec::new();
for _ in 0..5 {
values.push(receiver.recv_async().await.unwrap());
}
values
});
let values = receiver_task.await.unwrap();
assert_eq!(values, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn test_send_async_receive_sync() {
let (sender, receiver) = bounded::<String>(1);
let sender_task = tokio::spawn(async move {
for i in 0..5 {
sender.send_async(format!("mixed_{}", i)).await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
let receiver_task = tokio::task::spawn_blocking(move || {
let mut messages = Vec::new();
for _ in 0..5 {
messages.push(receiver.recv().unwrap());
}
messages
});
sender_task.await.unwrap();
let messages = receiver_task.await.unwrap();
assert_eq!(messages.len(), 5);
for (i, msg) in messages.iter().enumerate() {
assert_eq!(msg, &format!("mixed_{}", i));
}
}
#[test]
fn test_channel_drop_behavior() {
let (sender, receiver) = bounded::<i32>(1);
sender.send(42).unwrap();
drop(sender);
assert_eq!(receiver.recv().unwrap(), 42);
assert!(receiver.recv().is_err());
}
#[tokio::test]
async fn test_async_channel_drop_behavior() {
let (sender, receiver) = bounded::<i32>(1);
sender.send_async(42).await.unwrap();
drop(sender);
assert_eq!(receiver.recv_async().await.unwrap(), 42);
assert!(receiver.recv_async().await.is_err());
}
#[tokio::test]
async fn test_stream_basic_functionality() {
let (sender, receiver) = bounded::<i32>(5);
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
drop(sender);
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
}
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_stream_empty_channel() {
let (sender, receiver) = bounded::<i32>(1);
drop(sender);
let mut stream = receiver.into_stream();
let value = stream.next().await;
assert!(value.is_none());
}
#[tokio::test]
async fn test_stream_with_collect() {
let (sender, receiver) = bounded::<String>(10);
for i in 0..5 {
sender.send(format!("message_{}", i)).unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let values: Vec<String> = stream.collect().await;
assert_eq!(values.len(), 5);
for (i, value) in values.iter().enumerate() {
assert_eq!(value, &format!("message_{}", i));
}
}
#[tokio::test]
async fn test_stream_concurrent_senders() {
let (sender, receiver) = bounded::<i32>(100);
let mut handles = Vec::new();
for i in 0..3 {
let sender_clone = sender.clone();
let handle = tokio::spawn(async move {
for j in 0..10 {
sender_clone.send_async(i * 10 + j).await.unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let values: Vec<i32> = stream.collect().await;
assert_eq!(values.len(), 30);
let mut sorted_values = values;
sorted_values.sort();
let expected: Vec<i32> = (0..30).collect();
assert_eq!(sorted_values, expected);
}
#[tokio::test]
async fn test_stream_with_timeout() {
let (sender, receiver) = bounded::<i32>(1);
sender.send(42).unwrap();
let mut stream = receiver.into_stream();
let result = timeout(Duration::from_millis(100), stream.next()).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(42));
let result = timeout(Duration::from_millis(50), stream.next()).await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_stream_partial_consumption() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..10 {
sender.send(i).unwrap();
}
drop(sender);
let mut stream = receiver.into_stream();
let mut consumed = Vec::new();
for _ in 0..3 {
if let Some(value) = stream.next().await {
consumed.push(value);
}
}
assert_eq!(consumed, vec![0, 1, 2]);
let remaining: Vec<i32> = stream.collect().await;
assert_eq!(remaining, vec![3, 4, 5, 6, 7, 8, 9]);
}
#[tokio::test]
async fn test_stream_async_senders() {
let (sender, receiver) = bounded::<i32>(5);
for i in 0..5 {
sender.send_async(i).await.unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let values: Vec<i32> = stream.collect().await;
assert_eq!(values, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn test_stream_mixed_send_modes() {
let (sender, receiver) = bounded::<String>(10);
sender.send("sync_1".to_string()).unwrap();
sender.send("sync_2".to_string()).unwrap();
let sender_clone = sender.clone();
let async_task = tokio::spawn(async move {
sender_clone
.send_async("async_1".to_string())
.await
.unwrap();
sender_clone
.send_async("async_2".to_string())
.await
.unwrap();
});
async_task.await.unwrap();
drop(sender);
let stream = receiver.into_stream();
let values: Vec<String> = stream.collect().await;
assert_eq!(values.len(), 4);
assert!(values.contains(&"sync_1".to_string()));
assert!(values.contains(&"sync_2".to_string()));
assert!(values.contains(&"async_1".to_string()));
assert!(values.contains(&"async_2".to_string()));
}
#[tokio::test]
async fn test_stream_with_filter_map() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..10 {
sender.send(i).unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let filtered: Vec<i32> = stream
.filter_map(|x| async move {
if x % 2 == 0 {
Some(x * 2)
} else {
None
}
})
.collect()
.await;
assert_eq!(filtered, vec![0, 4, 8, 12, 16]);
}
#[tokio::test]
async fn test_stream_take() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..10 {
sender.send(i).unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let taken: Vec<i32> = stream.take(3).collect().await;
assert_eq!(taken, vec![0, 1, 2]);
}
#[tokio::test]
async fn test_stream_fold() {
let (sender, receiver) = bounded::<i32>(5);
for i in 1..=5 {
sender.send(i).unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let sum = stream.fold(0, |acc, x| async move { acc + x }).await;
assert_eq!(sum, 15); }
#[tokio::test]
async fn test_stream_multiple_receivers() {
let (sender, receiver) = bounded::<i32>(10);
let receiver2 = receiver.clone();
for i in 0..6 {
sender.send(i).unwrap();
}
drop(sender);
let task1 = tokio::spawn(async move { receiver.into_stream().collect::<Vec<i32>>().await });
let task2 =
tokio::spawn(async move { receiver2.into_stream().collect::<Vec<i32>>().await });
let values1 = task1.await.unwrap();
let values2 = task2.await.unwrap();
assert_eq!(values1.len() + values2.len(), 6);
let mut all_values = values1;
all_values.extend(values2);
all_values.sort();
assert_eq!(all_values, vec![0, 1, 2, 3, 4, 5]);
}
#[tokio::test]
async fn test_stream_error_handling() {
let (sender, receiver) = bounded::<Result<i32, String>>(5);
sender.send(Ok(1)).unwrap();
sender.send(Err("error".to_string())).unwrap();
sender.send(Ok(2)).unwrap();
drop(sender);
let stream = receiver.into_stream();
let results: Vec<Result<i32, String>> = stream.collect().await;
assert_eq!(results.len(), 3);
assert_eq!(results[0], Ok(1));
assert_eq!(results[1], Err("error".to_string()));
assert_eq!(results[2], Ok(2));
let (sender2, receiver2) = bounded::<Result<i32, String>>(5);
sender2.send(Ok(1)).unwrap();
sender2.send(Err("error".to_string())).unwrap();
sender2.send(Ok(2)).unwrap();
drop(sender2);
let stream2 = receiver2.into_stream();
let result: Result<Vec<i32>, String> = stream2.try_collect().await;
assert_eq!(result, Err("error".to_string()));
}
#[tokio::test]
async fn test_stream_waker_behavior_immediate_values() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..5 {
sender.send(i).unwrap();
}
drop(sender);
let start = std::time::Instant::now();
let stream = receiver.into_stream();
let values: Vec<i32> = stream.collect().await;
let elapsed = start.elapsed();
assert_eq!(values, vec![0, 1, 2, 3, 4]);
assert!(elapsed < Duration::from_millis(10));
}
#[tokio::test]
async fn test_stream_waker_behavior_delayed_values() {
let (sender, receiver) = bounded::<i32>(10);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 5 {
break;
}
}
values
});
for i in 0..5 {
tokio::time::sleep(Duration::from_millis(50)).await;
sender.send(i).unwrap();
}
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn test_stream_waker_behavior_mixed_timing() {
let (sender, receiver) = bounded::<String>(10);
sender.send("immediate_1".to_string()).unwrap();
sender.send("immediate_2".to_string()).unwrap();
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 4 {
break;
}
}
values
});
tokio::time::sleep(Duration::from_millis(30)).await;
sender.send("delayed_1".to_string()).unwrap();
tokio::time::sleep(Duration::from_millis(30)).await;
sender.send("delayed_2".to_string()).unwrap();
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 4);
assert!(values.contains(&"immediate_1".to_string()));
assert!(values.contains(&"immediate_2".to_string()));
assert!(values.contains(&"delayed_1".to_string()));
assert!(values.contains(&"delayed_2".to_string()));
}
#[tokio::test]
async fn test_stream_concurrent_producers_single_consumer() {
let (sender, receiver) = bounded::<i32>(100);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 30 {
break;
}
}
values
});
let mut producer_handles = Vec::new();
for producer_id in 0..3 {
let sender_clone = sender.clone();
let handle = tokio::spawn(async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
sender_clone.send_async(producer_id * 10 + i).await.unwrap();
}
});
producer_handles.push(handle);
}
for handle in producer_handles {
handle.await.unwrap();
}
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 30);
let mut sorted_values = values;
sorted_values.sort();
let expected: Vec<i32> = (0..30).collect();
assert_eq!(sorted_values, expected);
}
#[tokio::test]
async fn test_stream_multiple_consumers_competing() {
let (sender, receiver) = bounded::<i32>(50);
let receiver2 = receiver.clone();
let receiver3 = receiver.clone();
let consumer1 = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 10 {
break;
}
}
values
});
let consumer2 = tokio::spawn(async move {
let mut stream = receiver2.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 10 {
break;
}
}
values
});
let consumer3 = tokio::spawn(async move {
let mut stream = receiver3.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 10 {
break;
}
}
values
});
for i in 0..30 {
tokio::time::sleep(Duration::from_millis(5)).await;
sender.send_async(i).await.unwrap();
}
drop(sender);
let values1 = consumer1.await.unwrap();
let values2 = consumer2.await.unwrap();
let values3 = consumer3.await.unwrap();
assert_eq!(values1.len(), 10);
assert_eq!(values2.len(), 10);
assert_eq!(values3.len(), 10);
let mut all_values = values1;
all_values.extend(values2);
all_values.extend(values3);
all_values.sort();
all_values.dedup();
assert_eq!(all_values.len(), 30);
}
#[tokio::test]
async fn test_stream_backpressure_behavior() {
let (sender, receiver) = bounded::<i32>(2);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
tokio::time::sleep(Duration::from_millis(100)).await;
if values.len() == 5 {
break;
}
}
values
});
for i in 0..5 {
sender.send_async(i).await.unwrap();
}
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn test_stream_rapid_send_receive_cycles() {
let (sender, receiver) = bounded::<i32>(10);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 20 {
break;
}
}
values
});
for batch in 0..4 {
for i in 0..5 {
sender.send_async(batch * 5 + i).await.unwrap();
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 20);
let mut sorted_values = values;
sorted_values.sort();
let expected: Vec<i32> = (0..20).collect();
assert_eq!(sorted_values, expected);
}
#[tokio::test]
async fn test_stream_intermittent_sending() {
let (sender, receiver) = bounded::<String>(10);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 6 {
break;
}
}
values
});
sender.send("burst_1".to_string()).unwrap();
sender.send("burst_2".to_string()).unwrap();
sender.send("burst_3".to_string()).unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
sender.send("pause_1".to_string()).unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
sender.send("pause_2".to_string()).unwrap();
sender.send("pause_3".to_string()).unwrap();
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 6);
assert!(values.contains(&"burst_1".to_string()));
assert!(values.contains(&"burst_2".to_string()));
assert!(values.contains(&"burst_3".to_string()));
assert!(values.contains(&"pause_1".to_string()));
assert!(values.contains(&"pause_2".to_string()));
assert!(values.contains(&"pause_3".to_string()));
}
#[tokio::test]
async fn test_stream_early_termination() {
let (sender, receiver) = bounded::<i32>(10);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 3 {
break;
}
}
values
});
for i in 0..10 {
sender.send_async(i).await.unwrap();
}
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 3);
assert_eq!(values, vec![0, 1, 2]);
}
#[tokio::test]
async fn test_stream_with_select_macro() {
let (sender1, receiver1) = bounded::<i32>(5);
let (sender2, receiver2) = bounded::<String>(5);
let collect_task = tokio::spawn(async move {
let mut stream1 = receiver1.into_stream();
let mut stream2 = receiver2.into_stream();
let mut int_values = Vec::new();
let mut string_values = Vec::new();
loop {
tokio::select! {
Some(value) = stream1.next() => {
int_values.push(value);
if int_values.len() == 3 && string_values.len() == 2 {
break;
}
}
Some(value) = stream2.next() => {
string_values.push(value);
if int_values.len() == 3 && string_values.len() == 2 {
break;
}
}
else => {
break;
}
}
}
(int_values, string_values)
});
sender1.send_async(1).await.unwrap();
sender2.send_async("hello".to_string()).await.unwrap();
sender1.send_async(2).await.unwrap();
sender2.send_async("world".to_string()).await.unwrap();
sender1.send_async(3).await.unwrap();
drop(sender1);
drop(sender2);
let (int_values, string_values) = collect_task.await.unwrap();
assert_eq!(int_values.len(), 3);
assert_eq!(string_values.len(), 2);
assert!(int_values.contains(&1));
assert!(int_values.contains(&2));
assert!(int_values.contains(&3));
assert!(string_values.contains(&"hello".to_string()));
assert!(string_values.contains(&"world".to_string()));
}
#[tokio::test]
async fn test_stream_stress_high_throughput() {
let (sender, receiver) = bounded::<i32>(1000);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 1000 {
break;
}
}
values
});
let sender_task = tokio::spawn(async move {
for i in 0..1000 {
sender.send_async(i).await.unwrap();
}
drop(sender);
});
sender_task.await.unwrap();
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 1000);
let mut sorted_values = values;
sorted_values.sort();
let expected: Vec<i32> = (0..1000).collect();
assert_eq!(sorted_values, expected);
}
#[tokio::test]
async fn test_stream_timeout_behavior() {
let (sender, receiver) = bounded::<i32>(5);
let collect_task = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
loop {
match tokio::time::timeout(Duration::from_millis(100), stream.next()).await {
Ok(Some(value)) => {
values.push(value);
if values.len() == 3 {
break;
}
}
Ok(None) => break, Err(_) => {
if values.len() == 3 {
break;
}
}
}
}
values
});
tokio::time::sleep(Duration::from_millis(50)).await;
sender.send_async(1).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
sender.send_async(2).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
sender.send_async(3).await.unwrap();
drop(sender);
let values = collect_task.await.unwrap();
assert_eq!(values.len(), 3);
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_stream_drop_behavior_cleanup() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..5 {
sender.send_async(i).await.unwrap();
}
let stream = receiver.into_stream();
let mut stream = stream;
let first_value = stream.next().await.unwrap();
let second_value = stream.next().await.unwrap();
assert_eq!(first_value, 0);
assert_eq!(second_value, 1);
drop(stream);
drop(sender);
}
#[tokio::test]
async fn test_stream_with_chain_operations() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..10 {
sender.send_async(i).await.unwrap();
}
drop(sender);
let stream = receiver.into_stream();
let result: Vec<i32> = stream
.filter_map(|x| async move {
if x % 2 == 0 {
Some(x * 2)
} else {
None
}
}) .take(3) .collect()
.await;
assert_eq!(result, vec![0, 4, 8]); }
#[test]
fn test_receiver_drop_wakes_blocked_senders() {
let (sender, receiver) = bounded::<i32>(1);
sender.send(42).unwrap();
let sender_clone = sender.clone();
let send_handle = thread::spawn(move || {
let result = sender_clone.send(43);
assert!(result.is_err());
});
thread::sleep(Duration::from_millis(10));
drop(receiver);
send_handle.join().unwrap();
}
#[test]
fn test_multiple_sender_drop_behavior() {
let (sender, receiver) = bounded::<i32>(10);
let sender1 = sender.clone();
let sender2 = sender.clone();
let sender3 = sender.clone();
sender1.send(1).unwrap();
sender2.send(2).unwrap();
sender3.send(3).unwrap();
drop(sender1);
drop(sender2);
sender3.send(4).unwrap();
sender.send(5).unwrap();
let mut values = Vec::new();
for _ in 0..5 {
values.push(receiver.recv().unwrap());
}
values.sort();
assert_eq!(values, vec![1, 2, 3, 4, 5]);
drop(sender3);
drop(sender);
assert!(receiver.recv().is_err());
}
#[test]
fn test_multiple_receiver_drop_behavior() {
let (sender, receiver) = bounded::<i32>(10);
let receiver1 = receiver.clone();
let receiver2 = receiver.clone();
let receiver3 = receiver.clone();
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
drop(receiver1);
drop(receiver2);
assert_eq!(receiver3.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
drop(receiver3);
drop(receiver);
assert!(sender.send(4).is_err());
}
#[tokio::test]
async fn test_async_sender_drop_wakes_blocked_receivers() {
let (sender, receiver) = bounded::<i32>(1);
sender.send_async(42).await.unwrap();
let receiver_clone = receiver.clone();
let recv_task = tokio::spawn(async move {
receiver_clone.recv_async().await.unwrap()
});
tokio::time::sleep(Duration::from_millis(10)).await;
drop(sender);
let result = recv_task.await.unwrap();
assert_eq!(result, 42);
assert!(receiver.recv_async().await.is_err());
}
#[tokio::test]
async fn test_async_receiver_drop_wakes_blocked_senders() {
let (sender, receiver) = bounded::<i32>(1);
sender.send_async(42).await.unwrap();
let sender_clone = sender.clone();
let send_task = tokio::spawn(async move {
let result = sender_clone.send_async(43).await;
assert!(result.is_err());
});
tokio::time::sleep(Duration::from_millis(10)).await;
drop(receiver);
send_task.await.unwrap();
}
#[tokio::test]
async fn test_send_fut_drop_cleanup() {
let (sender, receiver) = bounded::<i32>(2);
sender.send_async(42).await.unwrap();
let send_fut = sender.send_async(43);
drop(send_fut);
sender.send_async(44).await.unwrap();
assert_eq!(receiver.recv_async().await.unwrap(), 42);
assert_eq!(receiver.recv_async().await.unwrap(), 44);
}
#[tokio::test]
async fn test_recv_fut_drop_cleanup() {
let (sender, receiver) = bounded::<i32>(1);
let recv_fut = receiver.recv_async();
drop(recv_fut);
sender.send_async(42).await.unwrap();
assert_eq!(receiver.recv_async().await.unwrap(), 42);
}
#[tokio::test]
async fn test_stream_drop_cleanup() {
let (sender, receiver) = bounded::<i32>(10);
for i in 0..5 {
sender.send_async(i).await.unwrap();
}
let stream = receiver.into_stream();
drop(stream);
assert!(sender.send_async(99).await.is_err());
}
#[tokio::test]
async fn test_concurrent_drop_behavior() {
let (sender, receiver) = bounded::<i32>(10);
let senders: Vec<_> = (0..5).map(|_| sender.clone()).collect();
let receivers: Vec<_> = (0..5).map(|_| receiver.clone()).collect();
for (i, sender) in senders.iter().enumerate() {
sender.send_async(i as i32).await.unwrap();
}
let mut received_values = Vec::new();
for receiver in &receivers {
received_values.push(receiver.recv_async().await.unwrap());
}
received_values.sort();
assert_eq!(received_values, vec![0, 1, 2, 3, 4]);
drop(senders);
drop(receivers);
drop(sender);
drop(receiver);
}
#[tokio::test]
async fn test_complex_drop_scenarios() {
let (sender, receiver) = bounded::<i32>(10);
let send_fut1 = sender.send_async(1);
let send_fut2 = sender.send_async(2);
let recv_fut1 = receiver.recv_async();
let recv_fut2 = receiver.recv_async();
drop(send_fut1);
drop(recv_fut1);
send_fut2.await.unwrap();
assert_eq!(recv_fut2.await.unwrap(), 2);
for i in 3..8 {
sender.send_async(i).await.unwrap();
}
let stream = receiver.into_stream();
let mut stream = stream;
assert_eq!(stream.next().await.unwrap(), 3);
assert_eq!(stream.next().await.unwrap(), 4);
drop(stream);
assert!(sender.send_async(99).await.is_err());
}
#[tokio::test]
async fn test_stream_concurrent_stream_creation_final() {
let (sender, receiver) = bounded::<i32>(20);
let receiver2 = receiver.clone();
let receiver3 = receiver.clone();
let task1 = tokio::spawn(async move {
let mut stream = receiver.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 5 {
break;
}
}
values
});
let task2 = tokio::spawn(async move {
let mut stream = receiver2.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 5 {
break;
}
}
values
});
let task3 = tokio::spawn(async move {
let mut stream = receiver3.into_stream();
let mut values = Vec::new();
while let Some(value) = stream.next().await {
values.push(value);
if values.len() == 5 {
break;
}
}
values
});
for i in 0..15 {
sender.send_async(i).await.unwrap();
}
drop(sender);
let values1 = task1.await.unwrap();
let values2 = task2.await.unwrap();
let values3 = task3.await.unwrap();
assert_eq!(values1.len(), 5);
assert_eq!(values2.len(), 5);
assert_eq!(values3.len(), 5);
let mut all_values = values1;
all_values.extend(values2);
all_values.extend(values3);
all_values.sort();
all_values.dedup();
assert_eq!(all_values.len(), 15);
}
}