use alloc::{boxed::Box, sync::Arc};
use core::{fmt::Debug, sync::atomic};
const BUFFER_SIZE: usize = 1024;
mod node;
use node::NodeState;
mod bufferlist;
use bufferlist::BufferList;
#[cfg(feature = "async")]
mod async_queue;
#[cfg(feature = "async")]
pub use async_queue::*;
use crate::queues::{DequeueError, EnqueueError};
pub struct Sender<T> {
closed: Arc<atomic::AtomicBool>,
tail: atomic::AtomicUsize,
tail_of_queue: atomic::AtomicPtr<BufferList<T>>,
}
pub struct Receiver<T> {
closed: Arc<atomic::AtomicBool>,
head_of_queue: *mut BufferList<T>,
}
fn close_side<T, F>(closed: &atomic::AtomicBool, get_ptr: F)
where
F: Fn() -> *mut BufferList<T>,
{
match closed.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
) {
Ok(_) => {}
Err(_) => {
let buffer_list_ptr = get_ptr();
BufferList::deallocate_all(buffer_list_ptr);
}
};
}
impl<T> Sender<T> {
pub fn is_closed(&self) -> bool {
self.closed.load(atomic::Ordering::Acquire)
}
pub fn enqueue(&self, data: T) -> Result<(), (T, EnqueueError)> {
if self.is_closed() {
return Err((data, EnqueueError::Closed));
}
let location = self.tail.fetch_add(1, atomic::Ordering::AcqRel);
let mut tmp_buffer_ptr = self.tail_of_queue.load(atomic::Ordering::Acquire);
let mut tmp_buffer = unsafe { &*tmp_buffer_ptr };
let mut end = tmp_buffer.position_in_queue * BUFFER_SIZE;
while location >= end {
tmp_buffer_ptr = tmp_buffer.go_to_next(tmp_buffer_ptr, &self.tail_of_queue);
tmp_buffer = unsafe { &*tmp_buffer_ptr };
end = tmp_buffer.position_in_queue * BUFFER_SIZE;
}
let mut start = (tmp_buffer.position_in_queue - 1) * BUFFER_SIZE;
let mut last_buffer = true;
while location < start {
tmp_buffer_ptr = tmp_buffer.previous as *mut BufferList<T>;
tmp_buffer = unsafe { &*tmp_buffer_ptr };
last_buffer = false;
start = (tmp_buffer.position_in_queue - 1) * BUFFER_SIZE;
}
let index = location - start;
unsafe { tmp_buffer.buffer.get_unchecked(index) }.store(data);
if last_buffer && index == 2 {
tmp_buffer.allocate_next(tmp_buffer_ptr, &self.tail_of_queue);
}
Ok(())
}
}
impl<T> Debug for Sender<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Sender ()")
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
close_side(&self.closed, || {
self.tail_of_queue.load(atomic::Ordering::Acquire)
});
}
}
impl<T> Receiver<T> {
pub fn is_closed(&self) -> bool {
self.closed.load(atomic::Ordering::Acquire)
}
fn move_to_next_buffer(&mut self) -> bool {
let current_queue_ptr = self.head_of_queue;
let current_queue = unsafe { &*current_queue_ptr };
if current_queue.head >= BUFFER_SIZE {
let next_ptr = current_queue.next.load(atomic::Ordering::Acquire);
if next_ptr.is_null() {
return false;
}
self.head_of_queue = next_ptr;
drop(unsafe { Box::from_raw(current_queue_ptr) });
let next = unsafe { &mut *self.head_of_queue };
next.previous = core::ptr::null();
}
true
}
pub fn try_dequeue(&mut self) -> Result<T, DequeueError> {
let mut current_queue = unsafe { &mut *self.head_of_queue };
let mut n = match current_queue.buffer.get(current_queue.head) {
Some(n) => n,
None => {
self.move_to_next_buffer();
current_queue = unsafe { &mut *self.head_of_queue };
match current_queue.buffer.get(current_queue.head) {
Some(n) => n,
None => return Err(DequeueError::Empty),
}
}
};
while n.get_state() == NodeState::Handled {
current_queue.head += 1;
if !self.move_to_next_buffer() {
return Err(DequeueError::Empty);
}
current_queue = unsafe { &mut *self.head_of_queue };
n = match current_queue.buffer.get(current_queue.head) {
Some(n) => n,
None => {
self.move_to_next_buffer();
current_queue = unsafe { &mut *self.head_of_queue };
match current_queue.buffer.get(current_queue.head) {
Some(t) => t,
None => return Err(DequeueError::Empty),
}
}
};
}
match n.get_state() {
NodeState::Set => {
let data = n
.load()
.expect("Data should be loadable and node shoudl be Set");
current_queue.head += 1;
self.move_to_next_buffer();
Ok(data)
}
NodeState::Empty => {
let tmp_head_of_queue = unsafe { &*self.head_of_queue };
let tmp_head = tmp_head_of_queue.head;
let (tmp_head_of_queue, tmp_head) = {
let (mut n_queue, result) = BufferList::scan(self.head_of_queue, tmp_head);
let n_head = match result {
Some(n) => n,
None => {
if self.is_closed() {
let (t_queue, t_result) =
BufferList::scan(self.head_of_queue, tmp_head);
match t_result {
Some(n) => {
n_queue = t_queue;
n
}
None => return Err(DequeueError::Closed),
}
} else {
return Err(DequeueError::Empty);
}
}
};
(unsafe { &*n_queue }, n_head)
};
let tmp_n = match tmp_head_of_queue.buffer.get(tmp_head) {
Some(n) => n,
None => return Err(DequeueError::Empty),
};
let data = tmp_n
.load()
.expect("Data should be loadable and node shoudl be Set");
Ok(data)
}
_ => Err(DequeueError::Empty),
}
}
pub fn dequeue(&mut self) -> Option<T> {
loop {
match self.try_dequeue() {
Ok(d) => return Some(d),
Err(e) => match e {
DequeueError::Empty => {}
DequeueError::Closed => return None,
},
};
}
}
pub fn iter_mut<'queue, 'iter>(&'queue mut self) -> RefIter<'iter, T>
where
'queue: 'iter,
{
self.into_iter()
}
}
mod owned_iter;
pub use owned_iter::OwnedIter;
impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = OwnedIter<T>;
fn into_iter(self) -> Self::IntoIter {
OwnedIter::new(self)
}
}
mod ref_iter;
pub use ref_iter::RefIter;
impl<'queue, T> IntoIterator for &'queue mut Receiver<T> {
type Item = T;
type IntoIter = RefIter<'queue, T>;
fn into_iter(self) -> Self::IntoIter {
RefIter::new(self)
}
}
unsafe impl<T> Send for Receiver<T> {}
unsafe impl<T> Sync for Receiver<T> {}
impl<T> Debug for Receiver<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Receiver ()")
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
close_side(&self.closed, || {
let mut current_ptr = self.head_of_queue;
let mut current = unsafe { &*current_ptr };
loop {
let next_ptr = current.next.load(atomic::Ordering::SeqCst);
if next_ptr.is_null() {
return current_ptr;
}
current_ptr = next_ptr;
current = unsafe { &*current_ptr };
}
});
}
}
pub fn queue<T>() -> (Receiver<T>, Sender<T>) {
let initial_buffer = BufferList::boxed(core::ptr::null(), 1);
let initial_ptr = Box::into_raw(initial_buffer);
let tail = atomic::AtomicUsize::new(0);
let tail_of_queue = atomic::AtomicPtr::new(initial_ptr);
let closed = Arc::new(atomic::AtomicBool::new(false));
(
Receiver {
closed: closed.clone(),
head_of_queue: initial_ptr,
},
Sender {
closed,
tail,
tail_of_queue,
},
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dequeue_empty() {
let (mut rx, tx) = queue::<u8>();
assert_eq!(Err(DequeueError::Empty), rx.try_dequeue());
drop(tx);
}
#[test]
fn enqueue_one() {
let (rx, tx) = queue();
tx.enqueue(13).unwrap();
drop(rx);
}
#[test]
fn enqueue_dequeue() {
let (mut rx, tx) = queue();
tx.enqueue(13).unwrap();
assert_eq!(Ok(13), rx.try_dequeue());
}
#[test]
fn enqueue_fill_one_buffer() {
let (mut rx, tx) = queue();
let elements = BUFFER_SIZE + 2;
for i in 0..elements {
tx.enqueue(i).unwrap();
}
for i in 0..elements {
assert_eq!(Ok(i), rx.try_dequeue());
}
}
#[test]
fn fill_mulitple_buffers() {
let (mut rx, tx) = queue();
let elements = BUFFER_SIZE * 5;
for i in 0..elements {
tx.enqueue(i).unwrap();
}
for i in 0..elements {
assert_eq!(Ok(i), rx.try_dequeue());
}
tx.enqueue(13).unwrap();
assert_eq!(Ok(13), rx.try_dequeue());
}
#[test]
fn enqueue_closed() {
let (rx, tx) = queue();
drop(rx);
assert_eq!(Err((13, EnqueueError::Closed)), tx.enqueue(13));
}
#[test]
fn dequeue_closed() {
let (mut rx, tx) = queue::<usize>();
drop(tx);
assert_eq!(Err(DequeueError::Closed), rx.try_dequeue());
}
#[test]
fn enqueue_dequeue_closed() {
let (mut rx, tx) = queue::<usize>();
tx.enqueue(13).unwrap();
drop(tx);
assert_eq!(Ok(13), rx.try_dequeue());
assert_eq!(Err(DequeueError::Closed), rx.try_dequeue());
}
#[test]
fn enqueue_some_close() {
let (rx, tx) = queue::<usize>();
for index in 0..10 {
tx.enqueue(index).unwrap();
}
drop(tx);
drop(rx);
}
#[test]
fn iter_mut() {
let (mut rx, tx) = queue::<usize>();
tx.enqueue(13).unwrap();
drop(tx);
let mut iter = (&mut rx).into_iter();
assert_eq!(Some(13), iter.next());
assert_eq!(None, iter.next());
assert!(rx.is_closed());
}
}