iceoryx2_bb_threadsafe/
trigger_queue.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! A **threadsafe** queue which triggers a consumer when data arrived or triggers the producer
14//! when the queue is no longer full.
15//!
16//! # Example
17//!
18//! ```
19//! use std::thread;
20//! use iceoryx2_bb_threadsafe::trigger_queue::*;
21//!
22//! const CAPACITY: usize = 16;
23//!
24//! let mtx_handle = MutexHandle::new();
25//! let free_handle = UnnamedSemaphoreHandle::new();
26//! let used_handle = UnnamedSemaphoreHandle::new();
27//!
28//! let queue = TriggerQueue::<u64, CAPACITY>::new(&mtx_handle, &free_handle, &used_handle);
29//!
30//! thread::scope(|s| {
31//!     let consumer = s.spawn(|| {
32//!         for i in 0..10 {
33//!             println!("got: {}", queue.blocking_pop());
34//!         }
35//!     });
36//!
37//!     let producer = s.spawn(|| {
38//!         for i in 0..10 {
39//!             queue.blocking_push(i);
40//!             println!("pushed data {}", i);
41//!         }
42//!     });
43//! })
44//! ```
45pub use iceoryx2_bb_posix::mutex::*;
46pub use iceoryx2_bb_posix::semaphore::*;
47
48use iceoryx2_bb_container::queue::FixedSizeQueue;
49use iceoryx2_bb_log::fatal_panic;
50use std::{fmt::Debug, marker::PhantomData, time::Duration};
51
52const INTER_PROCESS_SUPPORT: bool = true;
53
54#[derive(Debug)]
55pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
56    queue: Mutex<'a, FixedSizeQueue<T, CAPACITY>>,
57    free_slots: UnnamedSemaphore<'a>,
58    used_slots: UnnamedSemaphore<'a>,
59    _phantom_data: PhantomData<T>,
60}
61
62impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
63    /// Creates a new [`TriggerQueue`] which uses the [`ClockType::default()`] in
64    /// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
65    pub fn new(
66        mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
67        free_handle: &'a UnnamedSemaphoreHandle,
68        used_handle: &'a UnnamedSemaphoreHandle,
69    ) -> Self {
70        Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
71    }
72
73    /// Creates a new [`TriggerQueue`] which uses the user provided clock in
74    /// [`TriggerQueue::timed_push()`] and [`TriggerQueue::timed_pop()`].
75    pub fn new_with_custom_clock(
76        mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
77        free_handle: &'a UnnamedSemaphoreHandle,
78        used_handle: &'a UnnamedSemaphoreHandle,
79        clock_type: ClockType,
80    ) -> Self {
81        let msg = "Fatal failure while creating TriggerQueue";
82        Self {
83            queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
84                    .is_interprocess_capable(INTER_PROCESS_SUPPORT)
85                    .create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
86                    "{} since the mutex creation failed.", msg),
87            free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
88                    .initial_value(CAPACITY as u32)
89                    .clock_type(clock_type).create(free_handle),
90                    "{} since the free slots counting semaphore creation failed.", msg),
91            used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
92                    .initial_value(0)
93                    .clock_type(clock_type).create(used_handle),
94                    "{} since the free slots counting semaphore creation failed.", msg),
95            _phantom_data: PhantomData,
96        }
97    }
98
99    /// Tries to push a value into the queue. When the queue is full it returns false, otherwise
100    /// true.
101    pub fn try_push(&self, value: T) -> bool {
102        match self.free_slots.try_wait().unwrap() {
103            true => self.push(value),
104            false => false,
105        }
106    }
107
108    /// Tries to push a value into the queue until the timeout is reached. If the sample was
109    /// pushed into the queue it returns true, otherwise false.
110    pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
111        match self.free_slots.timed_wait(timeout).unwrap() {
112            true => self.push(value),
113            false => false,
114        }
115    }
116
117    /// Blocks the process until the value could be pushed into the queue.
118    pub fn blocking_push(&self, value: T) {
119        self.free_slots.blocking_wait().unwrap();
120        self.push(value);
121    }
122
123    /// Tries to pop a value out of the queue. If the queue was empty it returns [`None`]
124    /// otherwise the value packed inside the Option.
125    pub fn try_pop(&self) -> Option<T> {
126        match self.used_slots.try_wait().unwrap() {
127            true => self.pop(),
128            false => None,
129        }
130    }
131
132    /// Tries to pop a value out of the queue until the timeout is reached. If a value could not be
133    /// acquired it returns [`None`].
134    pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
135        match self.used_slots.timed_wait(timeout).unwrap() {
136            true => self.pop(),
137            false => None,
138        }
139    }
140
141    /// Blocks until a value could be acquired from the queue.
142    pub fn blocking_pop(&self) -> T {
143        self.used_slots.blocking_wait().unwrap();
144        self.pop().unwrap()
145    }
146
147    /// Empties the queue.
148    pub fn clear(&self) {
149        while self.try_pop().is_some() {}
150    }
151
152    /// Returns the capacity of the queue.
153    pub fn capacity(&self) -> usize {
154        CAPACITY
155    }
156
157    /// Returns the amount of values stored inside the queue
158    pub fn len(&self) -> usize {
159        fatal_panic!(from self, when self.queue.lock(),
160            "Failed to acquire mutex to acquire size")
161        .len()
162    }
163
164    /// Returns true if the queue is full, otherwise false
165    pub fn is_full(&self) -> bool {
166        fatal_panic!(from self, when self.queue.lock(),
167            "Failed to acquire mutex to acquire full state")
168        .is_full()
169    }
170
171    /// Returns true if the queue is empty, otherwise false
172    pub fn is_empty(&self) -> bool {
173        fatal_panic!(from self, when self.queue.lock(),
174            "Failed to acquire mutex to acquire empty state")
175        .is_empty()
176    }
177
178    fn push(&self, value: T) -> bool {
179        fatal_panic!(from self, when self.queue.lock(),
180            "Failed to acquire mutex to push")
181        .push(value);
182        self.used_slots.post().unwrap();
183        true
184    }
185
186    fn pop(&self) -> Option<T> {
187        let value = fatal_panic!(from self, when self.queue.lock(),
188            "Failed to acquire mutex to pop")
189        .pop();
190        self.free_slots.post().unwrap();
191        value
192    }
193}