linux_io/
sync.rs

1use core::{
2    cell::UnsafeCell,
3    marker::PhantomData,
4    ops::{Deref, DerefMut},
5    ptr::null,
6    sync::atomic::{
7        AtomicU32,
8        Ordering::{Acquire, Relaxed, Release},
9    },
10};
11
12/// A mutex implemented in terms of the Linux "futex" system call.
13pub struct Mutex<T: ?Sized> {
14    futex: Futex<true>,
15    data: UnsafeCell<T>,
16}
17
18impl<T> Mutex<T> {
19    pub const fn new(v: T) -> Self {
20        Self {
21            futex: Futex::new(),
22            data: UnsafeCell::new(v),
23        }
24    }
25
26    pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> {
27        self.futex.lock();
28        MutexGuard::new(self)
29    }
30
31    pub fn try_lock<'a>(&'a self) -> core::result::Result<MutexGuard<'a, T>, ()> {
32        if self.futex.try_lock() {
33            Ok(MutexGuard::new(self))
34        } else {
35            Err(())
36        }
37    }
38}
39
40unsafe impl<T: Send> Send for Mutex<T> {}
41unsafe impl<T: Send> Sync for Mutex<T> {}
42
43pub struct MutexGuard<'mutex, T: ?Sized + 'mutex> {
44    lock: &'mutex Mutex<T>,
45    _not_send: PhantomData<*mut ()>,
46}
47
48impl<'mutex, T: ?Sized + 'mutex> MutexGuard<'mutex, T> {
49    #[inline]
50    fn new(lock: &'mutex Mutex<T>) -> Self {
51        Self {
52            lock,
53            _not_send: PhantomData,
54        }
55    }
56}
57
58impl<T: ?Sized> Drop for MutexGuard<'_, T> {
59    fn drop(&mut self) {
60        unsafe { self.lock.futex.unlock() }
61    }
62}
63
64impl<T: ?Sized> Deref for MutexGuard<'_, T> {
65    type Target = T;
66
67    fn deref(&self) -> &T {
68        unsafe { &*self.lock.data.get() }
69    }
70}
71
72impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
73    fn deref_mut(&mut self) -> &mut T {
74        unsafe { &mut *self.lock.data.get() }
75    }
76}
77
78#[repr(transparent)]
79struct Futex<const SINGLE_PROCESS: bool = false> {
80    futex_word: AtomicU32,
81}
82
83impl<const SINGLE_PROCESS: bool> Futex<SINGLE_PROCESS> {
84    // The following is essentially just the futex implementation for std, but lightly
85    // adapted to use linux-unsafe to call the kernel's futex ops.
86
87    const UNLOCKED: u32 = 0;
88    const LOCKED: u32 = 1; // locked, no other threads waiting
89    const CONTENDED: u32 = 2; // locked, and other threads waiting (contended)
90
91    const FUTEX_WAIT: linux_unsafe::int = linux_unsafe::FUTEX_WAIT | Self::FUTEX_OP_FLAGS;
92    const FUTEX_WAKE: linux_unsafe::int = linux_unsafe::FUTEX_WAKE | Self::FUTEX_OP_FLAGS;
93    const FUTEX_OP_FLAGS: linux_unsafe::int = if SINGLE_PROCESS {
94        linux_unsafe::FUTEX_PRIVATE
95    } else {
96        0
97    };
98
99    #[inline]
100    pub const fn new() -> Self {
101        Self {
102            futex_word: AtomicU32::new(Self::UNLOCKED),
103        }
104    }
105
106    #[inline]
107    pub fn try_lock(&self) -> bool {
108        self.futex_word
109            .compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
110            .is_ok()
111    }
112
113    #[inline]
114    pub fn lock(&self) {
115        if self
116            .futex_word
117            .compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
118            .is_err()
119        {
120            self.lock_contended();
121        }
122    }
123
124    #[cold]
125    fn lock_contended(&self) {
126        // Spin first to speed things up if the lock is released quickly.
127        let mut state = self.spin();
128
129        // If it's unlocked now, attempt to take the lock
130        // without marking it as contended.
131        if state == Self::UNLOCKED {
132            match self
133                .futex_word
134                .compare_exchange(Self::UNLOCKED, Self::LOCKED, Acquire, Relaxed)
135            {
136                Ok(_) => return, // Locked!
137                Err(s) => state = s,
138            }
139        }
140
141        loop {
142            // Put the lock in contended state.
143            // We avoid an unnecessary write if it as already set to CONTENDED,
144            // to be friendlier for the caches.
145            if state != Self::CONTENDED
146                && self.futex_word.swap(Self::CONTENDED, Acquire) == Self::UNLOCKED
147            {
148                // We changed it from UNLOCKED to CONTENDED, so we just successfully locked it.
149                return;
150            }
151
152            // Wait for the futex to change state, assuming it is still CONTENDED.
153            let _ = self.futex_wait(Self::CONTENDED);
154
155            // Spin again after waking up.
156            state = self.spin();
157        }
158    }
159
160    fn spin(&self) -> u32 {
161        let mut spin = 100;
162        loop {
163            // We only use `load` (and not `swap` or `compare_exchange`)
164            // while spinning, to be easier on the caches.
165            let state = self.futex_word.load(Relaxed);
166
167            // We stop spinning when the mutex is UNLOCKED,
168            // but also when it's CONTENDED.
169            if state != Self::LOCKED || spin == 0 {
170                return state;
171            }
172
173            core::hint::spin_loop();
174            spin -= 1;
175        }
176    }
177
178    #[inline]
179    pub unsafe fn unlock(&self) {
180        if self.futex_word.swap(Self::UNLOCKED, Release) == Self::CONTENDED {
181            // We only wake up one thread. When that thread locks the mutex, it
182            // will mark the mutex as CONTENDED (see lock_contended above),
183            // which makes sure that any other waiting threads will also be
184            // woken up eventually.
185            self.wake();
186        }
187    }
188
189    #[cold]
190    fn wake(&self) {
191        let _ = self.futex_wake();
192    }
193
194    #[inline]
195    fn futex_wait(&self, want: u32) -> linux_unsafe::result::Result<linux_unsafe::int> {
196        unsafe {
197            linux_unsafe::futex(
198                self.futex_word.as_ptr(),
199                Self::FUTEX_WAIT,
200                want,
201                0,
202                null(),
203                0,
204            )
205        }
206    }
207
208    #[inline]
209    fn futex_wake(&self) -> linux_unsafe::result::Result<linux_unsafe::int> {
210        unsafe { linux_unsafe::futex(self.futex_word.as_ptr(), Self::FUTEX_WAKE, 1, 0, null(), 0) }
211    }
212}