iceoryx2_bb_container/queue.rs
1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! Three queue variations that are similar to [`alloc::collections::vec_deque::VecDeque`].
14//!
15//! * [`FixedSizeQueue`](crate::queue::FixedSizeQueue), compile-time fixed size queue that
16//! is self-contained.
17//! * [`RelocatableQueue`](crate::queue::RelocatableQueue), run-time fixed size queue that
18//! acquires the required memory from a custom user-provided allocator.
19//! * [`Queue`](crate::queue::Queue), run-time fixed size queue that uses by default
20//! heap memory.
21//!
22//! # Basic Examples
23//!
24//! ## Use the [`FixedSizeQueue`](crate::queue::FixedSizeQueue)
25//!
26//! ```
27//! # extern crate iceoryx2_bb_loggers;
28//!
29//! use iceoryx2_bb_container::queue::FixedSizeQueue;
30//!
31//! const QUEUE_CAPACITY: usize = 1;
32//! let mut queue = FixedSizeQueue::<u64, QUEUE_CAPACITY>::new();
33//!
34//! queue.push(123);
35//!
36//! // queue is full, we override the oldest element (123) with the new number (456)
37//! queue.push_with_overflow(456);
38//!
39//! println!("pop from queue {}", queue.pop().unwrap());
40//! ```
41//!
42//! ## Use the [`Queue`](crate::queue::Queue)
43//!
44//! ```
45//! # extern crate iceoryx2_bb_loggers;
46//!
47//! use iceoryx2_bb_container::queue::Queue;
48//!
49//! let queue_capacity = 1234;
50//! let mut queue = Queue::<u64>::new(queue_capacity);
51//!
52//! queue.push(123);
53//!
54//! println!("pop from queue {}", queue.pop().unwrap());
55//! ```
56//!
57//! # Advanced Examples
58//!
59//! ## Create [`RelocatableQueue`](crate::queue::RelocatableQueue) inside constructs which provides memory
60//!
61//! ```
62//! # extern crate iceoryx2_bb_loggers;
63//!
64//! use iceoryx2_bb_container::queue::RelocatableQueue;
65//! use iceoryx2_bb_elementary::math::align_to;
66//! use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
67//! use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
68//! use core::mem::MaybeUninit;
69//!
70//! const QUEUE_CAPACITY:usize = 12;
71//! struct MyConstruct {
72//! queue: RelocatableQueue<u128>,
73//! queue_memory: [MaybeUninit<u128>; QUEUE_CAPACITY],
74//! }
75//!
76//! impl MyConstruct {
77//! pub fn new() -> Self {
78//! let mut new_self = Self {
79//! queue: unsafe { RelocatableQueue::new_uninit(QUEUE_CAPACITY) },
80//! queue_memory: [const { MaybeUninit::uninit() }; QUEUE_CAPACITY] ,
81//! };
82//!
83//! let allocator = BumpAllocator::new(new_self.queue_memory.as_mut_ptr().cast());
84//! unsafe {
85//! new_self.queue.init(&allocator).expect("Enough memory provided.")
86//! };
87//! new_self
88//! }
89//! }
90//! ```
91//!
92//! ## Create [`RelocatableQueue`](crate::queue::RelocatableQueue) with allocator
93//!
94//! ```
95//! # extern crate iceoryx2_bb_loggers;
96//!
97//! use iceoryx2_bb_container::queue::RelocatableQueue;
98//! use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
99//! use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
100//! use core::ptr::NonNull;
101//!
102//! const QUEUE_CAPACITY:usize = 12;
103//! const MEM_SIZE: usize = RelocatableQueue::<u128>::const_memory_size(QUEUE_CAPACITY);
104//! let mut memory = [0u8; MEM_SIZE];
105//!
106//! let bump_allocator = BumpAllocator::new(memory.as_mut_ptr());
107//!
108//! let mut queue = unsafe { RelocatableQueue::<u128>::new_uninit(QUEUE_CAPACITY) };
109//! unsafe { queue.init(&bump_allocator).expect("queue init failed") };
110//! ```
111//!
112use core::marker::PhantomData;
113use core::{alloc::Layout, fmt::Debug, mem::MaybeUninit};
114use iceoryx2_bb_concurrency::atomic::AtomicBool;
115use iceoryx2_bb_elementary::bump_allocator::BumpAllocator;
116use iceoryx2_bb_elementary::math::unaligned_mem_size;
117use iceoryx2_bb_elementary::relocatable_ptr::{GenericRelocatablePointer, RelocatablePointer};
118use iceoryx2_bb_elementary_traits::allocator::{AllocationError, BaseAllocator};
119use iceoryx2_bb_elementary_traits::generic_pointer::GenericPointer;
120use iceoryx2_bb_elementary_traits::owning_pointer::{GenericOwningPointer, OwningPointer};
121use iceoryx2_bb_elementary_traits::placement_default::PlacementDefault;
122use iceoryx2_bb_elementary_traits::pointer_trait::PointerTrait;
123pub use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
124use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
125use iceoryx2_log::{fail, fatal_panic};
126
127/// Queue with run-time fixed size capacity. In contrast to its counterpart the
128/// [`RelocatableQueue`] it is movable but is not shared memory compatible.
129pub type Queue<T> = MetaQueue<T, GenericOwningPointer>;
130/// **Non-movable** relocatable queue with runtime fixed size capacity.
131pub type RelocatableQueue<T> = MetaQueue<T, GenericRelocatablePointer>;
132
133#[doc(hidden)]
134/// **Non-movable** relocatable queue with runtime fixed size capacity.
135#[repr(C)]
136#[derive(Debug)]
137pub struct MetaQueue<T, Ptr: GenericPointer> {
138 data_ptr: Ptr::Type<MaybeUninit<T>>,
139 start: usize,
140 len: usize,
141 capacity: usize,
142 is_initialized: AtomicBool,
143 _phantom_data: PhantomData<T>,
144}
145
146unsafe impl<T: Send, Ptr: GenericPointer> Send for MetaQueue<T, Ptr> {}
147
148impl<T> Queue<T> {
149 /// Creates a new [`Queue`] with the provided capacity
150 pub fn new(capacity: usize) -> Self {
151 Self {
152 data_ptr: OwningPointer::<MaybeUninit<T>>::new_with_alloc(capacity),
153 start: 0,
154 len: 0,
155 capacity,
156 is_initialized: AtomicBool::new(true),
157 _phantom_data: PhantomData,
158 }
159 }
160
161 /// Removes all elements from the queue
162 pub fn clear(&mut self) {
163 unsafe { self.clear_impl() }
164 }
165
166 /// Returns a reference to the element from the beginning of the queue without removing it.
167 /// If the queue is empty it returns [`None`].
168 pub fn peek(&self) -> Option<&T> {
169 unsafe { self.peek_impl() }
170 }
171
172 /// Returns a mutable reference to the element from the beginning of the queue without removing it.
173 /// If the queue is empty it returns [`None`].
174 pub fn peek_mut(&mut self) -> Option<&mut T> {
175 unsafe { self.peek_mut_impl() }
176 }
177
178 /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
179 pub fn pop(&mut self) -> Option<T> {
180 unsafe { self.pop_impl() }
181 }
182
183 /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
184 pub fn push(&mut self, value: T) -> bool {
185 unsafe { self.push_impl(value) }
186 }
187
188 /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
189 /// otherwise [`None`].
190 pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
191 unsafe { self.push_with_overflow_impl(value) }
192 }
193}
194
195impl<T: Copy + Debug, Ptr: GenericPointer + Debug> MetaQueue<T, Ptr> {
196 /// Returns a copy of the element stored at index. The index is starting by 0 for the first
197 /// element until [`Queue::len()`].
198 ///
199 /// # Safety
200 ///
201 /// * Must satisfy `index` < [`Queue::len()`]
202 pub unsafe fn get_unchecked(&self, index: usize) -> T {
203 unsafe {
204 (*self
205 .data_ptr
206 .as_ptr()
207 .add((self.start - self.len + index) % self.capacity))
208 .assume_init()
209 }
210 }
211
212 /// Returns a copy of the element stored at index. The index is starting by 0 for the first
213 /// element until [`Queue::len()`]queue_memory
214 pub fn get(&self, index: usize) -> T {
215 if self.len() <= index {
216 fatal_panic!(from self, "Unable to copy content since the index {} is out of range.", index);
217 }
218
219 unsafe { self.get_unchecked(index) }
220 }
221}
222
223impl<T> RelocatableContainer for RelocatableQueue<T> {
224 unsafe fn new_uninit(capacity: usize) -> Self {
225 Self {
226 data_ptr: unsafe { RelocatablePointer::new_uninit() },
227 start: 0,
228 len: 0,
229 capacity,
230 is_initialized: AtomicBool::new(false),
231 _phantom_data: PhantomData,
232 }
233 }
234
235 unsafe fn init<Allocator: BaseAllocator>(
236 &mut self,
237 allocator: &Allocator,
238 ) -> Result<(), AllocationError> {
239 if self
240 .is_initialized
241 .load(core::sync::atomic::Ordering::Relaxed)
242 {
243 fatal_panic!(
244 from "Queue::init()",
245 "Memory already initialized. Initializing it twice may lead to undefined behavior."
246 );
247 }
248 unsafe {
249 self.data_ptr.init(fail!(from "Queue::init", when allocator
250 .allocate(Layout::from_size_align_unchecked(
251 core::mem::size_of::<T>() * self.capacity,
252 core::mem::align_of::<T>(),
253 )), "Failed to initialize queue since the allocation of the data memory failed."
254 ));
255 }
256 self.is_initialized
257 .store(true, core::sync::atomic::Ordering::Relaxed);
258
259 Ok(())
260 }
261
262 fn memory_size(capacity: usize) -> usize {
263 Self::const_memory_size(capacity)
264 }
265}
266
267unsafe impl<T: ZeroCopySend> ZeroCopySend for RelocatableQueue<T> {}
268
269impl<T> RelocatableQueue<T> {
270 /// Returns the required memory size for a queue with a specified capacity
271 pub const fn const_memory_size(capacity: usize) -> usize {
272 unaligned_mem_size::<T>(capacity)
273 }
274
275 /// Removes all elements from the queue
276 ///
277 /// # Safety
278 ///
279 /// * [`Queue::init()`] must have been called once before
280 ///
281 pub unsafe fn clear(&mut self) {
282 unsafe { self.clear_impl() }
283 }
284
285 /// Returns a reference to the element from the beginning of the queue without removing it.
286 /// If the queue is empty it returns [`None`].
287 ///
288 /// # Safety
289 ///
290 /// * [`Queue::init()`] must have been called once before
291 ///
292 pub fn peek(&self) -> Option<&T> {
293 unsafe { self.peek_impl() }
294 }
295
296 /// Returns a mutable reference to the element from the beginning of the queue without removing it.
297 /// If the queue is empty it returns [`None`].
298 ///
299 /// # Safety
300 ///
301 /// * [`Queue::init()`] must have been called once before
302 ///
303 pub fn peek_mut(&mut self) -> Option<&mut T> {
304 unsafe { self.peek_mut_impl() }
305 }
306
307 /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
308 ///
309 /// # Safety
310 ///
311 /// * [`Queue::init()`] must have been called once before
312 ///
313 pub unsafe fn pop(&mut self) -> Option<T> {
314 unsafe { self.pop_impl() }
315 }
316
317 /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
318 ///
319 /// # Safety
320 ///
321 /// * [`Queue::init()`] must have been called once before
322 ///
323 pub unsafe fn push(&mut self, value: T) -> bool {
324 unsafe { self.push_impl(value) }
325 }
326
327 /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
328 /// otherwise [`None`].
329 ///
330 /// # Safety
331 ///
332 /// * [`Queue::init()`] must have been called once before
333 ///
334 pub unsafe fn push_with_overflow(&mut self, value: T) -> Option<T> {
335 unsafe { self.push_with_overflow_impl(value) }
336 }
337}
338
339impl<T, Ptr: GenericPointer> MetaQueue<T, Ptr> {
340 #[inline(always)]
341 fn verify_init(&self, source: &str) {
342 debug_assert!(
343 self.is_initialized
344 .load(core::sync::atomic::Ordering::Relaxed),
345 "From: MetaQueue<{}>::{}, Undefined behavior - the object was not initialized with 'init' before.",
346 core::any::type_name::<T>(),
347 source
348 );
349 }
350
351 /// Returns true if the queue is empty, otherwise false
352 pub fn is_empty(&self) -> bool {
353 self.len == 0
354 }
355
356 /// Returns the capacity of the queue
357 pub fn capacity(&self) -> usize {
358 self.capacity
359 }
360
361 /// Returns the number of elements inside the queue
362 pub fn len(&self) -> usize {
363 self.len
364 }
365
366 /// Returns true if the queue is full, otherwise false
367 pub fn is_full(&self) -> bool {
368 self.len() == self.capacity()
369 }
370
371 pub(crate) unsafe fn clear_impl(&mut self) {
372 unsafe { while self.pop_impl().is_some() {} }
373 }
374
375 pub(crate) unsafe fn peek_mut_impl(&mut self) -> Option<&mut T> {
376 self.verify_init("peek_mut()");
377
378 if self.is_empty() {
379 return None;
380 }
381
382 let index = (self.start - self.len) % self.capacity;
383 unsafe { Some((*self.data_ptr.as_mut_ptr().add(index)).assume_init_mut()) }
384 }
385
386 pub(crate) unsafe fn peek_impl(&self) -> Option<&T> {
387 self.verify_init("peek()");
388
389 if self.is_empty() {
390 return None;
391 }
392
393 let index = (self.start - self.len) % self.capacity;
394 unsafe { Some((*self.data_ptr.as_ptr().add(index)).assume_init_ref()) }
395 }
396
397 pub(crate) unsafe fn pop_impl(&mut self) -> Option<T> {
398 self.verify_init("pop()");
399
400 if self.is_empty() {
401 return None;
402 }
403
404 let index = (self.start - self.len) % self.capacity;
405 self.len -= 1;
406 unsafe {
407 let value = core::mem::replace(
408 &mut *self.data_ptr.as_mut_ptr().add(index),
409 MaybeUninit::uninit(),
410 );
411
412 Some(value.assume_init())
413 }
414 }
415
416 pub(crate) unsafe fn push_impl(&mut self, value: T) -> bool {
417 self.verify_init("push()");
418
419 if self.len == self.capacity {
420 return false;
421 }
422 unsafe {
423 self.unchecked_push(value);
424 }
425 true
426 }
427
428 pub(crate) unsafe fn push_with_overflow_impl(&mut self, value: T) -> Option<T> {
429 self.verify_init("push_with_overflow()");
430
431 let overridden_value = if self.len() == self.capacity() {
432 unsafe { self.pop_impl() }
433 } else {
434 None
435 };
436
437 unsafe {
438 self.unchecked_push(value);
439 }
440 overridden_value
441 }
442
443 unsafe fn unchecked_push(&mut self, value: T) {
444 let index = (self.start) % self.capacity;
445 unsafe {
446 self.data_ptr
447 .as_mut_ptr()
448 .add(index)
449 .write(MaybeUninit::new(value));
450 }
451 self.start += 1;
452 self.len += 1;
453 }
454}
455
456impl<T, Ptr: GenericPointer> Drop for MetaQueue<T, Ptr> {
457 fn drop(&mut self) {
458 if self
459 .is_initialized
460 .load(core::sync::atomic::Ordering::Relaxed)
461 {
462 unsafe { self.clear_impl() }
463 }
464 }
465}
466
467/// Relocatable queue with compile time fixed size capacity. In contrast to its counterpart the
468/// [`Queue`] it is movable.
469#[repr(C)]
470#[derive(Debug)]
471pub struct FixedSizeQueue<T, const CAPACITY: usize> {
472 state: RelocatableQueue<T>,
473 _data: [MaybeUninit<T>; CAPACITY],
474}
475
476unsafe impl<T: ZeroCopySend, const CAPACITY: usize> ZeroCopySend for FixedSizeQueue<T, CAPACITY> {}
477
478impl<T, const CAPACITY: usize> PlacementDefault for FixedSizeQueue<T, CAPACITY> {
479 unsafe fn placement_default(ptr: *mut Self) {
480 unsafe {
481 let state_ptr = core::ptr::addr_of_mut!((*ptr).state);
482 state_ptr.write(RelocatableQueue::new_uninit(CAPACITY));
483
484 let allocator = BumpAllocator::new((*ptr)._data.as_mut_ptr().cast());
485 (*ptr)
486 .state
487 .init(&allocator)
488 .expect("All required memory is preallocated.");
489 }
490 }
491}
492
493impl<T, const CAPACITY: usize> Default for FixedSizeQueue<T, CAPACITY> {
494 fn default() -> Self {
495 let mut new_self = Self {
496 state: unsafe { RelocatableQueue::new_uninit(CAPACITY) },
497 _data: unsafe { MaybeUninit::uninit().assume_init() },
498 };
499
500 let allocator = BumpAllocator::new(new_self._data.as_mut_ptr().cast());
501 unsafe {
502 new_self
503 .state
504 .init(&allocator)
505 .expect("All required memory is preallocated.")
506 };
507
508 new_self
509 }
510}
511
512unsafe impl<T: Send, const CAPACITY: usize> Send for FixedSizeQueue<T, CAPACITY> {}
513
514impl<T: Send + Copy + Debug + PartialEq, const CAPACITY: usize> PartialEq
515 for FixedSizeQueue<T, CAPACITY>
516{
517 fn eq(&self, other: &Self) -> bool {
518 if self.len() != other.len() {
519 return false;
520 }
521
522 for i in 0..self.len() {
523 if self.get(i) != other.get(i) {
524 return false;
525 }
526 }
527
528 true
529 }
530}
531
532impl<T, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
533 /// Creates a new queue.
534 pub fn new() -> Self {
535 Self::default()
536 }
537
538 /// Returns true if the queue is empty, otherwise false
539 pub fn is_empty(&self) -> bool {
540 self.state.is_empty()
541 }
542
543 /// Returns the capacity of the queue
544 pub fn capacity(&self) -> usize {
545 self.state.capacity()
546 }
547
548 /// Returns the number of elements inside the queue
549 pub fn len(&self) -> usize {
550 self.state.len()
551 }
552
553 /// Returns true if the queue is full, otherwise false
554 pub fn is_full(&self) -> bool {
555 self.state.is_full()
556 }
557
558 /// Removes all elements from the queue
559 pub fn clear(&mut self) {
560 unsafe { self.state.clear_impl() }
561 }
562
563 /// Returns a reference to the element from the beginning of the queue without removing it.
564 /// If the queue is empty it returns [`None`].
565 pub fn peek(&self) -> Option<&T> {
566 unsafe { self.state.peek_impl() }
567 }
568
569 /// Returns a mutable reference to the element from the beginning of the queue without removing it.
570 /// If the queue is empty it returns [`None`].
571 pub fn peek_mut(&mut self) -> Option<&mut T> {
572 unsafe { self.state.peek_mut_impl() }
573 }
574
575 /// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
576 pub fn pop(&mut self) -> Option<T> {
577 unsafe { self.state.pop_impl() }
578 }
579
580 /// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
581 pub fn push(&mut self, value: T) -> bool {
582 unsafe { self.state.push_impl(value) }
583 }
584
585 /// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
586 /// otherwise [`None`].
587 pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
588 unsafe { self.state.push_with_overflow_impl(value) }
589 }
590}
591
592impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
593 /// Returns a copy of the element stored at index. The index is starting by 0 for the first
594 /// element until [`FixedSizeQueue::len()`].
595 ///
596 /// # Safety
597 ///
598 /// * The index must be not out of bounds
599 ///
600 pub unsafe fn get_unchecked(&self, index: usize) -> T {
601 unsafe { self.state.get_unchecked(index) }
602 }
603
604 /// Returns a copy of the element stored at index. The index is starting by 0 for the first
605 /// element until [`FixedSizeQueue::len()`].
606 pub fn get(&self, index: usize) -> T {
607 self.state.get(index)
608 }
609}