Skip to main content

async_priority_lock/
mutex.rs

1//! Contains the [Mutex] type for async, priority-ordered exclusive access to a resource.
2#[cfg(doc)]
3use crate::{FIFO, LIFO, queue::*};
4use crate::{
5    Priority, RwLock,
6    queue::{PriorityQueue, PriorityQueueHandle},
7    waiter::{self, Waiter, WaiterFlagFut},
8};
9use core::{
10    cell::UnsafeCell,
11    fmt::{Debug, Display},
12    format_args,
13    marker::PhantomData,
14    mem::ManuallyDrop,
15    ops::{Deref, DerefMut},
16};
17
18#[cfg(feature = "const-default")]
19use const_default::ConstDefault;
20
21impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> waiter::WaiterHandle
22    for MutexGuard<'a, P, T, Q>
23{
24    #[inline]
25    fn with_waker<R>(&self, f: impl FnOnce(&Waiter) -> R) -> R {
26        // rust optimizer doesn't seem to be smart enough yet to do branch elimination with
27        // associated const option (e.g. if let Some(x) = Q::Handle::LOAD_PURE)
28        if Self::HAS_PURE_LOAD {
29            unsafe { f(&Q::Handle::LOAD_PURE.unwrap_unchecked()(&self.node).waiter) }
30        } else {
31            let queue = self.mutex.queue.read();
32
33            f(&queue.get_by_handle(&self.node).waiter)
34        }
35    }
36}
37
38#[derive(Debug)]
39/// Opaque waiter type used for [PriorityQueue] entries.
40///
41/// This implements [Priority] and is the entry type used by [Mutex].
42pub struct MutexWaiter<P: Priority> {
43    priority: P,
44    waiter: Waiter,
45}
46
47/// Has the same Priority impl as P with the exception of pinning holding entry to head.
48impl<P: Priority> Priority for MutexWaiter<P> {
49    #[inline]
50    fn compare(&self, other: &Self) -> core::cmp::Ordering {
51        self.priority.compare(&other.priority)
52    }
53
54    #[inline]
55    fn compare_new(&self, old: &Self) -> std::cmp::Ordering {
56        if old.waiter.has_lock() {
57            return core::cmp::Ordering::Less;
58        }
59
60        self.priority.compare_new(&old.priority)
61    }
62}
63
64impl<P: Priority> MutexWaiter<P> {
65    #[inline]
66    const fn new<'a>(holder: P, has_lock: bool) -> Self {
67        Self {
68            priority: holder,
69            waiter: Waiter::new(has_lock),
70        }
71    }
72}
73
74#[cfg(all(feature = "arena-queue", target_pointer_width = "64"))]
75type DefaultMutexQueue_<P> = crate::queue::SingleLinkArenaQueue<MutexWaiter<P>>;
76
77#[cfg(all(not(feature = "arena-queue"), feature = "box-queue"))]
78type DefaultMutexQueue_<P> = crate::queue::DualLinkBoxQueue<MutexWaiter<P>>;
79
80/// The default queue used if unspecified for `Mutex`.  
81///
82/// The actual queue used varies based on flags (and in the future, may even be a new queue)
83/// but will always make [Mutex] (and associated fns) [`Send`]` + `[`Sync`] as long as `P` and `T` are.
84///
85/// Currently, the defualt is as follows:
86/// - if `arena-queue` is enabled: [SingleLinkArenaQueue] (it is expected that removed nodes will
87/// usually have the lock, and thus will be the head node, and dequeue uses a seq block of memory
88/// so the additional overhead added for queueing (e.g. in allocating) is usually not needed)
89/// - if `box-queue` is enabled but not `arena-queue`: [DualLinkBoxQueue] - unlike [ArenaQueue],
90/// searching for a prev node with [BoxQueue] is significantly slower than dual linking, so
91/// we don't use [SingleLinkBoxQueue] .
92/// - if neither flag is enabled: no default queue is used
93#[cfg(any(feature = "arena-queue", feature = "box-queue"))]
94pub type DefaultMutexQueue<P> = DefaultMutexQueue_<P>;
95
96/// A mutex that queues waiters by priority.  Higher priority requesters will receive access first.
97///
98/// If the `evict` feature is enabled, the current holder of the lock will be notified if a higher
99/// priority waiter is queued.
100///
101/// Note that requesters with the same priority may receive their access in an arbitrary priority -
102/// if this is non-desireable, [FIFO] and [LIFO] may be used.
103///
104/// Example:
105/// ```rust
106/// use async_priority_lock::{FIFO, HighestFirst, Mutex, MutexGuard};
107///
108/// static MY_LOCK: Mutex<FIFO<HighestFirst<usize>>, Vec<usize>> =
109///     Mutex::const_new(vec![]);
110///
111/// async fn push_num(num: usize, priority: usize) {
112///     // Pushes our number to the queue.  If multiple writers are waiting on this, the waiters
113///     // will acquire access by order of priority.  As the priority is FIFO, rquesters with the
114///     // same priority will execute oldest first.
115///     MY_LOCK.lock_from(priority).await.push(num);
116/// }
117///
118/// async fn wait_and_push(num: usize, priority: usize) {
119///     loop {
120///         let mut guard = MY_LOCK.lock_from(priority).await;
121///         // wait a second or abort and retry if the resource is requested with a higher
122///         // priority in the meantime
123///         tokio::select! {
124///             _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {},
125///             _ = MutexGuard::evicted(&mut guard) => {
126///                 // drop the guard and try again
127///                 continue;
128///             }
129///         }
130///
131///         guard.push(num)
132///     }
133/// }
134/// ```
135///
136pub struct Mutex<
137    P: Priority,
138    T: ?Sized,
139    #[cfg(any(feature = "box-queue", feature = "arena-queue"))] Q: MutexQueue<P> = DefaultMutexQueue<P>,
140    #[cfg(not(any(feature = "box-queue", feature = "arena-queue")))] Q: MutexQueue<P>,
141> {
142    queue: RwLock<Q>,
143    _phantom: PhantomData<P>,
144    data: UnsafeCell<T>,
145}
146
147impl<P: Priority, T: Default, Q: MutexQueue<P>> Default for Mutex<P, T, Q> {
148    fn default() -> Self {
149        Self {
150            queue: Default::default(),
151            data: Default::default(),
152            _phantom: Default::default(),
153        }
154    }
155}
156
157/// For the mutex to be Sync, T must be Send (Sync is not needed).
158///
159/// Also see [rust Mutex docs](https://doc.rust-lang.org/std/sync/struct.Mutex.html#impl-Sync-for-Mutex%3CT%3E)
160///
161/// (however, both `Sync` and `Send` are required for the queue; usually these are implemented if P
162/// is `Send` + `Sync`)
163unsafe impl<P: Priority, T: ?Sized + Send, Q: Send + Sync + MutexQueue<P>> Sync for Mutex<P, T, Q> {}
164unsafe impl<P: Priority, T: ?Sized + Send, Q: Send + MutexQueue<P>> Send for Mutex<P, T, Q> {}
165impl<P: Priority, T: ?Sized + Unpin, Q: Unpin + MutexQueue<P>> Unpin for Mutex<P, T, Q> {}
166
167/// Alias trait for [`PriorityQueue`]`<`[`MutexWaiter<P>`]`>`.
168pub trait MutexQueue<P: Priority>: PriorityQueue<MutexWaiter<P>> {}
169impl<P: Priority, Q: PriorityQueue<MutexWaiter<P>>> MutexQueue<P> for Q {}
170
171#[cfg(feature = "serde")]
172impl<'de, P: Priority, T, Q: MutexQueue<P>> serde::Deserialize<'de> for Mutex<P, T, Q>
173where
174    T: serde::Deserialize<'de>,
175{
176    #[inline]
177    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
178    where
179        D: serde::Deserializer<'de>,
180    {
181        // todo!();
182        Ok(Self::new(T::deserialize(deserializer)?))
183    }
184}
185
186/// A guard holding access for a [Mutex].  When dropped, the lock is released.
187///
188/// If the `evict` flag is enabled, higher priority requesters will mark held locks for eviction,
189/// which can be subscribed to via [Self::evicted] (associated function).
190pub struct MutexGuard<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> {
191    mutex: &'a Mutex<P, T, Q>,
192    node: ManuallyDrop<Q::Handle>,
193}
194
195unsafe impl<'a, P: Priority, T: ?Sized + Sync, Q: MutexQueue<P>> Sync for MutexGuard<'a, P, T, Q> where
196    Mutex<P, T, Q>: Sync
197{
198}
199unsafe impl<'a, P: Priority, T: ?Sized + Send, Q: MutexQueue<P>> Send for MutexGuard<'a, P, T, Q>
200where
201    Mutex<P, T, Q>: Sync,
202    Q::Handle: Send,
203{
204}
205
206impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> Display for MutexGuard<'a, P, T, Q>
207where
208    T: Display,
209{
210    #[inline]
211    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
212        self.deref().fmt(f)
213    }
214}
215
216impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> Debug for MutexGuard<'a, P, T, Q>
217where
218    T: Debug,
219{
220    #[inline]
221    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
222        self.deref().fmt(f)
223    }
224}
225
226impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> Deref for MutexGuard<'a, P, T, Q> {
227    type Target = T;
228
229    #[inline]
230    fn deref(&self) -> &Self::Target {
231        unsafe { &*self.mutex.data.get() }
232    }
233}
234
235impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> DerefMut for MutexGuard<'a, P, T, Q> {
236    #[inline]
237    fn deref_mut(&mut self) -> &mut Self::Target {
238        unsafe { &mut *self.mutex.data.get() }
239    }
240}
241
242impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> MutexGuard<'a, P, T, Q> {
243    /// Returns a future which resolves when/if another, higher priority holder attempts to acquire
244    /// the lock.
245    ///
246    /// Note: this is an associated method to avoid colision with methods of `T`.  Invoke via
247    /// [`MutexGuard::evicted`]`(&mut self)`.
248    ///
249    /// Cancel Safety: This function is cancel safe.
250    #[inline]
251    #[cfg(feature = "evict")]
252    pub fn evicted(this: &mut Self) -> impl Future<Output = ()> {
253        waiter::VoidFut(WaiterFlagFut::<_, { waiter::WAITER_FLAG_WANTS_EVICT }>::new(this))
254    }
255
256    const HAS_PURE_LOAD: bool = Q::Handle::LOAD_PURE.is_some();
257}
258
259impl<'a, P: Priority, T: ?Sized, Q: MutexQueue<P>> Drop for MutexGuard<'a, P, T, Q> {
260    #[inline]
261    fn drop(&mut self) {
262        let mut queue = self.mutex.queue.write();
263
264        let was_head = queue.get_by_handle(&self.node).waiter.has_lock();
265        queue.dequeue(unsafe { ManuallyDrop::take(&mut self.node) });
266
267        // interestingly, rust seems to optimize a bit better when we don't use return values from
268        // dequeue... best guess is that using the return value causes an extra register or two to
269        // be pushed to stack
270        if was_head {
271            if let Some(handle) = queue.head() {
272                handle.waiter.start();
273            }
274        }
275    }
276}
277
278/// Opaque marker type for try_lock result
279#[derive(Debug, Default, Clone, Copy)]
280pub struct TryLockError;
281
282impl Display for TryLockError {
283    #[inline]
284    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
285        write!(f, "lock is already held")
286    }
287}
288
289impl core::error::Error for TryLockError {}
290
291impl<P: Priority, T: ?Sized, Q: MutexQueue<P>> Debug for Mutex<P, T, Q>
292where
293    T: Debug,
294    P: Default,
295{
296    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
297        let mut d = f.debug_tuple("Mutex");
298        match self.try_lock(P::default()) {
299            Ok(data) => d.field(&data.deref()),
300            Err(_) => d.field(&format_args!("<locked>")),
301        };
302
303        d.finish()
304    }
305}
306
307impl<P: Priority, T: ?Sized, Q: MutexQueue<P>> Mutex<P, T, Q> {
308    /// Try to acquire the lock without blocking or requesting eviction of the current holder.
309    ///
310    /// Priority will be stored in guard; higher priority requesters will try to evict the returned
311    /// guard if the `evict` flag is enabled.
312    ///
313    /// i.e. the holder may wait / check for eviction via [MutexGuard::evicted].
314    pub fn try_lock(
315        &self,
316        priority: impl Into<P>,
317    ) -> Result<MutexGuard<'_, P, T, Q>, TryLockError> {
318        let priority = priority.into();
319        let mut queue = self.queue.write();
320
321        if !queue.is_empty() {
322            return Err(TryLockError);
323        }
324
325        let node = MutexWaiter::new(priority, true);
326        let handle = queue.enqueue(node);
327
328        Ok(MutexGuard {
329            mutex: self,
330            node: ManuallyDrop::new(handle),
331        })
332    }
333
334    /// Acquire exclusive access to the locked resource, waiting until after higher priority
335    /// requesters acquire and release the lock.
336    ///
337    /// If the `evict` feature is enabled, this will also notify the current holder to request it
338    /// to release the lock if the current holder is lower priority.
339    ///
340    /// i.e. the holder may wait / check for eviction via [MutexGuard::evicted].
341    ///
342    /// Cancel Safety: This function is cancel safe.
343    pub async fn lock(&self, priority: P) -> MutexGuard<'_, P, T, Q> {
344        let priority = priority.into();
345        let guard = {
346            let mut queue = self.queue.write();
347
348            let maybe_head = queue.head();
349            if maybe_head.is_none() {
350                let handle = queue.enqueue(MutexWaiter::new(priority, true));
351
352                return MutexGuard {
353                    mutex: self,
354                    node: ManuallyDrop::new(handle),
355                };
356            }
357
358            #[cfg(feature = "evict")]
359            {
360                let head = maybe_head.unwrap();
361
362                if priority.compare(&maybe_head.unwrap().priority).is_gt() {
363                    head.waiter.evict();
364                }
365            }
366
367            let handle = queue.enqueue(MutexWaiter::new(priority, false));
368
369            MutexGuard {
370                mutex: self,
371                node: ManuallyDrop::new(handle),
372            }
373        };
374
375        WaiterFlagFut::<_, { waiter::WAITER_FLAG_HAS_LOCK }>::new(&guard).await;
376
377        guard
378    }
379
380    /// Acquire exclusive access to the locked resource, waiting until higher priority requesters
381    /// release the lock.
382    ///
383    /// Shorthand for [`self.lock`](Self::lock)`(priority.into())`.
384    ///
385    /// Cancel Safety: This function is cancel safe.
386    pub fn lock_from(
387        &self,
388        priority: impl Into<P>,
389    ) -> impl Future<Output = MutexGuard<'_, P, T, Q>> {
390        self.lock(priority.into())
391    }
392
393    /// Acquire exclusive access to the locked resource, waiting until higher priority requesters
394    /// release the lock.
395    ///
396    /// Shorthand for [`self.lock`](Self::lock)`(`[`Default::default()`]`)`.
397    ///
398    /// Cancel Safety: This function is cancel safe.
399    pub fn lock_default(&self) -> impl Future<Output = MutexGuard<'_, P, T, Q>>
400    where
401        P: Default,
402    {
403        self.lock(Default::default())
404    }
405
406    #[inline]
407    /// Create a new [Mutex], unlocked and ready for use.
408    pub fn new(val: T) -> Self
409    where
410        T: Sized,
411    {
412        Self {
413            queue: Default::default(),
414            data: UnsafeCell::new(val),
415            _phantom: PhantomData,
416        }
417    }
418
419    /// Create a new [Mutex], unlocked and ready for use.
420    ///
421    /// Available when the queue is [ConstDefault] if the `const-default` feature is enabled.
422    ///
423    /// All builtin queues are [ConstDefault].
424    #[cfg(feature = "const-default")]
425    pub const fn const_new(val: T) -> Self
426    where
427        Q: const_default::ConstDefault,
428        T: Sized,
429    {
430        Self {
431            queue: RwLock::new(Q::DEFAULT),
432            data: UnsafeCell::new(val),
433            _phantom: PhantomData,
434        }
435    }
436}
437
438#[cfg(feature = "const-default")]
439impl<P: Priority, T: ConstDefault, Q: ConstDefault + MutexQueue<P>> ConstDefault
440    for Mutex<P, T, Q>
441{
442    const DEFAULT: Self = Self::const_new(T::DEFAULT);
443}