use core::marker::PhantomData;
use core::{alloc::Layout, fmt::Debug, mem::MaybeUninit};
use iceoryx2_bb_concurrency::atomic::AtomicBool;
use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
use iceoryx2_bb_elementary::math::unaligned_mem_size;
use iceoryx2_bb_elementary::relocatable_ptr::{GenericRelocatablePointer, RelocatablePointer};
use iceoryx2_bb_elementary_traits::allocator::{AllocationError, BaseAllocator};
use iceoryx2_bb_elementary_traits::generic_pointer::GenericPointer;
use iceoryx2_bb_elementary_traits::owning_pointer::{GenericOwningPointer, OwningPointer};
use iceoryx2_bb_elementary_traits::placement_default::PlacementDefault;
use iceoryx2_bb_elementary_traits::pointer_trait::PointerTrait;
pub use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_log::{fail, fatal_panic};
pub type Queue<T> = MetaQueue<T, GenericOwningPointer>;
pub type RelocatableQueue<T> = MetaQueue<T, GenericRelocatablePointer>;
#[doc(hidden)]
#[repr(C)]
#[derive(Debug)]
pub struct MetaQueue<T, Ptr: GenericPointer> {
data_ptr: Ptr::Type<MaybeUninit<T>>,
start: usize,
len: usize,
capacity: usize,
is_initialized: AtomicBool,
_phantom_data: PhantomData<T>,
}
unsafe impl<T: Send, Ptr: GenericPointer> Send for MetaQueue<T, Ptr> {}
impl<T> Queue<T> {
pub fn new(capacity: usize) -> Self {
Self {
data_ptr: OwningPointer::<MaybeUninit<T>>::new_with_alloc(capacity),
start: 0,
len: 0,
capacity,
is_initialized: AtomicBool::new(true),
_phantom_data: PhantomData,
}
}
pub fn clear(&mut self) {
unsafe { self.clear_impl() }
}
pub fn peek(&self) -> Option<&T> {
unsafe { self.peek_impl() }
}
pub fn peek_mut(&mut self) -> Option<&mut T> {
unsafe { self.peek_mut_impl() }
}
pub fn pop(&mut self) -> Option<T> {
unsafe { self.pop_impl() }
}
pub fn push(&mut self, value: T) -> bool {
unsafe { self.push_impl(value) }
}
pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
unsafe { self.push_with_overflow_impl(value) }
}
}
impl<T: Copy + Debug, Ptr: GenericPointer + Debug> MetaQueue<T, Ptr> {
pub unsafe fn get_unchecked(&self, index: usize) -> T {
unsafe {
(*self
.data_ptr
.as_ptr()
.add((self.start - self.len + index) % self.capacity))
.assume_init()
}
}
pub fn get(&self, index: usize) -> T {
if self.len() <= index {
fatal_panic!(from self, "Unable to copy content since the index {} is out of range.", index);
}
unsafe { self.get_unchecked(index) }
}
}
impl<T> RelocatableContainer for RelocatableQueue<T> {
unsafe fn new_uninit(capacity: usize) -> Self {
Self {
data_ptr: unsafe { RelocatablePointer::new_uninit() },
start: 0,
len: 0,
capacity,
is_initialized: AtomicBool::new(false),
_phantom_data: PhantomData,
}
}
unsafe fn init<Allocator: BaseAllocator>(
&mut self,
allocator: &Allocator,
) -> Result<(), AllocationError> {
if self
.is_initialized
.load(core::sync::atomic::Ordering::Relaxed)
{
fatal_panic!(
from "Queue::init()",
"Memory already initialized. Initializing it twice may lead to undefined behavior."
);
}
unsafe {
self.data_ptr.init(fail!(from "Queue::init", when allocator
.allocate(Layout::from_size_align_unchecked(
core::mem::size_of::<T>() * self.capacity,
core::mem::align_of::<T>(),
)), "Failed to initialize queue since the allocation of the data memory failed."
));
}
self.is_initialized
.store(true, core::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn memory_size(capacity: usize) -> usize {
Self::const_memory_size(capacity)
}
}
unsafe impl<T: ZeroCopySend> ZeroCopySend for RelocatableQueue<T> {}
impl<T> RelocatableQueue<T> {
pub const fn const_memory_size(capacity: usize) -> usize {
unaligned_mem_size::<T>(capacity)
}
pub unsafe fn clear(&mut self) {
unsafe { self.clear_impl() }
}
pub fn peek(&self) -> Option<&T> {
unsafe { self.peek_impl() }
}
pub fn peek_mut(&mut self) -> Option<&mut T> {
unsafe { self.peek_mut_impl() }
}
pub unsafe fn pop(&mut self) -> Option<T> {
unsafe { self.pop_impl() }
}
pub unsafe fn push(&mut self, value: T) -> bool {
unsafe { self.push_impl(value) }
}
pub unsafe fn push_with_overflow(&mut self, value: T) -> Option<T> {
unsafe { self.push_with_overflow_impl(value) }
}
}
impl<T, Ptr: GenericPointer> MetaQueue<T, Ptr> {
#[inline(always)]
fn verify_init(&self, source: &str) {
debug_assert!(
self.is_initialized
.load(core::sync::atomic::Ordering::Relaxed),
"From: MetaQueue<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.",
core::any::type_name::<T>(),
source
);
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub(crate) unsafe fn clear_impl(&mut self) {
unsafe { while self.pop_impl().is_some() {} }
}
pub(crate) unsafe fn peek_mut_impl(&mut self) -> Option<&mut T> {
self.verify_init("peek_mut()");
if self.is_empty() {
return None;
}
let index = (self.start - self.len) % self.capacity;
unsafe { Some((*self.data_ptr.as_mut_ptr().add(index)).assume_init_mut()) }
}
pub(crate) unsafe fn peek_impl(&self) -> Option<&T> {
self.verify_init("peek()");
if self.is_empty() {
return None;
}
let index = (self.start - self.len) % self.capacity;
unsafe { Some((*self.data_ptr.as_ptr().add(index)).assume_init_ref()) }
}
pub(crate) unsafe fn pop_impl(&mut self) -> Option<T> {
self.verify_init("pop()");
if self.is_empty() {
return None;
}
let index = (self.start - self.len) % self.capacity;
self.len -= 1;
unsafe {
let value = core::mem::replace(
&mut *self.data_ptr.as_mut_ptr().add(index),
MaybeUninit::uninit(),
);
Some(value.assume_init())
}
}
pub(crate) unsafe fn push_impl(&mut self, value: T) -> bool {
self.verify_init("push()");
if self.len == self.capacity {
return false;
}
unsafe {
self.unchecked_push(value);
}
true
}
pub(crate) unsafe fn push_with_overflow_impl(&mut self, value: T) -> Option<T> {
self.verify_init("push_with_overflow()");
let overridden_value = if self.len() == self.capacity() {
unsafe { self.pop_impl() }
} else {
None
};
unsafe {
self.unchecked_push(value);
}
overridden_value
}
unsafe fn unchecked_push(&mut self, value: T) {
let index = (self.start) % self.capacity;
unsafe {
self.data_ptr
.as_mut_ptr()
.add(index)
.write(MaybeUninit::new(value));
}
self.start += 1;
self.len += 1;
}
}
impl<T, Ptr: GenericPointer> Drop for MetaQueue<T, Ptr> {
fn drop(&mut self) {
if self
.is_initialized
.load(core::sync::atomic::Ordering::Relaxed)
{
unsafe { self.clear_impl() }
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct FixedSizeQueue<T, const CAPACITY: usize> {
state: RelocatableQueue<T>,
_data: [MaybeUninit<T>; CAPACITY],
}
unsafe impl<T: ZeroCopySend, const CAPACITY: usize> ZeroCopySend for FixedSizeQueue<T, CAPACITY> {}
impl<T, const CAPACITY: usize> PlacementDefault for FixedSizeQueue<T, CAPACITY> {
unsafe fn placement_default(ptr: *mut Self) {
unsafe {
let state_ptr = core::ptr::addr_of_mut!((*ptr).state);
state_ptr.write(RelocatableQueue::new_uninit(CAPACITY));
let allocator = BumpAllocator::new((*ptr)._data.as_mut_ptr().cast());
(*ptr)
.state
.init(&allocator)
.expect("All required memory is preallocated.");
}
}
}
impl<T, const CAPACITY: usize> Default for FixedSizeQueue<T, CAPACITY> {
fn default() -> Self {
let mut new_self = Self {
state: unsafe { RelocatableQueue::new_uninit(CAPACITY) },
_data: unsafe { MaybeUninit::uninit().assume_init() },
};
let allocator = BumpAllocator::new(new_self._data.as_mut_ptr().cast());
unsafe {
new_self
.state
.init(&allocator)
.expect("All required memory is preallocated.")
};
new_self
}
}
unsafe impl<T: Send, const CAPACITY: usize> Send for FixedSizeQueue<T, CAPACITY> {}
impl<T: Send + Copy + Debug + PartialEq, const CAPACITY: usize> PartialEq
for FixedSizeQueue<T, CAPACITY>
{
fn eq(&self, other: &Self) -> bool {
if self.len() != other.len() {
return false;
}
for i in 0..self.len() {
if self.get(i) != other.get(i) {
return false;
}
}
true
}
}
impl<T, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
pub fn new() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.state.is_empty()
}
pub fn capacity(&self) -> usize {
self.state.capacity()
}
pub fn len(&self) -> usize {
self.state.len()
}
pub fn is_full(&self) -> bool {
self.state.is_full()
}
pub fn clear(&mut self) {
unsafe { self.state.clear_impl() }
}
pub fn peek(&self) -> Option<&T> {
unsafe { self.state.peek_impl() }
}
pub fn peek_mut(&mut self) -> Option<&mut T> {
unsafe { self.state.peek_mut_impl() }
}
pub fn pop(&mut self) -> Option<T> {
unsafe { self.state.pop_impl() }
}
pub fn push(&mut self, value: T) -> bool {
unsafe { self.state.push_impl(value) }
}
pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
unsafe { self.state.push_with_overflow_impl(value) }
}
}
impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
pub unsafe fn get_unchecked(&self, index: usize) -> T {
unsafe { self.state.get_unchecked(index) }
}
pub fn get(&self, index: usize) -> T {
self.state.get(index)
}
}