use core::num::NonZeroUsize;
pub use self::{receiver::Receiver, sender::Sender};
mod queue;
mod receiver;
mod sender;
pub mod sharded;
pub fn channel<T>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
let queue = queue::QueuePtr::with_size(capacity);
queue.initialize::<queue::Initializer<T>>();
(Sender::new(queue.clone()), Receiver::new(queue))
}
#[cfg(all(test, not(feature = "loom")))]
mod test {
use super::*;
use crate::thread;
#[test]
fn basic() {
const THREADS: u32 = 10;
const ITER: u32 = 1000;
let (tx, mut rx) = channel(NonZeroUsize::new(4).unwrap());
thread::scope(move |scope| {
for thread_id in 0..THREADS {
let mut tx = tx.clone();
scope.spawn(move || {
for i in 0..ITER {
tx.send((thread_id, i));
}
});
}
let mut sum = 0;
for _ in 0..THREADS {
for _ in 0..ITER {
let (_thread_id, i) = rx.recv();
sum += i;
}
}
assert_eq!(sum, (ITER * (ITER - 1)) / 2 * THREADS);
});
}
#[test]
fn multiple_senders_multiple_receivers() {
use std::sync::atomic::{AtomicUsize, Ordering};
const SENDERS: usize = 4;
const RECEIVERS: usize = 4;
const MESSAGES: usize = 1000;
let (tx, rx) = channel(NonZeroUsize::new(128).unwrap());
let total_received = std::sync::Arc::new(AtomicUsize::new(0));
let total_sum = std::sync::Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for t in 0..SENDERS {
let mut tx = tx.clone();
s.spawn(move || {
for i in 0..MESSAGES {
tx.send(t * MESSAGES + i);
}
});
}
for _ in 0..RECEIVERS {
let mut rx = rx.clone();
let total_received = total_received.clone();
let total_sum = total_sum.clone();
s.spawn(move || {
for _ in 0..(SENDERS * MESSAGES / RECEIVERS) {
let val = rx.recv();
total_received.fetch_add(1, Ordering::SeqCst);
total_sum.fetch_add(val, Ordering::SeqCst);
}
});
}
});
assert_eq!(total_received.load(Ordering::SeqCst), SENDERS * MESSAGES);
let n = SENDERS * MESSAGES;
let expected_sum = n * (n - 1) / 2;
assert_eq!(total_sum.load(Ordering::SeqCst), expected_sum);
}
#[test]
fn multiple_senders_multiple_receivers_try() {
use std::sync::atomic::{AtomicUsize, Ordering};
const SENDERS: usize = 4;
const RECEIVERS: usize = 4;
const MESSAGES: usize = 1000;
let (tx, rx) = channel(NonZeroUsize::new(128).unwrap());
let total_received = std::sync::Arc::new(AtomicUsize::new(0));
let total_sum = std::sync::Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for t in 0..SENDERS {
let mut tx = tx.clone();
s.spawn(move || {
for i in 0..MESSAGES {
let mut backoff = crate::Backoff::with_spin_count(1);
while tx.try_send(t * MESSAGES + i).is_err() {
backoff.backoff();
}
}
});
}
for _ in 0..RECEIVERS {
let mut rx = rx.clone();
let total_received = total_received.clone();
let total_sum = total_sum.clone();
s.spawn(move || {
let mut count = 0;
let mut backoff = crate::Backoff::with_spin_count(1);
while count < (SENDERS * MESSAGES / RECEIVERS) {
if let Some(val) = rx.try_recv() {
total_received.fetch_add(1, Ordering::SeqCst);
total_sum.fetch_add(val, Ordering::SeqCst);
count += 1;
} else {
backoff.backoff();
}
}
});
}
});
assert_eq!(total_received.load(Ordering::SeqCst), SENDERS * MESSAGES);
let n = SENDERS * MESSAGES;
let expected_sum = n * (n - 1) / 2;
assert_eq!(total_sum.load(Ordering::SeqCst), expected_sum);
}
#[test]
fn test_valid_try_sends() {
let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(4).unwrap());
for _ in 0..4 {
assert!(rx.try_recv().is_none());
}
for i in 0..4 {
tx.try_send(i).unwrap();
}
assert!(tx.try_send(5).is_err());
for i in 0..4 {
assert_eq!(rx.try_recv(), Some(i));
}
assert!(rx.try_recv().is_none());
for i in 0..4 {
tx.try_send(i).unwrap();
}
}
#[test]
fn test_drop_full_capacity() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct DropCounter(Arc<AtomicUsize>);
impl Drop for DropCounter {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let dropped_count = Arc::new(AtomicUsize::new(0));
{
let (mut tx, _rx) = channel::<DropCounter>(NonZeroUsize::new(3).unwrap());
for _ in 0..4 {
tx.send(DropCounter(dropped_count.clone()));
}
}
let count = dropped_count.load(Ordering::SeqCst);
assert_eq!(
count, 4,
"Expected 4 items to be dropped, but got {}",
count
);
}
}