1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
/*! A mutex where waiting threads to specify a priority. The API is very similar to `std::sync::Mutex`. The key difference, of course, is that `lock` takes a priority. If multiple threads are waiting for the mutex when it's freed, the one which gave the highest priorty will recieve it. The other difference is that `std::sync::Mutex` implements `Sync` but not `Clone`, whereas `priomutex::Mutex` implements `Clone` but not `Sync`. In practice this means (1) that you don't need to wrap a priomutex in an `Arc`, and (2) that we can't implement `into_inner` and `get_mut`. ``` # extern crate rand; # extern crate priomutex; # fn main() { use priomutex::Mutex; use rand::{Rng, thread_rng}; use std::mem; use std::thread; use std::time::Duration; const N: usize = 10; let data = Mutex::new(Vec::new()); let guard = data.lock(0); let mut tids = Vec::new(); for _ in 0..N { let data = data.clone(); tids.push(thread::spawn(move || { let mut rng = thread_rng(); let prio = rng.gen::<usize>(); // generate a random priority let mut data = data.lock(prio); // wait on the mutex data.push(prio); // push priority onto the list })); } // Give the threads time to spawn and wait on the mutex thread::sleep(Duration::from_millis(10)); mem::drop(guard); // go go go! for t in tids { t.join(); } // wait until they've all modified the mutex // Check that every thread pushed an element let d1 = data.lock(0); assert_eq!(d1.len(), N); // Check that the threads were woken in priority order let mut d2 = d1.clone(); d2.sort(); assert_eq!(*d1, d2); # } ``` ## Poisoning Currently, priomutexes don't support poisoning; they are *not* poisoned if the thread holding the lock panics. */ use std::collections::BinaryHeap; use std::ops::{Deref, DerefMut}; use std::sync::{mpsc, Arc}; use std::thread::{self, Thread}; mod with_prio; use with_prio::*; pub mod reservable; /// A mutex which allows waiting threads to specify a priority. pub struct Mutex<T> { inner: Arc<reservable::Mutex<Inner<T>>>, tx: mpsc::Sender<WithPrio<Thread>>, } // This is derived anyway by the autotrait rules, but we make it explicit so that it shows up in // the docs. unsafe impl<T: Send> Send for Mutex<T> { } impl<T> Clone for Mutex<T> { fn clone(&self) -> Self { Mutex { inner: self.inner.clone(), tx: self.tx.clone(), } } } impl<T> Mutex<T> { /// Creates a new mutex in an unlocked state ready for use. pub fn new(data: T) -> Mutex<T> { let (tx, rx) = mpsc::channel(); Mutex { inner: Arc::new(reservable::Mutex::new(Inner { data: data, rx: rx, heap: BinaryHeap::new(), })), tx: tx, } } /// Takes the lock. If another thread is holding it, this function will block until the lock /// is released. /// /// Waiting threads are woken up in order of priority. 0 is the highest priority, 1 is /// second-highest, etc. pub fn lock(&self, prio: usize) -> MutexGuard<T> { loop { if let Some(x) = self.try_lock() { return x; } else { let me = WithPrio { prio: prio, inner: thread::current() }; self.tx.send(me).unwrap(); thread::park(); } } } /// Attempts to take the lock. If another thread is holding it, this function returns `None`. pub fn try_lock(&self) -> Option<MutexGuard<T>> { self.inner.try_lock().map(|inner| MutexGuard { __inner: inner }) } } struct Inner<T> { data: T, rx: mpsc::Receiver<WithPrio<Thread>>, heap: BinaryHeap<WithPrio<Thread>>, } impl<T> Inner<T> { fn next_thread(&mut self) -> Option<Thread> { for x in self.rx.try_iter() { self.heap.push(x); } self.heap.pop().map(|x| x.inner) } } /// An RAII guard. Frees the mutex when dropped. /// /// It can be dereferenced to access the data protected by the mutex. pub struct MutexGuard<'a, T: 'a> { __inner: reservable::MutexGuard<'a, Inner<T>>, } impl<'a, T> Drop for MutexGuard<'a, T> { /// Release the lock. /// /// If any threads are ready to take the mutex (ie. are currently blocked calling `lock`), then /// the one with the highest priority will receive it; if not, the mutex will just be freed. /// /// This function performs a syscall when there are threads waiting. On my machine this takes /// ~3 μs. fn drop(&mut self) { // Release the lock first, and *then* wake the next thread. If we rely on the Drop impl // for simple::MutexGuard then these operations happen in the reverse order, which can lead // to a deadlock. let next_thread = self.__inner.next_thread(); self.__inner.release_to(next_thread.as_ref().map(|x| x.id())); if let Some(h) = next_thread { h.unpark(); } } } impl<'a, T> Deref for MutexGuard<'a, T> { type Target = T; fn deref(&self) -> &T { &(*self.__inner).data } } impl<'a, T> DerefMut for MutexGuard<'a, T> { fn deref_mut(&mut self) -> &mut T { &mut (*self.__inner).data } }