use alloc::{sync::Arc, vec::Vec};
use core::{fmt::Debug, sync::atomic};
use crate::queues::{DequeueError, EnqueueError};
#[cfg(feature = "async")]
mod async_queue;
#[cfg(feature = "async")]
pub use async_queue::*;
mod node;
use node::Node;
pub struct BoundedSender<T> {
closed: Arc<atomic::AtomicBool>,
head: usize,
buffer: Arc<Vec<Node<T>>>,
}
pub struct BoundedReceiver<T> {
closed: Arc<atomic::AtomicBool>,
tail: usize,
buffer: Arc<Vec<Node<T>>>,
}
#[inline(always)]
const fn next_element(current: usize, length: usize) -> usize {
let target = current + 1;
if target >= length {
0
} else {
target
}
}
impl<T> BoundedSender<T> {
pub fn is_closed(&self) -> bool {
self.closed.load(atomic::Ordering::Acquire)
}
pub fn try_enqueue(&mut self, data: T) -> Result<(), (T, EnqueueError)> {
if self.is_closed() {
return Err((data, EnqueueError::Closed));
}
let buffer_entry = unsafe { self.buffer.get_unchecked(self.head) };
if buffer_entry.is_set() {
return Err((data, EnqueueError::Full));
}
buffer_entry.store(data);
self.head = next_element(self.head, self.buffer.len());
Ok(())
}
pub fn enqueue(&mut self, mut data: T) -> Result<(), (T, EnqueueError)> {
loop {
match self.try_enqueue(data) {
Ok(_) => return Ok(()),
Err((d, e)) => match e {
EnqueueError::Full => {
data = d;
}
EnqueueError::Closed => return Err((d, EnqueueError::Closed)),
},
};
}
}
pub fn is_full(&self) -> bool {
self.buffer[self.head].is_set()
}
}
impl<T> Debug for BoundedSender<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "BoundedSender ()")
}
}
impl<T> Drop for BoundedSender<T> {
fn drop(&mut self) {
self.closed.store(true, atomic::Ordering::Release);
}
}
unsafe impl<T> Send for BoundedSender<T> {}
unsafe impl<T> Sync for BoundedSender<T> {}
impl<T> BoundedReceiver<T> {
pub fn is_closed(&self) -> bool {
self.closed.load(atomic::Ordering::Acquire)
}
pub fn try_dequeue(&mut self) -> Result<T, DequeueError> {
let buffer_entry = unsafe { self.buffer.get_unchecked(self.tail) };
if !buffer_entry.is_set() {
if self.is_closed() {
if !buffer_entry.is_set() {
return Err(DequeueError::Closed);
}
}
return Err(DequeueError::Empty);
}
let data = buffer_entry.load();
self.tail = next_element(self.tail, self.buffer.len());
Ok(data)
}
pub fn dequeue(&mut self) -> Option<T> {
loop {
match self.try_dequeue() {
Ok(d) => return Some(d),
Err(e) => match e {
DequeueError::Empty => {}
DequeueError::Closed => return None,
},
};
}
}
pub fn is_empty(&self) -> bool {
!self.buffer[self.tail].is_set()
}
}
impl<T> Debug for BoundedReceiver<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "BoundedReceiver ()")
}
}
impl<T> Drop for BoundedReceiver<T> {
fn drop(&mut self) {
self.closed.store(true, atomic::Ordering::Release);
}
}
unsafe impl<T> Send for BoundedReceiver<T> {}
unsafe impl<T> Sync for BoundedReceiver<T> {}
pub fn queue<T>(capacity: usize) -> (BoundedReceiver<T>, BoundedSender<T>) {
let mut raw_buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
raw_buffer.push(Node::new());
}
let closed = Arc::new(atomic::AtomicBool::new(false));
let buffer = Arc::new(raw_buffer);
(
BoundedReceiver {
closed: closed.clone(),
buffer: buffer.clone(),
tail: 0,
},
BoundedSender {
closed,
buffer,
head: 0,
},
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn enqueue_dequeue() {
let (mut rx, mut tx) = queue(10);
assert_eq!(Ok(()), tx.try_enqueue(13));
assert_eq!(Ok(13), rx.try_dequeue());
}
#[test]
fn enqueue_will_block() {
let (rx, mut tx) = queue(1);
assert_eq!(Ok(()), tx.try_enqueue(13));
assert_eq!(Err((14, EnqueueError::Full)), tx.try_enqueue(14));
drop(rx);
}
#[test]
fn dequeue_will_block() {
let (mut rx, tx) = queue::<usize>(1);
assert_eq!(Err(DequeueError::Empty), rx.try_dequeue());
drop(tx);
}
#[test]
fn enqueue_dequeue_full_buffer() {
let (mut rx, mut tx) = queue(3);
for i in 0..4 {
assert_eq!(Ok(()), tx.try_enqueue(i));
assert_eq!(Ok(i), rx.try_dequeue());
}
}
#[test]
fn enqueue_is_closed() {
let (rx, mut tx) = queue(3);
drop(rx);
assert_eq!(Err((13, EnqueueError::Closed)), tx.try_enqueue(13));
}
#[test]
fn dequeue_is_closed() {
let (mut rx, tx) = queue::<usize>(3);
drop(tx);
assert_eq!(Err(DequeueError::Closed), rx.try_dequeue());
}
#[test]
fn enqueue_dequeue_is_closed() {
let (mut rx, mut tx) = queue::<usize>(3);
tx.try_enqueue(13).unwrap();
drop(tx);
assert_eq!(Ok(13), rx.try_dequeue());
assert_eq!(Err(DequeueError::Closed), rx.try_dequeue());
}
#[test]
fn blocking_enqueue_closed() {
let (rx, mut tx) = queue::<usize>(3);
drop(rx);
assert_eq!(Err((13, EnqueueError::Closed)), tx.enqueue(13));
}
#[test]
fn blocking_dequeue_closed() {
let (mut rx, tx) = queue::<usize>(3);
drop(tx);
assert_eq!(None, rx.dequeue());
}
#[test]
fn is_empty() {
let (mut rx, mut tx) = queue::<usize>(3);
assert!(rx.is_empty());
tx.try_enqueue(13).unwrap();
assert!(!rx.is_empty());
rx.try_dequeue().unwrap();
assert!(rx.is_empty());
}
#[test]
fn is_full() {
let (mut rx, mut tx) = queue::<usize>(1);
assert!(!tx.is_full());
tx.try_enqueue(13).unwrap();
assert!(tx.is_full());
rx.try_dequeue().unwrap();
assert!(!tx.is_full());
}
}