1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use internal::*;
use std::collections::BinaryHeap;
use std::ops::{Deref, DerefMut};
use std::sync::{self, PoisonError, TryLockError};

/// A mutex which allows waiting threads to specify a priority.
#[derive(Debug)]
pub struct Mutex<T> {
    heap: sync::Mutex<BinaryHeap<PV<Prio, WakeToken>>>,
    data: sync::Mutex<T>,
}

impl<T> Mutex<T> {
    /// Creates a new mutex in an unlocked state ready for use.
    pub fn new(data: T) -> Mutex<T> {
        Mutex {
            heap: sync::Mutex::new(BinaryHeap::new()),
            data: sync::Mutex::new(data),
        }
    }

    /// Takes the lock.  If another thread is holding it, this function will block until the lock
    /// is released.
    ///
    /// Waiting threads are woken up in order of priority.  0 is the highest priority, 1 is
    /// second-highest, etc.
    pub fn lock(&self, prio: usize) -> Result<MutexGuard<T>, PoisonError<MutexGuard<T>>> {
        // is it free?
        match self.try_lock() {
            Ok(guard) => return Ok(guard),  // mission accomplished!
            Err(TryLockError::WouldBlock) => {} // carry on...
            Err(TryLockError::Poisoned(e)) => return Err(e),
        }
        // no. let's sleep
        let (sleep_token, wake_token) = create_tokens();
        {
            let mut heap = self.heap.lock().unwrap();
            heap.push(PV { p: Prio::new(prio), v: wake_token });
        }
        sleep_token.sleep();
        // ok, we've been explicitly woken up.  it *must* be free! (soon)
        self.data.lock()
            .map(|g| MutexGuard(g, self))
            .map_err(|pe| PoisonError::new(MutexGuard(pe.into_inner(), self)))
    }

    /// Attempts to take the lock.  If another thread is holding it, this function returns `None`.
    pub fn try_lock(&self) -> sync::TryLockResult<MutexGuard<T>> {
        self.data.try_lock().map(|guard| MutexGuard(guard, self)).map_err(|tle| match tle {
            TryLockError::WouldBlock => TryLockError::WouldBlock,
            TryLockError::Poisoned(pe) => TryLockError::Poisoned(
                PoisonError::new(MutexGuard(pe.into_inner(), self))
            ),
        })
    }
}

/// An RAII guard.  Frees the mutex when dropped.
///
/// It can be dereferenced to access the data protected by the mutex.
pub struct MutexGuard<'a, T: 'a>(sync::MutexGuard<'a, T>, &'a Mutex<T>);

impl<'a, T> Drop for MutexGuard<'a, T> {
    /// Release the lock.
    ///
    /// If any threads are ready to take the mutex (ie. are currently blocked calling `lock`), then
    /// the one with the highest priority will receive it; if not, the mutex will just be freed.
    ///
    /// This function performs no syscalls.
    fn drop(&mut self) {
        let mut heap = self.1.heap.lock().unwrap();
        if let Some(x) = heap.pop() { x.v.wake(); }  // wake the next thread
    }
}

impl<'a, T> Deref for MutexGuard<'a, T> {
    type Target = T;
    fn deref(&self) -> &T {
        &*self.0
    }
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        &mut *self.0
    }
}