use crate::sync::atomic;
use std::{cell::UnsafeCell, mem::MaybeUninit, sync::Arc};
use crate::queues::{DequeueError, EnqueueError};
mod scq;
pub struct BoundedQueue<T> {
data: Arc<Vec<UnsafeCell<MaybeUninit<T>>>>,
pub aq: Arc<scq::Queue>,
fq: Arc<scq::Queue>,
pub next: atomic::AtomicPtr<Self>,
}
pub fn new_queue<T>(capacity: usize) -> BoundedQueue<T> {
let data = {
let mut tmp = Vec::with_capacity(capacity);
for _ in 0..capacity {
tmp.push(UnsafeCell::new(MaybeUninit::uninit()));
}
Arc::new(tmp)
};
let aq = scq::Queue::new(capacity);
let fq = scq::Queue::new(capacity);
for index in 0..capacity {
fq.enqueue(index).expect("Works as expected");
}
let aq_arc = Arc::new(aq);
let fq_arc = Arc::new(fq);
BoundedQueue {
data,
aq: aq_arc,
fq: fq_arc,
next: atomic::AtomicPtr::new(std::ptr::null_mut()),
}
}
unsafe impl<T> Sync for BoundedQueue<T> {}
unsafe impl<T> Send for BoundedQueue<T> where T: Send {}
impl<T> BoundedQueue<T> {
pub fn try_enqueue(&self, data: T) -> Result<(), (EnqueueError, T)> {
let index = match self.fq.dequeue() {
Some(i) => i,
None => {
self.aq.finalize();
return Err((EnqueueError::Full, data));
}
};
let bucket = self
.data
.get(index)
.expect("The received Index should always be in the Bounds of the Data Buffer");
let bucket_ptr = bucket.get();
unsafe { bucket_ptr.write(MaybeUninit::new(data)) };
match self.aq.enqueue(index) {
Ok(_) => Ok(()),
Err(_) => {
let bucket_ptr = bucket.get();
let old = unsafe { bucket_ptr.replace(MaybeUninit::uninit()).assume_init() };
Err((EnqueueError::Full, old))
}
}
}
pub fn dequeue(&self) -> Result<T, DequeueError> {
let index = match self.aq.dequeue() {
Some(i) => i,
None => return Err(DequeueError::Empty),
};
let bucket = self
.data
.get(index)
.expect("The received Index should always be in the Bounds of the Data-Buffer");
let bucket_ptr = bucket.get();
let data = unsafe { bucket_ptr.replace(MaybeUninit::uninit()).assume_init() };
self.fq.enqueue(index).expect("");
Ok(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn enqueue_finalize() {
let queue = new_queue(10);
for index in 0..10 {
queue
.try_enqueue(index)
.expect("Queue has enough capacity to place all the Elements into it");
}
assert_eq!(Err((EnqueueError::Full, 0)), queue.try_enqueue(0));
queue.dequeue().expect("The Queue contains elements");
assert_eq!(Err((EnqueueError::Full, 0)), queue.try_enqueue(0));
}
}