mod barriers;
mod consumers;
mod producers;
mod ring;
use alloc::sync::Arc;
pub use barriers::{MultiBarrier, Output, OwnedOutput, SharedOutput, SingleBarrier};
pub use consumers::{Consumer, ConsumerAccess, ConsumerMode};
pub use producers::{ConcurrentProducer, SingleProducer};
pub use ring::RingBuffer;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct Sequence(isize);
impl Default for Sequence {
#[inline]
fn default() -> Self {
Self(-1) }
}
#[test]
fn test_sequence_default() {
assert_eq!(Sequence::default().0, -1);
}
impl From<isize> for Sequence {
#[inline]
fn from(value: isize) -> Self {
Self(value)
}
}
impl From<usize> for Sequence {
#[inline]
#[allow(clippy::cast_possible_wrap)]
fn from(value: usize) -> Self {
Self(value as isize)
}
}
impl Sequence {
#[must_use]
#[inline]
pub fn is_valid_item(self) -> bool {
self.0 >= 0
}
#[must_use]
#[inline]
pub fn as_index(self) -> usize {
debug_assert!(self.0 >= 0);
self.0.unsigned_abs()
}
}
pub trait QueueUser {
type Item;
type UserOutput: Output + 'static;
type ProducerOutput: Output + 'static;
fn queue(&self) -> &Arc<RingBuffer<Self::Item, Self::ProducerOutput>>;
fn output(&self) -> &Arc<Self::UserOutput>;
}
#[cfg(test)]
mod tests_send_sync {
use alloc::sync::Arc;
use super::{ConcurrentProducer, Consumer, ConsumerMode, RingBuffer, SingleProducer};
pub fn assert_send<T: Send>(_thing: &T) {}
pub fn assert_sync<T: Sync>(_thing: &T) {}
#[test]
fn ring_is_send_sync() {
let ring = RingBuffer::<usize, _>::new_single_producer(4, 16);
assert_send(&ring);
assert_sync(&ring);
}
#[test]
fn producers_are_send_sync() {
let ring = Arc::new(RingBuffer::<usize, _>::new_single_producer(4, 16));
let producer_single = SingleProducer::new(ring.clone());
assert_send(&producer_single);
assert_sync(&producer_single);
let ring = Arc::new(RingBuffer::<usize, _>::new_multi_producer(4, 16));
let producer_concurrent = ConcurrentProducer::new(ring.clone());
assert_send(&producer_concurrent);
assert_sync(&producer_concurrent);
}
#[test]
fn consumer_is_send_sync() {
let ring = Arc::new(RingBuffer::<usize, _>::new_single_producer(4, 16));
let producer_single = SingleProducer::new(ring.clone());
let consumer = Consumer::new_awaiting_on(&producer_single, ConsumerMode::default());
assert_send(&consumer);
assert_sync(&consumer);
}
}
#[cfg(test)]
mod tests_concurrency_stress {
use alloc::sync::Arc;
use super::{ConcurrentProducer, Consumer, ConsumerMode, RingBuffer, SingleProducer};
use crate::errors::TryRecvError;
const SCALE_QUEUE_SIZE: usize = 256;
const SCALE_MSG_COUNT: usize = 1_000_000;
const SCALE_PRODUCERS: usize = 5;
const SCALE_CONSUMERS: usize = 5;
#[test]
fn spsc() {
let ring = Arc::new(RingBuffer::<usize, _>::new_single_producer(SCALE_QUEUE_SIZE, 16));
let mut sender = SingleProducer::new(ring.clone());
let mut receiver = Consumer::new(ring.clone(), ConsumerMode::Blocking).unwrap();
let consumer = std::thread::spawn(move || {
let mut outputs = Vec::with_capacity(SCALE_MSG_COUNT);
loop {
match receiver.try_recv() {
Ok(access) => {
for &item in access {
outputs.push(item);
}
}
Err(TryRecvError::Lagging(_)) => {
panic!("consumer should not lag");
}
Err(TryRecvError::Disconnected) => {
break;
}
Err(TryRecvError::NoCapacity) => {
panic!("no capacity when no buffer was used")
}
Err(TryRecvError::Empty) => {}
}
}
outputs.sort_unstable();
outputs.dedup();
assert_eq!(SCALE_MSG_COUNT, outputs.len());
for (i, v) in outputs.into_iter().enumerate() {
assert_eq!(i, v);
}
});
let producer = std::thread::spawn(move || {
let mut _tries = 0;
for i in 0..SCALE_MSG_COUNT {
_tries += 1;
while sender.try_push(i).is_err() {
_tries += 1;
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn spmc() {
let ring = Arc::new(RingBuffer::<usize, _>::new_single_producer(SCALE_QUEUE_SIZE, 16));
let mut sender = SingleProducer::new(ring.clone());
let mut receivers = (0..SCALE_CONSUMERS)
.map(|_| Consumer::new(ring.clone(), ConsumerMode::Blocking).unwrap())
.collect::<Vec<_>>();
let consumers = (0..SCALE_CONSUMERS)
.map(|_| {
let mut receiver = receivers.pop().unwrap();
std::thread::spawn(move || {
let mut outputs = Vec::with_capacity(SCALE_MSG_COUNT);
loop {
match receiver.try_recv() {
Ok(access) => {
for &item in access {
outputs.push(item);
}
}
Err(TryRecvError::Lagging(_)) => {
panic!("consumer should not lag");
}
Err(TryRecvError::Disconnected) => {
break;
}
Err(TryRecvError::NoCapacity) => {
panic!("no capacity when no buffer was used")
}
Err(TryRecvError::Empty) => {}
}
}
outputs.sort_unstable();
outputs.dedup();
assert_eq!(SCALE_MSG_COUNT, outputs.len());
for (i, v) in outputs.into_iter().enumerate() {
assert_eq!(i, v);
}
})
})
.collect::<Vec<_>>();
let producer = std::thread::spawn(move || {
let mut _tries = 0;
for i in 0..SCALE_MSG_COUNT {
_tries += 1;
while sender.try_push(i).is_err() {
_tries += 1;
}
}
});
producer.join().unwrap();
for consumer in consumers {
consumer.join().unwrap();
}
}
#[test]
fn mpmc() {
let ring = Arc::new(RingBuffer::<usize, _>::new_multi_producer(SCALE_QUEUE_SIZE, 16));
let mut senders = (0..SCALE_PRODUCERS)
.map(|_| ConcurrentProducer::new(ring.clone()))
.collect::<Vec<_>>();
let mut receivers = (0..SCALE_CONSUMERS)
.map(|_| Consumer::new(ring.clone(), ConsumerMode::Blocking).unwrap())
.collect::<Vec<_>>();
let consumers = (0..SCALE_CONSUMERS)
.map(|_| {
let mut receiver = receivers.pop().unwrap();
std::thread::spawn(move || {
let mut outputs = Vec::with_capacity(SCALE_MSG_COUNT);
loop {
match receiver.try_recv() {
Ok(access) => {
for &item in access {
outputs.push(item);
}
}
Err(TryRecvError::Lagging(_)) => {
panic!("consumer should not lag");
}
Err(TryRecvError::Disconnected) => {
break;
}
Err(TryRecvError::NoCapacity) => {
panic!("no capacity when no buffer was used")
}
Err(TryRecvError::Empty) => {}
}
}
outputs.sort_unstable();
outputs.dedup();
assert_eq!(SCALE_MSG_COUNT, outputs.len());
for (i, v) in outputs.into_iter().enumerate() {
assert_eq!(i, v);
}
})
})
.collect::<Vec<_>>();
let producers = (0..SCALE_PRODUCERS)
.map(|p| {
let mut sender = senders.pop().unwrap();
std::thread::spawn(move || {
for i in 0..(SCALE_MSG_COUNT / SCALE_PRODUCERS) {
while sender.try_push((p * SCALE_MSG_COUNT / SCALE_PRODUCERS) + i).is_err() {}
println!("pushed {}", (p * SCALE_MSG_COUNT / SCALE_PRODUCERS) + i);
}
})
})
.collect::<Vec<_>>();
for producer in producers {
producer.join().unwrap();
}
for consumer in consumers {
consumer.join().unwrap();
}
}
}