pcp_mutex/lib.rs
1use lazy_static::lazy_static;
2use linux_futex::{PiFutex, Private};
3use std::{
4 cell::{Cell, UnsafeCell},
5 marker::PhantomData,
6 mem::MaybeUninit,
7 sync::{
8 atomic::{self, AtomicPtr},
9 Arc,
10 },
11};
12
13pub type Priority = u8;
14pub type ThreadId = i32;
15
16// Futex can contain WAITERS bit, so we use mask to obtain thread id
17const FUTEX_TID_MASK: i32 = PiFutex::<linux_futex::Private>::TID_MASK;
18
19lazy_static! {
20 // Default PcpGroup when using `PcpMutex::new()`
21 static ref DEFAULT_GROUP: PcpGroup = Default::default();
22 // Used to temporarily block highest_locker AtomicPtr
23 static ref BLOCKED_LOCKER: PcpMutexLock = PcpMutexLock {
24 ceiling: Priority::MAX,
25 futex: PiFutex::default(),
26 };
27}
28
29/// A helper object that contains thread id and keeps track of current priority
30#[derive(Debug)]
31struct ThreadState {
32 priority: Cell<Priority>,
33 thread_id: ThreadId,
34 _non_send: PhantomData<*const u8>,
35}
36
37impl ThreadState {
38 /// Creates a new `ThreadState` with a given priority.
39 ///
40 /// # Safety
41 ///
42 /// The given priority and thread id must be valid.
43 unsafe fn new(priority: Priority, thread_id: ThreadId) -> Self {
44 Self {
45 priority: Cell::new(priority),
46 thread_id,
47 _non_send: PhantomData::default(),
48 }
49 }
50
51 /// Constructs `ThreadState` from `gettid` and `sched_getparam` syscalls
52 fn from_sys() -> Self {
53 let mut sched_param = MaybeUninit::<libc::sched_param>::uninit();
54
55 unsafe {
56 let thread_id = libc::syscall(libc::SYS_gettid) as _;
57 libc::sched_getparam(0, sched_param.as_mut_ptr());
58 Self::new(sched_param.assume_init().sched_priority as u8, thread_id)
59 }
60 }
61
62 /// Internal method for setting priority to the locked resource ceiling.
63 fn set_priority(&self, priority: Priority) {
64 self.priority.set(priority);
65 }
66
67 /// Returns the current thread priority
68 fn get_priority(&self) -> Priority {
69 self.priority.get()
70 }
71}
72
73pub mod thread {
74 use crate::{Priority, ThreadState};
75 use std::mem::MaybeUninit;
76
77 thread_local! {
78 pub(crate) static THREAD_STATE: ThreadState = ThreadState::from_sys()
79 }
80
81 /// Updates internal base thread priority from `sched_getparam` syscall. Must be called if thread priority is changed manually.
82 pub fn update_priority() -> std::io::Result<()> {
83 let mut sched_param = MaybeUninit::<libc::sched_param>::uninit();
84
85 let priority = unsafe {
86 if libc::sched_getparam(0, sched_param.as_mut_ptr()) != 0 {
87 return Err(std::io::Error::last_os_error());
88 }
89
90 sched_param.assume_init().sched_priority as u8
91 };
92
93 THREAD_STATE.with(|s| s.set_priority(priority));
94
95 Ok(())
96 }
97
98 /// Sets current thread scheduling policy to SCHED_FIFO with a given priority.
99 pub fn init_fifo_priority(priority: Priority) -> std::io::Result<()> {
100 let param = libc::sched_param {
101 sched_priority: priority as i32,
102 };
103
104 unsafe {
105 if libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m) != 0 {
106 return Err(std::io::Error::last_os_error());
107 }
108 }
109
110 THREAD_STATE.with(|s| s.set_priority(priority));
111
112 Ok(())
113 }
114
115 /// Returns the current dynamic thread priority
116 pub fn get_priority() -> Priority {
117 THREAD_STATE.with(|s| s.priority.get())
118 }
119}
120
121/// Group of mutexes that share the same system ceiling
122#[derive(Debug, Default)]
123pub struct PcpGroup {
124 // Pointer to the locked mutex with the highest ceiling
125 highest_locker: AtomicPtr<PcpMutexLock>,
126}
127
128impl PcpGroup {
129 /// Creates a new mutex with a given priority ceiling.
130 /// Ceiling must be calculated manually.
131 pub fn create<'a, T>(&'a self, res: T, ceiling: Priority) -> PcpMutex<'a, T> {
132 PcpMutex {
133 res: UnsafeCell::new(res),
134 group: &self,
135 lock: Arc::new(PcpMutexLock {
136 ceiling,
137 futex: PiFutex::default(),
138 }),
139 }
140 }
141}
142
143#[derive(Debug)]
144struct PcpMutexLock {
145 /// Static priority ceiling of the mutex
146 ceiling: Priority,
147 /// PiFutex that is used to lock the mutex
148 futex: PiFutex<Private>,
149}
150
151/// A Priority Ceiling Protocol mutex
152#[derive(Debug)]
153pub struct PcpMutex<'a, T> {
154 /// Resource protected by the mutex
155 res: UnsafeCell<T>,
156 /// Group which this mutex belongs to
157 group: &'a PcpGroup,
158 /// Internal lock implementation
159 lock: Arc<PcpMutexLock>,
160}
161
162// PcpMutex is not Send nor Sync because we use UnsafeCell.
163// It is save to `Send` it between threads as long as resource itself is `Send`.
164unsafe impl<'a, T> Send for PcpMutex<'a, T> where T: Send {}
165// We ensure `Sync` behavior by locking mechanism in `PcpMutex::lock()`
166unsafe impl<'a, T> Sync for PcpMutex<'a, T> where T: Send {}
167
168// Performs PCP check if thread can lock a mutex.
169fn can_lock(highest: &Option<Arc<PcpMutexLock>>, thread_info: &ThreadState) -> bool {
170 if let Some(highest) = highest {
171 if thread_info.get_priority() > highest.ceiling {
172 // Thread priority is higher than the system ceiling, lock can be taken
173 true
174 } else {
175 // Get thread id of the highest locker
176 let locker_tid = highest.futex.value.load(atomic::Ordering::Relaxed) & FUTEX_TID_MASK;
177
178 if thread_info.get_priority() == highest.ceiling && thread_info.thread_id == locker_tid
179 {
180 // Highest locker is the same thread, lock can be taken
181 true
182 } else {
183 // Thread does not meet PCP criteria and cannot take the lock
184 false
185 }
186 }
187 } else {
188 // There are no locked mutexes, lock can be taken
189 true
190 }
191}
192
193// This safely clones Arc that is in the highest_locker AtomicPtr by temporarily swapping in BLOCKED_LOCKER.
194// Without this, Arc could be dropped between AtomicPtr::load() and Arc::clone() calls.
195// There is probably a more efficient way of doing this.
196fn get_highest_locker(
197 ptr: &AtomicPtr<PcpMutexLock>,
198 thread_info: &ThreadState,
199) -> (*mut PcpMutexLock, Option<Arc<PcpMutexLock>>) {
200 use atomic::Ordering::*;
201
202 if BLOCKED_LOCKER
203 .futex
204 .value
205 .compare_exchange(0, thread_info.thread_id, Acquire, Relaxed)
206 .is_err()
207 {
208 while !BLOCKED_LOCKER.futex.lock_pi().is_ok() {}
209 }
210
211 let blocked_ptr = unsafe { core::mem::transmute(&*BLOCKED_LOCKER) };
212 let highest_ptr = ptr.swap(blocked_ptr, Acquire);
213
214 let highest = if highest_ptr.is_null() {
215 None
216 } else {
217 let arc = unsafe { Arc::from_raw(highest_ptr) };
218 let arc_new = arc.clone();
219 // prevent original ref count from being decremented
220 core::mem::forget(arc);
221 Some(arc_new)
222 };
223
224 while !ptr
225 .compare_exchange(blocked_ptr, highest_ptr, Release, Relaxed)
226 .is_ok()
227 {}
228
229 if BLOCKED_LOCKER
230 .futex
231 .value
232 .compare_exchange(thread_info.thread_id, 0, Release, Relaxed)
233 .is_err()
234 {
235 BLOCKED_LOCKER.futex.unlock_pi();
236 }
237
238 (highest_ptr, highest)
239}
240
241impl<'a, T> PcpMutex<'a, T> {
242 /// Creates a new PcpMutex with a default global group
243 pub fn new(res: T, ceiling: Priority) -> Self {
244 DEFAULT_GROUP.create(res, ceiling)
245 }
246
247 /// Locks the mutex and executes critical section in a closure.
248 pub fn lock<R>(&'a self, f: impl FnOnce(&mut T) -> R) -> R {
249 thread::THREAD_STATE.with(|thread_state| self.lock_internal(thread_state, f))
250 }
251
252 /// Locks the mutex and executes critical section in a closure.
253 fn lock_internal<R>(&'a self, thread_info: &ThreadState, f: impl FnOnce(&mut T) -> R) -> R {
254 use atomic::Ordering::*;
255
256 // Get pointer to our lock.
257 // If any thread is waiting on this lock, they will firstly clone the Arc.
258 // We transmute *const to *mut, because AtomicPtr only takes *mut pointer.
259 // However, we only use immutable references to PcpMutexLock so this is safe.
260 let self_ptr = unsafe { core::mem::transmute(Arc::as_ptr(&self.lock)) };
261
262 loop {
263 // Fetch the current highest locker (system ceiling)
264 let (highest_ptr, highest) =
265 get_highest_locker(&self.group.highest_locker, thread_info);
266
267 if can_lock(&highest, thread_info) {
268 // This would work, but we don't want to create multiple mutable references to the resource
269 if highest
270 .map(|l| Arc::ptr_eq(&l, &self.lock))
271 .unwrap_or(false)
272 {
273 panic!("PcpMutex is not reentrant!");
274 }
275
276 // Try locking the mutex
277 if let Err(val) = self.lock.futex.value.compare_exchange(
278 0,
279 thread_info.thread_id,
280 Acquire,
281 Acquire,
282 ) {
283 // Futex might already contain current thread, because of `lock_pi()` syscall.
284 // Use TID mask, because WAITERS bit might be set and comparison would fail.
285 if val & FUTEX_TID_MASK != thread_info.thread_id {
286 // Some other thread locked futex before us.
287 while !self.lock.futex.lock_pi().is_ok() {}
288 // Retry
289 continue;
290 }
291 }
292
293 // Set the new highest locker
294 if self
295 .group
296 .highest_locker
297 .compare_exchange(highest_ptr, self_ptr, SeqCst, SeqCst)
298 .is_err()
299 {
300 // Race condition.
301 // Unlock the acquired futex and retry
302 if self
303 .lock
304 .futex
305 .value
306 .compare_exchange(thread_info.thread_id, 0, SeqCst, SeqCst)
307 .is_err()
308 {
309 self.lock.futex.unlock_pi();
310 }
311
312 // retry
313 continue;
314 }
315
316 let prev_priority = thread_info.get_priority();
317 // Note that actual priority will only be updated when another thread calls `lock_pi()` on the futex.
318 thread_info.set_priority(self.lock.ceiling);
319
320 // Enter the critical section.
321 // Accessing resource is safe because no other thread is allowed to reach this.
322 let result = f(unsafe { &mut *self.res.get() });
323
324 // Restore priority
325 thread_info.set_priority(prev_priority);
326
327 loop {
328 match self.group.highest_locker.compare_exchange(
329 self_ptr,
330 highest_ptr,
331 SeqCst,
332 SeqCst,
333 ) {
334 Ok(_) => break,
335 // State was changed while running, retry.
336 // For multi-core it would be wise to lock_pi new state,
337 // but execution ordering gets complicated, so just busy wait.
338 Err(_) => continue,
339 }
340 }
341
342 // Unlock the futex
343 if self
344 .lock
345 .futex
346 .value
347 .compare_exchange(thread_info.thread_id, 0, SeqCst, SeqCst)
348 .is_err()
349 {
350 // WAITERS bit was set and we must unlock via syscall to wakeup waiting thread.
351 self.lock.futex.unlock_pi();
352 }
353
354 return result;
355 } else {
356 // Thread hit the system ceiling and could not take the lock. It must wait.
357
358 // can_lock will not return false if Option is None so this never fails
359 let highest = highest.unwrap();
360
361 // Suspend current thread. Kernel will raise the locker priority automatically.
362 while !highest.futex.lock_pi().is_ok() {}
363
364 // Retry locking recursively
365 let res = self.lock_internal(thread_info, f);
366
367 // Release the acquired `highest` lock
368 if let Err(val) =
369 highest
370 .futex
371 .value
372 .compare_exchange(thread_info.thread_id, 0, SeqCst, SeqCst)
373 {
374 // Lock could already have been unlocked by a recursive self.lock() call
375 if val & FUTEX_TID_MASK == thread_info.thread_id {
376 highest.futex.unlock_pi();
377 }
378 }
379
380 return res;
381 }
382 }
383 }
384
385 /// Returns priority ceiling of this mutex
386 pub fn ceiling(&self) -> Priority {
387 self.lock.ceiling
388 }
389}