iceoryx2_bb_threadsafe/
trigger_queue.rs1pub use iceoryx2_bb_posix::mutex::*;
46pub use iceoryx2_bb_posix::semaphore::*;
47
48use iceoryx2_bb_container::queue::FixedSizeQueue;
49use iceoryx2_bb_log::fatal_panic;
50use std::{fmt::Debug, marker::PhantomData, time::Duration};
51
52const INTER_PROCESS_SUPPORT: bool = true;
53
54#[derive(Debug)]
55pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
56 queue: Mutex<'a, FixedSizeQueue<T, CAPACITY>>,
57 free_slots: UnnamedSemaphore<'a>,
58 used_slots: UnnamedSemaphore<'a>,
59 _phantom_data: PhantomData<T>,
60}
61
62impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
63 pub fn new(
66 mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
67 free_handle: &'a UnnamedSemaphoreHandle,
68 used_handle: &'a UnnamedSemaphoreHandle,
69 ) -> Self {
70 Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
71 }
72
73 pub fn new_with_custom_clock(
76 mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
77 free_handle: &'a UnnamedSemaphoreHandle,
78 used_handle: &'a UnnamedSemaphoreHandle,
79 clock_type: ClockType,
80 ) -> Self {
81 let msg = "Fatal failure while creating TriggerQueue";
82 Self {
83 queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
84 .is_interprocess_capable(INTER_PROCESS_SUPPORT)
85 .create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
86 "{} since the mutex creation failed.", msg),
87 free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
88 .initial_value(CAPACITY as u32)
89 .clock_type(clock_type).create(free_handle),
90 "{} since the free slots counting semaphore creation failed.", msg),
91 used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
92 .initial_value(0)
93 .clock_type(clock_type).create(used_handle),
94 "{} since the free slots counting semaphore creation failed.", msg),
95 _phantom_data: PhantomData,
96 }
97 }
98
99 pub fn try_push(&self, value: T) -> bool {
102 match self.free_slots.try_wait().unwrap() {
103 true => self.push(value),
104 false => false,
105 }
106 }
107
108 pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
111 match self.free_slots.timed_wait(timeout).unwrap() {
112 true => self.push(value),
113 false => false,
114 }
115 }
116
117 pub fn blocking_push(&self, value: T) {
119 self.free_slots.blocking_wait().unwrap();
120 self.push(value);
121 }
122
123 pub fn try_pop(&self) -> Option<T> {
126 match self.used_slots.try_wait().unwrap() {
127 true => self.pop(),
128 false => None,
129 }
130 }
131
132 pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
135 match self.used_slots.timed_wait(timeout).unwrap() {
136 true => self.pop(),
137 false => None,
138 }
139 }
140
141 pub fn blocking_pop(&self) -> T {
143 self.used_slots.blocking_wait().unwrap();
144 self.pop().unwrap()
145 }
146
147 pub fn clear(&self) {
149 while self.try_pop().is_some() {}
150 }
151
152 pub fn capacity(&self) -> usize {
154 CAPACITY
155 }
156
157 pub fn len(&self) -> usize {
159 fatal_panic!(from self, when self.queue.lock(),
160 "Failed to acquire mutex to acquire size")
161 .len()
162 }
163
164 pub fn is_full(&self) -> bool {
166 fatal_panic!(from self, when self.queue.lock(),
167 "Failed to acquire mutex to acquire full state")
168 .is_full()
169 }
170
171 pub fn is_empty(&self) -> bool {
173 fatal_panic!(from self, when self.queue.lock(),
174 "Failed to acquire mutex to acquire empty state")
175 .is_empty()
176 }
177
178 fn push(&self, value: T) -> bool {
179 fatal_panic!(from self, when self.queue.lock(),
180 "Failed to acquire mutex to push")
181 .push(value);
182 self.used_slots.post().unwrap();
183 true
184 }
185
186 fn pop(&self) -> Option<T> {
187 let value = fatal_panic!(from self, when self.queue.lock(),
188 "Failed to acquire mutex to pop")
189 .pop();
190 self.free_slots.post().unwrap();
191 value
192 }
193}