iceoryx2_bb_threadsafe/
trigger_queue.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! A **threadsafe** queue which triggers a consumer when data arrived or triggers the producer
14//! when the queue is no longer full.
15//!
16//! # Example
17//!
18//! ```
19//! use std::thread;
20//! use iceoryx2_bb_threadsafe::trigger_queue::*;
21//!
22//! const CAPACITY: usize = 16;
23//!
24//! let mtx_handle = MutexHandle::new();
25//! let free_handle = UnnamedSemaphoreHandle::new();
26//! let used_handle = UnnamedSemaphoreHandle::new();
27//!
28//! let queue = TriggerQueue::<u64, CAPACITY>::new(&mtx_handle, &free_handle, &used_handle);
29//!
30//! thread::scope(|s| {
31//!     let consumer = s.spawn(|| {
32//!         for i in 0..10 {
33//!             println!("got: {}", queue.blocking_pop());
34//!         }
35//!     });
36//!
37//!     let producer = s.spawn(|| {
38//!         for i in 0..10 {
39//!             queue.blocking_push(i);
40//!             println!("pushed data {}", i);
41//!         }
42//!     });
43//! })
44//! ```
45pub use iceoryx2_bb_posix::mutex::*;
46pub use iceoryx2_bb_posix::semaphore::*;
47
48use core::{fmt::Debug, marker::PhantomData, time::Duration};
49use iceoryx2_bb_container::queue::FixedSizeQueue;
50use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
51use iceoryx2_bb_log::fatal_panic;
52
53const INTER_PROCESS_SUPPORT: bool = true;
54
55#[derive(Debug)]
56pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
57    queue: Mutex<'a, 'a, FixedSizeQueue<T, CAPACITY>>,
58    free_slots: UnnamedSemaphore<'a>,
59    used_slots: UnnamedSemaphore<'a>,
60    _phantom_data: PhantomData<T>,
61}
62
63unsafe impl<T: Debug + ZeroCopySend, const CAPACITY: usize> ZeroCopySend
64    for TriggerQueue<'_, T, CAPACITY>
65{
66}
67
68impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
69    /// Creates a new [`TriggerQueue`] which uses the [`ClockType::default()`] in
70    /// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
71    pub fn new(
72        mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
73        free_handle: &'a UnnamedSemaphoreHandle,
74        used_handle: &'a UnnamedSemaphoreHandle,
75    ) -> Self {
76        Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
77    }
78
79    /// Creates a new [`TriggerQueue`] which uses the user provided clock in
80    /// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
81    pub fn new_with_custom_clock(
82        mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
83        free_handle: &'a UnnamedSemaphoreHandle,
84        used_handle: &'a UnnamedSemaphoreHandle,
85        clock_type: ClockType,
86    ) -> Self {
87        let msg = "Fatal failure while creating TriggerQueue";
88        Self {
89            queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
90                    .is_interprocess_capable(INTER_PROCESS_SUPPORT)
91                    .create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
92                    "{} since the mutex creation failed.", msg),
93            free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
94                    .initial_value(CAPACITY as u32)
95                    .clock_type(clock_type).create(free_handle),
96                    "{} since the free slots counting semaphore creation failed.", msg),
97            used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
98                    .initial_value(0)
99                    .clock_type(clock_type).create(used_handle),
100                    "{} since the free slots counting semaphore creation failed.", msg),
101            _phantom_data: PhantomData,
102        }
103    }
104
105    /// Tries to push a value into the queue. When the queue is full it returns false, otherwise
106    /// true.
107    pub fn try_push(&self, value: T) -> bool {
108        match self.free_slots.try_wait().unwrap() {
109            true => self.push(value),
110            false => false,
111        }
112    }
113
114    /// Tries to push a value into the queue until the timeout is reached. If the sample was
115    /// pushed into the queue it returns true, otherwise false.
116    pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
117        match self.free_slots.timed_wait(timeout).unwrap() {
118            true => self.push(value),
119            false => false,
120        }
121    }
122
123    /// Blocks the process until the value could be pushed into the queue.
124    pub fn blocking_push(&self, value: T) {
125        self.free_slots.blocking_wait().unwrap();
126        self.push(value);
127    }
128
129    /// Tries to pop a value out of the queue. If the queue was empty it returns [`None`]
130    /// otherwise the value packed inside the Option.
131    pub fn try_pop(&self) -> Option<T> {
132        match self.used_slots.try_wait().unwrap() {
133            true => self.pop(),
134            false => None,
135        }
136    }
137
138    /// Tries to pop a value out of the queue until the timeout is reached. If a value could not be
139    /// acquired it returns [`None`].
140    pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
141        match self.used_slots.timed_wait(timeout).unwrap() {
142            true => self.pop(),
143            false => None,
144        }
145    }
146
147    /// Blocks until a value could be acquired from the queue.
148    pub fn blocking_pop(&self) -> T {
149        self.used_slots.blocking_wait().unwrap();
150        self.pop().unwrap()
151    }
152
153    /// Empties the queue.
154    pub fn clear(&self) {
155        while self.try_pop().is_some() {}
156    }
157
158    /// Returns the capacity of the queue.
159    pub fn capacity(&self) -> usize {
160        CAPACITY
161    }
162
163    /// Returns the amount of values stored inside the queue
164    pub fn len(&self) -> usize {
165        fatal_panic!(from self, when self.queue.lock(),
166            "Failed to acquire mutex to acquire size")
167        .len()
168    }
169
170    /// Returns true if the queue is full, otherwise false
171    pub fn is_full(&self) -> bool {
172        fatal_panic!(from self, when self.queue.lock(),
173            "Failed to acquire mutex to acquire full state")
174        .is_full()
175    }
176
177    /// Returns true if the queue is empty, otherwise false
178    pub fn is_empty(&self) -> bool {
179        fatal_panic!(from self, when self.queue.lock(),
180            "Failed to acquire mutex to acquire empty state")
181        .is_empty()
182    }
183
184    fn push(&self, value: T) -> bool {
185        fatal_panic!(from self, when self.queue.lock(),
186            "Failed to acquire mutex to push")
187        .push(value);
188        self.used_slots.post().unwrap();
189        true
190    }
191
192    fn pop(&self) -> Option<T> {
193        let value = fatal_panic!(from self, when self.queue.lock(),
194            "Failed to acquire mutex to pop")
195        .pop();
196        self.free_slots.post().unwrap();
197        value
198    }
199}