pcp_mutex/
lib.rs

1use lazy_static::lazy_static;
2use linux_futex::{PiFutex, Private};
3use std::{
4    cell::{Cell, UnsafeCell},
5    marker::PhantomData,
6    mem::MaybeUninit,
7    sync::{
8        atomic::{self, AtomicPtr},
9        Arc,
10    },
11};
12
13pub type Priority = u8;
14pub type ThreadId = i32;
15
16// Futex can contain WAITERS bit, so we use mask to obtain thread id
17const FUTEX_TID_MASK: i32 = PiFutex::<linux_futex::Private>::TID_MASK;
18
19lazy_static! {
20    // Default PcpGroup when using `PcpMutex::new()`
21    static ref DEFAULT_GROUP: PcpGroup = Default::default();
22    // Used to temporarily block highest_locker AtomicPtr
23    static ref BLOCKED_LOCKER: PcpMutexLock = PcpMutexLock {
24        ceiling: Priority::MAX,
25        futex: PiFutex::default(),
26    };
27}
28
29/// A helper object that contains thread id and keeps track of current priority
30#[derive(Debug)]
31struct ThreadState {
32    priority: Cell<Priority>,
33    thread_id: ThreadId,
34    _non_send: PhantomData<*const u8>,
35}
36
37impl ThreadState {
38    /// Creates a new `ThreadState` with a given priority.
39    ///
40    /// # Safety
41    ///
42    /// The given priority and thread id must be valid.
43    unsafe fn new(priority: Priority, thread_id: ThreadId) -> Self {
44        Self {
45            priority: Cell::new(priority),
46            thread_id,
47            _non_send: PhantomData::default(),
48        }
49    }
50
51    /// Constructs `ThreadState` from `gettid` and `sched_getparam` syscalls
52    fn from_sys() -> Self {
53        let mut sched_param = MaybeUninit::<libc::sched_param>::uninit();
54
55        unsafe {
56            let thread_id = libc::syscall(libc::SYS_gettid) as _;
57            libc::sched_getparam(0, sched_param.as_mut_ptr());
58            Self::new(sched_param.assume_init().sched_priority as u8, thread_id)
59        }
60    }
61
62    /// Internal method for setting priority to the locked resource ceiling.
63    fn set_priority(&self, priority: Priority) {
64        self.priority.set(priority);
65    }
66
67    /// Returns the current thread priority
68    fn get_priority(&self) -> Priority {
69        self.priority.get()
70    }
71}
72
73pub mod thread {
74    use crate::{Priority, ThreadState};
75    use std::mem::MaybeUninit;
76
77    thread_local! {
78        pub(crate) static THREAD_STATE: ThreadState = ThreadState::from_sys()
79    }
80
81    /// Updates internal base thread priority from `sched_getparam` syscall. Must be called if thread priority is changed manually.
82    pub fn update_priority() -> std::io::Result<()> {
83        let mut sched_param = MaybeUninit::<libc::sched_param>::uninit();
84
85        let priority = unsafe {
86            if libc::sched_getparam(0, sched_param.as_mut_ptr()) != 0 {
87                return Err(std::io::Error::last_os_error());
88            }
89
90            sched_param.assume_init().sched_priority as u8
91        };
92
93        THREAD_STATE.with(|s| s.set_priority(priority));
94
95        Ok(())
96    }
97
98    /// Sets current thread scheduling policy to SCHED_FIFO with a given priority.
99    pub fn init_fifo_priority(priority: Priority) -> std::io::Result<()> {
100        let param = libc::sched_param {
101            sched_priority: priority as i32,
102        };
103
104        unsafe {
105            if libc::sched_setscheduler(0, libc::SCHED_FIFO, &param) != 0 {
106                return Err(std::io::Error::last_os_error());
107            }
108        }
109
110        THREAD_STATE.with(|s| s.set_priority(priority));
111
112        Ok(())
113    }
114
115    /// Returns the current dynamic thread priority
116    pub fn get_priority() -> Priority {
117        THREAD_STATE.with(|s| s.priority.get())
118    }
119}
120
121/// Group of mutexes that share the same system ceiling
122#[derive(Debug, Default)]
123pub struct PcpGroup {
124    // Pointer to the locked mutex with the highest ceiling
125    highest_locker: AtomicPtr<PcpMutexLock>,
126}
127
128impl PcpGroup {
129    /// Creates a new mutex with a given priority ceiling.
130    /// Ceiling must be calculated manually.
131    pub fn create<'a, T>(&'a self, res: T, ceiling: Priority) -> PcpMutex<'a, T> {
132        PcpMutex {
133            res: UnsafeCell::new(res),
134            group: &self,
135            lock: Arc::new(PcpMutexLock {
136                ceiling,
137                futex: PiFutex::default(),
138            }),
139        }
140    }
141}
142
143#[derive(Debug)]
144struct PcpMutexLock {
145    /// Static priority ceiling of the mutex
146    ceiling: Priority,
147    /// PiFutex that is used to lock the mutex
148    futex: PiFutex<Private>,
149}
150
151/// A Priority Ceiling Protocol mutex
152#[derive(Debug)]
153pub struct PcpMutex<'a, T> {
154    /// Resource protected by the mutex
155    res: UnsafeCell<T>,
156    /// Group which this mutex belongs to
157    group: &'a PcpGroup,
158    /// Internal lock implementation
159    lock: Arc<PcpMutexLock>,
160}
161
162// PcpMutex is not Send nor Sync because we use UnsafeCell.
163// It is save to `Send` it between threads as long as resource itself is `Send`.
164unsafe impl<'a, T> Send for PcpMutex<'a, T> where T: Send {}
165// We ensure `Sync` behavior by locking mechanism in `PcpMutex::lock()`
166unsafe impl<'a, T> Sync for PcpMutex<'a, T> where T: Send {}
167
168// Performs PCP check if thread can lock a mutex.
169fn can_lock(highest: &Option<Arc<PcpMutexLock>>, thread_info: &ThreadState) -> bool {
170    if let Some(highest) = highest {
171        if thread_info.get_priority() > highest.ceiling {
172            // Thread priority is higher than the system ceiling, lock can be taken
173            true
174        } else {
175            // Get thread id of the highest locker
176            let locker_tid = highest.futex.value.load(atomic::Ordering::Relaxed) & FUTEX_TID_MASK;
177
178            if thread_info.get_priority() == highest.ceiling && thread_info.thread_id == locker_tid
179            {
180                // Highest locker is the same thread, lock can be taken
181                true
182            } else {
183                // Thread does not meet PCP criteria and cannot take the lock
184                false
185            }
186        }
187    } else {
188        // There are no locked mutexes, lock can be taken
189        true
190    }
191}
192
193// This safely clones Arc that is in the highest_locker AtomicPtr by temporarily swapping in BLOCKED_LOCKER.
194// Without this, Arc could be dropped between AtomicPtr::load() and Arc::clone() calls.
195// There is probably a more efficient way of doing this.
196fn get_highest_locker(
197    ptr: &AtomicPtr<PcpMutexLock>,
198    thread_info: &ThreadState,
199) -> (*mut PcpMutexLock, Option<Arc<PcpMutexLock>>) {
200    use atomic::Ordering::*;
201
202    if BLOCKED_LOCKER
203        .futex
204        .value
205        .compare_exchange(0, thread_info.thread_id, Acquire, Relaxed)
206        .is_err()
207    {
208        while !BLOCKED_LOCKER.futex.lock_pi().is_ok() {}
209    }
210
211    let blocked_ptr = unsafe { core::mem::transmute(&*BLOCKED_LOCKER) };
212    let highest_ptr = ptr.swap(blocked_ptr, Acquire);
213
214    let highest = if highest_ptr.is_null() {
215        None
216    } else {
217        let arc = unsafe { Arc::from_raw(highest_ptr) };
218        let arc_new = arc.clone();
219        // prevent original ref count from being decremented
220        core::mem::forget(arc);
221        Some(arc_new)
222    };
223
224    while !ptr
225        .compare_exchange(blocked_ptr, highest_ptr, Release, Relaxed)
226        .is_ok()
227    {}
228
229    if BLOCKED_LOCKER
230        .futex
231        .value
232        .compare_exchange(thread_info.thread_id, 0, Release, Relaxed)
233        .is_err()
234    {
235        BLOCKED_LOCKER.futex.unlock_pi();
236    }
237
238    (highest_ptr, highest)
239}
240
241impl<'a, T> PcpMutex<'a, T> {
242    /// Creates a new PcpMutex with a default global group
243    pub fn new(res: T, ceiling: Priority) -> Self {
244        DEFAULT_GROUP.create(res, ceiling)
245    }
246
247    /// Locks the mutex and executes critical section in a closure.
248    pub fn lock<R>(&'a self, f: impl FnOnce(&mut T) -> R) -> R {
249        thread::THREAD_STATE.with(|thread_state| self.lock_internal(thread_state, f))
250    }
251
252    /// Locks the mutex and executes critical section in a closure.
253    fn lock_internal<R>(&'a self, thread_info: &ThreadState, f: impl FnOnce(&mut T) -> R) -> R {
254        use atomic::Ordering::*;
255
256        // Get pointer to our lock.
257        // If any thread is waiting on this lock, they will firstly clone the Arc.
258        // We transmute *const to *mut, because AtomicPtr only takes *mut pointer.
259        // However, we only use immutable references to PcpMutexLock so this is safe.
260        let self_ptr = unsafe { core::mem::transmute(Arc::as_ptr(&self.lock)) };
261
262        loop {
263            // Fetch the current highest locker (system ceiling)
264            let (highest_ptr, highest) =
265                get_highest_locker(&self.group.highest_locker, thread_info);
266
267            if can_lock(&highest, thread_info) {
268                // This would work, but we don't want to create multiple mutable references to the resource
269                if highest
270                    .map(|l| Arc::ptr_eq(&l, &self.lock))
271                    .unwrap_or(false)
272                {
273                    panic!("PcpMutex is not reentrant!");
274                }
275
276                // Try locking the mutex
277                if let Err(val) = self.lock.futex.value.compare_exchange(
278                    0,
279                    thread_info.thread_id,
280                    Acquire,
281                    Acquire,
282                ) {
283                    // Futex might already contain current thread, because of `lock_pi()` syscall.
284                    // Use TID mask, because WAITERS bit might be set and comparison would fail.
285                    if val & FUTEX_TID_MASK != thread_info.thread_id {
286                        // Some other thread locked futex before us.
287                        while !self.lock.futex.lock_pi().is_ok() {}
288                        // Retry
289                        continue;
290                    }
291                }
292
293                // Set the new highest locker
294                if self
295                    .group
296                    .highest_locker
297                    .compare_exchange(highest_ptr, self_ptr, SeqCst, SeqCst)
298                    .is_err()
299                {
300                    // Race condition.
301                    // Unlock the acquired futex and retry
302                    if self
303                        .lock
304                        .futex
305                        .value
306                        .compare_exchange(thread_info.thread_id, 0, SeqCst, SeqCst)
307                        .is_err()
308                    {
309                        self.lock.futex.unlock_pi();
310                    }
311
312                    // retry
313                    continue;
314                }
315
316                let prev_priority = thread_info.get_priority();
317                // Note that actual priority will only be updated when another thread calls `lock_pi()` on the futex.
318                thread_info.set_priority(self.lock.ceiling);
319
320                // Enter the critical section.
321                // Accessing resource is safe because no other thread is allowed to reach this.
322                let result = f(unsafe { &mut *self.res.get() });
323
324                // Restore priority
325                thread_info.set_priority(prev_priority);
326
327                loop {
328                    match self.group.highest_locker.compare_exchange(
329                        self_ptr,
330                        highest_ptr,
331                        SeqCst,
332                        SeqCst,
333                    ) {
334                        Ok(_) => break,
335                        // State was changed while running, retry.
336                        // For multi-core it would be wise to lock_pi new state,
337                        // but execution ordering gets complicated, so just busy wait.
338                        Err(_) => continue,
339                    }
340                }
341
342                // Unlock the futex
343                if self
344                    .lock
345                    .futex
346                    .value
347                    .compare_exchange(thread_info.thread_id, 0, SeqCst, SeqCst)
348                    .is_err()
349                {
350                    // WAITERS bit was set and we must unlock via syscall to wakeup waiting thread.
351                    self.lock.futex.unlock_pi();
352                }
353
354                return result;
355            } else {
356                // Thread hit the system ceiling and could not take the lock. It must wait.
357
358                // can_lock will not return false if Option is None so this never fails
359                let highest = highest.unwrap();
360
361                // Suspend current thread. Kernel will raise the locker priority automatically.
362                while !highest.futex.lock_pi().is_ok() {}
363
364                // Retry locking recursively
365                let res = self.lock_internal(thread_info, f);
366
367                // Release the acquired `highest` lock
368                if let Err(val) =
369                    highest
370                        .futex
371                        .value
372                        .compare_exchange(thread_info.thread_id, 0, SeqCst, SeqCst)
373                {
374                    // Lock could already have been unlocked by a recursive self.lock() call
375                    if val & FUTEX_TID_MASK == thread_info.thread_id {
376                        highest.futex.unlock_pi();
377                    }
378                }
379
380                return res;
381            }
382        }
383    }
384
385    /// Returns priority ceiling of this mutex
386    pub fn ceiling(&self) -> Priority {
387        self.lock.ceiling
388    }
389}