Skip to main content

zerodds_corba_rt/
mutex.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! `RTCORBA::Mutex` with priority inheritance (RT-CORBA §5.6).
5//!
6//! Two layers:
7//! - [`PriorityInheritance`] — the **protocol core** (`no_std`): computes the
8//!   effective (inherited) priority of a lock owner from its base priority
9//!   and the priorities of the currently blocked waiters. Prevents
10//!   priority inversion: a low-priority owner inherits the priority of the
11//!   highest-priority waiter until it releases.
12//! - [`RtMutex`] — a real, blocking mutex (`std` feature) built on
13//!   `std::sync::Mutex` + `Condvar`. It grants the lock **in priority order**
14//!   (highest waiter first) and makes the inherited owner priority queryable.
15//!   Fully **event-driven** (Condvar `wait`), no busy-poll, no
16//!   `unsafe` (the inner `std::sync::Mutex` protects the data).
17
18use alloc::vec::Vec;
19
20use crate::priority::Priority;
21
22/// Priority-inheritance protocol core (RT-CORBA §5.6).
23///
24/// Holds the base priority of a lock owner plus the priorities of all waiters
25/// currently blocked on this lock. [`effective`](Self::effective) is the
26/// maximum — the priority at which the owner should run while it holds the lock.
27#[derive(Debug, Clone)]
28pub struct PriorityInheritance {
29    base: Priority,
30    waiters: Vec<Priority>,
31}
32
33impl PriorityInheritance {
34    /// New state for an owner with the given base priority, without waiters.
35    #[must_use]
36    pub fn new(base: Priority) -> Self {
37        Self {
38            base,
39            waiters: Vec::new(),
40        }
41    }
42
43    /// The owner's base priority.
44    #[must_use]
45    pub fn base(&self) -> Priority {
46        self.base
47    }
48
49    /// Effective priority: `max(base, highest waiter priority)`.
50    #[must_use]
51    pub fn effective(&self) -> Priority {
52        self.waiters
53            .iter()
54            .copied()
55            .chain(core::iter::once(self.base))
56            .max()
57            .unwrap_or(self.base)
58    }
59
60    /// A waiter now blocks on the lock. Returns the (possibly raised)
61    /// effective priority.
62    pub fn on_block(&mut self, waiter: Priority) -> Priority {
63        self.waiters.push(waiter);
64        self.effective()
65    }
66
67    /// A waiter was served/cancelled — its priority drops out of the
68    /// inheritance. Returns the new effective priority.
69    pub fn on_unblock(&mut self, waiter: Priority) -> Priority {
70        if let Some(pos) = self.waiters.iter().position(|&w| w == waiter) {
71            self.waiters.remove(pos);
72        }
73        self.effective()
74    }
75
76    /// Number of currently blocked waiters.
77    #[must_use]
78    pub fn waiter_count(&self) -> usize {
79        self.waiters.len()
80    }
81}
82
83#[cfg(feature = "std")]
84pub use std_impl::{RtMutex, RtMutexGuard};
85
86#[cfg(feature = "std")]
87#[allow(clippy::expect_used)]
88mod std_impl {
89    use super::Priority;
90    use alloc::vec::Vec;
91    use std::sync::{Condvar, Mutex, MutexGuard};
92
93    /// PI bookkeeping of an [`RtMutex`] — separate from the payload lock.
94    #[derive(Debug)]
95    struct Coord {
96        locked: bool,
97        holder: Option<Priority>,
98        waiters: Vec<Priority>,
99    }
100
101    impl Coord {
102        /// Highest waiting priority (if any).
103        fn top_waiter(&self) -> Option<Priority> {
104            self.waiters.iter().copied().max()
105        }
106    }
107
108    /// A blocking mutex with priority inheritance (RT-CORBA §5.6).
109    ///
110    /// The lock is granted **in priority order**: when the owner releases, it
111    /// goes to the highest-priority waiting thread (not FIFO). The effective
112    /// owner priority (`base` raised to the highest waiter) is queryable via
113    /// [`effective_holder_priority`](RtMutex::effective_holder_priority)
114    /// — a runtime integration can use it to track the OS priority of the owner
115    /// thread.
116    #[derive(Debug)]
117    pub struct RtMutex<T> {
118        data: Mutex<T>,
119        coord: Mutex<Coord>,
120        cv: Condvar,
121    }
122
123    /// RAII guard of an [`RtMutex`]; releases the lock priority-correctly on drop.
124    #[derive(Debug)]
125    pub struct RtMutexGuard<'a, T> {
126        mutex: &'a RtMutex<T>,
127        data: Option<MutexGuard<'a, T>>,
128    }
129
130    impl<T> RtMutex<T> {
131        /// New, unlocked `RtMutex` around `value`.
132        #[must_use]
133        pub fn new(value: T) -> Self {
134            Self {
135                data: Mutex::new(value),
136                coord: Mutex::new(Coord {
137                    locked: false,
138                    holder: None,
139                    waiters: Vec::new(),
140                }),
141                cv: Condvar::new(),
142            }
143        }
144
145        /// Locks the mutex for a caller with priority `my_priority`.
146        /// Blocks in priority order (Condvar, no spin) until it is its
147        /// turn.
148        ///
149        /// # Panics
150        /// On a poisoned internal lock.
151        #[allow(clippy::missing_panics_doc)]
152        pub fn lock(&self, my_priority: Priority) -> RtMutexGuard<'_, T> {
153            {
154                let mut coord = self.coord.lock().expect("rt-mutex coord poisoned");
155                // ALWAYS enqueue, even when the lock is momentarily free: a fresh
156                // arrival must never barge ahead of an already-waiting,
157                // higher-priority thread. Taking a `!locked` fast path here would
158                // let a low-priority caller grab the lock in the handoff window
159                // after `unlock()` clears `locked` but before the woken
160                // highest-priority waiter has re-acquired `coord` — a
161                // priority-inversion bug. The wait-loop is a no-op (no condvar
162                // wait) for the uncontended case: we are the sole/highest waiter
163                // and the lock is free, so the predicate holds immediately.
164                coord.waiters.push(my_priority);
165                loop {
166                    let granted = !coord.locked && coord.top_waiter() == Some(my_priority);
167                    if granted {
168                        break;
169                    }
170                    coord = self.cv.wait(coord).expect("rt-mutex cv poisoned");
171                }
172                // Take the lock: remove ourselves from the waiters.
173                if let Some(pos) = coord.waiters.iter().position(|&w| w == my_priority) {
174                    coord.waiters.remove(pos);
175                }
176                coord.locked = true;
177                coord.holder = Some(my_priority);
178            }
179            let data = self.data.lock().expect("rt-mutex data poisoned");
180            RtMutexGuard {
181                mutex: self,
182                data: Some(data),
183            }
184        }
185
186        /// Attempts to lock without blocking. `None` if already locked.
187        ///
188        /// # Panics
189        /// On a poisoned internal lock.
190        #[allow(clippy::missing_panics_doc)]
191        pub fn try_lock(&self, my_priority: Priority) -> Option<RtMutexGuard<'_, T>> {
192            {
193                let mut coord = self.coord.lock().expect("rt-mutex coord poisoned");
194                if coord.locked {
195                    return None;
196                }
197                coord.locked = true;
198                coord.holder = Some(my_priority);
199            }
200            let data = self.data.lock().expect("rt-mutex data poisoned");
201            Some(RtMutexGuard {
202                mutex: self,
203                data: Some(data),
204            })
205        }
206
207        /// Effective priority of the current owner: its base, raised to
208        /// the highest waiting priority (priority inheritance). `None` if
209        /// nobody currently holds the lock.
210        ///
211        /// # Panics
212        /// On a poisoned internal lock.
213        #[allow(clippy::missing_panics_doc)]
214        #[must_use]
215        pub fn effective_holder_priority(&self) -> Option<Priority> {
216            let coord = self.coord.lock().expect("rt-mutex coord poisoned");
217            coord.holder.map(|h| match coord.top_waiter() {
218                Some(w) if w > h => w,
219                _ => h,
220            })
221        }
222
223        fn unlock(&self) {
224            let mut coord = self.coord.lock().expect("rt-mutex coord poisoned");
225            coord.locked = false;
226            coord.holder = None;
227            // Wake everyone; the highest-priority waiter wins the next lock.
228            drop(coord);
229            self.cv.notify_all();
230        }
231    }
232
233    impl<T> RtMutexGuard<'_, T> {
234        /// Read access to the protected data.
235        #[must_use]
236        pub fn get(&self) -> &T {
237            self.data.as_ref().expect("guard active")
238        }
239
240        /// Write access to the protected data.
241        pub fn get_mut(&mut self) -> &mut T {
242            self.data.as_mut().expect("guard active")
243        }
244    }
245
246    impl<T> core::ops::Deref for RtMutexGuard<'_, T> {
247        type Target = T;
248        fn deref(&self) -> &T {
249            self.get()
250        }
251    }
252
253    impl<T> core::ops::DerefMut for RtMutexGuard<'_, T> {
254        fn deref_mut(&mut self) -> &mut T {
255            self.get_mut()
256        }
257    }
258
259    impl<T> Drop for RtMutexGuard<'_, T> {
260        fn drop(&mut self) {
261            // Release the data lock first, then free the coordination.
262            self.data.take();
263            self.mutex.unlock();
264        }
265    }
266}
267
268#[cfg(test)]
269#[allow(clippy::unwrap_used, clippy::panic)]
270mod tests {
271    use super::*;
272
273    fn p(v: i16) -> Priority {
274        Priority::new(v).unwrap()
275    }
276
277    #[test]
278    fn effective_is_base_without_waiters() {
279        let pi = PriorityInheritance::new(p(10));
280        assert_eq!(pi.effective(), p(10));
281        assert_eq!(pi.waiter_count(), 0);
282    }
283
284    #[test]
285    fn owner_inherits_highest_waiter() {
286        let mut pi = PriorityInheritance::new(p(10));
287        assert_eq!(pi.on_block(p(50)), p(50)); // raised
288        assert_eq!(pi.on_block(p(30)), p(50)); // stays at the maximum
289        assert_eq!(pi.on_block(p(90)), p(90)); // higher → inherits more
290        assert_eq!(pi.waiter_count(), 3);
291    }
292
293    #[test]
294    fn priority_reverts_on_unblock() {
295        let mut pi = PriorityInheritance::new(p(10));
296        pi.on_block(p(50));
297        pi.on_block(p(90));
298        assert_eq!(pi.effective(), p(90));
299        assert_eq!(pi.on_unblock(p(90)), p(50)); // highest gone → drops to 50
300        assert_eq!(pi.on_unblock(p(50)), p(10)); // all gone → base
301    }
302
303    #[cfg(feature = "std")]
304    #[test]
305    fn rt_mutex_basic_lock_unlock() {
306        let m = RtMutex::new(0u32);
307        {
308            let mut g = m.lock(p(10));
309            *g += 5;
310            assert_eq!(m.effective_holder_priority(), Some(p(10)));
311        }
312        assert_eq!(m.effective_holder_priority(), None);
313        assert_eq!(*m.lock(p(1)), 5);
314    }
315
316    #[cfg(feature = "std")]
317    #[test]
318    fn rt_mutex_try_lock_contended() {
319        let m = RtMutex::new(());
320        let g = m.lock(p(10));
321        assert!(m.try_lock(p(20)).is_none());
322        drop(g);
323        assert!(m.try_lock(p(20)).is_some());
324    }
325
326    #[cfg(feature = "std")]
327    #[test]
328    fn rt_mutex_grants_highest_priority_waiter_first() {
329        use std::sync::Arc;
330        use std::sync::atomic::{AtomicU32, Ordering};
331
332        let m = Arc::new(RtMutex::new(()));
333        let order = Arc::new(std::sync::Mutex::new(alloc::vec::Vec::<i16>::new()));
334        let next = Arc::new(AtomicU32::new(0));
335
336        // Low-priority owner holds the lock and blocks the waiters.
337        let g = m.lock(p(1));
338
339        // Start two waiters; wait until both are provably blocked.
340        let mut handles = alloc::vec::Vec::new();
341        for prio in [p(40), p(80)] {
342            let m = Arc::clone(&m);
343            let order = Arc::clone(&order);
344            let next = Arc::clone(&next);
345            handles.push(std::thread::spawn(move || {
346                next.fetch_add(1, Ordering::SeqCst);
347                let _lg = m.lock(prio);
348                order.lock().unwrap().push(prio.value());
349            }));
350        }
351        // Both threads have started; wait until both are in coord.waiters.
352        while m.effective_holder_priority() != Some(p(80)) {
353            std::thread::yield_now();
354        }
355        // Owner releases → highest waiter (80) must be served first.
356        drop(g);
357        for h in handles {
358            h.join().unwrap();
359        }
360        let seq = order.lock().unwrap().clone();
361        assert_eq!(seq, alloc::vec![80, 40]);
362    }
363}