iceoryx2_bb_threadsafe/
trigger_queue.rs1pub use iceoryx2_bb_posix::mutex::*;
46pub use iceoryx2_bb_posix::semaphore::*;
47
48use core::{fmt::Debug, marker::PhantomData, time::Duration};
49use iceoryx2_bb_container::queue::FixedSizeQueue;
50use iceoryx2_bb_elementary::zero_copy_send::ZeroCopySend;
51use iceoryx2_bb_log::fatal_panic;
52
53const INTER_PROCESS_SUPPORT: bool = true;
54
55#[derive(Debug)]
56pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
57 queue: Mutex<'a, FixedSizeQueue<T, CAPACITY>>,
58 free_slots: UnnamedSemaphore<'a>,
59 used_slots: UnnamedSemaphore<'a>,
60 _phantom_data: PhantomData<T>,
61}
62
63unsafe impl<T: Debug + ZeroCopySend, const CAPACITY: usize> ZeroCopySend
64 for TriggerQueue<'_, T, CAPACITY>
65{
66}
67
68impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
69 pub fn new(
72 mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
73 free_handle: &'a UnnamedSemaphoreHandle,
74 used_handle: &'a UnnamedSemaphoreHandle,
75 ) -> Self {
76 Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
77 }
78
79 pub fn new_with_custom_clock(
82 mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
83 free_handle: &'a UnnamedSemaphoreHandle,
84 used_handle: &'a UnnamedSemaphoreHandle,
85 clock_type: ClockType,
86 ) -> Self {
87 let msg = "Fatal failure while creating TriggerQueue";
88 Self {
89 queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
90 .is_interprocess_capable(INTER_PROCESS_SUPPORT)
91 .create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
92 "{} since the mutex creation failed.", msg),
93 free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
94 .initial_value(CAPACITY as u32)
95 .clock_type(clock_type).create(free_handle),
96 "{} since the free slots counting semaphore creation failed.", msg),
97 used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
98 .initial_value(0)
99 .clock_type(clock_type).create(used_handle),
100 "{} since the free slots counting semaphore creation failed.", msg),
101 _phantom_data: PhantomData,
102 }
103 }
104
105 pub fn try_push(&self, value: T) -> bool {
108 match self.free_slots.try_wait().unwrap() {
109 true => self.push(value),
110 false => false,
111 }
112 }
113
114 pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
117 match self.free_slots.timed_wait(timeout).unwrap() {
118 true => self.push(value),
119 false => false,
120 }
121 }
122
123 pub fn blocking_push(&self, value: T) {
125 self.free_slots.blocking_wait().unwrap();
126 self.push(value);
127 }
128
129 pub fn try_pop(&self) -> Option<T> {
132 match self.used_slots.try_wait().unwrap() {
133 true => self.pop(),
134 false => None,
135 }
136 }
137
138 pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
141 match self.used_slots.timed_wait(timeout).unwrap() {
142 true => self.pop(),
143 false => None,
144 }
145 }
146
147 pub fn blocking_pop(&self) -> T {
149 self.used_slots.blocking_wait().unwrap();
150 self.pop().unwrap()
151 }
152
153 pub fn clear(&self) {
155 while self.try_pop().is_some() {}
156 }
157
158 pub fn capacity(&self) -> usize {
160 CAPACITY
161 }
162
163 pub fn len(&self) -> usize {
165 fatal_panic!(from self, when self.queue.lock(),
166 "Failed to acquire mutex to acquire size")
167 .len()
168 }
169
170 pub fn is_full(&self) -> bool {
172 fatal_panic!(from self, when self.queue.lock(),
173 "Failed to acquire mutex to acquire full state")
174 .is_full()
175 }
176
177 pub fn is_empty(&self) -> bool {
179 fatal_panic!(from self, when self.queue.lock(),
180 "Failed to acquire mutex to acquire empty state")
181 .is_empty()
182 }
183
184 fn push(&self, value: T) -> bool {
185 fatal_panic!(from self, when self.queue.lock(),
186 "Failed to acquire mutex to push")
187 .push(value);
188 self.used_slots.post().unwrap();
189 true
190 }
191
192 fn pop(&self) -> Option<T> {
193 let value = fatal_panic!(from self, when self.queue.lock(),
194 "Failed to acquire mutex to pop")
195 .pop();
196 self.free_slots.post().unwrap();
197 value
198 }
199}