use crate::error::{Result, ZiporaError};
use std::alloc::{self, Layout};
use std::fmt;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::marker::PhantomData;
const CACHE_LINE_SIZE: usize = 64;
#[inline(always)]
fn likely(b: bool) -> bool {
#[cfg(all(target_arch = "x86_64", target_feature = "sse2"))]
{
std::intrinsics::likely(b)
}
#[cfg(not(all(target_arch = "x86_64", target_feature = "sse2")))]
{
b
}
}
#[inline(always)]
fn unlikely(b: bool) -> bool {
#[cfg(all(target_arch = "x86_64", target_feature = "sse2"))]
{
std::intrinsics::unlikely(b)
}
#[cfg(not(all(target_arch = "x86_64", target_feature = "sse2")))]
{
b
}
}
#[repr(align(64))] pub struct UltraFastCircularQueue<T> {
buffer: *mut T,
capacity: usize,
mask: usize,
head: usize,
tail: usize,
len: usize,
_phantom: PhantomData<T>,
}
impl<T> UltraFastCircularQueue<T> {
const INITIAL_CAPACITY: usize = 256;
pub fn new() -> Self {
Self::with_capacity(Self::INITIAL_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let capacity = capacity.max(4).next_power_of_two();
let buffer = Self::allocate_aligned(capacity);
Self {
buffer,
capacity,
mask: capacity - 1,
head: 0,
tail: 0,
len: 0,
_phantom: PhantomData,
}
}
fn allocate_aligned(capacity: usize) -> *mut T {
let size = capacity * mem::size_of::<T>();
if size == 0 {
return ptr::NonNull::dangling().as_ptr();
}
let layout = match Layout::from_size_align(size, CACHE_LINE_SIZE) {
Ok(l) => l,
Err(_) => {
match Layout::from_size_align(size, mem::align_of::<T>()) {
Ok(l) => l,
Err(_) => return ptr::NonNull::dangling().as_ptr(),
}
}
};
let ptr = unsafe { alloc::alloc(layout) };
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
ptr as *mut T
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
pub fn push_back(&mut self, value: T) -> Result<()> {
if likely(self.len < self.capacity) {
unsafe {
self.buffer.add(self.tail).write(value);
}
self.tail = (self.tail + 1) & self.mask;
self.len += 1;
self.prefetch_next_write();
Ok(())
} else {
self.push_back_slow_path(value)
}
}
#[cold]
#[inline(never)]
fn push_back_slow_path(&mut self, value: T) -> Result<()> {
self.grow_buffer()?;
unsafe {
self.buffer.add(self.tail).write(value);
}
self.tail = (self.tail + 1) & self.mask;
self.len += 1;
Ok(())
}
#[inline]
pub fn pop_front(&mut self) -> Option<T> {
if likely(self.len > 0) {
let value = unsafe { self.buffer.add(self.head).read() };
self.head = (self.head + 1) & self.mask;
self.len -= 1;
self.prefetch_next_read();
Some(value)
} else {
None
}
}
#[inline]
pub fn front(&self) -> Option<&T> {
if likely(self.len > 0) {
Some(unsafe { &*self.buffer.add(self.head) })
} else {
None
}
}
#[inline]
pub fn back(&self) -> Option<&T> {
if likely(self.len > 0) {
let back_index = if self.tail == 0 {
self.capacity - 1
} else {
self.tail - 1
};
Some(unsafe { &*self.buffer.add(back_index) })
} else {
None
}
}
pub fn clear(&mut self) {
if !mem::needs_drop::<T>() {
self.head = 0;
self.tail = 0;
self.len = 0;
} else {
while self.len > 0 {
self.pop_front();
}
}
}
#[inline]
pub fn push(&mut self, value: T) -> Result<()> {
self.push_back(value)
}
#[inline]
pub fn pop(&mut self) -> Option<T> {
self.pop_front()
}
#[inline]
fn prefetch_next_write(&self) {
#[cfg(target_arch = "x86_64")]
unsafe {
if self.len + 1 < self.capacity {
let next_pos = (self.tail + 1) & self.mask;
std::arch::x86_64::_mm_prefetch(
self.buffer.add(next_pos) as *const i8,
std::arch::x86_64::_MM_HINT_T0
);
}
}
}
#[inline]
fn prefetch_next_read(&self) {
#[cfg(target_arch = "x86_64")]
unsafe {
if self.len > 1 {
let next_pos = (self.head + 1) & self.mask;
std::arch::x86_64::_mm_prefetch(
self.buffer.add(next_pos) as *const i8,
std::arch::x86_64::_MM_HINT_T0
);
}
}
}
fn grow_buffer(&mut self) -> Result<()> {
let old_capacity = self.capacity;
let new_capacity = old_capacity * 2;
let new_buffer = unsafe {
let old_size = old_capacity * mem::size_of::<T>();
let new_size = new_capacity * mem::size_of::<T>();
let old_layout = Layout::from_size_align_unchecked(old_size, CACHE_LINE_SIZE);
let new_layout = Layout::from_size_align_unchecked(new_size, CACHE_LINE_SIZE);
let new_ptr = alloc::realloc(self.buffer as *mut u8, old_layout, new_size);
if new_ptr.is_null() {
return Err(ZiporaError::memory_error("Failed to grow circular buffer"));
}
new_ptr as *mut T
};
if self.head > self.tail && self.len > 0 {
self.relocate_wrapped_data_simd(old_capacity, new_capacity, new_buffer);
}
self.buffer = new_buffer;
self.capacity = new_capacity;
self.mask = new_capacity - 1;
Ok(())
}
fn relocate_wrapped_data_simd(&mut self, old_capacity: usize, _new_capacity: usize, buffer: *mut T) {
let tail_portion_size = self.tail;
if tail_portion_size == 0 {
return;
}
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
{
if tail_portion_size * mem::size_of::<T>() >= 64 {
self.simd_copy_avx2(buffer, old_capacity, tail_portion_size);
self.tail = old_capacity + self.tail;
return;
}
}
unsafe {
ptr::copy_nonoverlapping(
buffer,
buffer.add(old_capacity),
tail_portion_size
);
}
self.tail = old_capacity + self.tail;
}
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
fn simd_copy_avx2(&self, buffer: *mut T, old_capacity: usize, tail_portion_size: usize) {
use std::arch::x86_64::*;
unsafe {
let src = buffer as *const u8;
let dst = (buffer as *mut u8).add(old_capacity * mem::size_of::<T>());
let total_bytes = tail_portion_size * mem::size_of::<T>();
let chunks = total_bytes / 32;
for i in 0..chunks {
let data = _mm256_loadu_si256(src.add(i * 32) as *const __m256i);
_mm256_storeu_si256(dst.add(i * 32) as *mut __m256i, data);
}
let remainder_start = chunks * 32;
let remainder_size = total_bytes - remainder_start;
if remainder_size > 0 {
ptr::copy_nonoverlapping(
src.add(remainder_start),
dst.add(remainder_start),
remainder_size
);
}
}
}
}
impl<T> Default for UltraFastCircularQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Drop for UltraFastCircularQueue<T> {
fn drop(&mut self) {
self.clear();
if !self.buffer.is_null() {
let size = self.capacity * mem::size_of::<T>();
if size > 0 {
let layout = Layout::from_size_align(size, CACHE_LINE_SIZE)
.or_else(|_| Layout::from_size_align(size, mem::align_of::<T>()));
if let Ok(layout) = layout {
unsafe {
alloc::dealloc(self.buffer as *mut u8, layout);
}
}
}
}
}
}
impl<T: fmt::Debug> fmt::Debug for UltraFastCircularQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut list = f.debug_list();
let mut pos = self.head;
for _ in 0..self.len {
unsafe {
list.entry(&*self.buffer.add(pos));
}
pos = (pos + 1) & self.mask;
}
list.finish()
}
}
impl<T: Clone> Clone for UltraFastCircularQueue<T> {
fn clone(&self) -> Self {
let mut new_queue = Self::with_capacity(self.capacity);
let mut pos = self.head;
for _ in 0..self.len {
let value = unsafe { (*self.buffer.add(pos)).clone() };
if let Err(_) = new_queue.push_back(value) {
break;
}
pos = (pos + 1) & self.mask;
}
new_queue
}
}
impl<T: PartialEq> PartialEq for UltraFastCircularQueue<T> {
fn eq(&self, other: &Self) -> bool {
if self.len() != other.len() {
return false;
}
let mut self_pos = self.head;
let mut other_pos = other.head;
for _ in 0..self.len {
let self_val = unsafe { &*self.buffer.add(self_pos) };
let other_val = unsafe { &*other.buffer.add(other_pos) };
if self_val != other_val {
return false;
}
self_pos = (self_pos + 1) & self.mask;
other_pos = (other_pos + 1) & other.mask;
}
true
}
}
impl<T: Eq> Eq for UltraFastCircularQueue<T> {}
unsafe impl<T: Send> Send for UltraFastCircularQueue<T> {}
unsafe impl<T: Sync> Sync for UltraFastCircularQueue<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ultra_fast_queue_new() {
let queue: UltraFastCircularQueue<i32> = UltraFastCircularQueue::new();
assert_eq!(queue.len(), 0);
assert_eq!(queue.capacity(), 256);
assert!(queue.is_empty());
}
#[test]
fn test_ultra_fast_queue_with_capacity() {
let queue: UltraFastCircularQueue<i32> = UltraFastCircularQueue::with_capacity(100);
assert_eq!(queue.capacity(), 128); }
#[test]
fn test_ultra_fast_queue_push_pop() -> Result<()> {
let mut queue = UltraFastCircularQueue::<i32>::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop_front(), Some(1));
assert_eq!(queue.pop_front(), Some(2));
assert_eq!(queue.len(), 1);
queue.push_back(4)?;
assert_eq!(queue.pop_front(), Some(3));
assert_eq!(queue.pop_front(), Some(4));
assert!(queue.is_empty());
Ok(())
}
#[test]
fn test_ultra_fast_queue_growth() -> Result<()> {
let mut queue = UltraFastCircularQueue::<i32>::with_capacity(4);
let initial_capacity = queue.capacity();
for i in 0..10 {
queue.push_back(i)?;
}
assert!(queue.capacity() > initial_capacity);
assert_eq!(queue.len(), 10);
for i in 0..10 {
assert_eq!(queue.pop_front(), Some(i));
}
Ok(())
}
#[test]
fn test_ultra_fast_queue_front_back() -> Result<()> {
let mut queue = UltraFastCircularQueue::<i32>::new();
assert_eq!(queue.front(), None);
assert_eq!(queue.back(), None);
queue.push_back(1)?;
queue.push_back(2)?;
assert_eq!(queue.front(), Some(&1));
assert_eq!(queue.back(), Some(&2));
Ok(())
}
#[test]
fn test_ultra_fast_queue_clone() -> Result<()> {
let mut queue = UltraFastCircularQueue::<i32>::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
let cloned = queue.clone();
assert_eq!(queue, cloned);
Ok(())
}
#[test]
fn test_ultra_fast_queue_clear() -> Result<()> {
let mut queue = UltraFastCircularQueue::<i32>::new();
queue.push_back(1)?;
queue.push_back(2)?;
queue.clear();
assert!(queue.is_empty());
Ok(())
}
#[test]
fn test_wrapped_buffer_growth() -> Result<()> {
let mut queue = UltraFastCircularQueue::<i32>::with_capacity(4);
queue.push_back(1)?;
queue.push_back(2)?;
queue.push_back(3)?;
queue.pop_front(); queue.push_back(4)?;
queue.push_back(5)?;
assert_eq!(queue.pop_front(), Some(2));
assert_eq!(queue.pop_front(), Some(3));
assert_eq!(queue.pop_front(), Some(4));
assert_eq!(queue.pop_front(), Some(5));
Ok(())
}
}