iceoryx2_bb_threadsafe/
trigger_queue.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! A **threadsafe** queue which triggers a consumer when data arrived or triggers the producer
14//! when the queue is no longer full.
15//!
16//! # Example
17//!
18//! ```
19//! # extern crate iceoryx2_loggers;
20//!
21//! use std::thread;
22//! use iceoryx2_bb_threadsafe::trigger_queue::*;
23//!
24//! const CAPACITY: usize = 16;
25//!
26//! let mtx_handle = MutexHandle::new();
27//! let free_handle = UnnamedSemaphoreHandle::new();
28//! let used_handle = UnnamedSemaphoreHandle::new();
29//!
30//! let queue = TriggerQueue::<u64, CAPACITY>::new(&mtx_handle, &free_handle, &used_handle);
31//!
32//! thread::scope(|s| {
33//!     let consumer = s.spawn(|| {
34//!         for i in 0..10 {
35//!             println!("got: {}", queue.blocking_pop());
36//!         }
37//!     });
38//!
39//!     let producer = s.spawn(|| {
40//!         for i in 0..10 {
41//!             queue.blocking_push(i);
42//!             println!("pushed data {}", i);
43//!         }
44//!     });
45//! })
46//! ```
47pub use iceoryx2_bb_posix::mutex::*;
48pub use iceoryx2_bb_posix::semaphore::*;
49
50use core::{fmt::Debug, marker::PhantomData, time::Duration};
51use iceoryx2_bb_container::queue::FixedSizeQueue;
52use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
53use iceoryx2_log::fatal_panic;
54
55const INTER_PROCESS_SUPPORT: bool = true;
56
57#[derive(Debug)]
58pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
59    queue: Mutex<'a, 'a, FixedSizeQueue<T, CAPACITY>>,
60    free_slots: UnnamedSemaphore<'a>,
61    used_slots: UnnamedSemaphore<'a>,
62    _phantom_data: PhantomData<T>,
63}
64
65unsafe impl<T: Debug + ZeroCopySend, const CAPACITY: usize> ZeroCopySend
66    for TriggerQueue<'_, T, CAPACITY>
67{
68}
69
70impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
71    /// Creates a new [`TriggerQueue`] which uses the [`ClockType::default()`] in
72    /// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
73    pub fn new(
74        mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
75        free_handle: &'a UnnamedSemaphoreHandle,
76        used_handle: &'a UnnamedSemaphoreHandle,
77    ) -> Self {
78        Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
79    }
80
81    /// Creates a new [`TriggerQueue`] which uses the user provided clock in
82    /// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
83    pub fn new_with_custom_clock(
84        mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
85        free_handle: &'a UnnamedSemaphoreHandle,
86        used_handle: &'a UnnamedSemaphoreHandle,
87        clock_type: ClockType,
88    ) -> Self {
89        let msg = "Fatal failure while creating TriggerQueue";
90        Self {
91            queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
92                    .is_interprocess_capable(INTER_PROCESS_SUPPORT)
93                    .create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
94                    "{} since the mutex creation failed.", msg),
95            free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
96                    .initial_value(CAPACITY as u32)
97                    .clock_type(clock_type).create(free_handle),
98                    "{} since the free slots counting semaphore creation failed.", msg),
99            used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
100                    .initial_value(0)
101                    .clock_type(clock_type).create(used_handle),
102                    "{} since the free slots counting semaphore creation failed.", msg),
103            _phantom_data: PhantomData,
104        }
105    }
106
107    /// Tries to push a value into the queue. When the queue is full it returns false, otherwise
108    /// true.
109    pub fn try_push(&self, value: T) -> bool {
110        match self.free_slots.try_wait().unwrap() {
111            true => self.push(value),
112            false => false,
113        }
114    }
115
116    /// Tries to push a value into the queue until the timeout is reached. If the sample was
117    /// pushed into the queue it returns true, otherwise false.
118    pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
119        match self.free_slots.timed_wait(timeout).unwrap() {
120            true => self.push(value),
121            false => false,
122        }
123    }
124
125    /// Blocks the process until the value could be pushed into the queue.
126    pub fn blocking_push(&self, value: T) {
127        self.free_slots.blocking_wait().unwrap();
128        self.push(value);
129    }
130
131    /// Tries to pop a value out of the queue. If the queue was empty it returns [`None`]
132    /// otherwise the value packed inside the Option.
133    pub fn try_pop(&self) -> Option<T> {
134        match self.used_slots.try_wait().unwrap() {
135            true => self.pop(),
136            false => None,
137        }
138    }
139
140    /// Tries to pop a value out of the queue until the timeout is reached. If a value could not be
141    /// acquired it returns [`None`].
142    pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
143        match self.used_slots.timed_wait(timeout).unwrap() {
144            true => self.pop(),
145            false => None,
146        }
147    }
148
149    /// Blocks until a value could be acquired from the queue.
150    pub fn blocking_pop(&self) -> T {
151        self.used_slots.blocking_wait().unwrap();
152        self.pop().unwrap()
153    }
154
155    /// Empties the queue.
156    pub fn clear(&self) {
157        while self.try_pop().is_some() {}
158    }
159
160    /// Returns the capacity of the queue.
161    pub fn capacity(&self) -> usize {
162        CAPACITY
163    }
164
165    /// Returns the amount of values stored inside the queue
166    pub fn len(&self) -> usize {
167        fatal_panic!(from self, when self.queue.lock(),
168            "Failed to acquire mutex to acquire size")
169        .len()
170    }
171
172    /// Returns true if the queue is full, otherwise false
173    pub fn is_full(&self) -> bool {
174        fatal_panic!(from self, when self.queue.lock(),
175            "Failed to acquire mutex to acquire full state")
176        .is_full()
177    }
178
179    /// Returns true if the queue is empty, otherwise false
180    pub fn is_empty(&self) -> bool {
181        fatal_panic!(from self, when self.queue.lock(),
182            "Failed to acquire mutex to acquire empty state")
183        .is_empty()
184    }
185
186    fn push(&self, value: T) -> bool {
187        fatal_panic!(from self, when self.queue.lock(),
188            "Failed to acquire mutex to push")
189        .push(value);
190        self.used_slots.post().unwrap();
191        true
192    }
193
194    fn pop(&self) -> Option<T> {
195        let value = fatal_panic!(from self, when self.queue.lock(),
196            "Failed to acquire mutex to pop")
197        .pop();
198        self.free_slots.post().unwrap();
199        value
200    }
201}