use core::fmt;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker};
use crossbeam_utils::{Backoff, CachePadded};
use nexus_queue::Full;
use nexus_queue::mpsc::{Consumer, Producer, bounded};
use crate::{RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError};
const DEFAULT_SNOOZE_ITERS: usize = 8;
#[allow(clippy::struct_field_names)]
struct Shared {
receiver_parked: CachePadded<AtomicBool>,
receiver_disconnected: AtomicBool,
sender_count: AtomicUsize,
receiver_unparker: Unparker,
}
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
channel_with_config(capacity, DEFAULT_SNOOZE_ITERS)
}
pub fn channel_with_config<T>(capacity: usize, snooze_iters: usize) -> (Sender<T>, Receiver<T>) {
let (producer, consumer) = bounded(capacity);
let receiver_parker = Parker::new();
let receiver_unparker = receiver_parker.unparker().clone();
let shared = Arc::new(Shared {
receiver_parked: CachePadded::new(AtomicBool::new(false)),
receiver_disconnected: AtomicBool::new(false),
sender_count: AtomicUsize::new(1),
receiver_unparker,
});
(
Sender {
producer: ManuallyDrop::new(producer),
shared: Arc::clone(&shared),
snooze_iters,
},
Receiver {
consumer: ManuallyDrop::new(consumer),
shared,
parker: receiver_parker,
snooze_iters,
},
)
}
pub struct Sender<T> {
producer: ManuallyDrop<Producer<T>>,
shared: Arc<Shared>,
snooze_iters: usize,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.shared.sender_count.fetch_add(1, Ordering::AcqRel);
Sender {
producer: ManuallyDrop::new((*self.producer).clone()),
shared: Arc::clone(&self.shared),
snooze_iters: self.snooze_iters,
}
}
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
if self.is_disconnected() {
return Err(SendError(value));
}
let mut producer = (*self.producer).clone();
let mut val = value;
match producer.push(val) {
Ok(()) => {
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
let backoff = Backoff::new();
for _ in 0..self.snooze_iters {
backoff.snooze();
if self.is_disconnected() {
return Err(SendError(val));
}
match producer.push(val) {
Ok(()) => {
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
}
loop {
if self.is_disconnected() {
return Err(SendError(val));
}
match producer.push(val) {
Ok(()) => {
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
std::thread::yield_now();
}
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
if self.is_disconnected() {
return Err(TrySendError::Disconnected(value));
}
let mut producer = (*self.producer).clone();
match producer.push(value) {
Ok(()) => {
self.notify_receiver();
Ok(())
}
Err(Full(v)) => Err(TrySendError::Full(v)),
}
}
#[inline]
fn notify_receiver(&self) {
if self.shared.receiver_parked.load(Ordering::SeqCst) {
self.shared.receiver_unparker.unpark();
}
}
#[inline]
pub fn is_disconnected(&self) -> bool {
self.shared.receiver_disconnected.load(Ordering::Acquire)
}
#[inline]
pub fn capacity(&self) -> usize {
(*self.producer).capacity()
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("capacity", &self.capacity())
.field("disconnected", &self.is_disconnected())
.finish_non_exhaustive()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe { ManuallyDrop::drop(&mut self.producer) };
if self.shared.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.shared.receiver_unparker.unpark();
}
}
}
pub struct Receiver<T> {
consumer: ManuallyDrop<Consumer<T>>,
shared: Arc<Shared>,
parker: Parker,
snooze_iters: usize,
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Result<T, RecvError> {
if let Some(v) = self.consumer.pop() {
return Ok(v);
}
let backoff = Backoff::new();
for _ in 0..self.snooze_iters {
backoff.snooze();
if let Some(v) = self.consumer.pop() {
return Ok(v);
}
if self.consumer.is_disconnected() {
return self.consumer.pop().ok_or(RecvError);
}
}
loop {
self.shared.receiver_parked.store(true, Ordering::SeqCst);
if let Some(v) = self.consumer.pop() {
self.shared.receiver_parked.store(false, Ordering::Relaxed);
return Ok(v);
}
if self.consumer.is_disconnected() {
self.shared.receiver_parked.store(false, Ordering::Relaxed);
return Err(RecvError);
}
self.parker.park();
self.shared.receiver_parked.store(false, Ordering::Relaxed);
if let Some(v) = self.consumer.pop() {
return Ok(v);
}
if self.consumer.is_disconnected() {
return Err(RecvError);
}
}
}
pub fn recv_timeout(&mut self, timeout: Duration) -> Result<T, RecvTimeoutError> {
let deadline = Instant::now() + timeout;
if let Some(v) = self.consumer.pop() {
return Ok(v);
}
let backoff = Backoff::new();
for _ in 0..self.snooze_iters {
if Instant::now() >= deadline {
return Err(RecvTimeoutError::Timeout);
}
backoff.snooze();
if let Some(v) = self.consumer.pop() {
return Ok(v);
}
if self.consumer.is_disconnected() {
return self.consumer.pop().ok_or(RecvTimeoutError::Disconnected);
}
}
loop {
let now = Instant::now();
if now >= deadline {
return Err(RecvTimeoutError::Timeout);
}
self.shared.receiver_parked.store(true, Ordering::SeqCst);
if let Some(v) = self.consumer.pop() {
self.shared.receiver_parked.store(false, Ordering::Relaxed);
return Ok(v);
}
if self.consumer.is_disconnected() {
self.shared.receiver_parked.store(false, Ordering::Relaxed);
return Err(RecvTimeoutError::Disconnected);
}
let remaining = deadline - now;
self.parker.park_timeout(remaining);
self.shared.receiver_parked.store(false, Ordering::Relaxed);
if let Some(v) = self.consumer.pop() {
return Ok(v);
}
if self.consumer.is_disconnected() {
return Err(RecvTimeoutError::Disconnected);
}
}
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
match self.consumer.pop() {
Some(v) => Ok(v),
None => {
if self.consumer.is_disconnected() {
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
}
}
#[inline]
pub fn is_disconnected(&self) -> bool {
self.consumer.is_disconnected()
}
#[inline]
pub fn capacity(&self) -> usize {
self.consumer.capacity()
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("capacity", &self.capacity())
.field("disconnected", &self.is_disconnected())
.finish_non_exhaustive()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.shared
.receiver_disconnected
.store(true, Ordering::Release);
unsafe { ManuallyDrop::drop(&mut self.consumer) };
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::thread;
#[test]
fn basic_send_recv() {
let (tx, mut rx) = channel::<u64>(4);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert_eq!(rx.recv().unwrap(), 3);
}
#[test]
fn try_send_try_recv() {
let (tx, mut rx) = channel::<u64>(2);
assert!(tx.try_send(1).is_ok());
assert!(tx.try_send(2).is_ok());
assert!(matches!(tx.try_send(3), Err(TrySendError::Full(3))));
assert_eq!(rx.try_recv().unwrap(), 1);
assert_eq!(rx.try_recv().unwrap(), 2);
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn send_fills_then_recv_drains() {
let (tx, mut rx) = channel::<u64>(4);
for i in 0..4 {
tx.try_send(i).unwrap();
}
assert!(matches!(tx.try_send(99), Err(TrySendError::Full(99))));
for i in 0..4 {
assert_eq!(rx.recv().unwrap(), i);
}
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn recv_timeout_success() {
let (tx, mut rx) = channel::<u64>(4);
tx.send(42).unwrap();
let result = rx.recv_timeout(Duration::from_millis(100));
assert_eq!(result.unwrap(), 42);
}
#[test]
fn recv_timeout_expires() {
let (_tx, mut rx) = channel::<u64>(4);
let start = Instant::now();
let result = rx.recv_timeout(Duration::from_millis(50));
assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn recv_timeout_disconnected() {
let (tx, mut rx) = channel::<u64>(4);
drop(tx);
let result = rx.recv_timeout(Duration::from_millis(100));
assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
}
#[test]
fn recv_timeout_data_arrives() {
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(25));
tx.send(42).unwrap();
});
let result = rx.recv_timeout(Duration::from_millis(100));
assert_eq!(result.unwrap(), 42);
handle.join().unwrap();
}
#[test]
fn recv_timeout_disconnect_while_waiting() {
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(25));
drop(tx);
});
let result = rx.recv_timeout(Duration::from_millis(100));
assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
handle.join().unwrap();
}
#[test]
fn recv_timeout_all_senders_disconnect() {
let (tx1, mut rx) = channel::<u64>(4);
let tx2 = tx1.clone();
let tx3 = tx1.clone();
let h1 = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
drop(tx1);
});
let h2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(20));
drop(tx2);
});
let h3 = thread::spawn(move || {
thread::sleep(Duration::from_millis(30));
drop(tx3);
});
let result = rx.recv_timeout(Duration::from_millis(100));
assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
}
#[test]
fn recv_returns_error_when_sender_dropped() {
let (tx, mut rx) = channel::<u64>(4);
drop(tx);
assert!(rx.recv().is_err());
assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
}
#[test]
fn recv_drains_before_error_when_sender_dropped() {
let (tx, mut rx) = channel::<u64>(4);
tx.send(1).unwrap();
tx.send(2).unwrap();
drop(tx);
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert!(rx.recv().is_err());
}
#[test]
fn send_returns_error_when_receiver_dropped() {
let (tx, rx) = channel::<u64>(4);
drop(rx);
assert!(tx.send(1).is_err());
assert!(matches!(tx.try_send(1), Err(TrySendError::Disconnected(1))));
}
#[test]
fn is_disconnected_sender() {
let (tx, rx) = channel::<u64>(4);
assert!(!tx.is_disconnected());
drop(rx);
assert!(tx.is_disconnected());
}
#[test]
fn is_disconnected_receiver() {
let (tx, rx) = channel::<u64>(4);
assert!(!rx.is_disconnected());
drop(tx);
assert!(rx.is_disconnected());
}
#[test]
fn disconnection_all_senders_dropped() {
let (tx, mut rx) = channel::<u64>(4);
let tx2 = tx.clone();
assert!(!rx.is_disconnected());
drop(tx);
assert!(!rx.is_disconnected()); drop(tx2);
assert!(rx.is_disconnected());
assert!(rx.recv().is_err());
}
#[test]
fn disconnection_receiver_dropped() {
let (tx, rx) = channel::<u64>(4);
drop(rx);
assert!(tx.send(1).is_err());
assert!(matches!(tx.try_send(1), Err(TrySendError::Disconnected(1))));
}
#[test]
fn receiver_drop_detected_by_cloned_sender() {
let (tx1, rx) = channel::<u64>(4);
let tx2 = tx1.clone();
let tx3 = tx1.clone();
drop(rx);
assert!(tx1.is_disconnected());
assert!(tx2.is_disconnected());
assert!(tx3.is_disconnected());
assert!(tx1.send(1).is_err());
assert!(tx2.send(2).is_err());
assert!(tx3.send(3).is_err());
}
#[test]
fn partial_sender_drop_keeps_channel_alive() {
let (tx1, mut rx) = channel::<u64>(4);
let tx2 = tx1.clone();
let tx3 = tx1.clone();
tx1.send(1).unwrap();
drop(tx1);
tx2.send(2).unwrap();
drop(tx2);
tx3.send(3).unwrap();
drop(tx3);
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert_eq!(rx.recv().unwrap(), 3);
assert!(rx.recv().is_err());
}
#[test]
fn many_senders_drop_sequentially() {
let (tx, mut rx) = channel::<u64>(64);
let senders: Vec<_> = (0..10).map(|_| tx.clone()).collect();
drop(tx);
for (i, sender) in senders.into_iter().enumerate() {
assert!(!rx.is_disconnected());
sender.send(i as u64).unwrap();
drop(sender);
}
assert!(rx.is_disconnected());
for i in 0..10 {
assert_eq!(rx.recv().unwrap(), i);
}
assert!(rx.recv().is_err());
}
#[test]
fn multiple_producers() {
let (tx, mut rx) = channel::<u64>(64);
let handles: Vec<_> = (0..4)
.map(|i| {
let tx = tx.clone();
thread::spawn(move || {
for j in 0..100 {
tx.send(i * 100 + j).unwrap();
}
})
})
.collect();
drop(tx);
let mut received = Vec::new();
while let Ok(v) = rx.recv() {
received.push(v);
}
for h in handles {
h.join().unwrap();
}
assert_eq!(received.len(), 400);
received.sort();
let expected: Vec<u64> = (0..4)
.flat_map(|i| (0..100).map(move |j| i * 100 + j))
.collect();
let mut expected_sorted = expected;
expected_sorted.sort();
assert_eq!(received, expected_sorted);
}
#[test]
fn sender_clone_is_independent() {
let (tx1, mut rx) = channel::<u64>(4);
let tx2 = tx1.clone();
tx1.send(1).unwrap();
tx2.send(2).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
}
#[test]
fn concurrent_sends_from_clones() {
let (tx, mut rx) = channel::<u64>(1024);
let handles: Vec<_> = (0..8)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..1000 {
tx.send(id * 1000 + i).unwrap();
}
})
})
.collect();
drop(tx);
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(count, 8000);
}
#[test]
fn cross_thread_single_message() {
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || rx.recv().unwrap());
tx.send(42).unwrap();
assert_eq!(handle.join().unwrap(), 42);
}
#[test]
fn cross_thread_multiple_messages() {
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
let mut sum = 0;
for _ in 0..100 {
sum += rx.recv().unwrap();
}
sum
});
for i in 0..100 {
tx.send(i).unwrap();
}
let sum = handle.join().unwrap();
assert_eq!(sum, 99 * 100 / 2);
}
#[test]
fn recv_blocks_until_send() {
let (tx, mut rx) = channel::<u64>(4);
let start = Instant::now();
let handle = thread::spawn(move || rx.recv().unwrap());
thread::sleep(Duration::from_millis(50));
tx.send(42).unwrap();
let val = handle.join().unwrap();
assert_eq!(val, 42);
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn send_spins_until_space_available() {
let (tx, mut rx) = channel::<u64>(2);
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
let start = Instant::now();
let handle = thread::spawn(move || {
tx.send(3).unwrap(); });
thread::sleep(Duration::from_millis(50));
rx.recv().unwrap();
handle.join().unwrap();
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn recv_wakes_on_all_senders_drop() {
let (tx1, mut rx) = channel::<u64>(4);
let tx2 = tx1.clone();
let handle = thread::spawn(move || {
let result = rx.recv();
assert!(result.is_err());
});
thread::sleep(Duration::from_millis(50));
drop(tx1);
drop(tx2);
handle.join().unwrap();
}
#[test]
fn send_detects_receiver_drop_while_spinning() {
let (tx, rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
let result = tx.send(2); assert!(result.is_err());
});
thread::sleep(Duration::from_millis(50));
drop(rx);
handle.join().unwrap();
}
#[test]
fn capacity_one() {
let (tx, mut rx) = channel::<u64>(1);
for i in 0..100 {
tx.send(i).unwrap();
assert_eq!(rx.recv().unwrap(), i);
}
}
#[test]
fn capacity_one_cross_thread() {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for _ in 0..1000 {
rx.recv().unwrap();
}
});
for i in 0..1000 {
tx.send(i).unwrap();
}
handle.join().unwrap();
}
#[test]
fn capacity_one_multiple_producers() {
let (tx, mut rx) = channel::<u64>(1);
let handles: Vec<_> = (0..4)
.map(|_| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..100 {
tx.send(i).unwrap();
}
})
})
.collect();
drop(tx);
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(count, 400);
}
#[test]
fn values_dropped_on_channel_drop() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::SeqCst);
}
}
DROP_COUNT.store(0, Ordering::SeqCst);
let (tx, rx) = channel::<DropCounter>(4);
tx.try_send(DropCounter).unwrap();
tx.try_send(DropCounter).unwrap();
tx.try_send(DropCounter).unwrap();
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
drop(tx);
drop(rx);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
}
#[test]
fn failed_send_returns_value() {
let (tx, rx) = channel::<String>(1);
tx.try_send("hello".to_string()).unwrap();
let err = tx.try_send("world".to_string());
match err {
Err(TrySendError::Full(s)) => assert_eq!(s, "world"),
_ => panic!("expected Full error"),
}
drop(rx);
let err = tx.try_send("test".to_string());
match err {
Err(TrySendError::Disconnected(s)) => assert_eq!(s, "test"),
_ => panic!("expected Disconnected error"),
}
}
#[test]
fn drop_sender_while_value_in_flight() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
struct DropCounter(u64);
impl Drop for DropCounter {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::SeqCst);
}
}
DROP_COUNT.store(0, Ordering::SeqCst);
let (tx, mut rx) = channel::<DropCounter>(4);
tx.send(DropCounter(1)).unwrap();
tx.send(DropCounter(2)).unwrap();
drop(tx);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
let v = rx.recv().unwrap();
assert_eq!(v.0, 1);
drop(v);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 1);
let v = rx.recv().unwrap();
assert_eq!(v.0, 2);
drop(v);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 2);
}
#[test]
fn drop_receiver_with_pending_values() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::SeqCst);
}
}
DROP_COUNT.store(0, Ordering::SeqCst);
let (tx, rx) = channel::<DropCounter>(4);
tx.send(DropCounter).unwrap();
tx.send(DropCounter).unwrap();
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
drop(rx);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
assert!(tx.is_disconnected());
drop(tx);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 2);
}
#[test]
fn zero_sized_type() {
let (tx, mut rx) = channel::<()>(4);
tx.send(()).unwrap();
tx.send(()).unwrap();
assert_eq!(rx.recv().unwrap(), ());
assert_eq!(rx.recv().unwrap(), ());
}
#[test]
fn large_message_type() {
#[derive(Clone, PartialEq, Debug)]
struct LargeMessage {
data: [u8; 4096],
}
let (tx, mut rx) = channel::<LargeMessage>(4);
let msg = LargeMessage { data: [42u8; 4096] };
tx.send(msg.clone()).unwrap();
let received = rx.recv().unwrap();
assert_eq!(received.data[0], 42);
assert_eq!(received.data[4095], 42);
}
#[test]
fn many_laps_single_thread() {
let (tx, mut rx) = channel::<u64>(4);
for i in 0..1000 {
tx.send(i).unwrap();
assert_eq!(rx.recv().unwrap(), i);
}
}
#[test]
fn many_laps_cross_thread() {
const COUNT: u64 = 100_000;
let (tx, mut rx) = channel::<u64>(4);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut expected = 0u64;
while expected < COUNT {
let val = rx.recv().unwrap();
assert_eq!(val, expected);
expected += 1;
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn stress_high_volume() {
const COUNT: u64 = 100_000;
let (tx, mut rx) = channel::<u64>(1024);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut sum = 0u64;
for _ in 0..COUNT {
sum = sum.wrapping_add(rx.recv().unwrap());
}
sum
});
producer.join().unwrap();
let sum = consumer.join().unwrap();
assert_eq!(sum, COUNT * (COUNT - 1) / 2);
}
#[test]
fn stress_small_buffer() {
const COUNT: u64 = 10_000;
let (tx, mut rx) = channel::<u64>(4);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut received = 0u64;
while received < COUNT {
rx.recv().unwrap();
received += 1;
}
received
});
producer.join().unwrap();
let received = consumer.join().unwrap();
assert_eq!(received, COUNT);
}
#[test]
fn stress_capacity_one_high_volume() {
const COUNT: u64 = 10_000;
let (tx, mut rx) = channel::<u64>(1);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut expected = 0u64;
while expected < COUNT {
let val = rx.recv().unwrap();
assert_eq!(val, expected);
expected += 1;
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn stress_multiple_producers() {
const PRODUCERS: usize = 4;
const PER_PRODUCER: u64 = 25_000;
let (tx, mut rx) = channel::<u64>(1024);
let handles: Vec<_> = (0..PRODUCERS)
.map(|_| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..PER_PRODUCER {
tx.send(i).unwrap();
}
})
})
.collect();
drop(tx);
let mut received = 0u64;
while let Ok(_) = rx.recv() {
received += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(received, PRODUCERS as u64 * PER_PRODUCER);
}
#[test]
fn stress_many_producers_small_buffer() {
const PRODUCERS: usize = 8;
const PER_PRODUCER: u64 = 1_000;
let (tx, mut rx) = channel::<u64>(4);
let handles: Vec<_> = (0..PRODUCERS)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..PER_PRODUCER {
tx.send(id as u64 * PER_PRODUCER + i).unwrap();
}
})
})
.collect();
drop(tx);
let mut received = 0u64;
while let Ok(_) = rx.recv() {
received += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(received, PRODUCERS as u64 * PER_PRODUCER);
}
#[test]
fn stress_producers_dropping_mid_stream() {
const PRODUCERS: usize = 8;
let (tx, mut rx) = channel::<u64>(64);
let handles: Vec<_> = (0..PRODUCERS)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..1000 {
if i == 500 && id % 2 == 0 {
return; }
tx.send(id as u64 * 1000 + i).unwrap();
}
})
})
.collect();
drop(tx);
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(count, 6000);
}
#[test]
fn high_contention_producers() {
const PRODUCERS: usize = 16;
const PER_PRODUCER: u64 = 5_000;
let (tx, mut rx) = channel::<u64>(256);
let handles: Vec<_> = (0..PRODUCERS)
.map(|_| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..PER_PRODUCER {
tx.send(i).unwrap();
}
})
})
.collect();
drop(tx);
let mut count = 0u64;
while let Ok(_) = rx.recv() {
count += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(count, PRODUCERS as u64 * PER_PRODUCER);
}
#[test]
fn bursting_producers() {
let (tx, mut rx) = channel::<u64>(64);
let handles: Vec<_> = (0..4)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for burst in 0..100 {
for i in 0..10 {
tx.send(id * 1000 + burst * 10 + i).unwrap();
}
if burst % 10 == 0 {
thread::yield_now();
}
}
})
})
.collect();
drop(tx);
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
}
for h in handles {
h.join().unwrap();
}
assert_eq!(count, 4 * 100 * 10);
}
#[test]
fn no_deadlock_disconnect_while_blocked_recv() {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
let result = rx.recv();
assert!(result.is_err()); });
thread::sleep(Duration::from_millis(50));
drop(tx);
handle.join().unwrap();
}
#[test]
fn no_deadlock_disconnect_while_spinning_send() {
let (tx, rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
let result = tx.send(2);
assert!(result.is_err()); });
thread::sleep(Duration::from_millis(50));
drop(rx);
handle.join().unwrap();
}
#[test]
fn no_deadlock_multiple_senders_disconnect() {
let (tx1, rx) = channel::<u64>(1);
let tx2 = tx1.clone();
tx1.try_send(1).unwrap();
let h1 = thread::spawn(move || {
let _ = tx1.send(2); });
let h2 = thread::spawn(move || {
let _ = tx2.send(3); });
thread::sleep(Duration::from_millis(50));
drop(rx);
h1.join().unwrap();
h2.join().unwrap();
}
#[test]
fn race_send_before_recv_parks() {
for _ in 0..100 {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || rx.recv().unwrap());
thread::yield_now();
tx.send(42).unwrap();
assert_eq!(handle.join().unwrap(), 42);
}
}
#[test]
fn race_disconnect_during_park_transition() {
for _ in 0..100 {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
let _ = rx.recv(); });
drop(tx);
handle.join().unwrap();
}
}
#[test]
fn race_multiple_producers_one_slot() {
for _ in 0..50 {
let (tx, mut rx) = channel::<u64>(1);
let handles: Vec<_> = (0..4)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
tx.send(id).unwrap();
})
})
.collect();
drop(tx);
let mut received = Vec::new();
while let Ok(v) = rx.recv() {
received.push(v);
}
for h in handles {
h.join().unwrap();
}
assert_eq!(received.len(), 4);
}
}
#[test]
fn stress_rapid_unpark_receiver() {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for i in 0..10_000 {
tx.send(i).unwrap();
}
});
for _ in 0..10_000 {
rx.recv().unwrap();
}
handle.join().unwrap();
}
#[test]
fn stress_receiver_parks_frequently() {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
let mut count = 0;
for _ in 0..50_000 {
rx.recv().unwrap();
count += 1;
}
count
});
for i in 0..50_000 {
tx.send(i).unwrap();
}
assert_eq!(handle.join().unwrap(), 50_000);
}
#[test]
fn completes_in_reasonable_time() {
use std::sync::mpsc;
let (done_tx, done_rx) = mpsc::channel();
let handle = thread::spawn(move || {
let (tx, mut rx) = channel::<u64>(1);
let h = thread::spawn(move || {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
for _ in 0..1000 {
rx.recv().unwrap();
}
h.join().unwrap();
done_tx.send(()).unwrap();
});
let result = done_rx.recv_timeout(Duration::from_secs(5));
assert!(result.is_ok(), "Test timed out - possible deadlock!");
handle.join().unwrap();
}
#[test]
fn does_not_hang_on_disconnect_during_recv() {
let done = Arc::new(AtomicBool::new(false));
let done_clone = done.clone();
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
let _ = rx.recv(); done_clone.store(true, Ordering::SeqCst);
});
thread::sleep(Duration::from_millis(50));
assert!(!done.load(Ordering::SeqCst));
drop(tx);
handle.join().unwrap();
assert!(done.load(Ordering::SeqCst)); }
#[test]
fn does_not_hang_on_disconnect_during_send() {
let done = Arc::new(AtomicBool::new(false));
let done_clone = done.clone();
let (tx, rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
let _ = tx.send(2); done_clone.store(true, Ordering::SeqCst);
});
thread::sleep(Duration::from_millis(50));
assert!(!done.load(Ordering::SeqCst));
drop(rx);
handle.join().unwrap();
assert!(done.load(Ordering::SeqCst)); }
#[test]
fn rapid_channel_creation() {
for _ in 0..1000 {
let (tx, mut rx) = channel::<u64>(4);
tx.try_send(1).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
}
}
#[test]
fn rapid_disconnect() {
for _ in 0..1000 {
let (tx, rx) = channel::<u64>(4);
drop(tx);
drop(rx);
}
}
#[test]
fn rapid_clone_and_drop() {
for _ in 0..100 {
let (tx, mut rx) = channel::<u64>(16);
let clones: Vec<_> = (0..10).map(|_| tx.clone()).collect();
for (i, c) in clones.into_iter().enumerate() {
c.send(i as u64).unwrap();
drop(c);
}
drop(tx);
let mut count = 0;
while let Ok(_) = rx.recv() {
count += 1;
}
assert_eq!(count, 10);
}
}
#[test]
fn debug_impl_sender() {
let (tx, _rx) = channel::<u64>(4);
let debug_str = format!("{:?}", tx);
assert!(debug_str.contains("Sender"));
assert!(debug_str.contains("capacity"));
}
#[test]
fn debug_impl_receiver() {
let (_tx, rx) = channel::<u64>(4);
let debug_str = format!("{:?}", rx);
assert!(debug_str.contains("Receiver"));
assert!(debug_str.contains("capacity"));
}
#[test]
fn capacity_method() {
let (tx, rx) = channel::<u64>(100);
assert!(tx.capacity() >= 100);
assert!(rx.capacity() >= 100);
assert_eq!(tx.capacity(), rx.capacity());
}
#[test]
fn send_recv_after_partial_fill_drain() {
let (tx, mut rx) = channel::<u64>(4);
tx.send(1).unwrap();
tx.send(2).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
tx.send(3).unwrap();
tx.send(4).unwrap();
assert_eq!(rx.recv().unwrap(), 2);
assert_eq!(rx.recv().unwrap(), 3);
assert_eq!(rx.recv().unwrap(), 4);
}
#[test]
fn interleaved_send_recv() {
let (tx, mut rx) = channel::<u64>(2);
for i in 0..100 {
tx.send(i).unwrap();
assert_eq!(rx.recv().unwrap(), i);
}
}
}