use super::core::RingBufCore;
use crate::shim::atomic::Ordering;
use crate::shim::sync::Arc;
use core::fmt;
use core::num::NonZero;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PushError<T> {
Full(T),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PopError {
Empty,
}
pub struct SharedData<T, const N: usize> {
core: RingBufCore<T, N>,
}
pub struct Producer<T, const N: usize> {
shared: Arc<SharedData<T, N>>,
cached_read: usize,
}
pub struct Consumer<T, const N: usize> {
shared: Arc<SharedData<T, N>>,
cached_write: usize,
}
pub struct Drain<'a, T, const N: usize> {
consumer: &'a mut Consumer<T, N>,
}
impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.consumer.pop().ok()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.consumer.slots();
(len, Some(len))
}
}
impl<T, const N: usize> SharedData<T, N> {
#[inline]
pub fn capacity(&self) -> usize {
self.core.capacity()
}
}
pub fn new<T, const N: usize>(capacity: NonZero<usize>) -> (Producer<T, N>, Consumer<T, N>) {
let core = RingBufCore::new(capacity.get());
let shared = Arc::new(SharedData { core });
let producer = Producer {
shared: shared.clone(),
cached_read: 0,
};
let consumer = Consumer {
shared,
cached_write: 0,
};
(producer, consumer)
}
impl<T: fmt::Debug, const N: usize> fmt::Debug for Producer<T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Producer")
.field("capacity", &self.shared.core.capacity())
.field("slots", &self.slots())
.field("free_slots", &self.free_slots())
.field("is_empty", &self.is_empty())
.field("is_full", &self.is_full())
.finish()
}
}
impl<T, const N: usize> Producer<T, N> {
#[inline]
pub fn capacity(&self) -> usize {
self.shared.core.capacity()
}
#[inline]
pub fn slots(&self) -> usize {
let write = self.shared.core.write_idx().load(Ordering::Relaxed);
let read = self.shared.core.read_idx().load(Ordering::Acquire);
write.wrapping_sub(read)
}
#[inline]
pub fn len(&self) -> usize {
self.slots()
}
#[inline]
pub fn is_empty(&self) -> bool {
let write = self.shared.core.write_idx().load(Ordering::Relaxed);
let read = self.shared.core.read_idx().load(Ordering::Acquire);
write == read
}
#[inline]
pub fn free_slots(&self) -> usize {
self.shared.core.capacity() - self.slots()
}
#[inline]
pub fn is_full(&self) -> bool {
let write = self.shared.core.write_idx().load(Ordering::Relaxed);
let read = self.shared.core.read_idx().load(Ordering::Acquire);
write.wrapping_sub(read) >= self.shared.core.capacity()
}
#[inline]
pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
let write = self.shared.core.write_idx().load(Ordering::Relaxed);
let mut read = self.cached_read;
if write.wrapping_sub(read) >= self.shared.core.capacity() {
read = self.shared.core.read_idx().load(Ordering::Acquire);
self.cached_read = read;
if write.wrapping_sub(read) >= self.shared.core.capacity() {
return Err(PushError::Full(value));
}
}
let index = write & self.shared.core.mask();
unsafe {
self.shared.core.write_at(index, value);
}
self.shared
.core
.write_idx()
.store(write.wrapping_add(1), Ordering::Release);
Ok(())
}
}
impl<T: Copy, const N: usize> Producer<T, N> {
#[inline]
pub fn push_slice(&mut self, values: &[T]) -> usize {
if values.is_empty() {
return 0;
}
let write = self.shared.core.write_idx().load(Ordering::Relaxed);
let mut read = self.cached_read;
let mut available = self
.shared
.core
.capacity()
.saturating_sub(write.wrapping_sub(read));
if available == 0 {
read = self.shared.core.read_idx().load(Ordering::Acquire);
self.cached_read = read;
available = self
.shared
.core
.capacity()
.saturating_sub(write.wrapping_sub(read));
if available == 0 {
return 0;
}
}
let to_push = available.min(values.len());
unsafe {
self.shared.core.copy_from_slice(write, values, to_push);
}
self.shared
.core
.write_idx()
.store(write.wrapping_add(to_push), Ordering::Release);
to_push
}
}
impl<T: fmt::Debug, const N: usize> fmt::Debug for Consumer<T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Consumer")
.field("capacity", &self.shared.core.capacity())
.field("slots", &self.slots())
.field("is_empty", &self.is_empty())
.finish()
}
}
impl<T, const N: usize> Consumer<T, N> {
#[inline]
pub fn pop(&mut self) -> Result<T, PopError> {
let read = self.shared.core.read_idx().load(Ordering::Relaxed);
let mut write = self.cached_write;
if read == write {
write = self.shared.core.write_idx().load(Ordering::Acquire);
self.cached_write = write;
if read == write {
return Err(PopError::Empty);
}
}
let index = read & self.shared.core.mask();
let value = unsafe { self.shared.core.read_at(index) };
self.shared
.core
.read_idx()
.store(read.wrapping_add(1), Ordering::Release);
Ok(value)
}
#[inline]
pub fn is_empty(&self) -> bool {
let read = self.shared.core.read_idx().load(Ordering::Relaxed);
let write = self.shared.core.write_idx().load(Ordering::Acquire);
read == write
}
#[inline]
pub fn slots(&self) -> usize {
let read = self.shared.core.read_idx().load(Ordering::Relaxed);
let write = self.shared.core.write_idx().load(Ordering::Acquire);
write.wrapping_sub(read)
}
#[inline]
pub fn len(&self) -> usize {
self.slots()
}
#[inline]
pub fn capacity(&self) -> usize {
self.shared.core.capacity()
}
#[inline]
pub fn peek(&self) -> Option<&T> {
let read = self.shared.core.read_idx().load(Ordering::Relaxed);
let write = self.shared.core.write_idx().load(Ordering::Acquire);
if read == write {
return None;
}
let index = read & self.shared.core.mask();
unsafe { Some(self.shared.core.peek_at(index)) }
}
pub fn clear(&mut self) {
while self.pop().is_ok() {
}
}
#[inline]
pub fn drain(&mut self) -> Drain<'_, T, N> {
Drain { consumer: self }
}
#[inline]
pub fn buffer(&self) -> &SharedData<T, N> {
&self.shared
}
}
impl<T: Copy, const N: usize> Consumer<T, N> {
#[inline]
pub fn pop_slice(&mut self, dest: &mut [T]) -> usize {
if dest.is_empty() {
return 0;
}
let read = self.shared.core.read_idx().load(Ordering::Relaxed);
let mut write = self.cached_write;
let mut available = write.wrapping_sub(read);
if available == 0 {
write = self.shared.core.write_idx().load(Ordering::Acquire);
self.cached_write = write;
available = write.wrapping_sub(read);
if available == 0 {
return 0;
}
}
let to_pop = available.min(dest.len());
unsafe {
self.shared.core.copy_to_slice(read, dest, to_pop);
}
self.shared
.core
.read_idx()
.store(read.wrapping_add(to_pop), Ordering::Release);
to_pop
}
}
impl<T, const N: usize> Drop for Consumer<T, N> {
fn drop(&mut self) {
while self.pop().is_ok() {
}
}
}
#[cfg(all(test, not(feature = "loom")))]
mod tests {
use super::*;
#[test]
fn test_basic_push_pop() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
assert!(producer.push(1).is_ok());
assert!(producer.push(2).is_ok());
assert!(producer.push(3).is_ok());
assert_eq!(consumer.pop().unwrap(), 1);
assert_eq!(consumer.pop().unwrap(), 2);
assert_eq!(consumer.pop().unwrap(), 3);
assert!(consumer.pop().is_err());
}
#[test]
fn test_capacity_rounding() {
let (_, consumer) = new::<i32, 32>(NonZero::new(5).unwrap());
assert_eq!(consumer.buffer().capacity(), 8);
let (_, consumer) = new::<i32, 64>(NonZero::new(32).unwrap());
assert_eq!(consumer.buffer().capacity(), 32);
let (_, consumer) = new::<i32, 128>(NonZero::new(33).unwrap());
assert_eq!(consumer.buffer().capacity(), 64);
}
#[test]
fn test_buffer_full() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
assert!(producer.push(1).is_ok());
assert!(producer.push(2).is_ok());
assert!(producer.push(3).is_ok());
assert!(producer.push(4).is_ok());
assert!(matches!(producer.push(5), Err(PushError::Full(5))));
assert_eq!(consumer.pop().unwrap(), 1);
assert!(producer.push(5).is_ok());
}
#[test]
fn test_buffer_empty() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
assert!(consumer.pop().is_err());
assert!(consumer.is_empty());
producer.push(42).unwrap();
assert!(!consumer.is_empty());
consumer.pop().unwrap();
assert!(consumer.is_empty());
}
#[test]
fn test_slots() {
let (mut producer, consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
assert_eq!(consumer.slots(), 0);
producer.push(1).unwrap();
producer.push(2).unwrap();
producer.push(3).unwrap();
assert_eq!(consumer.slots(), 3);
}
#[test]
fn test_wrap_around() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
for round in 0..10 {
for i in 0..4 {
producer.push(round * 10 + i).unwrap();
}
for i in 0..4 {
assert_eq!(consumer.pop().unwrap(), round * 10 + i);
}
}
}
#[test]
fn test_drop_cleanup() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct DropCounter {
counter: Arc<AtomicUsize>,
}
impl Drop for DropCounter {
fn drop(&mut self) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(AtomicUsize::new(0));
{
let (mut producer, consumer) = new::<DropCounter, 32>(NonZero::new(8).unwrap());
for _ in 0..5 {
producer
.push(DropCounter {
counter: counter.clone(),
})
.unwrap();
}
drop(consumer);
}
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
#[test]
fn test_concurrent_access() {
use std::thread;
let (mut producer, mut consumer) = new::<u64, 128>(NonZero::new(128).unwrap());
let producer_handle = thread::spawn(move || {
for i in 0..1000 {
loop {
if producer.push(i).is_ok() {
break;
}
thread::yield_now();
}
}
});
let consumer_handle = thread::spawn(move || {
let mut received = Vec::new();
for _ in 0..1000 {
loop {
match consumer.pop() {
Ok(val) => {
received.push(val);
break;
}
Err(_) => thread::yield_now(),
}
}
}
received
});
producer_handle.join().unwrap();
let received = consumer_handle.join().unwrap();
assert_eq!(received.len(), 1000);
for (i, &val) in received.iter().enumerate() {
assert_eq!(val, i as u64);
}
}
#[test]
fn test_small_capacity_stack_allocation() {
let (mut producer, mut consumer) = new::<u8, 32>(NonZero::new(16).unwrap());
for i in 0..10 {
producer.push(i).unwrap();
}
for i in 0..10 {
assert_eq!(consumer.pop().unwrap(), i);
}
}
#[test]
fn test_large_capacity_heap_allocation() {
let (mut producer, mut consumer) = new::<u8, 32>(NonZero::new(64).unwrap());
for i in 0..50 {
producer.push(i).unwrap();
}
for i in 0..50 {
assert_eq!(consumer.pop().unwrap(), i);
}
}
#[test]
fn test_capacity_one() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(1).unwrap());
assert_eq!(consumer.buffer().capacity(), 1);
assert!(producer.push(42).is_ok());
assert!(matches!(producer.push(99), Err(PushError::Full(99))));
assert_eq!(consumer.pop().unwrap(), 42);
assert!(consumer.pop().is_err());
}
#[test]
fn test_power_of_two_capacities() {
for power in 0..10 {
let capacity = 1 << power; let (_, consumer) = new::<u8, 128>(NonZero::new(capacity).unwrap());
assert_eq!(consumer.buffer().capacity(), capacity);
}
}
#[test]
fn test_non_power_of_two_rounding() {
let test_cases = vec![
(3, 4),
(5, 8),
(7, 8),
(9, 16),
(15, 16),
(17, 32),
(31, 32),
(33, 64),
(100, 128),
(1000, 1024),
];
for (input, expected) in test_cases {
let (_, consumer) = new::<u8, 128>(NonZero::new(input).unwrap());
assert_eq!(
consumer.buffer().capacity(),
expected,
"Capacity {} should round up to {}",
input,
expected
);
}
}
#[test]
fn test_single_element_operations() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
for i in 0..100 {
producer.push(i).unwrap();
assert_eq!(consumer.slots(), 1);
assert_eq!(consumer.pop().unwrap(), i);
assert_eq!(consumer.slots(), 0);
assert!(consumer.is_empty());
}
}
#[test]
fn test_alternating_push_pop() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
for i in 0..50 {
producer.push(i * 2).unwrap();
producer.push(i * 2 + 1).unwrap();
assert_eq!(consumer.pop().unwrap(), i * 2);
assert_eq!(consumer.pop().unwrap(), i * 2 + 1);
}
}
#[test]
fn test_index_wrapping() {
let (mut producer, mut consumer) = new::<usize, 32>(NonZero::new(4).unwrap());
for iteration in 0..1000 {
for i in 0..4 {
producer.push(iteration * 4 + i).unwrap();
}
for i in 0..4 {
assert_eq!(consumer.pop().unwrap(), iteration * 4 + i);
}
}
}
#[test]
fn test_various_stack_thresholds() {
let (mut p1, mut c1) = new::<u32, 16>(NonZero::new(8).unwrap());
p1.push(1).unwrap();
assert_eq!(c1.pop().unwrap(), 1);
let (mut p2, mut c2) = new::<u32, 64>(NonZero::new(32).unwrap());
p2.push(2).unwrap();
assert_eq!(c2.pop().unwrap(), 2);
let (mut p3, mut c3) = new::<u32, 128>(NonZero::new(64).unwrap());
p3.push(3).unwrap();
assert_eq!(c3.pop().unwrap(), 3);
let (mut p4, mut c4) = new::<u32, 256>(NonZero::new(128).unwrap());
p4.push(4).unwrap();
assert_eq!(c4.pop().unwrap(), 4);
}
#[test]
fn test_small_n_with_large_capacity() {
let (mut producer, mut consumer) = new::<u64, 8>(NonZero::new(32).unwrap());
for i in 0..20 {
producer.push(i).unwrap();
}
for i in 0..20 {
assert_eq!(consumer.pop().unwrap(), i);
}
}
#[test]
fn test_large_n_with_small_capacity() {
let (mut producer, mut consumer) = new::<u64, 256>(NonZero::new(16).unwrap());
for i in 0..10 {
producer.push(i).unwrap();
}
for i in 0..10 {
assert_eq!(consumer.pop().unwrap(), i);
}
}
#[test]
fn test_zero_sized_types() {
let (mut producer, mut consumer) = new::<(), 32>(NonZero::new(4).unwrap());
producer.push(()).unwrap();
producer.push(()).unwrap();
assert_eq!(consumer.pop().unwrap(), ());
assert_eq!(consumer.pop().unwrap(), ());
}
#[test]
fn test_large_types() {
#[derive(Debug, PartialEq, Clone)]
struct LargeStruct {
data: [u64; 32],
}
let (mut producer, mut consumer) = new::<LargeStruct, 32>(NonZero::new(4).unwrap());
let item1 = LargeStruct { data: [1; 32] };
let item2 = LargeStruct { data: [2; 32] };
producer.push(item1.clone()).unwrap();
producer.push(item2.clone()).unwrap();
assert_eq!(consumer.pop().unwrap(), item1);
assert_eq!(consumer.pop().unwrap(), item2);
}
#[test]
fn test_string_type() {
let (mut producer, mut consumer) = new::<String, 32>(NonZero::new(8).unwrap());
let messages = vec!["Hello", "World", "Rust", "Ring", "Buffer"];
for msg in &messages {
producer.push(msg.to_string()).unwrap();
}
for msg in &messages {
assert_eq!(consumer.pop().unwrap(), msg.to_string());
}
}
#[test]
fn test_option_type() {
let (mut producer, mut consumer) = new::<Option<i32>, 32>(NonZero::new(4).unwrap());
producer.push(Some(42)).unwrap();
producer.push(None).unwrap();
producer.push(Some(100)).unwrap();
assert_eq!(consumer.pop().unwrap(), Some(42));
assert_eq!(consumer.pop().unwrap(), None);
assert_eq!(consumer.pop().unwrap(), Some(100));
}
#[test]
fn test_concurrent_small_buffer() {
use std::thread;
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(4).unwrap());
let count = 100;
let producer_handle = thread::spawn(move || {
for i in 0..count {
loop {
if producer.push(i).is_ok() {
break;
}
thread::yield_now();
}
}
});
let consumer_handle = thread::spawn(move || {
let mut sum = 0;
for _ in 0..count {
loop {
match consumer.pop() {
Ok(val) => {
sum += val;
break;
}
Err(_) => thread::yield_now(),
}
}
}
sum
});
producer_handle.join().unwrap();
let sum = consumer_handle.join().unwrap();
assert_eq!(sum, (count * (count - 1)) / 2);
}
#[test]
fn test_concurrent_large_buffer() {
use std::thread;
let (mut producer, mut consumer) = new::<u64, 512>(NonZero::new(512).unwrap());
let count = 10000;
let producer_handle = thread::spawn(move || {
for i in 0..count {
loop {
if producer.push(i).is_ok() {
break;
}
thread::yield_now();
}
}
});
let consumer_handle = thread::spawn(move || {
let mut received = 0;
for _ in 0..count {
loop {
match consumer.pop() {
Ok(_) => {
received += 1;
break;
}
Err(_) => thread::yield_now(),
}
}
}
received
});
producer_handle.join().unwrap();
let received = consumer_handle.join().unwrap();
assert_eq!(received, count);
}
#[test]
fn test_concurrent_with_different_speeds() {
use std::thread;
use std::time::Duration;
let (mut producer, mut consumer) = new::<u32, 64>(NonZero::new(32).unwrap());
let producer_handle = thread::spawn(move || {
for i in 0..50 {
loop {
if producer.push(i).is_ok() {
break;
}
thread::yield_now();
}
if i % 10 == 0 {
thread::sleep(Duration::from_micros(1));
}
}
});
let consumer_handle = thread::spawn(move || {
let mut received = Vec::new();
for _ in 0..50 {
loop {
match consumer.pop() {
Ok(val) => {
received.push(val);
break;
}
Err(_) => thread::yield_now(),
}
}
}
received
});
producer_handle.join().unwrap();
let received = consumer_handle.join().unwrap();
assert_eq!(received.len(), 50);
for (i, &val) in received.iter().enumerate() {
assert_eq!(val, i as u32);
}
}
#[test]
fn test_push_error_value_returned() {
let (mut producer, _consumer) = new::<String, 32>(NonZero::new(2).unwrap());
producer.push("first".to_string()).unwrap();
producer.push("second".to_string()).unwrap();
let value = "third".to_string();
match producer.push(value.clone()) {
Err(PushError::Full(returned_value)) => {
assert_eq!(returned_value, value);
}
Ok(_) => panic!("Expected PushError::Full"),
}
}
#[test]
fn test_pop_error() {
let (_producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
match consumer.pop() {
Err(PopError::Empty) => {} Ok(_) => panic!("Expected PopError::Empty"),
}
}
#[test]
fn test_is_empty_after_operations() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
assert!(consumer.is_empty());
producer.push(1).unwrap();
assert!(!consumer.is_empty());
producer.push(2).unwrap();
assert!(!consumer.is_empty());
consumer.pop().unwrap();
assert!(!consumer.is_empty());
consumer.pop().unwrap();
assert!(consumer.is_empty());
}
#[test]
fn test_slots_accuracy() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
assert_eq!(consumer.slots(), 0);
for i in 1..=10 {
producer.push(i).unwrap();
assert_eq!(consumer.slots(), i as usize);
}
for i in (0..10).rev() {
consumer.pop().unwrap();
assert_eq!(consumer.slots(), i);
}
assert_eq!(consumer.slots(), 0);
}
#[test]
fn test_slots_with_wrap_around() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(4).unwrap());
for _ in 0..100 {
for i in 0..3 {
producer.push(i).unwrap();
}
assert_eq!(consumer.slots(), 3);
for _ in 0..3 {
consumer.pop().unwrap();
}
assert_eq!(consumer.slots(), 0);
}
}
#[test]
fn test_partial_drop_cleanup() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct DropCounter {
counter: Arc<AtomicUsize>,
}
impl Drop for DropCounter {
fn drop(&mut self) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(AtomicUsize::new(0));
{
let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(16).unwrap());
for _ in 0..10 {
producer
.push(DropCounter {
counter: counter.clone(),
})
.unwrap();
}
for _ in 0..6 {
consumer.pop().unwrap();
}
assert_eq!(counter.load(Ordering::SeqCst), 6);
drop(consumer);
}
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[test]
fn test_empty_buffer_drop() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct DropCounter {
counter: Arc<AtomicUsize>,
}
impl Drop for DropCounter {
fn drop(&mut self) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(AtomicUsize::new(0));
{
let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(8).unwrap());
producer
.push(DropCounter {
counter: counter.clone(),
})
.unwrap();
consumer.pop().unwrap();
assert!(consumer.is_empty());
drop(consumer);
}
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_high_throughput() {
use std::thread;
let (mut producer, mut consumer) = new::<u64, 256>(NonZero::new(256).unwrap());
let count = 100000;
let producer_handle = thread::spawn(move || {
for i in 0..count {
loop {
if producer.push(i).is_ok() {
break;
}
thread::yield_now();
}
}
});
let consumer_handle = thread::spawn(move || {
let mut last = None;
for _ in 0..count {
loop {
match consumer.pop() {
Ok(val) => {
if let Some(prev) = last {
assert_eq!(val, prev + 1, "Values must be sequential");
}
last = Some(val);
break;
}
Err(_) => thread::yield_now(),
}
}
}
last
});
producer_handle.join().unwrap();
let last = consumer_handle.join().unwrap();
assert_eq!(last, Some(count - 1));
}
#[test]
fn test_producer_capacity_queries() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
assert_eq!(producer.capacity(), 8);
assert_eq!(producer.len(), 0);
assert_eq!(producer.slots(), 0);
assert_eq!(producer.free_slots(), 8);
assert!(!producer.is_full());
producer.push(1).unwrap();
producer.push(2).unwrap();
producer.push(3).unwrap();
assert_eq!(producer.len(), 3);
assert_eq!(producer.slots(), 3);
assert_eq!(producer.free_slots(), 5);
assert!(!producer.is_full());
producer.push(4).unwrap();
producer.push(5).unwrap();
producer.push(6).unwrap();
producer.push(7).unwrap();
producer.push(8).unwrap();
assert_eq!(producer.len(), 8);
assert_eq!(producer.slots(), 8);
assert_eq!(producer.free_slots(), 0);
assert!(producer.is_full());
consumer.pop().unwrap();
assert_eq!(producer.len(), 7);
assert_eq!(producer.free_slots(), 1);
assert!(!producer.is_full());
}
#[test]
fn test_consumer_len_and_capacity() {
let (mut producer, consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
assert_eq!(consumer.len(), 0);
assert_eq!(consumer.capacity(), 16);
for i in 0..10 {
producer.push(i).unwrap();
}
assert_eq!(consumer.len(), 10);
assert_eq!(consumer.capacity(), 16);
}
#[test]
fn test_peek() {
let (mut producer, consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
assert!(consumer.peek().is_none());
producer.push(42).unwrap();
producer.push(100).unwrap();
producer.push(200).unwrap();
assert_eq!(consumer.peek(), Some(&42));
assert_eq!(consumer.peek(), Some(&42)); assert_eq!(consumer.len(), 3); }
#[test]
fn test_peek_after_pop() {
let (mut producer, mut consumer) = new::<String, 32>(NonZero::new(8).unwrap());
producer.push("first".to_string()).unwrap();
producer.push("second".to_string()).unwrap();
producer.push("third".to_string()).unwrap();
assert_eq!(consumer.peek(), Some(&"first".to_string()));
consumer.pop().unwrap();
assert_eq!(consumer.peek(), Some(&"second".to_string()));
consumer.pop().unwrap();
assert_eq!(consumer.peek(), Some(&"third".to_string()));
consumer.pop().unwrap();
assert!(consumer.peek().is_none());
}
#[test]
fn test_push_slice_basic() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
let data = [1, 2, 3, 4, 5];
let pushed = producer.push_slice(&data);
assert_eq!(pushed, 5);
assert_eq!(consumer.len(), 5);
for i in 0..5 {
assert_eq!(consumer.pop().unwrap(), data[i]);
}
}
#[test]
fn test_push_slice_partial() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
let initial = [1, 2, 3, 4, 5];
producer.push_slice(&initial);
let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
let pushed = producer.push_slice(&more);
assert_eq!(pushed, 3);
assert_eq!(consumer.len(), 8);
assert!(producer.is_full());
for i in 1..=8 {
assert_eq!(consumer.pop().unwrap(), i);
}
}
#[test]
fn test_push_slice_wrap_around() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
producer.push_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
for _ in 0..5 {
consumer.pop().unwrap();
}
let data = [10, 11, 12, 13, 14];
let pushed = producer.push_slice(&data);
assert_eq!(pushed, 5);
assert_eq!(consumer.pop().unwrap(), 6);
assert_eq!(consumer.pop().unwrap(), 7);
assert_eq!(consumer.pop().unwrap(), 8);
assert_eq!(consumer.pop().unwrap(), 10);
assert_eq!(consumer.pop().unwrap(), 11);
assert_eq!(consumer.pop().unwrap(), 12);
assert_eq!(consumer.pop().unwrap(), 13);
assert_eq!(consumer.pop().unwrap(), 14);
}
#[test]
fn test_push_slice_empty() {
let (mut producer, _consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
let pushed = producer.push_slice(&[]);
assert_eq!(pushed, 0);
}
#[test]
fn test_pop_slice_basic() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
for i in 0..10 {
producer.push(i).unwrap();
}
let mut dest = [0u32; 5];
let popped = consumer.pop_slice(&mut dest);
assert_eq!(popped, 5);
assert_eq!(dest, [0, 1, 2, 3, 4]);
assert_eq!(consumer.len(), 5);
}
#[test]
fn test_pop_slice_partial() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
producer.push(1).unwrap();
producer.push(2).unwrap();
producer.push(3).unwrap();
let mut dest = [0u32; 10];
let popped = consumer.pop_slice(&mut dest);
assert_eq!(popped, 3);
assert_eq!(&dest[0..3], &[1, 2, 3]);
assert!(consumer.is_empty());
}
#[test]
fn test_pop_slice_wrap_around() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
producer.push_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
let mut temp = [0u32; 5];
let popped = consumer.pop_slice(&mut temp);
assert_eq!(popped, 5);
assert_eq!(temp, [1, 2, 3, 4, 5]);
let pushed = producer.push_slice(&[9, 10, 11, 12, 13]);
assert_eq!(pushed, 5);
let mut dest1 = [0u32; 3];
let popped1 = consumer.pop_slice(&mut dest1);
assert_eq!(popped1, 3);
assert_eq!(dest1, [6, 7, 8]);
let mut dest2 = [0u32; 5];
let popped2 = consumer.pop_slice(&mut dest2);
assert_eq!(popped2, 5);
assert_eq!(dest2, [9, 10, 11, 12, 13]);
}
#[test]
fn test_pop_slice_empty() {
let (_producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
let mut dest = [0u32; 5];
let popped = consumer.pop_slice(&mut dest);
assert_eq!(popped, 0);
}
#[test]
fn test_clear() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
for i in 0..10 {
producer.push(i).unwrap();
}
assert_eq!(consumer.len(), 10);
consumer.clear();
assert_eq!(consumer.len(), 0);
assert!(consumer.is_empty());
}
#[test]
fn test_clear_with_drop() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct DropCounter {
counter: Arc<AtomicUsize>,
}
impl Drop for DropCounter {
fn drop(&mut self) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(AtomicUsize::new(0));
{
let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(16).unwrap());
for _ in 0..8 {
producer
.push(DropCounter {
counter: counter.clone(),
})
.unwrap();
}
assert_eq!(counter.load(Ordering::SeqCst), 0);
consumer.clear();
assert_eq!(counter.load(Ordering::SeqCst), 8);
}
}
#[test]
fn test_drain_iterator() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
for i in 0..10 {
producer.push(i).unwrap();
}
let collected: Vec<i32> = consumer.drain().collect();
assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert!(consumer.is_empty());
}
#[test]
fn test_drain_empty() {
let (_producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
let collected: Vec<i32> = consumer.drain().collect();
assert!(collected.is_empty());
}
#[test]
fn test_drain_size_hint() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
for i in 0..5 {
producer.push(i).unwrap();
}
let mut drain = consumer.drain();
assert_eq!(drain.size_hint(), (5, Some(5)));
drain.next();
assert_eq!(drain.size_hint(), (4, Some(4)));
drain.next();
assert_eq!(drain.size_hint(), (3, Some(3)));
}
#[test]
fn test_drain_partial() {
let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
for i in 0..10 {
producer.push(i).unwrap();
}
let mut drain = consumer.drain();
assert_eq!(drain.next(), Some(0));
assert_eq!(drain.next(), Some(1));
assert_eq!(drain.next(), Some(2));
drop(drain);
assert_eq!(consumer.len(), 7);
}
#[test]
fn test_combined_operations() {
let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
let data = [1, 2, 3, 4, 5];
producer.push_slice(&data);
assert_eq!(producer.len(), 5);
assert_eq!(consumer.len(), 5);
assert_eq!(consumer.capacity(), 16);
assert_eq!(consumer.peek(), Some(&1));
let mut dest = [0u32; 3];
consumer.pop_slice(&mut dest);
assert_eq!(dest, [1, 2, 3]);
assert_eq!(consumer.len(), 2);
assert_eq!(producer.free_slots(), 14);
consumer.clear();
assert!(consumer.is_empty());
assert!(!producer.is_full());
}
}