use core::sync::atomic::{AtomicUsize, AtomicU32};
use core::sync::atomic::Ordering::{SeqCst, Relaxed};
use core::array::from_fn;
use alloc::boxed::Box;
use alloc::vec::Vec;
use crate::slot::Slot;
#[doc(hidden)]
pub use storage::TmpArray;
pub use storage::{Storage, InternalStorageApi};
pub use block_size::{BlockSize, SmallBlockSize, DefaultBlockSize, LargeBlockSize, HugeBlockSize};
use block_ptr::{BlockPointer, CollectedBlock};
mod block;
mod block_ptr;
mod block_size;
mod storage;
const REV_CAP: usize = 8;
type RecycleBin<const L: usize, const F: usize, T> = Vec<CollectedBlock<L, F, T>>;
fn try_swap_int(atomic_int: &AtomicUsize, old: usize, new: usize) -> bool {
atomic_int.compare_exchange(old, new, SeqCst, Relaxed).is_ok()
}
pub struct Fifo<const L: usize, const F: usize, T> {
first_block: BlockPointer<L, F, T>,
prod_cursor: AtomicUsize,
cons_cursor: AtomicUsize,
revision: AtomicU32,
recycle_bin: Slot<RecycleBin<L, F, T>>,
visitors: [AtomicU32; REV_CAP],
}
impl<const L: usize, const F: usize, T> Default for Fifo<L, F, T> {
fn default() -> Self {
assert_eq!(F * 8, L);
let recycle_bin = Box::new(RecycleBin::new());
Self {
first_block: BlockPointer::new(),
prod_cursor: AtomicUsize::new(0),
cons_cursor: AtomicUsize::new(0),
revision: AtomicU32::new(0),
recycle_bin: Slot::new(recycle_bin),
visitors: from_fn(|_| AtomicU32::new(0)),
}
}
}
impl<const L: usize, const F: usize, T> Fifo<L, F, T> {
fn init_visit(&self) -> usize {
loop {
let rev = self.revision.load(SeqCst) as usize;
let rev_refcount = &self.visitors[rev % REV_CAP];
rev_refcount.fetch_add(1, SeqCst);
let new_rev = self.revision.load(SeqCst) as usize;
match (new_rev - rev) < REV_CAP {
true => break rev,
false => _ = rev_refcount.fetch_sub(1, SeqCst),
}
}
}
fn stop_visit(&self, rev: usize) {
self.visitors[rev % REV_CAP].fetch_sub(1, SeqCst);
}
fn try_maintain(&self) {
let Some(mut bin) = self.recycle_bin.try_take(false) else {
return;
};
let current_rev = self.revision.load(SeqCst) as usize;
let oldest_rev = current_rev.saturating_sub(REV_CAP - 1);
let mut oldest_visited_rev = current_rev;
for rev in oldest_rev..current_rev {
let rc_slot = rev % REV_CAP;
if self.visitors[rc_slot].load(SeqCst) != 0 {
oldest_visited_rev = rev;
break;
}
}
let oldest_used_slot = oldest_visited_rev % REV_CAP;
let next_rev = current_rev + 1;
let next_slot = next_rev % REV_CAP;
let can_increment = next_slot != oldest_used_slot;
let mut i = 0;
while i < bin.len() {
if bin[i].revision < oldest_visited_rev {
let block = bin.remove(i);
self.first_block.recycle(block);
} else {
i += 1;
}
}
if can_increment {
let mut has_collected = false;
while let Some(block) = self.first_block.try_collect(current_rev) {
bin.push(block);
has_collected = true;
}
if has_collected {
self.revision.store(next_rev as u32, SeqCst);
}
}
assert!(self.recycle_bin.try_insert(bin).is_ok());
}
fn produced(&self) -> usize {
let mut is_first_block = true;
let mut maybe_block = &self.first_block;
let mut total_produced = 0;
'outer: while let Some(block) = maybe_block.load() {
if is_first_block {
total_produced = block.offset.load(SeqCst);
is_first_block = false;
}
for i in 0..L {
match block.is_produced(i) {
false => break 'outer,
true => total_produced += 1,
}
}
maybe_block = &block.next;
}
total_produced
}
}
unsafe impl<const L: usize, const F: usize, T> Send for Fifo<L, F, T> {}
unsafe impl<const L: usize, const F: usize, T> Sync for Fifo<L, F, T> {}
pub trait FifoApi<T>: Send + Sync {
fn push(&self, iter: &mut dyn ExactSizeIterator<Item = T>);
fn pull(&self, storage: &mut dyn InternalStorageApi<T>) -> usize;
fn iter(&self) -> PullIter<'_, T>;
#[doc(hidden)]
fn consume_item(&self, i: usize) -> T;
#[doc(hidden)]
fn iter_drop(&self);
}
impl<const L: usize, const F: usize, T> FifoApi<T> for Fifo<L, F, T> {
fn push(&self, iter: &mut dyn ExactSizeIterator<Item = T>) {
let revision = self.init_visit();
let mut remaining = iter.len();
let mut i = self.prod_cursor.fetch_add(remaining, SeqCst);
let mut is_first_block = true;
let mut block_offset = 0;
let mut maybe_block = &self.first_block;
while remaining > 0 {
let Some(block) = maybe_block.load() else {
maybe_block.append_new();
continue;
};
if is_first_block {
block_offset = block.offset.load(SeqCst);
is_first_block = false;
}
let next_block_offset = block_offset + L;
let block_range = block_offset..next_block_offset;
while block_range.contains(&i) && remaining > 0 {
let item = iter.next().unwrap();
let slot_i = i - block_offset;
block.produce(slot_i, item);
i += 1;
remaining -= 1;
}
block_offset = next_block_offset;
maybe_block = &block.next;
}
self.stop_visit(revision);
self.try_maintain();
}
fn pull(&self, storage: &mut dyn InternalStorageApi<T>) -> usize {
let (min, max) = storage.bounds();
let max = max.unwrap_or(usize::MAX);
let min = min.unwrap_or(1);
let revision = self.init_visit();
let mut success = false;
let mut i = 0;
let mut negotiated = 0;
while !success {
let produced = self.produced();
i = self.cons_cursor.load(SeqCst);
negotiated = match produced.checked_sub(i) {
Some(available) => available.min(max),
None => continue,
};
if negotiated < min {
negotiated = 0;
break;
}
success = try_swap_int(&self.cons_cursor, i, i + negotiated);
}
storage.reserve(negotiated);
let mut remaining = negotiated;
let mut is_first_block = true;
let mut block_offset = 0;
let mut maybe_block = &self.first_block;
while remaining > 0 {
let Some(block) = maybe_block.load() else {
maybe_block.append_new();
continue;
};
if is_first_block {
block_offset = block.offset.load(SeqCst);
is_first_block = false;
}
let next_block_offset = block_offset + L;
let block_range = block_offset..next_block_offset;
while block_range.contains(&i) && remaining > 0 {
let slot_i = i - block_offset;
let item = block.consume(slot_i);
let storage_index = negotiated - remaining;
storage.insert(storage_index, item);
i += 1;
remaining -= 1;
}
block_offset = next_block_offset;
maybe_block = &block.next;
}
self.stop_visit(revision);
self.try_maintain();
negotiated
}
fn iter(&self) -> PullIter<'_, T> {
let revision = self.init_visit();
let mut success = false;
let mut i = 0;
let mut negotiated = 0;
while !success {
let produced = self.produced();
i = self.cons_cursor.load(SeqCst);
negotiated = match produced.checked_sub(i) {
Some(available) => available,
None => continue,
};
success = try_swap_int(&self.cons_cursor, i, i + negotiated);
}
self.stop_visit(revision);
PullIter {
fifo: self,
i,
remaining: negotiated,
}
}
fn consume_item(&self, i: usize) -> T {
let revision = self.init_visit();
let mut is_first_block = true;
let mut block_offset = 0;
let mut maybe_block = &self.first_block;
let item = loop {
let block = maybe_block.load().unwrap();
if is_first_block {
block_offset = block.offset.load(SeqCst);
is_first_block = false;
}
let next_block_offset = block_offset + L;
let block_range = block_offset..next_block_offset;
if block_range.contains(&i) {
let slot_i = i - block_offset;
break block.consume(slot_i);
}
block_offset = next_block_offset;
maybe_block = &block.next;
};
self.stop_visit(revision);
item
}
fn iter_drop(&self) {
self.try_maintain();
}
}
pub struct PullIter<'a, T> {
fifo: &'a dyn FifoApi<T>,
i: usize,
remaining: usize,
}
impl<'a, T> Iterator for PullIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let next_rem = self.remaining.checked_sub(1)?;
let item = self.fifo.consume_item(self.i);
self.remaining = next_rem;
self.i += 1;
Some(item)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}
impl<'a, T> ExactSizeIterator for PullIter<'a, T> {}
impl<'a, T> Drop for PullIter<'a, T> {
fn drop(&mut self) {
let _ = self.count();
self.fifo.iter_drop();
}
}