zerodds_corba_rt/
mutex.rs1use alloc::vec::Vec;
19
20use crate::priority::Priority;
21
22#[derive(Debug, Clone)]
28pub struct PriorityInheritance {
29 base: Priority,
30 waiters: Vec<Priority>,
31}
32
33impl PriorityInheritance {
34 #[must_use]
36 pub fn new(base: Priority) -> Self {
37 Self {
38 base,
39 waiters: Vec::new(),
40 }
41 }
42
43 #[must_use]
45 pub fn base(&self) -> Priority {
46 self.base
47 }
48
49 #[must_use]
51 pub fn effective(&self) -> Priority {
52 self.waiters
53 .iter()
54 .copied()
55 .chain(core::iter::once(self.base))
56 .max()
57 .unwrap_or(self.base)
58 }
59
60 pub fn on_block(&mut self, waiter: Priority) -> Priority {
63 self.waiters.push(waiter);
64 self.effective()
65 }
66
67 pub fn on_unblock(&mut self, waiter: Priority) -> Priority {
70 if let Some(pos) = self.waiters.iter().position(|&w| w == waiter) {
71 self.waiters.remove(pos);
72 }
73 self.effective()
74 }
75
76 #[must_use]
78 pub fn waiter_count(&self) -> usize {
79 self.waiters.len()
80 }
81}
82
83#[cfg(feature = "std")]
84pub use std_impl::{RtMutex, RtMutexGuard};
85
86#[cfg(feature = "std")]
87#[allow(clippy::expect_used)]
88mod std_impl {
89 use super::Priority;
90 use alloc::vec::Vec;
91 use std::sync::{Condvar, Mutex, MutexGuard};
92
93 #[derive(Debug)]
95 struct Coord {
96 locked: bool,
97 holder: Option<Priority>,
98 waiters: Vec<Priority>,
99 }
100
101 impl Coord {
102 fn top_waiter(&self) -> Option<Priority> {
104 self.waiters.iter().copied().max()
105 }
106 }
107
108 #[derive(Debug)]
117 pub struct RtMutex<T> {
118 data: Mutex<T>,
119 coord: Mutex<Coord>,
120 cv: Condvar,
121 }
122
123 #[derive(Debug)]
125 pub struct RtMutexGuard<'a, T> {
126 mutex: &'a RtMutex<T>,
127 data: Option<MutexGuard<'a, T>>,
128 }
129
130 impl<T> RtMutex<T> {
131 #[must_use]
133 pub fn new(value: T) -> Self {
134 Self {
135 data: Mutex::new(value),
136 coord: Mutex::new(Coord {
137 locked: false,
138 holder: None,
139 waiters: Vec::new(),
140 }),
141 cv: Condvar::new(),
142 }
143 }
144
145 #[allow(clippy::missing_panics_doc)]
152 pub fn lock(&self, my_priority: Priority) -> RtMutexGuard<'_, T> {
153 {
154 let mut coord = self.coord.lock().expect("rt-mutex coord poisoned");
155 coord.waiters.push(my_priority);
165 loop {
166 let granted = !coord.locked && coord.top_waiter() == Some(my_priority);
167 if granted {
168 break;
169 }
170 coord = self.cv.wait(coord).expect("rt-mutex cv poisoned");
171 }
172 if let Some(pos) = coord.waiters.iter().position(|&w| w == my_priority) {
174 coord.waiters.remove(pos);
175 }
176 coord.locked = true;
177 coord.holder = Some(my_priority);
178 }
179 let data = self.data.lock().expect("rt-mutex data poisoned");
180 RtMutexGuard {
181 mutex: self,
182 data: Some(data),
183 }
184 }
185
186 #[allow(clippy::missing_panics_doc)]
191 pub fn try_lock(&self, my_priority: Priority) -> Option<RtMutexGuard<'_, T>> {
192 {
193 let mut coord = self.coord.lock().expect("rt-mutex coord poisoned");
194 if coord.locked {
195 return None;
196 }
197 coord.locked = true;
198 coord.holder = Some(my_priority);
199 }
200 let data = self.data.lock().expect("rt-mutex data poisoned");
201 Some(RtMutexGuard {
202 mutex: self,
203 data: Some(data),
204 })
205 }
206
207 #[allow(clippy::missing_panics_doc)]
214 #[must_use]
215 pub fn effective_holder_priority(&self) -> Option<Priority> {
216 let coord = self.coord.lock().expect("rt-mutex coord poisoned");
217 coord.holder.map(|h| match coord.top_waiter() {
218 Some(w) if w > h => w,
219 _ => h,
220 })
221 }
222
223 fn unlock(&self) {
224 let mut coord = self.coord.lock().expect("rt-mutex coord poisoned");
225 coord.locked = false;
226 coord.holder = None;
227 drop(coord);
229 self.cv.notify_all();
230 }
231 }
232
233 impl<T> RtMutexGuard<'_, T> {
234 #[must_use]
236 pub fn get(&self) -> &T {
237 self.data.as_ref().expect("guard active")
238 }
239
240 pub fn get_mut(&mut self) -> &mut T {
242 self.data.as_mut().expect("guard active")
243 }
244 }
245
246 impl<T> core::ops::Deref for RtMutexGuard<'_, T> {
247 type Target = T;
248 fn deref(&self) -> &T {
249 self.get()
250 }
251 }
252
253 impl<T> core::ops::DerefMut for RtMutexGuard<'_, T> {
254 fn deref_mut(&mut self) -> &mut T {
255 self.get_mut()
256 }
257 }
258
259 impl<T> Drop for RtMutexGuard<'_, T> {
260 fn drop(&mut self) {
261 self.data.take();
263 self.mutex.unlock();
264 }
265 }
266}
267
268#[cfg(test)]
269#[allow(clippy::unwrap_used, clippy::panic)]
270mod tests {
271 use super::*;
272
273 fn p(v: i16) -> Priority {
274 Priority::new(v).unwrap()
275 }
276
277 #[test]
278 fn effective_is_base_without_waiters() {
279 let pi = PriorityInheritance::new(p(10));
280 assert_eq!(pi.effective(), p(10));
281 assert_eq!(pi.waiter_count(), 0);
282 }
283
284 #[test]
285 fn owner_inherits_highest_waiter() {
286 let mut pi = PriorityInheritance::new(p(10));
287 assert_eq!(pi.on_block(p(50)), p(50)); assert_eq!(pi.on_block(p(30)), p(50)); assert_eq!(pi.on_block(p(90)), p(90)); assert_eq!(pi.waiter_count(), 3);
291 }
292
293 #[test]
294 fn priority_reverts_on_unblock() {
295 let mut pi = PriorityInheritance::new(p(10));
296 pi.on_block(p(50));
297 pi.on_block(p(90));
298 assert_eq!(pi.effective(), p(90));
299 assert_eq!(pi.on_unblock(p(90)), p(50)); assert_eq!(pi.on_unblock(p(50)), p(10)); }
302
303 #[cfg(feature = "std")]
304 #[test]
305 fn rt_mutex_basic_lock_unlock() {
306 let m = RtMutex::new(0u32);
307 {
308 let mut g = m.lock(p(10));
309 *g += 5;
310 assert_eq!(m.effective_holder_priority(), Some(p(10)));
311 }
312 assert_eq!(m.effective_holder_priority(), None);
313 assert_eq!(*m.lock(p(1)), 5);
314 }
315
316 #[cfg(feature = "std")]
317 #[test]
318 fn rt_mutex_try_lock_contended() {
319 let m = RtMutex::new(());
320 let g = m.lock(p(10));
321 assert!(m.try_lock(p(20)).is_none());
322 drop(g);
323 assert!(m.try_lock(p(20)).is_some());
324 }
325
326 #[cfg(feature = "std")]
327 #[test]
328 fn rt_mutex_grants_highest_priority_waiter_first() {
329 use std::sync::Arc;
330 use std::sync::atomic::{AtomicU32, Ordering};
331
332 let m = Arc::new(RtMutex::new(()));
333 let order = Arc::new(std::sync::Mutex::new(alloc::vec::Vec::<i16>::new()));
334 let next = Arc::new(AtomicU32::new(0));
335
336 let g = m.lock(p(1));
338
339 let mut handles = alloc::vec::Vec::new();
341 for prio in [p(40), p(80)] {
342 let m = Arc::clone(&m);
343 let order = Arc::clone(&order);
344 let next = Arc::clone(&next);
345 handles.push(std::thread::spawn(move || {
346 next.fetch_add(1, Ordering::SeqCst);
347 let _lg = m.lock(prio);
348 order.lock().unwrap().push(prio.value());
349 }));
350 }
351 while m.effective_holder_priority() != Some(p(80)) {
353 std::thread::yield_now();
354 }
355 drop(g);
357 for h in handles {
358 h.join().unwrap();
359 }
360 let seq = order.lock().unwrap().clone();
361 assert_eq!(seq, alloc::vec![80, 40]);
362 }
363}