use crate::error::{Result, ZiporaError};
use std::alloc::{Layout, alloc, dealloc, realloc};
use std::fmt;
use std::marker::PhantomData;
use std::mem::{MaybeUninit, align_of, size_of};
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
#[inline(always)]
fn likely(b: bool) -> bool {
b
}
#[inline(always)]
fn unlikely(b: bool) -> bool {
b
}
pub struct FixedCircularQueue<T, const N: usize> {
buffer: [MaybeUninit<T>; N],
head: AtomicUsize,
tail: AtomicUsize,
count: AtomicUsize,
}
impl<T, const N: usize> FixedCircularQueue<T, N> {
pub fn new() -> Self {
Self {
buffer: [const { MaybeUninit::uninit() }; N],
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
count: AtomicUsize::new(0),
}
}
#[inline]
pub fn capacity(&self) -> usize {
N
}
#[inline]
pub fn len(&self) -> usize {
self.count.load(Ordering::Acquire)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.count.load(Ordering::Acquire) == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.count.load(Ordering::Acquire) == N
}
pub fn push_back(&mut self, value: T) -> Result<()> {
if self.is_full() {
return Err(ZiporaError::invalid_data("Queue is full"));
}
let tail = self.tail.load(Ordering::Relaxed);
unsafe {
self.buffer[tail].as_mut_ptr().write(value);
}
let next_tail = (tail + 1) % N;
self.tail.store(next_tail, Ordering::Release);
self.count.fetch_add(1, Ordering::AcqRel);
Ok(())
}
pub fn pop_front(&mut self) -> Option<T> {
if self.is_empty() {
return None;
}
let head = self.head.load(Ordering::Relaxed);
let value = unsafe { self.buffer[head].as_ptr().read() };
let next_head = (head + 1) % N;
self.head.store(next_head, Ordering::Release);
self.count.fetch_sub(1, Ordering::AcqRel);
Some(value)
}
pub fn front(&self) -> Option<&T> {
if self.is_empty() {
return None;
}
let head = self.head.load(Ordering::Acquire);
Some(unsafe { self.buffer[head].assume_init_ref() })
}
pub fn back(&self) -> Option<&T> {
if self.is_empty() {
return None;
}
let tail = self.tail.load(Ordering::Acquire);
let back_index = if tail == 0 { N - 1 } else { tail - 1 };
Some(unsafe { self.buffer[back_index].assume_init_ref() })
}
pub fn clear(&mut self) {
while !self.is_empty() {
self.pop_front();
}
}
#[inline]
pub fn push(&mut self, value: T) -> Result<()> {
self.push_back(value)
}
#[inline]
pub fn pop(&mut self) -> Option<T> {
self.pop_front()
}
}
impl<T, const N: usize> Default for FixedCircularQueue<T, N> {
fn default() -> Self {
Self::new()
}
}
impl<T, const N: usize> Drop for FixedCircularQueue<T, N> {
fn drop(&mut self) {
self.clear();
}
}
impl<T: fmt::Debug, const N: usize> fmt::Debug for FixedCircularQueue<T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut list = f.debug_list();
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
if head <= tail {
for i in head..tail {
list.entry(unsafe { self.buffer[i].assume_init_ref() });
}
} else {
for i in head..N {
list.entry(unsafe { self.buffer[i].assume_init_ref() });
}
for i in 0..tail {
list.entry(unsafe { self.buffer[i].assume_init_ref() });
}
}
list.finish()
}
}
unsafe impl<T: Send, const N: usize> Send for FixedCircularQueue<T, N> {}
unsafe impl<T: Send, const N: usize> Sync for FixedCircularQueue<T, N> {}
#[repr(align(64))] pub struct AutoGrowCircularQueue<T> {
buffer: *mut T,
capacity: usize,
mask: usize,
head: usize,
tail: usize,
len: usize,
_phantom: PhantomData<T>,
}
impl<T> AutoGrowCircularQueue<T> {
const INITIAL_CAPACITY: usize = 4;
#[inline(always)]
fn ensure_power_of_two(capacity: usize) -> usize {
if capacity == 0 {
return Self::INITIAL_CAPACITY;
}
if (capacity & (capacity - 1)) == 0 {
return capacity;
}
let mut n = capacity;
n = n.saturating_sub(1);
n |= n >> 1;
n |= n >> 2;
n |= n >> 4;
n |= n >> 8;
n |= n >> 16;
n |= n >> 32;
n.saturating_add(1)
}
pub fn new() -> Self {
Self::with_capacity(Self::INITIAL_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let capacity = if capacity == 0 {
Self::INITIAL_CAPACITY
} else {
Self::ensure_power_of_two(capacity)
};
let buffer = Self::allocate_buffer(capacity);
Self {
buffer,
capacity,
mask: capacity - 1, head: 0,
tail: 0,
len: 0,
_phantom: PhantomData,
}
}
fn allocate_buffer(capacity: usize) -> *mut T {
if capacity == 0 {
return ptr::null_mut();
}
let layout = Self::layout_for_capacity(capacity);
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
panic!("Failed to allocate memory for AutoGrowCircularQueue");
}
ptr as *mut T
}
fn layout_for_capacity(capacity: usize) -> Layout {
let size = capacity * size_of::<T>();
let align = align_of::<T>().max(64); Layout::from_size_align(size, align)
.or_else(|_| Layout::from_size_align(size, align_of::<T>()))
.unwrap_or_else(|_| Layout::from_size_align(size, 1)
.expect("layout creation: non-zero size, power-of-two alignment"))
}
fn current_layout(&self) -> Layout {
Self::layout_for_capacity(self.capacity)
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn reserve(&mut self, additional: usize) -> Result<()> {
let required = self.len + additional;
if required <= self.capacity {
return Ok(());
}
let new_capacity = Self::ensure_power_of_two(required);
self.grow_to(new_capacity)
}
fn grow_to(&mut self, new_capacity: usize) -> Result<()> {
if new_capacity <= self.capacity {
return Ok(());
}
let new_capacity = Self::ensure_power_of_two(new_capacity);
if self.len == 0 {
if !self.buffer.is_null() {
unsafe {
dealloc(self.buffer as *mut u8, self.current_layout());
}
}
self.buffer = Self::allocate_buffer(new_capacity);
self.capacity = new_capacity;
self.mask = new_capacity - 1;
return Ok(());
}
let old_layout = self.current_layout();
let new_layout = Self::layout_for_capacity(new_capacity);
if self.head < self.tail {
unsafe {
let new_ptr = realloc(self.buffer as *mut u8, old_layout, new_layout.size());
if !new_ptr.is_null() {
self.buffer = new_ptr as *mut T;
self.capacity = new_capacity;
self.mask = new_capacity - 1;
return Ok(());
}
}
}
let new_buffer = Self::allocate_buffer(new_capacity);
if self.len > 0 {
self.copy_elements_to_new_buffer(new_buffer)?;
}
if !self.buffer.is_null() {
unsafe {
dealloc(self.buffer as *mut u8, old_layout);
}
}
self.buffer = new_buffer;
self.capacity = new_capacity;
self.mask = new_capacity - 1;
self.head = 0;
self.tail = self.len;
Ok(())
}
fn reorganize_after_realloc(&mut self, new_buffer: *mut T, _new_capacity: usize) -> Result<()> {
if self.len == 0 {
return Ok(());
}
let temp_layout =
Layout::array::<T>(self.len).map_err(|_| ZiporaError::invalid_data("Layout error"))?;
let temp_buffer = unsafe { alloc(temp_layout) as *mut T };
if temp_buffer.is_null() {
return Err(ZiporaError::out_of_memory(temp_layout.size()));
}
unsafe {
if self.head < self.tail {
std::ptr::copy_nonoverlapping(self.buffer.add(self.head), temp_buffer, self.len);
} else {
let first_part = self.capacity - self.head;
std::ptr::copy_nonoverlapping(self.buffer.add(self.head), temp_buffer, first_part);
std::ptr::copy_nonoverlapping(self.buffer, temp_buffer.add(first_part), self.tail);
}
std::ptr::copy_nonoverlapping(temp_buffer, new_buffer, self.len);
dealloc(temp_buffer as *mut u8, temp_layout);
}
Ok(())
}
#[inline(always)]
fn copy_elements_to_new_buffer(&mut self, new_buffer: *mut T) -> Result<()> {
if self.len == 0 {
return Ok(());
}
unsafe {
if self.head < self.tail {
ptr::copy_nonoverlapping(self.buffer.add(self.head), new_buffer, self.len);
} else {
let first_part = self.capacity - self.head;
ptr::copy_nonoverlapping(self.buffer.add(self.head), new_buffer, first_part);
ptr::copy_nonoverlapping(self.buffer, new_buffer.add(first_part), self.tail);
}
}
Ok(())
}
#[cfg(feature = "simd")]
#[inline(always)]
unsafe fn simd_copy_elements(&self, src: *const T, dst: *mut T, count: usize) {
if count == 0 {
return;
}
#[cfg(debug_assertions)]
{
debug_assert!(!src.is_null(), "Source pointer is null");
debug_assert!(!dst.is_null(), "Destination pointer is null");
debug_assert_eq!(
src as usize % std::mem::align_of::<T>(),
0,
"Source pointer not aligned"
);
debug_assert_eq!(
dst as usize % std::mem::align_of::<T>(),
0,
"Destination pointer not aligned"
);
let src_start = src as usize;
let src_end = src_start + count * std::mem::size_of::<T>();
let dst_start = dst as usize;
let dst_end = dst_start + count * std::mem::size_of::<T>();
debug_assert!(
src_end <= dst_start || dst_end <= src_start,
"Memory ranges overlap"
);
}
unsafe {
ptr::copy_nonoverlapping(src, dst, count);
}
}
#[cfg(not(feature = "simd"))]
#[inline(always)]
unsafe fn simd_copy_elements(&self, src: *const T, dst: *mut T, count: usize) {
if count == 0 {
return;
}
#[cfg(debug_assertions)]
{
debug_assert!(!src.is_null(), "Source pointer is null");
debug_assert!(!dst.is_null(), "Destination pointer is null");
debug_assert_eq!(
src as usize % std::mem::align_of::<T>(),
0,
"Source pointer not aligned"
);
debug_assert_eq!(
dst as usize % std::mem::align_of::<T>(),
0,
"Destination pointer not aligned"
);
let src_start = src as usize;
let src_end = src_start + count * std::mem::size_of::<T>();
let dst_start = dst as usize;
let dst_end = dst_start + count * std::mem::size_of::<T>();
debug_assert!(
src_end <= dst_start || dst_end <= src_start,
"Memory ranges overlap"
);
}
unsafe { ptr::copy_nonoverlapping(src, dst, count); }
}
#[inline(always)]
pub fn push_back(&mut self, value: T) -> Result<()> {
if likely(self.len < self.capacity - 1) {
unsafe {
self.buffer.add(self.tail).write(value);
}
self.tail = (self.tail + 1) & self.mask;
self.len += 1;
Ok(())
} else {
self.push_back_slow_path(value)
}
}
#[cold]
#[inline(never)] fn push_back_slow_path(&mut self, value: T) -> Result<()> {
let new_capacity = (self.capacity << 1).max(Self::INITIAL_CAPACITY);
self.grow_to(new_capacity)?;
unsafe {
self.buffer.add(self.tail).write(value);
}
self.tail = (self.tail + 1) & self.mask;
self.len += 1;
Ok(())
}
#[inline(always)]
pub fn pop_front(&mut self) -> Option<T> {
if unlikely(self.len == 0) {
return None;
}
unsafe {
let value = self.buffer.add(self.head).read();
self.head = (self.head + 1) & self.mask;
self.len -= 1;
Some(value)
}
}
#[inline]
pub fn front(&self) -> Option<&T> {
if unlikely(self.len == 0) {
return None;
}
Some(unsafe { &*self.buffer.add(self.head) })
}
#[inline]
pub fn back(&self) -> Option<&T> {
if unlikely(self.len == 0) {
return None;
}
let back_index = (self.tail + self.capacity - 1) & self.mask;
Some(unsafe { &*self.buffer.add(back_index) })
}
pub fn clear(&mut self) {
if self.len > 0 {
unsafe {
if self.head <= self.tail {
for i in self.head..self.tail {
ptr::drop_in_place(self.buffer.add(i));
}
} else {
for i in self.head..self.capacity {
ptr::drop_in_place(self.buffer.add(i));
}
for i in 0..self.tail {
ptr::drop_in_place(self.buffer.add(i));
}
}
}
self.head = 0;
self.tail = 0;
self.len = 0;
}
}
#[inline(always)]
pub fn push(&mut self, value: T) -> Result<()> {
self.push_back(value)
}
#[inline(always)]
pub fn pop(&mut self) -> Option<T> {
self.pop_front()
}
pub fn push_bulk(&mut self, items: &[T]) -> Result<usize>
where
T: Clone,
{
if items.is_empty() {
return Ok(0);
}
self.reserve(items.len())?;
for item in items {
unsafe {
self.buffer.add(self.tail).write(item.clone());
}
self.tail = (self.tail + 1) & self.mask;
self.len += 1;
}
Ok(items.len())
}
pub fn pop_bulk(&mut self, output: &mut [T]) -> usize {
let available = self.len;
let to_pop = output.len().min(available);
if to_pop == 0 {
return 0;
}
unsafe {
if self.head < self.tail || (self.head + to_pop <= self.capacity) {
for i in 0..to_pop {
output[i] = self.buffer.add(self.head + i).read();
}
} else {
let first_part = self.capacity - self.head;
let second_part = to_pop - first_part;
for i in 0..first_part {
output[i] = self.buffer.add(self.head + i).read();
}
for i in 0..second_part {
output[first_part + i] = self.buffer.add(i).read();
}
}
self.head = (self.head + to_pop) & self.mask;
self.len -= to_pop;
}
to_pop
}
#[inline]
pub fn performance_stats(&self) -> AutoGrowQueueStats {
AutoGrowQueueStats {
capacity: self.capacity,
length: self.len,
utilization: if self.capacity > 0 {
self.len as f64 / self.capacity as f64
} else {
0.0
},
is_power_of_two: (self.capacity & (self.capacity - 1)) == 0,
head_index: self.head,
tail_index: self.tail,
}
}
}
#[derive(Debug, Clone)]
pub struct AutoGrowQueueStats {
pub capacity: usize,
pub length: usize,
pub utilization: f64,
pub is_power_of_two: bool,
pub head_index: usize,
pub tail_index: usize,
}
impl<T> Default for AutoGrowCircularQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Drop for AutoGrowCircularQueue<T> {
fn drop(&mut self) {
self.clear();
if !self.buffer.is_null() {
unsafe {
dealloc(self.buffer as *mut u8, self.current_layout());
}
}
}
}
impl<T: fmt::Debug> fmt::Debug for AutoGrowCircularQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut list = f.debug_list();
if self.len == 0 {
return list.finish();
}
if self.head <= self.tail {
for i in self.head..self.tail {
list.entry(unsafe { &*self.buffer.add(i) });
}
} else {
for i in self.head..self.capacity {
list.entry(unsafe { &*self.buffer.add(i) });
}
for i in 0..self.tail {
list.entry(unsafe { &*self.buffer.add(i) });
}
}
list.finish()
}
}
impl<T: Clone> Clone for AutoGrowCircularQueue<T> {
fn clone(&self) -> Self {
let mut new_queue = Self::with_capacity(self.capacity);
if self.len == 0 {
return new_queue;
}
if self.head <= self.tail {
for i in self.head..self.tail {
let value = unsafe { &*self.buffer.add(i) };
if let Err(_) = new_queue.push_back(value.clone()) {
return new_queue;
}
}
} else {
for i in self.head..self.capacity {
let value = unsafe { &*self.buffer.add(i) };
if let Err(_) = new_queue.push_back(value.clone()) {
return new_queue;
}
}
for i in 0..self.tail {
let value = unsafe { &*self.buffer.add(i) };
if let Err(_) = new_queue.push_back(value.clone()) {
return new_queue;
}
}
}
new_queue
}
}
impl<T: PartialEq> PartialEq for AutoGrowCircularQueue<T> {
fn eq(&self, other: &Self) -> bool {
if self.len != other.len {
return false;
}
if self.len == 0 {
return true;
}
let mut self_iter = UltraFastCircularQueueIter::new(self);
let mut other_iter = UltraFastCircularQueueIter::new(other);
for _ in 0..self.len {
match (self_iter.next(), other_iter.next()) {
(Some(a), Some(b)) if a == b => continue,
_ => return false,
}
}
true
}
}
impl<T: Eq> Eq for AutoGrowCircularQueue<T> {}
struct UltraFastCircularQueueIter<'a, T> {
queue: &'a AutoGrowCircularQueue<T>,
current: usize,
remaining: usize,
}
impl<'a, T> UltraFastCircularQueueIter<'a, T> {
fn new(queue: &'a AutoGrowCircularQueue<T>) -> Self {
Self {
queue,
current: queue.head,
remaining: queue.len,
}
}
}
impl<'a, T> Iterator for UltraFastCircularQueueIter<'a, T> {
type Item = &'a T;
#[inline(always)]
fn next(&mut self) -> Option<Self::Item> {
if unlikely(self.remaining == 0) {
return None;
}
let value = unsafe { &*self.queue.buffer.add(self.current) };
self.current = (self.current + 1) & self.queue.mask;
self.remaining -= 1;
Some(value)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}
impl<'a, T> ExactSizeIterator for UltraFastCircularQueueIter<'a, T> {}
unsafe impl<T: Send> Send for AutoGrowCircularQueue<T> {}
unsafe impl<T: Sync> Sync for AutoGrowCircularQueue<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fixed_queue_new() {
let queue: FixedCircularQueue<i32, 8> = FixedCircularQueue::new();
assert_eq!(queue.len(), 0);
assert_eq!(queue.capacity(), 8);
assert!(queue.is_empty());
assert!(!queue.is_full());
}
#[test]
fn test_fixed_queue_push_pop() -> Result<()> {
let mut queue: FixedCircularQueue<i32, 4> = FixedCircularQueue::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop_front(), Some(1));
assert_eq!(queue.pop_front(), Some(2));
assert_eq!(queue.len(), 1);
queue.push_back(4)?;
assert_eq!(queue.pop_front(), Some(3));
assert_eq!(queue.pop_front(), Some(4));
assert!(queue.is_empty());
Ok(())
}
#[test]
fn test_fixed_queue_full() -> Result<()> {
let mut queue: FixedCircularQueue<i32, 3> = FixedCircularQueue::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
assert!(queue.is_full());
let result = queue.push_back(4);
assert!(result.is_err());
Ok(())
}
#[test]
fn test_fixed_queue_front_back() -> Result<()> {
let mut queue: FixedCircularQueue<i32, 8> = FixedCircularQueue::new();
assert_eq!(queue.front(), None);
assert_eq!(queue.back(), None);
queue.push_back(1)?;
queue.push_back(2)?;
assert_eq!(queue.front(), Some(&1));
assert_eq!(queue.back(), Some(&2));
Ok(())
}
#[test]
fn test_auto_queue_new() {
let queue: AutoGrowCircularQueue<i32> = AutoGrowCircularQueue::new();
assert_eq!(queue.len(), 0);
assert_eq!(queue.capacity(), 4);
assert!(queue.is_empty());
}
#[test]
fn test_auto_queue_with_capacity() {
let queue: AutoGrowCircularQueue<i32> = AutoGrowCircularQueue::with_capacity(10);
assert_eq!(queue.capacity(), 16); }
#[test]
fn test_auto_queue_push_pop() -> Result<()> {
let mut queue = AutoGrowCircularQueue::<i32>::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop_front(), Some(1));
assert_eq!(queue.pop_front(), Some(2));
assert_eq!(queue.len(), 1);
queue.push_back(4)?;
assert_eq!(queue.pop_front(), Some(3));
assert_eq!(queue.pop_front(), Some(4));
assert!(queue.is_empty());
Ok(())
}
#[test]
fn test_auto_queue_growth() -> Result<()> {
let mut queue = AutoGrowCircularQueue::<i32>::new();
let initial_capacity = queue.capacity();
for i in 0..10 {
queue.push_back(i)?;
}
assert!(queue.capacity() > initial_capacity);
assert_eq!(queue.len(), 10);
for i in 0..10 {
let popped = queue.pop_front();
if popped != Some(i) {
println!("Expected {}, got {:?} at iteration {}", i, popped, i);
println!(
"Queue state: head={}, tail={}, len={}, capacity={}",
queue.head,
queue.tail,
queue.len,
queue.capacity()
);
}
assert_eq!(popped, Some(i));
}
Ok(())
}
#[test]
fn test_auto_queue_reserve() -> Result<()> {
let mut queue = AutoGrowCircularQueue::<i32>::new();
queue.reserve(100)?;
assert!(queue.capacity() >= 100);
assert_eq!(queue.len(), 0);
Ok(())
}
#[test]
fn test_auto_queue_front_back() -> Result<()> {
let mut queue = AutoGrowCircularQueue::<i32>::new();
assert_eq!(queue.front(), None);
assert_eq!(queue.back(), None);
queue.push_back(1)?;
queue.push_back(2)?;
assert_eq!(queue.front(), Some(&1));
assert_eq!(queue.back(), Some(&2));
Ok(())
}
#[test]
fn test_auto_queue_clone() -> Result<()> {
let mut queue = AutoGrowCircularQueue::<i32>::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
let cloned = queue.clone();
assert_eq!(queue, cloned);
Ok(())
}
#[test]
fn test_auto_queue_equality() -> Result<()> {
let mut queue1 = AutoGrowCircularQueue::new();
let mut queue2 = AutoGrowCircularQueue::new();
assert_eq!(queue1, queue2);
queue1.push_back(42)?;
assert_ne!(queue1, queue2);
queue2.push_back(42)?;
assert_eq!(queue1, queue2);
Ok(())
}
#[test]
fn test_clear() -> Result<()> {
let mut fixed_queue: FixedCircularQueue<i32, 8> = FixedCircularQueue::new();
fixed_queue.push_back(1)?;
fixed_queue.push_back(2)?;
fixed_queue.clear();
assert!(fixed_queue.is_empty());
let mut auto_queue = AutoGrowCircularQueue::new();
auto_queue.push_back(1)?;
auto_queue.push_back(2)?;
auto_queue.clear();
assert!(auto_queue.is_empty());
Ok(())
}
#[test]
fn test_memory_efficiency() {
let fixed_queue = FixedCircularQueue::<u64, 8>::new();
let auto_queue = AutoGrowCircularQueue::<u64>::new();
assert_eq!(std::mem::size_of_val(&fixed_queue), 8 * 8 + 24);
let auto_size = std::mem::size_of_val(&auto_queue);
assert_eq!(auto_size, 64);
let alignment = std::mem::align_of_val(&auto_queue);
assert_eq!(alignment, 64); }
}