iceoryx2_bb_threadsafe/
trigger_queue.rs1pub use iceoryx2_bb_posix::mutex::*;
48pub use iceoryx2_bb_posix::semaphore::*;
49
50use core::{fmt::Debug, marker::PhantomData, time::Duration};
51use iceoryx2_bb_container::queue::FixedSizeQueue;
52use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
53use iceoryx2_log::fatal_panic;
54
55const INTER_PROCESS_SUPPORT: bool = true;
56
57#[derive(Debug)]
58pub struct TriggerQueue<'a, T: Debug, const CAPACITY: usize> {
59 queue: Mutex<'a, 'a, FixedSizeQueue<T, CAPACITY>>,
60 free_slots: UnnamedSemaphore<'a>,
61 used_slots: UnnamedSemaphore<'a>,
62 _phantom_data: PhantomData<T>,
63}
64
65unsafe impl<T: Debug + ZeroCopySend, const CAPACITY: usize> ZeroCopySend
66 for TriggerQueue<'_, T, CAPACITY>
67{
68}
69
70impl<'a, T: Debug, const CAPACITY: usize> TriggerQueue<'a, T, CAPACITY> {
71 pub fn new(
74 mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
75 free_handle: &'a UnnamedSemaphoreHandle,
76 used_handle: &'a UnnamedSemaphoreHandle,
77 ) -> Self {
78 Self::new_with_custom_clock(mtx_handle, free_handle, used_handle, ClockType::default())
79 }
80
81 pub fn new_with_custom_clock(
84 mtx_handle: &'a MutexHandle<FixedSizeQueue<T, CAPACITY>>,
85 free_handle: &'a UnnamedSemaphoreHandle,
86 used_handle: &'a UnnamedSemaphoreHandle,
87 clock_type: ClockType,
88 ) -> Self {
89 let msg = "Fatal failure while creating TriggerQueue";
90 Self {
91 queue: fatal_panic!(from "TriggerQueue::new", when MutexBuilder::new()
92 .is_interprocess_capable(INTER_PROCESS_SUPPORT)
93 .create(FixedSizeQueue::<T, CAPACITY>::new(), mtx_handle),
94 "{} since the mutex creation failed.", msg),
95 free_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
96 .initial_value(CAPACITY as u32)
97 .clock_type(clock_type).create(free_handle),
98 "{} since the free slots counting semaphore creation failed.", msg),
99 used_slots: fatal_panic!(from "TriggerQueue::new", when UnnamedSemaphoreBuilder::new()
100 .initial_value(0)
101 .clock_type(clock_type).create(used_handle),
102 "{} since the free slots counting semaphore creation failed.", msg),
103 _phantom_data: PhantomData,
104 }
105 }
106
107 pub fn try_push(&self, value: T) -> bool {
110 match self.free_slots.try_wait().unwrap() {
111 true => self.push(value),
112 false => false,
113 }
114 }
115
116 pub fn timed_push(&self, value: T, timeout: Duration) -> bool {
119 match self.free_slots.timed_wait(timeout).unwrap() {
120 true => self.push(value),
121 false => false,
122 }
123 }
124
125 pub fn blocking_push(&self, value: T) {
127 self.free_slots.blocking_wait().unwrap();
128 self.push(value);
129 }
130
131 pub fn try_pop(&self) -> Option<T> {
134 match self.used_slots.try_wait().unwrap() {
135 true => self.pop(),
136 false => None,
137 }
138 }
139
140 pub fn timed_pop(&self, timeout: Duration) -> Option<T> {
143 match self.used_slots.timed_wait(timeout).unwrap() {
144 true => self.pop(),
145 false => None,
146 }
147 }
148
149 pub fn blocking_pop(&self) -> T {
151 self.used_slots.blocking_wait().unwrap();
152 self.pop().unwrap()
153 }
154
155 pub fn clear(&self) {
157 while self.try_pop().is_some() {}
158 }
159
160 pub fn capacity(&self) -> usize {
162 CAPACITY
163 }
164
165 pub fn len(&self) -> usize {
167 fatal_panic!(from self, when self.queue.lock(),
168 "Failed to acquire mutex to acquire size")
169 .len()
170 }
171
172 pub fn is_full(&self) -> bool {
174 fatal_panic!(from self, when self.queue.lock(),
175 "Failed to acquire mutex to acquire full state")
176 .is_full()
177 }
178
179 pub fn is_empty(&self) -> bool {
181 fatal_panic!(from self, when self.queue.lock(),
182 "Failed to acquire mutex to acquire empty state")
183 .is_empty()
184 }
185
186 fn push(&self, value: T) -> bool {
187 fatal_panic!(from self, when self.queue.lock(),
188 "Failed to acquire mutex to push")
189 .push(value);
190 self.used_slots.post().unwrap();
191 true
192 }
193
194 fn pop(&self) -> Option<T> {
195 let value = fatal_panic!(from self, when self.queue.lock(),
196 "Failed to acquire mutex to pop")
197 .pop();
198 self.free_slots.post().unwrap();
199 value
200 }
201}