mod d_spsc;
use alloc::sync::Arc;
use core::{fmt::Debug, sync::atomic};
use super::bounded;
use crate::queues::{DequeueError, EnqueueError};
#[cfg(feature = "async")]
mod async_queue;
#[cfg(feature = "async")]
pub use async_queue::*;
pub struct UnboundedSender<T> {
closed: Arc<atomic::AtomicBool>,
buffer_size: usize,
buf_w: bounded::BoundedSender<T>,
inuse_sender: d_spsc::UnboundedSender<bounded::BoundedReceiver<T>>,
}
impl<T> UnboundedSender<T> {
pub fn is_closed(&self) -> bool {
self.closed.load(atomic::Ordering::Acquire)
}
fn next_w(&mut self) -> bounded::BoundedSender<T> {
let (rx, tx) = bounded::queue(self.buffer_size);
self.inuse_sender.enqueue(rx).unwrap();
tx
}
pub fn enqueue(&mut self, data: T) -> Result<(), (T, EnqueueError)> {
if self.is_closed() {
return Err((data, EnqueueError::Closed));
}
if let Err((data, _)) = self.buf_w.try_enqueue(data) {
self.buf_w = self.next_w();
if self.buf_w.try_enqueue(data).is_err() {
panic!("The new Buffer is always empty");
}
}
Ok(())
}
}
impl<T> Debug for UnboundedSender<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "UnboundedSender ()")
}
}
impl<T> Drop for UnboundedSender<T> {
fn drop(&mut self) {
self.closed.store(true, atomic::Ordering::Release);
}
}
unsafe impl<T> Send for UnboundedSender<T> {}
unsafe impl<T> Sync for UnboundedSender<T> {}
pub struct UnboundedReceiver<T> {
closed: Arc<atomic::AtomicBool>,
buf_r: bounded::BoundedReceiver<T>,
inuse_recv: d_spsc::UnboundedReceiver<bounded::BoundedReceiver<T>>,
}
impl<T> UnboundedReceiver<T> {
pub fn is_closed(&self) -> bool {
self.buf_r.is_closed() && !self.inuse_recv.has_next()
}
pub fn try_dequeue(&mut self) -> Result<T, DequeueError> {
match self.buf_r.try_dequeue() {
Ok(d) => Ok(d),
Err(DequeueError::Empty) => Err(DequeueError::Empty),
Err(DequeueError::Closed) => match self.inuse_recv.try_dequeue() {
Ok(n_queue) => {
self.buf_r = n_queue;
self.buf_r.try_dequeue()
}
Err(_) => Err(DequeueError::Closed),
},
}
}
pub fn dequeue(&mut self) -> Option<T> {
loop {
match self.try_dequeue() {
Ok(d) => return Some(d),
Err(e) => match e {
DequeueError::Empty => {}
DequeueError::Closed => return None,
},
};
}
}
}
impl<T> Debug for UnboundedReceiver<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "UnboundedReceiver ()")
}
}
impl<T> Drop for UnboundedReceiver<T> {
fn drop(&mut self) {
self.closed.store(true, atomic::Ordering::Release);
}
}
unsafe impl<T> Send for UnboundedReceiver<T> {}
unsafe impl<T> Sync for UnboundedReceiver<T> {}
pub fn queue<T>() -> (UnboundedReceiver<T>, UnboundedSender<T>) {
let buffer_size = 64;
let (inuse_rx, inuse_tx) = d_spsc::unbounded_basic_queue();
let (initial_rx, initial_tx) = bounded::queue(buffer_size);
let closed = Arc::new(atomic::AtomicBool::new(false));
(
UnboundedReceiver {
closed: closed.clone(),
buf_r: initial_rx,
inuse_recv: inuse_rx,
},
UnboundedSender {
closed,
buffer_size,
buf_w: initial_tx,
inuse_sender: inuse_tx,
},
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn enqueue_dequeue() {
let (mut rx, mut tx) = queue();
tx.enqueue(13).unwrap();
assert_eq!(Ok(13), rx.try_dequeue());
}
#[test]
fn multi_buffer() {
let (mut rx, mut tx) = queue();
tx.enqueue(13).unwrap();
tx.enqueue(14).unwrap();
tx.enqueue(15).unwrap();
assert_eq!(Ok(13), rx.try_dequeue());
assert_eq!(Ok(14), rx.try_dequeue());
assert_eq!(Ok(15), rx.try_dequeue());
}
#[test]
fn enqueue_closed() {
let (rx, mut tx) = queue();
drop(rx);
assert_eq!(Err((13, EnqueueError::Closed)), tx.enqueue(13));
}
#[test]
fn dequeue_closed() {
let (mut rx, tx) = queue::<usize>();
drop(tx);
assert_eq!(Err(DequeueError::Closed), rx.try_dequeue());
}
}