use core::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use crossbeam_utils::sync::{Parker, Unparker};
use crossbeam_utils::{Backoff, CachePadded};
use nexus_queue::{Consumer, Full, Producer, ring_buffer};
const DEFAULT_SNOOZE_ITERS: usize = 8;
struct Shared {
sender_parked: CachePadded<AtomicBool>,
receiver_parked: CachePadded<AtomicBool>,
}
pub struct Sender<T> {
producer: Producer<T>,
shared: Arc<Shared>,
parker: Parker,
receiver_unparker: Unparker,
snooze_iters: usize,
}
pub struct Receiver<T> {
consumer: Consumer<T>,
shared: Arc<Shared>,
parker: Parker,
sender_unparker: Unparker,
snooze_iters: usize,
}
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
channel_with_config(capacity, DEFAULT_SNOOZE_ITERS)
}
pub fn channel_with_config<T>(capacity: usize, snooze_iters: usize) -> (Sender<T>, Receiver<T>) {
let (producer, consumer) = ring_buffer(capacity);
let shared = Arc::new(Shared {
sender_parked: CachePadded::new(AtomicBool::new(false)),
receiver_parked: CachePadded::new(AtomicBool::new(false)),
});
let sender_parker = Parker::new();
let sender_unparker = sender_parker.unparker().clone();
let receiver_parker = Parker::new();
let receiver_unparker = receiver_parker.unparker().clone();
(
Sender {
producer,
shared: Arc::clone(&shared),
parker: sender_parker,
receiver_unparker,
snooze_iters,
},
Receiver {
consumer,
shared,
parker: receiver_parker,
sender_unparker,
snooze_iters,
},
)
}
impl<T> Sender<T> {
pub fn send(&mut self, value: T) -> Result<(), SendError<T>> {
if self.producer.is_disconnected() {
return Err(SendError(value));
}
let mut val = value;
match self.producer.push(val) {
Ok(()) => {
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
let backoff = Backoff::new();
for _ in 0..self.snooze_iters {
backoff.snooze();
if self.producer.is_disconnected() {
return Err(SendError(val));
}
match self.producer.push(val) {
Ok(()) => {
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
}
loop {
self.shared.sender_parked.store(true, Ordering::SeqCst);
if self.producer.is_disconnected() {
self.shared.sender_parked.store(false, Ordering::Relaxed);
return Err(SendError(val));
}
match self.producer.push(val) {
Ok(()) => {
self.shared.sender_parked.store(false, Ordering::Relaxed);
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
self.parker.park();
self.shared.sender_parked.store(false, Ordering::Relaxed);
if self.producer.is_disconnected() {
return Err(SendError(val));
}
match self.producer.push(val) {
Ok(()) => {
self.notify_receiver();
return Ok(());
}
Err(Full(v)) => val = v,
}
}
}
pub fn try_send(&mut self, value: T) -> Result<(), TrySendError<T>> {
if self.producer.is_disconnected() {
return Err(TrySendError::Disconnected(value));
}
match self.producer.push(value) {
Ok(()) => {
self.notify_receiver();
Ok(())
}
Err(Full(v)) => Err(TrySendError::Full(v)),
}
}
#[inline]
fn notify_receiver(&self) {
if self.shared.receiver_parked.load(Ordering::SeqCst) {
self.receiver_unparker.unpark();
}
}
#[inline]
pub fn is_disconnected(&self) -> bool {
self.producer.is_disconnected()
}
#[inline]
pub fn capacity(&self) -> usize {
self.producer.capacity()
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("capacity", &self.capacity())
.field("disconnected", &self.is_disconnected())
.finish_non_exhaustive()
}
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Result<T, RecvError> {
if let Some(v) = self.consumer.pop() {
self.notify_sender();
return Ok(v);
}
let backoff = Backoff::new();
for _ in 0..self.snooze_iters {
backoff.snooze();
if let Some(v) = self.consumer.pop() {
self.notify_sender();
return Ok(v);
}
if self.consumer.is_disconnected() {
return self.consumer.pop().ok_or(RecvError);
}
}
loop {
self.shared.receiver_parked.store(true, Ordering::SeqCst);
if let Some(v) = self.consumer.pop() {
self.shared.receiver_parked.store(false, Ordering::Relaxed);
self.notify_sender();
return Ok(v);
}
if self.consumer.is_disconnected() {
self.shared.receiver_parked.store(false, Ordering::Relaxed);
return Err(RecvError);
}
self.parker.park();
self.shared.receiver_parked.store(false, Ordering::Relaxed);
if let Some(v) = self.consumer.pop() {
self.notify_sender();
return Ok(v);
}
if self.consumer.is_disconnected() {
return Err(RecvError);
}
}
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
match self.consumer.pop() {
Some(v) => {
self.notify_sender();
Ok(v)
}
None => {
if self.consumer.is_disconnected() {
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
}
}
#[inline]
fn notify_sender(&self) {
if self.shared.sender_parked.load(Ordering::SeqCst) {
self.sender_unparker.unpark();
}
}
#[inline]
pub fn is_disconnected(&self) -> bool {
self.consumer.is_disconnected()
}
#[inline]
pub fn capacity(&self) -> usize {
self.consumer.capacity()
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("capacity", &self.capacity())
.field("disconnected", &self.is_disconnected())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SendError<T>(pub T);
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "channel disconnected")
}
}
impl<T: fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecvError;
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "channel disconnected")
}
}
impl std::error::Error for RecvError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TrySendError<T> {
Full(T),
Disconnected(T),
}
impl<T> TrySendError<T> {
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(v) => v,
TrySendError::Disconnected(v) => v,
}
}
pub fn is_full(&self) -> bool {
matches!(self, TrySendError::Full(_))
}
pub fn is_disconnected(&self) -> bool {
matches!(self, TrySendError::Disconnected(_))
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TrySendError::Full(_) => write!(f, "channel full"),
TrySendError::Disconnected(_) => write!(f, "channel disconnected"),
}
}
}
impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
impl TryRecvError {
pub fn is_empty(&self) -> bool {
matches!(self, TryRecvError::Empty)
}
pub fn is_disconnected(&self) -> bool {
matches!(self, TryRecvError::Disconnected)
}
}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Empty => write!(f, "channel empty"),
TryRecvError::Disconnected => write!(f, "channel disconnected"),
}
}
}
impl std::error::Error for TryRecvError {}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.receiver_unparker.unpark();
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.sender_unparker.unpark();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::{Duration, Instant};
#[test]
fn basic_send_recv() {
let (mut tx, mut rx) = channel::<u64>(4);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert_eq!(rx.recv().unwrap(), 3);
}
#[test]
fn try_send_try_recv() {
let (mut tx, mut rx) = channel::<u64>(2);
assert!(tx.try_send(1).is_ok());
assert!(tx.try_send(2).is_ok());
assert!(matches!(tx.try_send(3), Err(TrySendError::Full(3))));
assert_eq!(rx.try_recv().unwrap(), 1);
assert_eq!(rx.try_recv().unwrap(), 2);
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn send_fills_then_recv_drains() {
let (mut tx, mut rx) = channel::<u64>(4);
for i in 0..4 {
tx.try_send(i).unwrap();
}
assert!(matches!(tx.try_send(99), Err(TrySendError::Full(99))));
for i in 0..4 {
assert_eq!(rx.recv().unwrap(), i);
}
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn recv_returns_error_when_sender_dropped() {
let (tx, mut rx) = channel::<u64>(4);
drop(tx);
assert!(rx.recv().is_err());
assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
}
#[test]
fn recv_drains_before_error_when_sender_dropped() {
let (mut tx, mut rx) = channel::<u64>(4);
tx.send(1).unwrap();
tx.send(2).unwrap();
drop(tx);
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert!(rx.recv().is_err());
}
#[test]
fn send_returns_error_when_receiver_dropped() {
let (mut tx, rx) = channel::<u64>(4);
drop(rx);
assert!(tx.send(1).is_err());
assert!(matches!(tx.try_send(1), Err(TrySendError::Disconnected(1))));
}
#[test]
fn is_disconnected_sender() {
let (tx, rx) = channel::<u64>(4);
assert!(!tx.is_disconnected());
drop(rx);
assert!(tx.is_disconnected());
}
#[test]
fn is_disconnected_receiver() {
let (tx, rx) = channel::<u64>(4);
assert!(!rx.is_disconnected());
drop(tx);
assert!(rx.is_disconnected());
}
#[test]
fn cross_thread_single_message() {
let (mut tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || rx.recv().unwrap());
tx.send(42).unwrap();
assert_eq!(handle.join().unwrap(), 42);
}
#[test]
fn cross_thread_multiple_messages() {
let (mut tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
let mut sum = 0;
for _ in 0..100 {
sum += rx.recv().unwrap();
}
sum
});
for i in 0..100 {
tx.send(i).unwrap();
}
let sum = handle.join().unwrap();
assert_eq!(sum, 99 * 100 / 2);
}
#[test]
fn fifo_ordering_single_thread() {
let (mut tx, mut rx) = channel::<u64>(8);
for i in 0..8 {
tx.try_send(i).unwrap();
}
for i in 0..8 {
assert_eq!(rx.recv().unwrap(), i);
}
}
#[test]
fn fifo_ordering_cross_thread() {
let (mut tx, mut rx) = channel::<u64>(64);
let handle = thread::spawn(move || {
let mut expected = 0u64;
while expected < 10_000 {
let val = rx.recv().unwrap();
assert_eq!(val, expected, "FIFO order violated");
expected += 1;
}
});
for i in 0..10_000 {
tx.send(i).unwrap();
}
handle.join().unwrap();
}
#[test]
fn recv_blocks_until_send() {
let (mut tx, mut rx) = channel::<u64>(4);
let start = Instant::now();
let handle = thread::spawn(move || rx.recv().unwrap());
thread::sleep(Duration::from_millis(50));
tx.send(42).unwrap();
let val = handle.join().unwrap();
assert_eq!(val, 42);
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn send_blocks_until_recv() {
let (mut tx, mut rx) = channel::<u64>(2);
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
let start = Instant::now();
let handle = thread::spawn(move || {
tx.send(3).unwrap(); tx
});
thread::sleep(Duration::from_millis(50));
rx.recv().unwrap();
let _ = handle.join().unwrap();
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn recv_wakes_on_sender_drop() {
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
let result = rx.recv();
assert!(result.is_err());
});
thread::sleep(Duration::from_millis(50));
drop(tx);
handle.join().unwrap();
}
#[test]
fn send_wakes_on_receiver_drop() {
let (mut tx, rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
let result = tx.send(2); assert!(result.is_err());
});
thread::sleep(Duration::from_millis(50));
drop(rx);
handle.join().unwrap();
}
#[test]
fn capacity_one() {
let (mut tx, mut rx) = channel::<u64>(1);
for i in 0..100 {
tx.send(i).unwrap();
assert_eq!(rx.recv().unwrap(), i);
}
}
#[test]
fn capacity_one_cross_thread() {
let (mut tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for _ in 0..1000 {
rx.recv().unwrap();
}
});
for i in 0..1000 {
tx.send(i).unwrap();
}
handle.join().unwrap();
}
#[test]
fn stress_high_volume() {
const COUNT: u64 = 100_000;
let (mut tx, mut rx) = channel::<u64>(1024);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut sum = 0u64;
for _ in 0..COUNT {
sum = sum.wrapping_add(rx.recv().unwrap());
}
sum
});
producer.join().unwrap();
let sum = consumer.join().unwrap();
assert_eq!(sum, COUNT * (COUNT - 1) / 2);
}
#[test]
fn stress_small_buffer() {
const COUNT: u64 = 10_000;
let (mut tx, mut rx) = channel::<u64>(4);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut received = 0u64;
while received < COUNT {
rx.recv().unwrap();
received += 1;
}
received
});
producer.join().unwrap();
let received = consumer.join().unwrap();
assert_eq!(received, COUNT);
}
#[test]
fn stress_capacity_one_high_volume() {
const COUNT: u64 = 10_000;
let (mut tx, mut rx) = channel::<u64>(1);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut expected = 0u64;
while expected < COUNT {
let val = rx.recv().unwrap();
assert_eq!(val, expected);
expected += 1;
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn values_dropped_on_channel_drop() {
use std::sync::atomic::{AtomicUsize, Ordering};
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::SeqCst);
}
}
DROP_COUNT.store(0, Ordering::SeqCst);
let (mut tx, rx) = channel::<DropCounter>(4);
tx.try_send(DropCounter).unwrap();
tx.try_send(DropCounter).unwrap();
tx.try_send(DropCounter).unwrap();
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
drop(tx);
drop(rx);
assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
}
#[test]
fn failed_send_returns_value() {
let (mut tx, rx) = channel::<String>(1);
tx.try_send("hello".to_string()).unwrap();
let err = tx.try_send("world".to_string());
match err {
Err(TrySendError::Full(s)) => assert_eq!(s, "world"),
_ => panic!("expected Full error"),
}
drop(rx);
let err = tx.try_send("test".to_string());
match err {
Err(TrySendError::Disconnected(s)) => assert_eq!(s, "test"),
_ => panic!("expected Disconnected error"),
}
}
#[test]
fn no_deadlock_alternating() {
let (mut tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for i in 0..1000u64 {
tx.send(i).unwrap();
}
});
for _ in 0..1000 {
rx.recv().unwrap();
}
handle.join().unwrap();
}
#[test]
fn no_deadlock_burst_then_drain() {
let (mut tx, mut rx) = channel::<u64>(8);
for round in 0..100 {
for i in 0..8 {
tx.try_send(round * 8 + i).unwrap();
}
for i in 0..8 {
assert_eq!(rx.recv().unwrap(), round * 8 + i);
}
}
}
#[test]
fn no_deadlock_concurrent_full_empty_transitions() {
let (mut tx, mut rx) = channel::<u64>(2);
let producer = thread::spawn(move || {
for i in 0..10_000u64 {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
for _ in 0..10_000 {
rx.recv().unwrap();
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn does_not_hang_on_disconnect_during_recv() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let done = Arc::new(AtomicBool::new(false));
let done_clone = done.clone();
let (tx, mut rx) = channel::<u64>(4);
let handle = thread::spawn(move || {
let _ = rx.recv(); done_clone.store(true, Ordering::SeqCst);
});
thread::sleep(Duration::from_millis(50));
assert!(!done.load(Ordering::SeqCst));
drop(tx);
handle.join().unwrap();
assert!(done.load(Ordering::SeqCst)); }
#[test]
fn does_not_hang_on_disconnect_during_send() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let done = Arc::new(AtomicBool::new(false));
let done_clone = done.clone();
let (mut tx, rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
let _ = tx.send(2); done_clone.store(true, Ordering::SeqCst);
});
thread::sleep(Duration::from_millis(50));
assert!(!done.load(Ordering::SeqCst));
drop(rx);
handle.join().unwrap();
assert!(done.load(Ordering::SeqCst)); }
#[test]
fn rapid_channel_creation() {
for _ in 0..1000 {
let (mut tx, mut rx) = channel::<u64>(4);
tx.try_send(1).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
}
}
#[test]
fn rapid_disconnect() {
for _ in 0..1000 {
let (tx, rx) = channel::<u64>(4);
drop(tx);
drop(rx);
}
}
#[test]
fn zero_sized_type() {
let (mut tx, mut rx) = channel::<()>(4);
tx.send(()).unwrap();
tx.send(()).unwrap();
assert_eq!(rx.recv().unwrap(), ());
assert_eq!(rx.recv().unwrap(), ());
}
#[test]
fn large_message_type() {
#[derive(Clone, PartialEq, Debug)]
struct LargeMessage {
data: [u8; 4096],
}
let (mut tx, mut rx) = channel::<LargeMessage>(4);
let msg = LargeMessage { data: [42u8; 4096] };
tx.send(msg.clone()).unwrap();
let received = rx.recv().unwrap();
assert_eq!(received.data[0], 42);
assert_eq!(received.data[4095], 42);
}
#[test]
fn many_laps_single_thread() {
let (mut tx, mut rx) = channel::<u64>(4);
for i in 0..1000 {
tx.send(i).unwrap();
assert_eq!(rx.recv().unwrap(), i);
}
}
#[test]
fn many_laps_cross_thread() {
const COUNT: u64 = 100_000;
let (mut tx, mut rx) = channel::<u64>(4);
let producer = thread::spawn(move || {
for i in 0..COUNT {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut expected = 0u64;
while expected < COUNT {
let val = rx.recv().unwrap();
assert_eq!(val, expected);
expected += 1;
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn ping_pong_basic() {
let (mut tx1, mut rx1) = channel::<u64>(1);
let (mut tx2, mut rx2) = channel::<u64>(1);
let handle = thread::spawn(move || {
for i in 0..1000 {
let val = rx1.recv().unwrap();
assert_eq!(val, i);
tx2.send(i).unwrap();
}
});
for i in 0..1000 {
tx1.send(i).unwrap();
let val = rx2.recv().unwrap();
assert_eq!(val, i);
}
handle.join().unwrap();
}
#[test]
fn ping_pong_capacity_one_high_iterations() {
let (mut tx1, mut rx1) = channel::<u64>(1);
let (mut tx2, mut rx2) = channel::<u64>(1);
let handle = thread::spawn(move || {
for i in 0..10_000 {
let val = rx1.recv().unwrap();
assert_eq!(val, i);
tx2.send(i * 2).unwrap();
}
});
for i in 0..10_000 {
tx1.send(i).unwrap();
let val = rx2.recv().unwrap();
assert_eq!(val, i * 2);
}
handle.join().unwrap();
}
#[test]
fn ping_pong_both_block() {
let (mut tx1, mut rx1) = channel::<u64>(1);
let (mut tx2, mut rx2) = channel::<u64>(1);
let handle = thread::spawn(move || {
for _ in 0..5000 {
let val = rx1.recv().unwrap();
tx2.send(val + 1).unwrap();
}
});
for i in 0..5000 {
tx1.send(i).unwrap();
let val = rx2.recv().unwrap();
assert_eq!(val, i + 1);
}
handle.join().unwrap();
}
#[test]
fn ping_pong_with_varying_delays() {
let (mut tx1, mut rx1) = channel::<u64>(1);
let (mut tx2, mut rx2) = channel::<u64>(1);
let handle = thread::spawn(move || {
for i in 0..100 {
let val = rx1.recv().unwrap();
if i % 20 == 0 {
thread::sleep(Duration::from_micros(100));
}
tx2.send(val).unwrap();
}
});
for i in 0..100 {
if i % 17 == 0 {
thread::sleep(Duration::from_micros(100));
}
tx1.send(i).unwrap();
let val = rx2.recv().unwrap();
assert_eq!(val, i);
}
handle.join().unwrap();
}
#[test]
fn ping_pong_multiple_pairs() {
let mut handles = vec![];
for _ in 0..4 {
let (mut tx1, mut rx1) = channel::<u64>(1);
let (mut tx2, mut rx2) = channel::<u64>(1);
let h1 = thread::spawn(move || {
for _ in 0..1000 {
let val = rx1.recv().unwrap();
tx2.send(val + 1).unwrap();
}
});
let h2 = thread::spawn(move || {
for i in 0..1000u64 {
tx1.send(i).unwrap();
let val = rx2.recv().unwrap();
assert_eq!(val, i + 1);
}
});
handles.push(h1);
handles.push(h2);
}
for h in handles {
h.join().unwrap();
}
}
#[test]
fn no_deadlock_send_recv_interleaved() {
let (mut tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for _ in 0..10_000 {
rx.recv().unwrap();
}
});
for i in 0..10_000 {
tx.send(i).unwrap();
}
handle.join().unwrap();
}
#[test]
fn no_deadlock_full_buffer_unblock() {
let (mut tx, mut rx) = channel::<u64>(2);
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
rx.recv().unwrap();
rx.recv().unwrap();
rx.recv().unwrap(); });
tx.send(3).unwrap();
handle.join().unwrap();
}
#[test]
fn no_deadlock_empty_buffer_unblock() {
let (mut tx, mut rx) = channel::<u64>(2);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send(42).unwrap();
});
let val = rx.recv().unwrap();
assert_eq!(val, 42);
handle.join().unwrap();
}
#[test]
fn no_deadlock_simultaneous_block() {
let (mut tx, mut rx) = channel::<u64>(1);
let sender = thread::spawn(move || {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
let receiver = thread::spawn(move || {
for _ in 0..1000 {
rx.recv().unwrap();
}
});
sender.join().unwrap();
receiver.join().unwrap();
}
#[test]
fn no_deadlock_disconnect_while_blocked_recv() {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
let result = rx.recv();
assert!(result.is_err()); });
thread::sleep(Duration::from_millis(50));
drop(tx);
handle.join().unwrap();
}
#[test]
fn no_deadlock_disconnect_while_blocked_send() {
let (mut tx, rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
let result = tx.send(2);
assert!(result.is_err()); });
thread::sleep(Duration::from_millis(50));
drop(rx);
handle.join().unwrap();
}
#[test]
fn race_send_before_recv_parks() {
for _ in 0..100 {
let (mut tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || rx.recv().unwrap());
thread::yield_now();
tx.send(42).unwrap();
assert_eq!(handle.join().unwrap(), 42);
}
}
#[test]
fn race_recv_before_send_parks() {
for _ in 0..100 {
let (mut tx, mut rx) = channel::<u64>(1);
tx.try_send(1).unwrap();
let handle = thread::spawn(move || {
tx.send(2).unwrap();
});
thread::yield_now();
rx.recv().unwrap();
handle.join().unwrap();
}
}
#[test]
fn race_disconnect_during_park_transition() {
for _ in 0..100 {
let (tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
let _ = rx.recv(); });
drop(tx);
handle.join().unwrap();
}
}
#[test]
fn stress_rapid_park_unpark_sender() {
let (mut tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for _ in 0..10_000 {
rx.recv().unwrap();
}
});
for i in 0..10_000 {
tx.send(i).unwrap();
}
handle.join().unwrap();
}
#[test]
fn stress_rapid_park_unpark_receiver() {
let (mut tx, mut rx) = channel::<u64>(1);
let handle = thread::spawn(move || {
for i in 0..10_000 {
tx.send(i).unwrap();
}
});
for _ in 0..10_000 {
rx.recv().unwrap();
}
handle.join().unwrap();
}
#[test]
fn stress_park_unpark_both_sides() {
let (mut tx, mut rx) = channel::<u64>(1);
let sender = thread::spawn(move || {
for i in 0..50_000 {
tx.send(i).unwrap();
}
});
let receiver = thread::spawn(move || {
let mut count = 0;
for _ in 0..50_000 {
rx.recv().unwrap();
count += 1;
}
count
});
sender.join().unwrap();
assert_eq!(receiver.join().unwrap(), 50_000);
}
#[test]
fn completes_in_reasonable_time() {
use std::sync::mpsc;
let (done_tx, done_rx) = mpsc::channel();
let handle = thread::spawn(move || {
let (mut tx, mut rx) = channel::<u64>(1);
let h = thread::spawn(move || {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
for _ in 0..1000 {
rx.recv().unwrap();
}
h.join().unwrap();
done_tx.send(()).unwrap();
});
let result = done_rx.recv_timeout(Duration::from_secs(5));
assert!(result.is_ok(), "Test timed out - possible deadlock!");
handle.join().unwrap();
}
}