use crate::atomic_circular_buffer::AtomicCircularBuffer;
use crate::buffer::RPBuffer;
use crate::queue::RPQueue;
use std::sync::Arc;
use std::time::Duration;
pub struct AtomicQueue<'a, T> {
buffer: AtomicCircularBuffer<'a, T>,
}
impl<'a, T> AtomicQueue<'a, T> {
pub fn new(size: usize, f: impl FnMut() -> T) -> Self {
let buf = AtomicQueue::<T>::allocate(size, f);
return AtomicQueue { buffer: buf };
}
pub fn new_arc(size: usize, f: impl FnMut() -> T) -> Arc<Self> {
let q = AtomicQueue::new(size, f);
return Arc::new(q);
}
fn allocate(size: usize, mut f: impl FnMut() -> T) -> AtomicCircularBuffer<'a, T> {
let mut buf: AtomicCircularBuffer<T> = AtomicCircularBuffer::new(size);
for _ in 0..size {
buf.add(f());
}
return buf;
}
}
impl<'a, T> RPQueue<'a, T> for AtomicQueue<'a, T> {
#[inline]
fn capacity(&self) -> usize {
return self.buffer.capacity();
}
#[inline]
fn available(&self) -> usize {
return self.buffer.available();
}
#[inline]
fn take(&self) -> Option<&'a mut T> {
return self.buffer.take();
}
#[inline]
fn take_wait(&self, _timeout: Duration) -> Option<&'a mut T> {
return self.take();
}
#[inline]
fn offer(&self, item: &'a T) -> usize {
return self.buffer.offer(item);
}
}
unsafe impl<'a, T> Sync for AtomicQueue<'a, T> {}
unsafe impl<'a, T> Send for AtomicQueue<'a, T> {}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn test_aq_take_offer() {
let mut count = 0;
let multiplier = 3;
let mut queue_items = HashMap::new();
let create_fn = || {
count = count + 1;
queue_items.insert(count, 0);
return count;
};
let queue = AtomicQueue::new_arc(11, create_fn);
for _ in 0..count * multiplier {
let item = queue.take().unwrap();
let current = queue_items[item];
queue_items.insert(*item, current + 1);
queue.offer(item);
}
for _ in 0..count {
let item = queue.take().unwrap();
let current = queue_items.remove(item);
assert_eq!(multiplier, current.unwrap());
}
assert_eq!(0, queue_items.len());
}
}