priomutex 0.3.1

A mutex where waiting threads specify a priority
Documentation
use std::collections::BinaryHeap;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::{self, PoisonError, TryLockError};
use std::thread;
use token::*;
use types::*;

/// A mutex which allows waiting threads to specify a priority.
#[derive(Debug)]
pub struct Mutex<T> {
    bookkeeping: sync::Mutex<Bookkeeping>,
    data: sync::Mutex<T>,
}

// Essentially all operations on `Mutex` are done while holding the bookkeeping lock.  This means
// that it's impossible that eg. one thread will be trying to lock the mutex while another one is
// dropping it.
#[derive(Debug)]
struct Bookkeeping {
    heap: BinaryHeap<PV<usize, WakeToken>>,
    free: bool,  // there's noone holding it AND noone waiting to take it
}

impl<T> Mutex<T> {
    /// Creates a new mutex in an unlocked state ready for use.
    pub fn new(data: T) -> Mutex<T> {
        Mutex {
            bookkeeping: sync::Mutex::new(Bookkeeping { heap: BinaryHeap::new(), free: true }),
            data: sync::Mutex::new(data),
        }
    }

    /// Takes the lock.  If another thread is holding it, this function will block until the lock
    /// is released.
    ///
    /// Waiting threads are woken up in order of priority.  0 is the highest priority, 1 is
    /// second-highest, etc.
    pub fn lock(&self, prio: usize) -> sync::LockResult<MutexGuard<T>> {
        let mut bk = self.bookkeeping.lock().unwrap();
        if bk.free {
            // We took it!  The data must be free (soon).
            bk.free = false;
            return self.spin_lock_data();
        }
        // no. let's sleep
        let (sleep_token, wake_token) = create_tokens();
        bk.heap.push(PV { p: prio, v: wake_token });
        mem::drop(bk);
        sleep_token.sleep();
        // ok, we've been explicitly woken up.  It *must* be free!  (soon)
        self.spin_lock_data()
    }

    /// Attempts to take the lock.  Fails if another thread it already holding it, or is another
    /// thread is already waiting to take it.
    pub fn try_lock(&self) -> sync::TryLockResult<MutexGuard<T>> {
        let mut bk = self.bookkeeping.lock().unwrap();
        if bk.free {
            // We took it!  The data must be free (soon).
            bk.free = false;
            self.spin_lock_data().map_err(TryLockError::Poisoned)
        } else {
            // It's already taken
            Err(TryLockError::WouldBlock)
        }
    }

    /// Spin waits until the data lock becomes free.  Careful: make sure you're the next thread in
    /// line before calling this!
    fn spin_lock_data(&self) -> sync::LockResult<MutexGuard<T>> {
        loop {
            match self.data.try_lock() {
                Ok(guard) =>
                    return Ok(MutexGuard(guard, self)),
                Err(TryLockError::WouldBlock) =>
                    thread::yield_now(),
                Err(TryLockError::Poisoned(pe)) =>
                    return Err(PoisonError::new(MutexGuard(pe.into_inner(), self))),
            }
        }
    }
}

/// An RAII guard.  Frees the mutex when dropped.
///
/// It can be dereferenced to access the data protected by the mutex.
pub struct MutexGuard<'a, T: 'a>(sync::MutexGuard<'a, T>, &'a Mutex<T>);

impl<'a, T> Drop for MutexGuard<'a, T> {
    /// Release the lock.
    ///
    /// If any threads are ready to take the mutex (ie. are currently blocked calling `lock`), then
    /// the one with the highest priority will receive it; if not, the mutex will just be freed.
    fn drop(&mut self) {
        let mut bk = self.1.bookkeeping.lock().unwrap();
        if let Some(x) = bk.heap.pop() {
            // wake the next thread
            x.v.wake();
        } else {
            // release the lock
            bk.free = true;
        }
    }
}

impl<'a, T> Deref for MutexGuard<'a, T> {
    type Target = T;
    fn deref(&self) -> &T {
        &*self.0
    }
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        &mut *self.0
    }
}