Skip to main content

async_priority_lock/
mutex.rs

1//! Contains the [Mutex] type for async, priority-ordered exclusive access to a resource.
2#[cfg(doc)]
3use crate::{FIFO, LIFO, queue::*};
4use crate::{
5    Priority, RwLock,
6    queue::{PriorityQueue, PriorityQueueHandle},
7    waiter::{self, Waiter, WaiterFlagFut},
8};
9use core::{
10    cell::UnsafeCell,
11    fmt::{Debug, Display},
12    format_args,
13    marker::PhantomData,
14    mem::ManuallyDrop,
15    ops::{Deref, DerefMut},
16};
17
18#[cfg(feature = "const-default")]
19use const_default::ConstDefault;
20
21impl<'a, P: Priority, T, Q: MutexQueue<P>> waiter::WaiterHandle for MutexGuard<'a, P, T, Q> {
22    #[inline]
23    fn with_waker<R>(&self, f: impl FnOnce(&Waiter) -> R) -> R {
24        // rust optimizer doesn't seem to be smart enough yet to do branch elimination with
25        // associated const option (e.g. if let Some(x) = Q::Handle::LOAD_PURE)
26        if Self::HAS_PURE_LOAD {
27            unsafe { f(&Q::Handle::LOAD_PURE.unwrap_unchecked()(&self.node).waiter) }
28        } else {
29            let queue = self.mutex.queue.read();
30
31            f(&queue.get_by_handle(&self.node).waiter)
32        }
33    }
34}
35
36#[derive(Debug)]
37/// Opaque waiter type used for [PriorityQueue] entries.
38///
39/// This implements [Priority] and is the entry type used by [Mutex].
40pub struct MutexWaiter<P: Priority> {
41    priority: P,
42    waiter: Waiter,
43}
44
45/// Has the same Priority impl as P with the exception of pinning holding entry to head.
46impl<P: Priority> Priority for MutexWaiter<P> {
47    #[inline]
48    fn compare(&self, other: &Self) -> core::cmp::Ordering {
49        self.priority.compare(&other.priority)
50    }
51
52    #[inline]
53    fn compare_new(&self, old: &Self) -> std::cmp::Ordering {
54        if old.waiter.has_lock() {
55            return core::cmp::Ordering::Less;
56        }
57
58        self.priority.compare_new(&old.priority)
59    }
60}
61
62impl<P: Priority> MutexWaiter<P> {
63    #[inline]
64    const fn new<'a>(holder: P, has_lock: bool) -> Self {
65        Self {
66            priority: holder,
67            waiter: Waiter::new(has_lock),
68        }
69    }
70}
71
72#[cfg(all(feature = "arena-queue", target_pointer_width = "64"))]
73type DefaultMutexQueue_<P> = crate::queue::SingleLinkArenaQueue<MutexWaiter<P>>;
74
75#[cfg(all(not(feature = "arena-queue"), feature = "box-queue"))]
76type DefaultMutexQueue_<P> = crate::queue::DualLinkBoxQueue<MutexWaiter<P>>;
77
78/// The default queue used if unspecified for `Mutex`.  
79///
80/// The actual queue used varies based on flags (and in the future, may even be a new queue)
81/// but will always make [Mutex] (and associated fns) [`Send`]` + `[`Sync`] as long as `P` and `T` are.
82///
83/// Currently, the defualt is as follows:
84/// - if `arena-queue` is enabled: [SingleLinkArenaQueue] (it is expected that removed nodes will
85/// usually have the lock, and thus will be the head node, and dequeue uses a seq block of memory
86/// so the additional overhead added for queueing (e.g. in allocating) is usually not needed)
87/// - if `box-queue` is enabled but not `arena-queue`: [DualLinkBoxQueue] - unlike [ArenaQueue],
88/// searching for a prev node with [BoxQueue] is significantly slower than dual linking, so
89/// we don't use [SingleLinkBoxQueue] .
90/// - if neither flag is enabled: no default queue is used
91#[cfg(any(feature = "arena-queue", feature = "box-queue"))]
92pub type DefaultMutexQueue<P> = DefaultMutexQueue_<P>;
93
94/// A mutex that queues waiters by priority.  Higher priority requesters will receive access first.
95///
96/// If the `evict` feature is enabled, the current holder of the lock will be notified if a higher
97/// priority waiter is queued.
98///
99/// Note that requesters with the same priority may receive their access in an arbitrary priority -
100/// if this is non-desireable, [FIFO] and [LIFO] may be used.
101///
102/// Example:
103/// ```rust
104/// use async_priority_lock::{FIFO, HighestFirst, Mutex, MutexGuard};
105///
106/// static MY_LOCK: Mutex<FIFO<HighestFirst<usize>>, Vec<usize>> =
107///     Mutex::const_new(vec![]);
108///
109/// async fn push_num(num: usize, priority: usize) {
110///     // Pushes our number to the queue.  If multiple writers are waiting on this, the waiters
111///     // will acquire access by order of priority.  As the priority is FIFO, rquesters with the
112///     // same priority will execute oldest first.
113///     MY_LOCK.lock_from(priority).await.push(num);
114/// }
115///
116/// async fn wait_and_push(num: usize, priority: usize) {
117///     loop {
118///         let mut guard = MY_LOCK.lock_from(priority).await;
119///         // wait a second or abort and retry if the resource is requested with a higher
120///         // priority in the meantime
121///         tokio::select! {
122///             _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {},
123///             _ = MutexGuard::evicted(&mut guard) => {
124///                 // drop the guard and try again
125///                 continue;
126///             }
127///         }
128///
129///         guard.push(num)
130///     }
131/// }
132/// ```
133///
134pub struct Mutex<
135    P: Priority,
136    T,
137    #[cfg(any(feature = "box-queue", feature = "arena-queue"))] Q: MutexQueue<P> = DefaultMutexQueue<P>,
138    #[cfg(not(any(feature = "box-queue", feature = "arena-queue")))] Q: MutexQueue<P>,
139> {
140    queue: RwLock<Q>,
141    data: UnsafeCell<T>,
142    _phantom: PhantomData<P>,
143}
144
145impl<P: Priority, T: Default, Q: MutexQueue<P>> Default for Mutex<P, T, Q> {
146    fn default() -> Self {
147        Self {
148            queue: Default::default(),
149            data: Default::default(),
150            _phantom: Default::default(),
151        }
152    }
153}
154
155/// For the mutex to be Sync, T must be Send (Sync is not needed).
156///
157/// Also see [rust Mutex docs](https://doc.rust-lang.org/std/sync/struct.Mutex.html#impl-Sync-for-Mutex%3CT%3E)
158///
159/// (however, both `Sync` and `Send` are required for the queue; usually these are implemented if P
160/// is `Send` + `Sync`)
161unsafe impl<P: Priority, T: Send, Q: Send + Sync + MutexQueue<P>> Sync for Mutex<P, T, Q> {}
162unsafe impl<P: Priority, T: Send, Q: Send + MutexQueue<P>> Send for Mutex<P, T, Q> {}
163impl<P: Priority, T: Unpin, Q: Unpin + MutexQueue<P>> Unpin for Mutex<P, T, Q> {}
164
165/// Alias trait for [`PriorityQueue`]`<`[`MutexWaiter<P>`]`>`.
166pub trait MutexQueue<P: Priority>: PriorityQueue<MutexWaiter<P>> {}
167impl<P: Priority, Q: PriorityQueue<MutexWaiter<P>>> MutexQueue<P> for Q {}
168
169#[cfg(feature = "serde")]
170impl<'de, P: Priority, T, Q: MutexQueue<P>> serde::Deserialize<'de> for Mutex<P, T, Q>
171where
172    T: serde::Deserialize<'de>,
173{
174    #[inline]
175    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
176    where
177        D: serde::Deserializer<'de>,
178    {
179        // todo!();
180        Ok(Self::new(T::deserialize(deserializer)?))
181    }
182}
183
184/// A guard holding access for a [Mutex].  When dropped, the lock is released.
185///
186/// If the `evict` flag is enabled, higher priority requesters will mark held locks for eviction,
187/// which can be subscribed to via [Self::evicted] (associated function).
188pub struct MutexGuard<'a, P: Priority, T, Q: MutexQueue<P>> {
189    mutex: &'a Mutex<P, T, Q>,
190    node: ManuallyDrop<Q::Handle>,
191}
192
193unsafe impl<'a, P: Priority, T: Sync, Q: MutexQueue<P>> Sync for MutexGuard<'a, P, T, Q> where
194    Mutex<P, T, Q>: Sync
195{
196}
197unsafe impl<'a, P: Priority, T: Send, Q: MutexQueue<P>> Send for MutexGuard<'a, P, T, Q>
198where
199    Mutex<P, T, Q>: Sync,
200    Q::Handle: Send,
201{
202}
203
204impl<'a, P: Priority, T, Q: MutexQueue<P>> Display for MutexGuard<'a, P, T, Q>
205where
206    T: Display,
207{
208    #[inline]
209    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
210        self.deref().fmt(f)
211    }
212}
213
214impl<'a, P: Priority, T, Q: MutexQueue<P>> Debug for MutexGuard<'a, P, T, Q>
215where
216    T: Debug,
217{
218    #[inline]
219    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
220        self.deref().fmt(f)
221    }
222}
223
224impl<'a, P: Priority, T, Q: MutexQueue<P>> Deref for MutexGuard<'a, P, T, Q> {
225    type Target = T;
226
227    #[inline]
228    fn deref(&self) -> &Self::Target {
229        unsafe { &*self.mutex.data.get() }
230    }
231}
232
233impl<'a, P: Priority, T, Q: MutexQueue<P>> DerefMut for MutexGuard<'a, P, T, Q> {
234    #[inline]
235    fn deref_mut(&mut self) -> &mut Self::Target {
236        unsafe { &mut *self.mutex.data.get() }
237    }
238}
239
240impl<'a, P: Priority, T, Q: MutexQueue<P>> MutexGuard<'a, P, T, Q> {
241    /// Returns a future which resolves when/if another, higher priority holder attempts to acquire
242    /// the lock.
243    ///
244    /// Note: this is an associated method to avoid colision with methods of `T`.  Invoke via
245    /// [`MutexGuard::evicted`]`(&mut self)`.
246    ///
247    /// Cancel Safety: This function is cancel safe.
248    #[inline]
249    #[cfg(feature = "evict")]
250    pub fn evicted(this: &mut Self) -> impl Future<Output = ()> {
251        waiter::VoidFut(WaiterFlagFut::<_, { waiter::WAITER_FLAG_WANTS_EVICT }>::new(this))
252    }
253
254    const HAS_PURE_LOAD: bool = Q::Handle::LOAD_PURE.is_some();
255}
256
257impl<'a, P: Priority, T, Q: MutexQueue<P>> Drop for MutexGuard<'a, P, T, Q> {
258    #[inline]
259    fn drop(&mut self) {
260        let mut queue = self.mutex.queue.write();
261
262        let was_head = queue.get_by_handle(&self.node).waiter.has_lock();
263        queue.dequeue(unsafe { ManuallyDrop::take(&mut self.node) });
264
265        // interestingly, rust seems to optimize a bit better when we don't use return values from
266        // dequeue... best guess is that using the return value causes an extra register or two to
267        // be pushed to stack
268        if was_head {
269            if let Some(handle) = queue.head() {
270                handle.waiter.start();
271            }
272        }
273    }
274}
275
276/// Opaque marker type for try_lock result
277#[derive(Debug, Default, Clone, Copy)]
278pub struct TryLockError;
279
280impl Display for TryLockError {
281    #[inline]
282    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
283        write!(f, "lock is already held")
284    }
285}
286
287impl core::error::Error for TryLockError {}
288
289impl<P: Priority, T, Q: MutexQueue<P>> Debug for Mutex<P, T, Q>
290where
291    T: Debug,
292    P: Default,
293{
294    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
295        let mut d = f.debug_tuple("Mutex");
296        match self.try_lock(P::default()) {
297            Ok(data) => d.field(&data.deref()),
298            Err(_) => d.field(&format_args!("<locked>")),
299        };
300
301        d.finish()
302    }
303}
304
305impl<P: Priority, T, Q: MutexQueue<P>> Mutex<P, T, Q> {
306    /// Try to acquire the lock without blocking or requesting eviction of the current holder.
307    ///
308    /// Priority will be stored in guard; higher priority requesters will try to evict the returned
309    /// guard if the `evict` flag is enabled.
310    ///
311    /// i.e. the holder may wait / check for eviction via [MutexGuard::evicted].
312    pub fn try_lock(
313        &self,
314        priority: impl Into<P>,
315    ) -> Result<MutexGuard<'_, P, T, Q>, TryLockError> {
316        let priority = priority.into();
317        let mut queue = self.queue.write();
318
319        if !queue.is_empty() {
320            return Err(TryLockError);
321        }
322
323        let node = MutexWaiter::new(priority, true);
324        let handle = queue.enqueue(node);
325
326        Ok(MutexGuard {
327            mutex: self,
328            node: ManuallyDrop::new(handle),
329        })
330    }
331
332    /// Acquire exclusive access to the locked resource, waiting until after higher priority
333    /// requesters acquire and release the lock.
334    ///
335    /// If the `evict` feature is enabled, this will also notify the current holder to request it
336    /// to release the lock if the current holder is lower priority.
337    ///
338    /// i.e. the holder may wait / check for eviction via [MutexGuard::evicted].
339    ///
340    /// Cancel Safety: This function is cancel safe.
341    pub async fn lock(&self, priority: P) -> MutexGuard<'_, P, T, Q> {
342        let priority = priority.into();
343        let guard = {
344            let mut queue = self.queue.write();
345
346            let maybe_head = queue.head();
347            if maybe_head.is_none() {
348                let handle = queue.enqueue(MutexWaiter::new(priority, true));
349
350                return MutexGuard {
351                    mutex: self,
352                    node: ManuallyDrop::new(handle),
353                };
354            }
355
356            #[cfg(feature = "evict")]
357            {
358                let head = maybe_head.unwrap();
359
360                if priority.compare(&maybe_head.unwrap().priority).is_gt() {
361                    head.waiter.evict();
362                }
363            }
364
365            let handle = queue.enqueue(MutexWaiter::new(priority, false));
366
367            MutexGuard {
368                mutex: self,
369                node: ManuallyDrop::new(handle),
370            }
371        };
372
373        WaiterFlagFut::<_, { waiter::WAITER_FLAG_HAS_LOCK }>::new(&guard).await;
374
375        guard
376    }
377
378    /// Acquire exclusive access to the locked resource, waiting until higher priority requesters
379    /// release the lock.
380    ///
381    /// Shorthand for [`self.lock`](Self::lock)`(priority.into())`.
382    ///
383    /// Cancel Safety: This function is cancel safe.
384    pub fn lock_from(
385        &self,
386        priority: impl Into<P>,
387    ) -> impl Future<Output = MutexGuard<'_, P, T, Q>> {
388        self.lock(priority.into())
389    }
390
391    /// Acquire exclusive access to the locked resource, waiting until higher priority requesters
392    /// release the lock.
393    ///
394    /// Shorthand for [`self.lock`](Self::lock)`(`[`Default::default()`]`)`.
395    ///
396    /// Cancel Safety: This function is cancel safe.
397    pub fn lock_default(&self) -> impl Future<Output = MutexGuard<'_, P, T, Q>>
398    where
399        P: Default,
400    {
401        self.lock(Default::default())
402    }
403
404    #[inline]
405    /// Create a new [Mutex], unlocked and ready for use.
406    pub fn new(val: T) -> Self {
407        Self {
408            queue: Default::default(),
409            data: UnsafeCell::new(val),
410            _phantom: PhantomData,
411        }
412    }
413
414    /// Create a new [Mutex], unlocked and ready for use.
415    ///
416    /// Available when the queue is [ConstDefault] if the `const-default` feature is enabled.
417    ///
418    /// All builtin queues are [ConstDefault].
419    #[cfg(feature = "const-default")]
420    pub const fn const_new(val: T) -> Self
421    where
422        Q: const_default::ConstDefault,
423    {
424        Self {
425            queue: RwLock::new(Q::DEFAULT),
426            data: UnsafeCell::new(val),
427            _phantom: PhantomData,
428        }
429    }
430}
431
432#[cfg(feature = "const-default")]
433impl<P: Priority, T: ConstDefault, Q: ConstDefault + MutexQueue<P>> ConstDefault
434    for Mutex<P, T, Q>
435{
436    const DEFAULT: Self = Self::const_new(T::DEFAULT);
437}