iceoryx2_bb_container/
queue.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! Three queue variations that are similar to [`alloc::collections::vec_deque::VecDeque`].
14//!
15//!  * [`FixedSizeQueue`](crate::queue::FixedSizeQueue), compile-time fixed size queue that
16//!     is self-contained.
17//!  * [`RelocatableQueue`](crate::queue::RelocatableQueue), run-time fixed size queue that
18//!     acquires the required memory from a custom user-provided allocator.
19//!  * [`Queue`](crate::queue::Queue), run-time fixed size queue that uses by default
20//!     heap memory.
21//!
22//! # Basic Examples
23//!
24//! ## Use the [`FixedSizeQueue`](crate::queue::FixedSizeQueue)
25//!
26//! ```
27//! # extern crate iceoryx2_loggers;
28//!
29//! use iceoryx2_bb_container::queue::FixedSizeQueue;
30//!
31//! const QUEUE_CAPACITY: usize = 1;
32//! let mut queue = FixedSizeQueue::<u64, QUEUE_CAPACITY>::new();
33//!
34//! queue.push(123);
35//!
36//! // queue is full, we override the oldest element (123) with the new number (456)
37//! queue.push_with_overflow(456);
38//!
39//! println!("pop from queue {}", queue.pop().unwrap());
40//! ```
41//!
42//! ## Use the [`Queue`](crate::queue::Queue)
43//!
44//! ```
45//! # extern crate iceoryx2_loggers;
46//!
47//! use iceoryx2_bb_container::queue::Queue;
48//!
49//! let queue_capacity = 1234;
50//! let mut queue = Queue::<u64>::new(queue_capacity);
51//!
52//! queue.push(123);
53//!
54//! println!("pop from queue {}", queue.pop().unwrap());
55//! ```
56//!
57//! # Advanced Examples
58//!
59//! ## Create [`RelocatableQueue`](crate::queue::RelocatableQueue) inside constructs which provides memory
60//!
61//! ```
62//! # extern crate iceoryx2_loggers;
63//!
64//! use iceoryx2_bb_container::queue::RelocatableQueue;
65//! use iceoryx2_bb_elementary::math::align_to;
66//! use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
67//! use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
68//! use core::mem::MaybeUninit;
69//!
70//! const QUEUE_CAPACITY:usize = 12;
71//! struct MyConstruct {
72//!     queue: RelocatableQueue<u128>,
73//!     queue_memory: [MaybeUninit<u128>; QUEUE_CAPACITY],
74//! }
75//!
76//! impl MyConstruct {
77//!     pub fn new() -> Self {
78//!         let mut new_self = Self {
79//!             queue: unsafe { RelocatableQueue::new_uninit(QUEUE_CAPACITY) },
80//!             queue_memory: core::array::from_fn(|_| MaybeUninit::uninit()),
81//!         };
82//!
83//!         let allocator = BumpAllocator::new(new_self.queue_memory.as_mut_ptr().cast());
84//!         unsafe {
85//!             new_self.queue.init(&allocator).expect("Enough memory provided.")
86//!         };
87//!         new_self
88//!     }
89//! }
90//! ```
91//!
92//! ## Create [`RelocatableQueue`](crate::queue::RelocatableQueue) with allocator
93//!
94//! ```
95//! # extern crate iceoryx2_loggers;
96//!
97//! use iceoryx2_bb_container::queue::RelocatableQueue;
98//! use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
99//! use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
100//! use core::ptr::NonNull;
101//!
102//! const QUEUE_CAPACITY:usize = 12;
103//! const MEM_SIZE: usize = RelocatableQueue::<u128>::const_memory_size(QUEUE_CAPACITY);
104//! let mut memory = [0u8; MEM_SIZE];
105//!
106//! let bump_allocator = BumpAllocator::new(memory.as_mut_ptr());
107//!
108//! let mut queue = unsafe { RelocatableQueue::<u128>::new_uninit(QUEUE_CAPACITY) };
109//! unsafe { queue.init(&bump_allocator).expect("queue init failed") };
110//! ```
111//!
112use core::marker::PhantomData;
113use core::{alloc::Layout, fmt::Debug, mem::MaybeUninit};
114use iceoryx2_bb_concurrency::atomic::AtomicBool;
115use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
116use iceoryx2_bb_elementary::math::unaligned_mem_size;
117use iceoryx2_bb_elementary::relocatable_ptr::{GenericRelocatablePointer, RelocatablePointer};
118use iceoryx2_bb_elementary_traits::allocator::{AllocationError, BaseAllocator};
119use iceoryx2_bb_elementary_traits::generic_pointer::GenericPointer;
120use iceoryx2_bb_elementary_traits::owning_pointer::{GenericOwningPointer, OwningPointer};
121use iceoryx2_bb_elementary_traits::placement_default::PlacementDefault;
122use iceoryx2_bb_elementary_traits::pointer_trait::PointerTrait;
123pub use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
124use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
125use iceoryx2_log::{fail, fatal_panic};
126
127/// Queue with run-time fixed size capacity. In contrast to its counterpart the
128/// [`RelocatableQueue`] it is movable but is not shared memory compatible.
129pub type Queue<T> = MetaQueue<T, GenericOwningPointer>;
130/// **Non-movable** relocatable queue with runtime fixed size capacity.
131pub type RelocatableQueue<T> = MetaQueue<T, GenericRelocatablePointer>;
132
133#[doc(hidden)]
134/// **Non-movable** relocatable queue with runtime fixed size capacity.
135#[repr(C)]
136#[derive(Debug)]
137pub struct MetaQueue<T, Ptr: GenericPointer> {
138    data_ptr: Ptr::Type<MaybeUninit<T>>,
139    start: usize,
140    len: usize,
141    capacity: usize,
142    is_initialized: AtomicBool,
143    _phantom_data: PhantomData<T>,
144}
145
146unsafe impl<T: Send, Ptr: GenericPointer> Send for MetaQueue<T, Ptr> {}
147
148impl<T> Queue<T> {
149    /// Creates a new [`Queue`] with the provided capacity
150    pub fn new(capacity: usize) -> Self {
151        Self {
152            data_ptr: OwningPointer::<MaybeUninit<T>>::new_with_alloc(capacity),
153            start: 0,
154            len: 0,
155            capacity,
156            is_initialized: AtomicBool::new(true),
157            _phantom_data: PhantomData,
158        }
159    }
160
161    /// Removes all elements from the queue
162    pub fn clear(&mut self) {
163        unsafe { self.clear_impl() }
164    }
165
166    /// Returns a reference to the element from the beginning of the queue without removing it.
167    /// If the queue is empty it returns [`None`].
168    pub fn peek(&self) -> Option<&T> {
169        unsafe { self.peek_impl() }
170    }
171
172    /// Returns a mutable reference to the element from the beginning of the queue without removing it.
173    /// If the queue is empty it returns [`None`].
174    pub fn peek_mut(&mut self) -> Option<&mut T> {
175        unsafe { self.peek_mut_impl() }
176    }
177
178    /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
179    pub fn pop(&mut self) -> Option<T> {
180        unsafe { self.pop_impl() }
181    }
182
183    /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
184    pub fn push(&mut self, value: T) -> bool {
185        unsafe { self.push_impl(value) }
186    }
187
188    /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
189    /// otherwise [`None`].
190    pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
191        unsafe { self.push_with_overflow_impl(value) }
192    }
193}
194
195impl<T: Copy + Debug, Ptr: GenericPointer + Debug> MetaQueue<T, Ptr> {
196    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
197    /// element until [`Queue::len()`].
198    ///
199    /// # Safety
200    ///
201    ///   * Must satisfy `index` < [`Queue::len()`]
202    pub unsafe fn get_unchecked(&self, index: usize) -> T {
203        unsafe {
204            (*self
205                .data_ptr
206                .as_ptr()
207                .add((self.start - self.len + index) % self.capacity))
208            .assume_init()
209        }
210    }
211
212    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
213    /// element until [`Queue::len()`]queue_memory
214    pub fn get(&self, index: usize) -> T {
215        if self.len() <= index {
216            fatal_panic!(from self, "Unable to copy content since the index {} is out of range.", index);
217        }
218
219        unsafe { self.get_unchecked(index) }
220    }
221}
222
223impl<T> RelocatableContainer for RelocatableQueue<T> {
224    unsafe fn new_uninit(capacity: usize) -> Self {
225        Self {
226            data_ptr: RelocatablePointer::new_uninit(),
227            start: 0,
228            len: 0,
229            capacity,
230            is_initialized: AtomicBool::new(false),
231            _phantom_data: PhantomData,
232        }
233    }
234
235    unsafe fn init<Allocator: BaseAllocator>(
236        &mut self,
237        allocator: &Allocator,
238    ) -> Result<(), AllocationError> {
239        if self
240            .is_initialized
241            .load(core::sync::atomic::Ordering::Relaxed)
242        {
243            fatal_panic!(
244                from "Queue::init()",
245                "Memory already initialized. Initializing it twice may lead to undefined behavior."
246            );
247        }
248
249        self.data_ptr.init(fail!(from "Queue::init", when allocator
250             .allocate(Layout::from_size_align_unchecked(
251                 core::mem::size_of::<T>() * self.capacity,
252                 core::mem::align_of::<T>(),
253             )), "Failed to initialize queue since the allocation of the data memory failed."
254        ));
255        self.is_initialized
256            .store(true, core::sync::atomic::Ordering::Relaxed);
257
258        Ok(())
259    }
260
261    fn memory_size(capacity: usize) -> usize {
262        Self::const_memory_size(capacity)
263    }
264}
265
266unsafe impl<T: ZeroCopySend> ZeroCopySend for RelocatableQueue<T> {}
267
268impl<T> RelocatableQueue<T> {
269    /// Returns the required memory size for a queue with a specified capacity
270    pub const fn const_memory_size(capacity: usize) -> usize {
271        unaligned_mem_size::<T>(capacity)
272    }
273
274    /// Removes all elements from the queue
275    ///
276    /// # Safety
277    ///
278    ///  * [`Queue::init()`] must have been called once before
279    ///
280    pub unsafe fn clear(&mut self) {
281        self.clear_impl()
282    }
283
284    /// Returns a reference to the element from the beginning of the queue without removing it.
285    /// If the queue is empty it returns [`None`].
286    ///
287    /// # Safety
288    ///
289    ///  * [`Queue::init()`] must have been called once before
290    ///
291    pub fn peek(&self) -> Option<&T> {
292        unsafe { self.peek_impl() }
293    }
294
295    /// Returns a mutable reference to the element from the beginning of the queue without removing it.
296    /// If the queue is empty it returns [`None`].
297    ///
298    /// # Safety
299    ///
300    ///  * [`Queue::init()`] must have been called once before
301    ///
302    pub fn peek_mut(&mut self) -> Option<&mut T> {
303        unsafe { self.peek_mut_impl() }
304    }
305
306    /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
307    ///
308    /// # Safety
309    ///
310    ///  * [`Queue::init()`] must have been called once before
311    ///
312    pub unsafe fn pop(&mut self) -> Option<T> {
313        self.pop_impl()
314    }
315
316    /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
317    ///
318    /// # Safety
319    ///
320    ///  * [`Queue::init()`] must have been called once before
321    ///
322    pub unsafe fn push(&mut self, value: T) -> bool {
323        self.push_impl(value)
324    }
325
326    /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
327    /// otherwise [`None`].
328    ///
329    /// # Safety
330    ///
331    ///  * [`Queue::init()`] must have been called once before
332    ///
333    pub unsafe fn push_with_overflow(&mut self, value: T) -> Option<T> {
334        self.push_with_overflow_impl(value)
335    }
336}
337
338impl<T, Ptr: GenericPointer> MetaQueue<T, Ptr> {
339    #[inline(always)]
340    fn verify_init(&self, source: &str) {
341        debug_assert!(
342                self.is_initialized
343                    .load(core::sync::atomic::Ordering::Relaxed),
344                "From: MetaQueue<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.",
345                core::any::type_name::<T>(), source
346            );
347    }
348
349    /// Returns true if the queue is empty, otherwise false
350    pub fn is_empty(&self) -> bool {
351        self.len == 0
352    }
353
354    /// Returns the capacity of the queue
355    pub fn capacity(&self) -> usize {
356        self.capacity
357    }
358
359    /// Returns the number of elements inside the queue
360    pub fn len(&self) -> usize {
361        self.len
362    }
363
364    /// Returns true if the queue is full, otherwise false
365    pub fn is_full(&self) -> bool {
366        self.len() == self.capacity()
367    }
368
369    pub(crate) unsafe fn clear_impl(&mut self) {
370        while self.pop_impl().is_some() {}
371    }
372
373    pub(crate) unsafe fn peek_mut_impl(&mut self) -> Option<&mut T> {
374        self.verify_init("peek_mut()");
375
376        if self.is_empty() {
377            return None;
378        }
379
380        let index = (self.start - self.len) % self.capacity;
381
382        Some((*self.data_ptr.as_mut_ptr().add(index)).assume_init_mut())
383    }
384
385    pub(crate) unsafe fn peek_impl(&self) -> Option<&T> {
386        self.verify_init("peek()");
387
388        if self.is_empty() {
389            return None;
390        }
391
392        let index = (self.start - self.len) % self.capacity;
393
394        Some((*self.data_ptr.as_ptr().add(index)).assume_init_ref())
395    }
396
397    pub(crate) unsafe fn pop_impl(&mut self) -> Option<T> {
398        self.verify_init("pop()");
399
400        if self.is_empty() {
401            return None;
402        }
403
404        let index = (self.start - self.len) % self.capacity;
405        self.len -= 1;
406        let value = core::mem::replace(
407            &mut *self.data_ptr.as_mut_ptr().add(index),
408            MaybeUninit::uninit(),
409        );
410        Some(value.assume_init())
411    }
412
413    pub(crate) unsafe fn push_impl(&mut self, value: T) -> bool {
414        self.verify_init("push()");
415
416        if self.len == self.capacity {
417            return false;
418        }
419
420        self.unchecked_push(value);
421        true
422    }
423
424    pub(crate) unsafe fn push_with_overflow_impl(&mut self, value: T) -> Option<T> {
425        self.verify_init("push_with_overflow()");
426
427        let overridden_value = if self.len() == self.capacity() {
428            self.pop_impl()
429        } else {
430            None
431        };
432
433        self.unchecked_push(value);
434        overridden_value
435    }
436
437    unsafe fn unchecked_push(&mut self, value: T) {
438        let index = (self.start) % self.capacity;
439        self.data_ptr
440            .as_mut_ptr()
441            .add(index)
442            .write(MaybeUninit::new(value));
443        self.start += 1;
444        self.len += 1;
445    }
446}
447
448impl<T, Ptr: GenericPointer> Drop for MetaQueue<T, Ptr> {
449    fn drop(&mut self) {
450        if self
451            .is_initialized
452            .load(core::sync::atomic::Ordering::Relaxed)
453        {
454            unsafe { self.clear_impl() }
455        }
456    }
457}
458
459/// Relocatable queue with compile time fixed size capacity. In contrast to its counterpart the
460/// [`Queue`] it is movable.
461#[repr(C)]
462#[derive(Debug)]
463pub struct FixedSizeQueue<T, const CAPACITY: usize> {
464    state: RelocatableQueue<T>,
465    _data: [MaybeUninit<T>; CAPACITY],
466}
467
468unsafe impl<T: ZeroCopySend, const CAPACITY: usize> ZeroCopySend for FixedSizeQueue<T, CAPACITY> {}
469
470impl<T, const CAPACITY: usize> PlacementDefault for FixedSizeQueue<T, CAPACITY> {
471    unsafe fn placement_default(ptr: *mut Self) {
472        let state_ptr = core::ptr::addr_of_mut!((*ptr).state);
473        state_ptr.write(RelocatableQueue::new_uninit(CAPACITY));
474
475        let allocator = BumpAllocator::new((*ptr)._data.as_mut_ptr().cast());
476        (*ptr)
477            .state
478            .init(&allocator)
479            .expect("All required memory is preallocated.");
480    }
481}
482
483impl<T, const CAPACITY: usize> Default for FixedSizeQueue<T, CAPACITY> {
484    fn default() -> Self {
485        let mut new_self = Self {
486            state: unsafe { RelocatableQueue::new_uninit(CAPACITY) },
487            _data: unsafe { MaybeUninit::uninit().assume_init() },
488        };
489
490        let allocator = BumpAllocator::new(new_self._data.as_mut_ptr().cast());
491        unsafe {
492            new_self
493                .state
494                .init(&allocator)
495                .expect("All required memory is preallocated.")
496        };
497
498        new_self
499    }
500}
501
502unsafe impl<T: Send, const CAPACITY: usize> Send for FixedSizeQueue<T, CAPACITY> {}
503
504impl<T: Send + Copy + Debug + PartialEq, const CAPACITY: usize> PartialEq
505    for FixedSizeQueue<T, CAPACITY>
506{
507    fn eq(&self, other: &Self) -> bool {
508        if self.len() != other.len() {
509            return false;
510        }
511
512        for i in 0..self.len() {
513            if self.get(i) != other.get(i) {
514                return false;
515            }
516        }
517
518        true
519    }
520}
521
522impl<T, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
523    /// Creates a new queue.
524    pub fn new() -> Self {
525        Self::default()
526    }
527
528    /// Returns true if the queue is empty, otherwise false
529    pub fn is_empty(&self) -> bool {
530        self.state.is_empty()
531    }
532
533    /// Returns the capacity of the queue
534    pub fn capacity(&self) -> usize {
535        self.state.capacity()
536    }
537
538    /// Returns the number of elements inside the queue
539    pub fn len(&self) -> usize {
540        self.state.len()
541    }
542
543    /// Returns true if the queue is full, otherwise false
544    pub fn is_full(&self) -> bool {
545        self.state.is_full()
546    }
547
548    /// Removes all elements from the queue
549    pub fn clear(&mut self) {
550        unsafe { self.state.clear_impl() }
551    }
552
553    /// Returns a reference to the element from the beginning of the queue without removing it.
554    /// If the queue is empty it returns [`None`].
555    pub fn peek(&self) -> Option<&T> {
556        unsafe { self.state.peek_impl() }
557    }
558
559    /// Returns a mutable reference to the element from the beginning of the queue without removing it.
560    /// If the queue is empty it returns [`None`].
561    pub fn peek_mut(&mut self) -> Option<&mut T> {
562        unsafe { self.state.peek_mut_impl() }
563    }
564
565    /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
566    pub fn pop(&mut self) -> Option<T> {
567        unsafe { self.state.pop_impl() }
568    }
569
570    /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
571    pub fn push(&mut self, value: T) -> bool {
572        unsafe { self.state.push_impl(value) }
573    }
574
575    /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
576    /// otherwise [`None`].
577    pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
578        unsafe { self.state.push_with_overflow_impl(value) }
579    }
580}
581
582impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
583    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
584    /// element until [`FixedSizeQueue::len()`].
585    ///
586    /// # Safety
587    ///
588    ///  * The index must be not out of bounds
589    ///
590    pub unsafe fn get_unchecked(&self, index: usize) -> T {
591        self.state.get_unchecked(index)
592    }
593
594    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
595    /// element until [`FixedSizeQueue::len()`].
596    pub fn get(&self, index: usize) -> T {
597        self.state.get(index)
598    }
599}