iceoryx2_bb_threadsafe/trigger_queue.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
// Copyright (c) 2023 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT
//! A **threadsafe** queue which triggers a consumer when data arrived or triggers the producer
//! when the queue is no longer full.
//!
//! # Example
//!
//! ```
//! use std::thread;
//! use iceoryx2_bb_threadsafe::trigger_queue::*;
//!
//! const CAPACITY: usize = 16;
//!
//! let mtx_handle = MutexHandle::new();
//! let free_handle = UnnamedSemaphoreHandle::new();
//! let used_handle = UnnamedSemaphoreHandle::new();
//!
//! let queue = TriggerQueue::<u64, CAPACITY>::new(&mtx_handle, &free_handle, &used_handle);
//!
//! thread::scope(|s| {
//! let consumer = s.spawn(|| {
//! for i in 0..10 {
//! println!("got: {}", queue.blocking_pop());
//! }
//! });
//!
//! let producer = s.spawn(|| {
//! for i in 0..10 {
//! queue.blocking_push(i);
//! println!("pushed data {}", i);
//! }
//! });
//! })
//! ```
pub use iceoryx2_bb_posix::mutex::*;
pub use iceoryx2_bb_posix::semaphore::*;
use iceoryx2_bb_container::queue::FixedSizeQueue;
use iceoryx2_bb_log::fatal_panic;
use std::{fmt::Debug, marker::PhantomData, time::Duration};
const INTER_PROCESS_SUPPORT: bool = true;
#[derive(Debug)]
pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
queue: Mutex<'a, FixedSizeQueue<T, CAPACITY>>,
free_slots: UnnamedSemaphore<'a>,
used_slots: UnnamedSemaphore<'a>,
_phantom_data: PhantomData<T>,
}
impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
/// Creates a new [`TriggerQueue`] which uses the [`ClockType::default()`] in
/// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
pub fn new(
mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
free_handle: &'a UnnamedSemaphoreHandle,
used_handle: &'a UnnamedSemaphoreHandle,
) -> Self {
Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
}
/// Creates a new [`TriggerQueue`] which uses the user provided clock in
/// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
pub fn new_with_custom_clock(
mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
free_handle: &'a UnnamedSemaphoreHandle,
used_handle: &'a UnnamedSemaphoreHandle,
clock_type: ClockType,
) -> Self {
let msg = "Fatal failure while creating TriggerQueue";
Self {
queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
.is_interprocess_capable(INTER_PROCESS_SUPPORT)
.create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
"{} since the mutex creation failed.", msg),
free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
.initial_value(CAPACITY as u32)
.clock_type(clock_type).create(free_handle),
"{} since the free slots counting semaphore creation failed.", msg),
used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
.initial_value(0)
.clock_type(clock_type).create(used_handle),
"{} since the free slots counting semaphore creation failed.", msg),
_phantom_data: PhantomData,
}
}
/// Tries to push a value into the queue. When the queue is full it returns false, otherwise
/// true.
pub fn try_push(&self, value: T) -> bool {
match self.free_slots.try_wait().unwrap() {
true => self.push(value),
false => false,
}
}
/// Tries to push a value into the queue until the timeout is reached. If the sample was
/// pushed into the queue it returns true, otherwise false.
pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
match self.free_slots.timed_wait(timeout).unwrap() {
true => self.push(value),
false => false,
}
}
/// Blocks the process until the value could be pushed into the queue.
pub fn blocking_push(&self, value: T) {
self.free_slots.blocking_wait().unwrap();
self.push(value);
}
/// Tries to pop a value out of the queue. If the queue was empty it returns [`None`]
/// otherwise the value packed inside the Option.
pub fn try_pop(&self) -> Option<T> {
match self.used_slots.try_wait().unwrap() {
true => self.pop(),
false => None,
}
}
/// Tries to pop a value out of the queue until the timeout is reached. If a value could not be
/// acquired it returns [`None`].
pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
match self.used_slots.timed_wait(timeout).unwrap() {
true => self.pop(),
false => None,
}
}
/// Blocks until a value could be acquired from the queue.
pub fn blocking_pop(&self) -> T {
self.used_slots.blocking_wait().unwrap();
self.pop().unwrap()
}
/// Empties the queue.
pub fn clear(&self) {
while self.try_pop().is_some() {}
}
/// Returns the capacity of the queue.
pub fn capacity(&self) -> usize {
CAPACITY
}
/// Returns the amount of values stored inside the queue
pub fn len(&self) -> usize {
fatal_panic!(from self, when self.queue.lock(),
"Failed to acquire mutex to acquire size")
.len()
}
/// Returns true if the queue is full, otherwise false
pub fn is_full(&self) -> bool {
fatal_panic!(from self, when self.queue.lock(),
"Failed to acquire mutex to acquire full state")
.is_full()
}
/// Returns true if the queue is empty, otherwise false
pub fn is_empty(&self) -> bool {
fatal_panic!(from self, when self.queue.lock(),
"Failed to acquire mutex to acquire empty state")
.is_empty()
}
fn push(&self, value: T) -> bool {
fatal_panic!(from self, when self.queue.lock(),
"Failed to acquire mutex to push")
.push(value);
self.used_slots.post().unwrap();
true
}
fn pop(&self) -> Option<T> {
let value = fatal_panic!(from self, when self.queue.lock(),
"Failed to acquire mutex to pop")
.pop();
self.free_slots.post().unwrap();
value
}
}