Skip to main content

iceoryx2_bb_container/
queue.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! Three queue variations that are similar to [`alloc::collections::vec_deque::VecDeque`].
14//!
15//!  * [`FixedSizeQueue`](crate::queue::FixedSizeQueue), compile-time fixed size queue that
16//!     is self-contained.
17//!  * [`RelocatableQueue`](crate::queue::RelocatableQueue), run-time fixed size queue that
18//!     acquires the required memory from a custom user-provided allocator.
19//!  * [`Queue`](crate::queue::Queue), run-time fixed size queue that uses by default
20//!     heap memory.
21//!
22//! # Basic Examples
23//!
24//! ## Use the [`FixedSizeQueue`](crate::queue::FixedSizeQueue)
25//!
26//! ```
27//! # extern crate iceoryx2_bb_loggers;
28//!
29//! use iceoryx2_bb_container::queue::FixedSizeQueue;
30//!
31//! const QUEUE_CAPACITY: usize = 1;
32//! let mut queue = FixedSizeQueue::<u64, QUEUE_CAPACITY>::new();
33//!
34//! queue.push(123);
35//!
36//! // queue is full, we override the oldest element (123) with the new number (456)
37//! queue.push_with_overflow(456);
38//!
39//! println!("pop from queue {}", queue.pop().unwrap());
40//! ```
41//!
42//! ## Use the [`Queue`](crate::queue::Queue)
43//!
44//! ```
45//! # extern crate iceoryx2_bb_loggers;
46//!
47//! use iceoryx2_bb_container::queue::Queue;
48//!
49//! let queue_capacity = 1234;
50//! let mut queue = Queue::<u64>::new(queue_capacity);
51//!
52//! queue.push(123);
53//!
54//! println!("pop from queue {}", queue.pop().unwrap());
55//! ```
56//!
57//! # Advanced Examples
58//!
59//! ## Create [`RelocatableQueue`](crate::queue::RelocatableQueue) inside constructs which provides memory
60//!
61//! ```
62//! # extern crate iceoryx2_bb_loggers;
63//!
64//! use iceoryx2_bb_container::queue::RelocatableQueue;
65//! use iceoryx2_bb_elementary::math::align_to;
66//! use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
67//! use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
68//! use core::mem::MaybeUninit;
69//!
70//! const QUEUE_CAPACITY:usize = 12;
71//! struct MyConstruct {
72//!     queue: RelocatableQueue<u128>,
73//!     queue_memory: [MaybeUninit<u128>; QUEUE_CAPACITY],
74//! }
75//!
76//! impl MyConstruct {
77//!     pub fn new() -> Self {
78//!         let mut new_self = Self {
79//!             queue: unsafe { RelocatableQueue::new_uninit(QUEUE_CAPACITY) },
80//!             queue_memory: [const { MaybeUninit::uninit() }; QUEUE_CAPACITY] ,
81//!         };
82//!
83//!         let allocator = BumpAllocator::new(new_self.queue_memory.as_mut_ptr().cast());
84//!         unsafe {
85//!             new_self.queue.init(&allocator).expect("Enough memory provided.")
86//!         };
87//!         new_self
88//!     }
89//! }
90//! ```
91//!
92//! ## Create [`RelocatableQueue`](crate::queue::RelocatableQueue) with allocator
93//!
94//! ```
95//! # extern crate iceoryx2_bb_loggers;
96//!
97//! use iceoryx2_bb_container::queue::RelocatableQueue;
98//! use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
99//! use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
100//! use core::ptr::NonNull;
101//!
102//! const QUEUE_CAPACITY:usize = 12;
103//! const MEM_SIZE: usize = RelocatableQueue::<u128>::const_memory_size(QUEUE_CAPACITY);
104//! let mut memory = [0u8; MEM_SIZE];
105//!
106//! let bump_allocator = BumpAllocator::new(memory.as_mut_ptr());
107//!
108//! let mut queue = unsafe { RelocatableQueue::<u128>::new_uninit(QUEUE_CAPACITY) };
109//! unsafe { queue.init(&bump_allocator).expect("queue init failed") };
110//! ```
111//!
112use core::marker::PhantomData;
113use core::{alloc::Layout, fmt::Debug, mem::MaybeUninit};
114use iceoryx2_bb_concurrency::atomic::AtomicBool;
115use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
116use iceoryx2_bb_elementary::math::unaligned_mem_size;
117use iceoryx2_bb_elementary::relocatable_ptr::{GenericRelocatablePointer, RelocatablePointer};
118use iceoryx2_bb_elementary_traits::allocator::{AllocationError, BaseAllocator};
119use iceoryx2_bb_elementary_traits::generic_pointer::GenericPointer;
120use iceoryx2_bb_elementary_traits::owning_pointer::{GenericOwningPointer, OwningPointer};
121use iceoryx2_bb_elementary_traits::placement_default::PlacementDefault;
122use iceoryx2_bb_elementary_traits::pointer_trait::PointerTrait;
123pub use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
124use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
125use iceoryx2_log::{fail, fatal_panic};
126
127/// Queue with run-time fixed size capacity. In contrast to its counterpart the
128/// [`RelocatableQueue`] it is movable but is not shared memory compatible.
129pub type Queue<T> = MetaQueue<T, GenericOwningPointer>;
130/// **Non-movable** relocatable queue with runtime fixed size capacity.
131pub type RelocatableQueue<T> = MetaQueue<T, GenericRelocatablePointer>;
132
133#[doc(hidden)]
134/// **Non-movable** relocatable queue with runtime fixed size capacity.
135#[repr(C)]
136#[derive(Debug)]
137pub struct MetaQueue<T, Ptr: GenericPointer> {
138    data_ptr: Ptr::Type<MaybeUninit<T>>,
139    start: usize,
140    len: usize,
141    capacity: usize,
142    is_initialized: AtomicBool,
143    _phantom_data: PhantomData<T>,
144}
145
146unsafe impl<T: Send, Ptr: GenericPointer> Send for MetaQueue<T, Ptr> {}
147
148impl<T> Queue<T> {
149    /// Creates a new [`Queue`] with the provided capacity
150    pub fn new(capacity: usize) -> Self {
151        Self {
152            data_ptr: OwningPointer::<MaybeUninit<T>>::new_with_alloc(capacity),
153            start: 0,
154            len: 0,
155            capacity,
156            is_initialized: AtomicBool::new(true),
157            _phantom_data: PhantomData,
158        }
159    }
160
161    /// Removes all elements from the queue
162    pub fn clear(&mut self) {
163        unsafe { self.clear_impl() }
164    }
165
166    /// Returns a reference to the element from the beginning of the queue without removing it.
167    /// If the queue is empty it returns [`None`].
168    pub fn peek(&self) -> Option<&T> {
169        unsafe { self.peek_impl() }
170    }
171
172    /// Returns a mutable reference to the element from the beginning of the queue without removing it.
173    /// If the queue is empty it returns [`None`].
174    pub fn peek_mut(&mut self) -> Option<&mut T> {
175        unsafe { self.peek_mut_impl() }
176    }
177
178    /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
179    pub fn pop(&mut self) -> Option<T> {
180        unsafe { self.pop_impl() }
181    }
182
183    /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
184    pub fn push(&mut self, value: T) -> bool {
185        unsafe { self.push_impl(value) }
186    }
187
188    /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
189    /// otherwise [`None`].
190    pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
191        unsafe { self.push_with_overflow_impl(value) }
192    }
193}
194
195impl<T: Copy + Debug, Ptr: GenericPointer + Debug> MetaQueue<T, Ptr> {
196    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
197    /// element until [`Queue::len()`].
198    ///
199    /// # Safety
200    ///
201    ///   * Must satisfy `index` < [`Queue::len()`]
202    pub unsafe fn get_unchecked(&self, index: usize) -> T {
203        unsafe {
204            (*self
205                .data_ptr
206                .as_ptr()
207                .add((self.start - self.len + index) % self.capacity))
208            .assume_init()
209        }
210    }
211
212    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
213    /// element until [`Queue::len()`]queue_memory
214    pub fn get(&self, index: usize) -> T {
215        if self.len() <= index {
216            fatal_panic!(from self, "Unable to copy content since the index {} is out of range.", index);
217        }
218
219        unsafe { self.get_unchecked(index) }
220    }
221}
222
223impl<T> RelocatableContainer for RelocatableQueue<T> {
224    unsafe fn new_uninit(capacity: usize) -> Self {
225        Self {
226            data_ptr: unsafe { RelocatablePointer::new_uninit() },
227            start: 0,
228            len: 0,
229            capacity,
230            is_initialized: AtomicBool::new(false),
231            _phantom_data: PhantomData,
232        }
233    }
234
235    unsafe fn init<Allocator: BaseAllocator>(
236        &mut self,
237        allocator: &Allocator,
238    ) -> Result<(), AllocationError> {
239        if self
240            .is_initialized
241            .load(core::sync::atomic::Ordering::Relaxed)
242        {
243            fatal_panic!(
244                from "Queue::init()",
245                "Memory already initialized. Initializing it twice may lead to undefined behavior."
246            );
247        }
248        unsafe {
249            self.data_ptr.init(fail!(from "Queue::init", when allocator
250                 .allocate(Layout::from_size_align_unchecked(
251                     core::mem::size_of::<T>() * self.capacity,
252                     core::mem::align_of::<T>(),
253                 )), "Failed to initialize queue since the allocation of the data memory failed."
254            ));
255        }
256        self.is_initialized
257            .store(true, core::sync::atomic::Ordering::Relaxed);
258
259        Ok(())
260    }
261
262    fn memory_size(capacity: usize) -> usize {
263        Self::const_memory_size(capacity)
264    }
265}
266
267unsafe impl<T: ZeroCopySend> ZeroCopySend for RelocatableQueue<T> {}
268
269impl<T> RelocatableQueue<T> {
270    /// Returns the required memory size for a queue with a specified capacity
271    pub const fn const_memory_size(capacity: usize) -> usize {
272        unaligned_mem_size::<T>(capacity)
273    }
274
275    /// Removes all elements from the queue
276    ///
277    /// # Safety
278    ///
279    ///  * [`Queue::init()`] must have been called once before
280    ///
281    pub unsafe fn clear(&mut self) {
282        unsafe { self.clear_impl() }
283    }
284
285    /// Returns a reference to the element from the beginning of the queue without removing it.
286    /// If the queue is empty it returns [`None`].
287    ///
288    /// # Safety
289    ///
290    ///  * [`Queue::init()`] must have been called once before
291    ///
292    pub fn peek(&self) -> Option<&T> {
293        unsafe { self.peek_impl() }
294    }
295
296    /// Returns a mutable reference to the element from the beginning of the queue without removing it.
297    /// If the queue is empty it returns [`None`].
298    ///
299    /// # Safety
300    ///
301    ///  * [`Queue::init()`] must have been called once before
302    ///
303    pub fn peek_mut(&mut self) -> Option<&mut T> {
304        unsafe { self.peek_mut_impl() }
305    }
306
307    /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
308    ///
309    /// # Safety
310    ///
311    ///  * [`Queue::init()`] must have been called once before
312    ///
313    pub unsafe fn pop(&mut self) -> Option<T> {
314        unsafe { self.pop_impl() }
315    }
316
317    /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
318    ///
319    /// # Safety
320    ///
321    ///  * [`Queue::init()`] must have been called once before
322    ///
323    pub unsafe fn push(&mut self, value: T) -> bool {
324        unsafe { self.push_impl(value) }
325    }
326
327    /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
328    /// otherwise [`None`].
329    ///
330    /// # Safety
331    ///
332    ///  * [`Queue::init()`] must have been called once before
333    ///
334    pub unsafe fn push_with_overflow(&mut self, value: T) -> Option<T> {
335        unsafe { self.push_with_overflow_impl(value) }
336    }
337}
338
339impl<T, Ptr: GenericPointer> MetaQueue<T, Ptr> {
340    #[inline(always)]
341    fn verify_init(&self, source: &str) {
342        debug_assert!(
343            self.is_initialized
344                .load(core::sync::atomic::Ordering::Relaxed),
345            "From: MetaQueue<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.",
346            core::any::type_name::<T>(),
347            source
348        );
349    }
350
351    /// Returns true if the queue is empty, otherwise false
352    pub fn is_empty(&self) -> bool {
353        self.len == 0
354    }
355
356    /// Returns the capacity of the queue
357    pub fn capacity(&self) -> usize {
358        self.capacity
359    }
360
361    /// Returns the number of elements inside the queue
362    pub fn len(&self) -> usize {
363        self.len
364    }
365
366    /// Returns true if the queue is full, otherwise false
367    pub fn is_full(&self) -> bool {
368        self.len() == self.capacity()
369    }
370
371    pub(crate) unsafe fn clear_impl(&mut self) {
372        unsafe { while self.pop_impl().is_some() {} }
373    }
374
375    pub(crate) unsafe fn peek_mut_impl(&mut self) -> Option<&mut T> {
376        self.verify_init("peek_mut()");
377
378        if self.is_empty() {
379            return None;
380        }
381
382        let index = (self.start - self.len) % self.capacity;
383        unsafe { Some((*self.data_ptr.as_mut_ptr().add(index)).assume_init_mut()) }
384    }
385
386    pub(crate) unsafe fn peek_impl(&self) -> Option<&T> {
387        self.verify_init("peek()");
388
389        if self.is_empty() {
390            return None;
391        }
392
393        let index = (self.start - self.len) % self.capacity;
394        unsafe { Some((*self.data_ptr.as_ptr().add(index)).assume_init_ref()) }
395    }
396
397    pub(crate) unsafe fn pop_impl(&mut self) -> Option<T> {
398        self.verify_init("pop()");
399
400        if self.is_empty() {
401            return None;
402        }
403
404        let index = (self.start - self.len) % self.capacity;
405        self.len -= 1;
406        unsafe {
407            let value = core::mem::replace(
408                &mut *self.data_ptr.as_mut_ptr().add(index),
409                MaybeUninit::uninit(),
410            );
411
412            Some(value.assume_init())
413        }
414    }
415
416    pub(crate) unsafe fn push_impl(&mut self, value: T) -> bool {
417        self.verify_init("push()");
418
419        if self.len == self.capacity {
420            return false;
421        }
422        unsafe {
423            self.unchecked_push(value);
424        }
425        true
426    }
427
428    pub(crate) unsafe fn push_with_overflow_impl(&mut self, value: T) -> Option<T> {
429        self.verify_init("push_with_overflow()");
430
431        let overridden_value = if self.len() == self.capacity() {
432            unsafe { self.pop_impl() }
433        } else {
434            None
435        };
436
437        unsafe {
438            self.unchecked_push(value);
439        }
440        overridden_value
441    }
442
443    unsafe fn unchecked_push(&mut self, value: T) {
444        let index = (self.start) % self.capacity;
445        unsafe {
446            self.data_ptr
447                .as_mut_ptr()
448                .add(index)
449                .write(MaybeUninit::new(value));
450        }
451        self.start += 1;
452        self.len += 1;
453    }
454}
455
456impl<T, Ptr: GenericPointer> Drop for MetaQueue<T, Ptr> {
457    fn drop(&mut self) {
458        if self
459            .is_initialized
460            .load(core::sync::atomic::Ordering::Relaxed)
461        {
462            unsafe { self.clear_impl() }
463        }
464    }
465}
466
467/// Relocatable queue with compile time fixed size capacity. In contrast to its counterpart the
468/// [`Queue`] it is movable.
469#[repr(C)]
470#[derive(Debug)]
471pub struct FixedSizeQueue<T, const CAPACITY: usize> {
472    state: RelocatableQueue<T>,
473    _data: [MaybeUninit<T>; CAPACITY],
474}
475
476unsafe impl<T: ZeroCopySend, const CAPACITY: usize> ZeroCopySend for FixedSizeQueue<T, CAPACITY> {}
477
478impl<T, const CAPACITY: usize> PlacementDefault for FixedSizeQueue<T, CAPACITY> {
479    unsafe fn placement_default(ptr: *mut Self) {
480        unsafe {
481            let state_ptr = core::ptr::addr_of_mut!((*ptr).state);
482            state_ptr.write(RelocatableQueue::new_uninit(CAPACITY));
483
484            let allocator = BumpAllocator::new((*ptr)._data.as_mut_ptr().cast());
485            (*ptr)
486                .state
487                .init(&allocator)
488                .expect("All required memory is preallocated.");
489        }
490    }
491}
492
493impl<T, const CAPACITY: usize> Default for FixedSizeQueue<T, CAPACITY> {
494    fn default() -> Self {
495        let mut new_self = Self {
496            state: unsafe { RelocatableQueue::new_uninit(CAPACITY) },
497            _data: unsafe { MaybeUninit::uninit().assume_init() },
498        };
499
500        let allocator = BumpAllocator::new(new_self._data.as_mut_ptr().cast());
501        unsafe {
502            new_self
503                .state
504                .init(&allocator)
505                .expect("All required memory is preallocated.")
506        };
507
508        new_self
509    }
510}
511
512unsafe impl<T: Send, const CAPACITY: usize> Send for FixedSizeQueue<T, CAPACITY> {}
513
514impl<T: Send + Copy + Debug + PartialEq, const CAPACITY: usize> PartialEq
515    for FixedSizeQueue<T, CAPACITY>
516{
517    fn eq(&self, other: &Self) -> bool {
518        if self.len() != other.len() {
519            return false;
520        }
521
522        for i in 0..self.len() {
523            if self.get(i) != other.get(i) {
524                return false;
525            }
526        }
527
528        true
529    }
530}
531
532impl<T, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
533    /// Creates a new queue.
534    pub fn new() -> Self {
535        Self::default()
536    }
537
538    /// Returns true if the queue is empty, otherwise false
539    pub fn is_empty(&self) -> bool {
540        self.state.is_empty()
541    }
542
543    /// Returns the capacity of the queue
544    pub fn capacity(&self) -> usize {
545        self.state.capacity()
546    }
547
548    /// Returns the number of elements inside the queue
549    pub fn len(&self) -> usize {
550        self.state.len()
551    }
552
553    /// Returns true if the queue is full, otherwise false
554    pub fn is_full(&self) -> bool {
555        self.state.is_full()
556    }
557
558    /// Removes all elements from the queue
559    pub fn clear(&mut self) {
560        unsafe { self.state.clear_impl() }
561    }
562
563    /// Returns a reference to the element from the beginning of the queue without removing it.
564    /// If the queue is empty it returns [`None`].
565    pub fn peek(&self) -> Option<&T> {
566        unsafe { self.state.peek_impl() }
567    }
568
569    /// Returns a mutable reference to the element from the beginning of the queue without removing it.
570    /// If the queue is empty it returns [`None`].
571    pub fn peek_mut(&mut self) -> Option<&mut T> {
572        unsafe { self.state.peek_mut_impl() }
573    }
574
575    /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
576    pub fn pop(&mut self) -> Option<T> {
577        unsafe { self.state.pop_impl() }
578    }
579
580    /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
581    pub fn push(&mut self, value: T) -> bool {
582        unsafe { self.state.push_impl(value) }
583    }
584
585    /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
586    /// otherwise [`None`].
587    pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
588        unsafe { self.state.push_with_overflow_impl(value) }
589    }
590}
591
592impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
593    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
594    /// element until [`FixedSizeQueue::len()`].
595    ///
596    /// # Safety
597    ///
598    ///  * The index must be not out of bounds
599    ///
600    pub unsafe fn get_unchecked(&self, index: usize) -> T {
601        unsafe { self.state.get_unchecked(index) }
602    }
603
604    /// Returns a copy of the element stored at index. The index is starting by 0 for the first
605    /// element until [`FixedSizeQueue::len()`].
606    pub fn get(&self, index: usize) -> T {
607        self.state.get(index)
608    }
609}