use alloc::boxed::Box;
use core::{marker::PhantomData, ptr::null_mut};
use crossbeam_utils::CachePadded;
use crate::{
Growable,
MPMCQueue,
core::{
AsPackedValue,
queue::QueueCore,
slots::{Auto, SlotType},
},
growable::NewSized,
owned::buffer::BoxedBuffer,
sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
utils::Backoff,
};
pub(crate) struct GrowableQueueCore<T, Q, S = Auto> {
cores: [AtomicPtr<Q>; 2],
push_epoch: CachePadded<AtomicUsize>,
pop_epoch: CachePadded<AtomicUsize>,
active_pushes: CachePadded<[AtomicUsize; 2]>,
active_reads: CachePadded<[AtomicUsize; 2]>,
is_resizing: AtomicBool,
_slot: PhantomData<(S, T)>,
}
impl<T, Q> GrowableQueueCore<T, Q, Auto>
where
Q: NewSized,
{
pub(crate) fn with_slot<S>(size: usize) -> GrowableQueueCore<T, Q, S> {
GrowableQueueCore {
cores: [
AtomicPtr::new(Box::into_raw(Box::new(Q::with_size(size)))),
AtomicPtr::new(Box::into_raw(Box::new(Q::with_size(1)))),
],
active_pushes: [AtomicUsize::new(0), AtomicUsize::new(0)].into(),
active_reads: [AtomicUsize::new(0), AtomicUsize::new(0)].into(),
push_epoch: AtomicUsize::new(0).into(),
pop_epoch: AtomicUsize::new(0).into(),
is_resizing: AtomicBool::new(false),
_slot: PhantomData,
}
}
}
impl<T, Q, S> Drop for GrowableQueueCore<T, Q, S> {
fn drop(&mut self) {
let left = self.cores[0].swap(null_mut(), Ordering::Acquire);
_ = unsafe { Box::from_raw(left) };
let right = self.cores[1].swap(null_mut(), Ordering::Acquire);
_ = unsafe { Box::from_raw(right) };
}
}
impl<T, Q, S> Growable for GrowableQueueCore<T, Q, S>
where
Q: NewSized + MPMCQueue<Item = T>,
{
fn grow_by(&self, by: usize) -> bool {
let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
let push_epoch = self.push_epoch.load(Ordering::Acquire);
if pop_epoch != push_epoch {
return false;
}
if self.active_reads[(push_epoch + 1) % 2].load(Ordering::Acquire) != 0 {
return false;
}
if self.is_resizing.swap(true, Ordering::AcqRel) {
return false;
}
if self.push_epoch.load(Ordering::Acquire) != push_epoch {
self.is_resizing.store(false, Ordering::Release);
return false;
}
let old_idx = (push_epoch + 1) % 2;
let mut backoff = Backoff::new();
while self.active_reads[old_idx].load(Ordering::Acquire) != 0 {
backoff.backoff();
}
debug_assert_eq!(
self.active_pushes[(push_epoch + 1) % 2].load(Ordering::SeqCst),
0
);
let new_queue = Box::into_raw(Box::new(Q::with_size(self.capacity() + by)));
let old_queue = self.cores[(push_epoch + 1) % 2].swap(new_queue, Ordering::AcqRel);
self.push_epoch.fetch_add(1, Ordering::Release);
let q = unsafe { Box::from_raw(old_queue) };
debug_assert!(q.pop().is_none());
self.is_resizing.store(false, Ordering::Release);
true
}
}
impl<T, Q, S> GrowableQueueCore<T, Q, S> {
fn get_queue(&self, epoch: usize) -> &Q {
let queue = self.cores[epoch % 2].load(Ordering::Acquire);
unsafe { &*queue }
}
fn register_reader(&self, target_epoch: usize) -> bool {
self.active_reads[target_epoch % 2].fetch_add(1, Ordering::Release);
let current_push = self.push_epoch.load(Ordering::SeqCst);
let current_pop = self.pop_epoch.load(Ordering::SeqCst);
if target_epoch != current_push && target_epoch != current_pop {
self.deregister_reader(target_epoch);
return false;
}
true
}
fn deregister_reader(&self, epoch: usize) {
self.active_reads[epoch % 2].fetch_sub(1, Ordering::Release);
}
}
impl<T, Q, S> MPMCQueue for GrowableQueueCore<T, Q, S>
where
Q: MPMCQueue<Item = T>,
{
type Item = T;
fn push(&self, item: Self::Item) -> Result<(), Self::Item> {
loop {
let push_epoch = self.push_epoch.load(Ordering::Acquire);
self.active_pushes[push_epoch % 2].fetch_add(1, Ordering::Release);
if self.push_epoch.load(Ordering::SeqCst) == push_epoch {
let r = self.get_queue(push_epoch).push(item);
self.active_pushes[push_epoch % 2].fetch_sub(1, Ordering::Release);
return r;
}
self.active_pushes[push_epoch % 2].fetch_sub(1, Ordering::Release);
}
}
fn pop(&self) -> Option<Self::Item> {
loop {
let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
let push_epoch = self.push_epoch.load(Ordering::Acquire);
if pop_epoch != push_epoch {
if !self.register_reader(pop_epoch) {
continue;
}
let item = self.get_queue(pop_epoch).pop();
self.deregister_reader(pop_epoch);
if item.is_some() {
return item;
}
if self.active_pushes[pop_epoch % 2].load(Ordering::Acquire) == 0 {
if !self.register_reader(pop_epoch) {
continue;
}
let final_item = self.get_queue(pop_epoch).pop();
self.deregister_reader(pop_epoch);
if final_item.is_some() {
return final_item;
}
_ = self.pop_epoch.compare_exchange_weak(
pop_epoch,
pop_epoch + 1,
Ordering::AcqRel,
Ordering::Relaxed,
);
}
continue;
}
if !self.register_reader(push_epoch) {
continue;
}
let item = self.get_queue(push_epoch).pop();
self.deregister_reader(push_epoch);
return item;
}
}
fn capacity(&self) -> usize {
loop {
let push_epoch = self.push_epoch.load(Ordering::Acquire);
if !self.register_reader(push_epoch) {
continue;
}
let cap = self.get_queue(push_epoch).capacity();
self.deregister_reader(push_epoch);
return cap;
}
}
fn len(&self) -> usize {
loop {
let push_epoch = self.push_epoch.load(Ordering::Acquire);
if !self.register_reader(push_epoch) {
continue;
}
let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
let pop_len = if pop_epoch != push_epoch {
if !self.register_reader(pop_epoch) {
self.deregister_reader(push_epoch);
continue;
}
let pop_len = self.get_queue(pop_epoch).len();
self.deregister_reader(pop_epoch);
pop_len
} else {
0
};
let len = self.get_queue(push_epoch).len() + pop_len;
self.deregister_reader(push_epoch);
return len;
}
}
fn is_empty(&self) -> bool {
loop {
let push_epoch = self.push_epoch.load(Ordering::Acquire);
if !self.register_reader(push_epoch) {
continue;
}
let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
let pop_is_empty = if pop_epoch != push_epoch {
if !self.register_reader(pop_epoch) {
self.deregister_reader(push_epoch);
continue;
}
let pop_is_empty = self.get_queue(pop_epoch).is_empty();
self.deregister_reader(pop_epoch);
pop_is_empty
} else {
true
};
let is_empty = self.get_queue(push_epoch).is_empty() && pop_is_empty;
self.deregister_reader(push_epoch);
return is_empty;
}
}
fn is_full(&self) -> bool {
loop {
let push_epoch = self.push_epoch.load(Ordering::Acquire);
if !self.register_reader(push_epoch) {
continue;
}
let is_full = self.get_queue(push_epoch).is_full();
self.deregister_reader(push_epoch);
return is_full;
}
}
}
impl<T, Q, S> NewSized for GrowableQueueCore<T, Q, S>
where
Q: NewSized,
{
fn with_size(size: usize) -> GrowableQueueCore<T, Q, S> {
GrowableQueueCore::with_slot(size)
}
}
impl<S> NewSized for QueueCore<BoxedBuffer<S>>
where
S: Default,
{
fn with_size(size: usize) -> Self {
Self::new_in(BoxedBuffer::new(size))
}
}
pub struct DynamicQueue<T, S = Auto>
where
S: SlotType<T>,
T: AsPackedValue,
{
inner: GrowableQueueCore<T, QueueCore<BoxedBuffer<S::Slot>>, S>,
}
impl<T> DynamicQueue<T, Auto>
where
T: AsPackedValue,
{
pub fn new(size: usize) -> Self {
Self::with_slot::<Auto>(size)
}
pub fn with_slot<S>(size: usize) -> DynamicQueue<T, S>
where
S: SlotType<T>,
{
DynamicQueue {
inner: GrowableQueueCore::with_slot::<S>(size),
}
}
}
impl<T, S> MPMCQueue for DynamicQueue<T, S>
where
T: AsPackedValue,
S: SlotType<T>,
{
type Item = T;
fn push(&self, item: Self::Item) -> Result<(), Self::Item> {
self.inner.push(item)
}
fn pop(&self) -> Option<Self::Item> {
self.inner.pop()
}
fn len(&self) -> usize {
self.inner.len()
}
fn capacity(&self) -> usize {
self.inner.capacity()
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn is_full(&self) -> bool {
self.inner.is_full()
}
}
impl<T, S> Growable for DynamicQueue<T, S>
where
T: AsPackedValue,
S: SlotType<T>,
{
fn grow_by(&self, by: usize) -> bool {
self.inner.grow_by(by)
}
}