shared_mutex/
lib.rs

1#![cfg_attr(test, deny(warnings))]
2#![deny(missing_docs)]
3
4//! # shared-mutex
5//!
6//! A RwLock that can be used with a Condvar.
7
8#[cfg(test)]
9extern crate scoped_pool;
10
11extern crate poison;
12
13use std::sync::{Condvar, LockResult, TryLockResult, TryLockError};
14use std::cell::UnsafeCell;
15use std::ops::{Deref, DerefMut};
16use std::{mem, ptr, fmt};
17
18use poison::{Poison, PoisonGuard, RawPoisonGuard};
19
20pub use raw::RawSharedMutex;
21
22pub mod monitor;
23mod raw;
24
25/// A lock providing both shared read locks and exclusive write locks.
26///
27/// Similar to `std::sync::RwLock`, except that its guards (`SharedMutexReadGuard` and
28/// `SharedMutexWriteGuard`) can wait on `std::sync::Condvar`s, which is very
29/// useful for implementing efficient concurrent programs.
30///
31/// Another difference from `std::sync::RwLock` is that the guard types are `Send`.
32pub struct SharedMutex<T: ?Sized> {
33    raw: RawSharedMutex,
34    data: UnsafeCell<Poison<T>>
35}
36
37unsafe impl<T: ?Sized + Send> Send for SharedMutex<T> {}
38unsafe impl<T: ?Sized + Sync> Sync for SharedMutex<T> {}
39
40impl<T> SharedMutex<T> {
41    /// Create a new SharedMutex protecting the given value.
42    #[inline]
43    pub fn new(value: T) -> Self {
44        SharedMutex {
45            raw: RawSharedMutex::new(),
46            data: UnsafeCell::new(Poison::new(value))
47        }
48    }
49
50    /// Extract the data from the lock and destroy the lock.
51    ///
52    /// Safe since it requires ownership of the lock.
53    #[inline]
54    pub fn into_inner(self) -> LockResult<T> {
55        unsafe { self.data.into_inner().into_inner() }
56    }
57}
58
59impl<T: ?Sized> SharedMutex<T> {
60    /// Acquire an exclusive Write lock on the data.
61    #[inline]
62    pub fn write(&self) -> LockResult<SharedMutexWriteGuard<T>> {
63        self.raw.write();
64        unsafe { SharedMutexWriteGuard::new(self) }
65    }
66
67    /// Acquire a shared Read lock on the data.
68    #[inline]
69    pub fn read(&self) -> LockResult<SharedMutexReadGuard<T>> {
70        self.raw.read();
71        unsafe { SharedMutexReadGuard::new(self) }
72    }
73
74    /// Attempt to acquire a shared Read lock on the data.
75    ///
76    /// If acquiring the lock would block, returns `TryLockError::WouldBlock`.
77    #[inline]
78    pub fn try_read(&self) -> TryLockResult<SharedMutexReadGuard<T>> {
79        if self.raw.try_read() {
80            Ok(try!(unsafe { SharedMutexReadGuard::new(self) }))
81        } else {
82            Err(TryLockError::WouldBlock)
83        }
84    }
85
86    /// Attempt to acquire an exclusive Write lock on the data.
87    ///
88    /// If acquiring the lock would block, returns `TryLockError::WouldBlock`.
89    #[inline]
90    pub fn try_write(&self) -> TryLockResult<SharedMutexWriteGuard<T>> {
91        if self.raw.try_write() {
92            Ok(try!(unsafe { SharedMutexWriteGuard::new(self) }))
93        } else {
94            Err(TryLockError::WouldBlock)
95        }
96    }
97
98    /// Get a mutable reference to the data without locking.
99    ///
100    /// Safe since it requires exclusive access to the lock itself.
101    #[inline]
102    pub fn get_mut(&mut self) -> LockResult<&mut T> {
103        poison::map_result(unsafe { &mut *self.data.get() }.lock(),
104                           |poison| unsafe { poison.into_mut() })
105    }
106}
107
108/// A shared read guard on a SharedMutex.
109pub struct SharedMutexReadGuard<'mutex, T: ?Sized + 'mutex> {
110    data: &'mutex T,
111    mutex: &'mutex SharedMutex<T>
112}
113
114unsafe impl<'mutex, T: ?Sized + Send> Send for SharedMutexReadGuard<'mutex, T> {}
115unsafe impl<'mutex, T: ?Sized + Sync> Sync for SharedMutexReadGuard<'mutex, T> {}
116
117/// An exclusive write guard on a SharedMutex.
118pub struct SharedMutexWriteGuard<'mutex, T: ?Sized + 'mutex> {
119    data: PoisonGuard<'mutex, T>,
120    mutex: &'mutex SharedMutex<T>
121}
122
123impl<'mutex, T: ?Sized> Deref for SharedMutexReadGuard<'mutex, T> {
124    type Target = T;
125
126    #[inline]
127    fn deref(&self) -> &T { self.data }
128}
129
130impl<'mutex, T: ?Sized> Deref for SharedMutexWriteGuard<'mutex, T> {
131    type Target = T;
132
133    #[inline]
134    fn deref(&self) -> &T { self.data.get() }
135}
136
137impl<'mutex, T: ?Sized> DerefMut for SharedMutexWriteGuard<'mutex, T> {
138    #[inline]
139    fn deref_mut(&mut self) -> &mut T { self.data.get_mut() }
140}
141
142impl<'mutex, T: ?Sized> SharedMutexReadGuard<'mutex, T> {
143    #[inline]
144    unsafe fn new(mutex: &'mutex SharedMutex<T>) -> LockResult<Self> {
145        poison::map_result((&*mutex.data.get()).get(), |data| {
146            SharedMutexReadGuard {
147                data: data,
148                mutex: mutex
149            }
150        })
151    }
152}
153
154impl<'mutex, T: ?Sized> SharedMutexWriteGuard<'mutex, T> {
155    #[inline]
156    unsafe fn new(mutex: &'mutex SharedMutex<T>) -> LockResult<Self> {
157        poison::map_result((&mut *mutex.data.get()).lock(), |poison| {
158            SharedMutexWriteGuard {
159                data: poison,
160                mutex: mutex
161            }
162        })
163    }
164}
165
166impl<'mutex, T: ?Sized> SharedMutexReadGuard<'mutex, T> {
167    /// Turn this guard into a guard which can be mapped to a sub-borrow.
168    ///
169    /// Note that a mapped guard cannot wait on a `Condvar`.
170    pub fn into_mapped(self) -> MappedSharedMutexReadGuard<'mutex, T> {
171        let guard = MappedSharedMutexReadGuard {
172            mutex: &self.mutex.raw,
173            data: self.data
174        };
175
176        // Don't double-unlock.
177        mem::forget(self);
178
179        guard
180    }
181
182    /// Wait on the given condition variable, and resume with a write lock.
183    ///
184    /// See the documentation for `std::sync::Condvar::wait` for more information.
185    pub fn wait_for_write(self, cond: &Condvar) -> LockResult<SharedMutexWriteGuard<'mutex, T>> {
186        self.mutex.raw.wait_from_read_to_write(cond);
187
188        let guard = unsafe { SharedMutexWriteGuard::new(self.mutex) };
189
190        // Don't double-unlock.
191        mem::forget(self);
192
193        guard
194    }
195
196    /// Wait on the given condition variable, and resume with another read lock.
197    ///
198    /// See the documentation for `std::sync::Condvar::wait` for more information.
199    pub fn wait_for_read(self, cond: &Condvar) -> LockResult<Self> {
200        self.mutex.raw.wait_from_read_to_read(cond);
201
202        let guard = unsafe { SharedMutexReadGuard::new(self.mutex) };
203
204        // Don't double-unlock.
205        mem::forget(self);
206
207        guard
208    }
209}
210
211impl<'mutex, T: ?Sized> SharedMutexWriteGuard<'mutex, T> {
212    /// Turn this guard into a guard which can be mapped to a sub-borrow.
213    ///
214    /// Note that a mapped guard cannot wait on a `Condvar`.
215    pub fn into_mapped(self) -> MappedSharedMutexWriteGuard<'mutex, T> {
216        let guard = MappedSharedMutexWriteGuard {
217            mutex: &self.mutex.raw,
218            poison: unsafe { ptr::read(&self.data).into_raw() },
219            data: unsafe { (&mut *self.mutex.data.get()).get_mut() }
220        };
221
222        // Don't double-unlock.
223        mem::forget(self);
224
225        guard
226    }
227
228    /// Wait on the given condition variable, and resume with another write lock.
229    pub fn wait_for_write(self, cond: &Condvar) -> LockResult<Self> {
230        self.mutex.raw.wait_from_write_to_write(cond);
231
232        let guard = unsafe { SharedMutexWriteGuard::new(self.mutex) };
233
234        // Don't double-unlock.
235        mem::forget(self);
236
237        guard
238    }
239
240    /// Wait on the given condition variable, and resume with a read lock.
241    pub fn wait_for_read(self, cond: &Condvar) -> LockResult<SharedMutexReadGuard<'mutex, T>> {
242        self.mutex.raw.wait_from_write_to_read(cond);
243
244        let guard = unsafe { SharedMutexReadGuard::new(self.mutex) };
245
246        // Don't double-unlock.
247        mem::forget(self);
248
249        guard
250    }
251}
252
253impl<'mutex, T: ?Sized> Drop for SharedMutexReadGuard<'mutex, T> {
254    #[inline]
255    fn drop(&mut self) { self.mutex.raw.unlock_read() }
256}
257
258impl<'mutex, T: ?Sized> Drop for SharedMutexWriteGuard<'mutex, T> {
259    #[inline]
260    fn drop(&mut self) { self.mutex.raw.unlock_write() }
261}
262
263/// A read guard to a sub-borrow of an original SharedMutexReadGuard.
264///
265/// Unlike SharedMutexReadGuard, it cannot be used to wait on a
266/// `Condvar`.
267pub struct MappedSharedMutexReadGuard<'mutex, T: ?Sized + 'mutex> {
268    mutex: &'mutex RawSharedMutex,
269    data: &'mutex T
270}
271
272/// A write guard to a sub-borrow of an original `SharedMutexWriteGuard`.
273///
274/// Unlike `SharedMutexWriteGuard`, it cannot be used to wait on a
275/// `Condvar`.
276pub struct MappedSharedMutexWriteGuard<'mutex, T: ?Sized + 'mutex> {
277    mutex: &'mutex RawSharedMutex,
278    poison: RawPoisonGuard<'mutex>,
279    data: &'mutex mut T,
280}
281
282impl<'mutex, T: ?Sized> MappedSharedMutexReadGuard<'mutex, T> {
283    /// Transform this guard into a sub-borrow of the original data.
284    #[inline]
285    pub fn map<U: ?Sized, F>(self, action: F) -> MappedSharedMutexReadGuard<'mutex, U>
286    where F: FnOnce(&T) -> &U {
287        self.option_map(move |t| Some(action(t))).unwrap()
288    }
289
290    /// Conditionally transform this guard into a sub-borrow of the original data.
291    #[inline]
292    pub fn option_map<U: ?Sized, F>(self, action: F) -> Option<MappedSharedMutexReadGuard<'mutex, U>>
293    where F: FnOnce(&T) -> Option<&U> {
294        self.result_map(move |t| action(t).ok_or(())).ok()
295    }
296
297    /// Conditionally transform this guard into a sub-borrow of the original data.
298    ///
299    /// If the transformation operation is aborted, returns the original guard.
300    #[inline]
301    pub fn result_map<U: ?Sized, E, F>(self, action: F)
302        -> Result<MappedSharedMutexReadGuard<'mutex, U>, (Self, E)>
303    where F: FnOnce(&T) -> Result<&U, E> {
304        let data = self.data;
305        let mutex = self.mutex;
306
307        match action(data) {
308            Ok(new_data) => {
309                // Don't double-unlock.
310                mem::forget(self);
311
312                Ok(MappedSharedMutexReadGuard {
313                    data: new_data,
314                    mutex: mutex
315                })
316            },
317            Err(e) => { Err((self, e)) }
318        }
319    }
320
321    /// Recover the original guard for waiting.
322    ///
323    /// Takes the original mutex to recover the original type and data. If the
324    /// passed mutex is not the same object as the original mutex, returns `Err`.
325    #[inline]
326    pub fn recover<U: ?Sized>(self, mutex: &'mutex SharedMutex<U>) -> Result<SharedMutexReadGuard<'mutex, U>, Self> {
327        if self.mutex.is(&mutex.raw) {
328            // The mutex can't have become poisoned since we are continuously holding a guard.
329            let guard = unsafe { SharedMutexReadGuard::new(mutex) }.unwrap();
330
331            // Don't double-unlock.
332            mem::forget(self);
333
334            Ok(guard)
335        } else {
336            Err(self)
337        }
338    }
339}
340
341impl<'mutex, T: ?Sized> MappedSharedMutexWriteGuard<'mutex, T> {
342    /// Transform this guard into a sub-borrow of the original data.
343    #[inline]
344    pub fn map<U: ?Sized, F>(self, action: F) -> MappedSharedMutexWriteGuard<'mutex, U>
345    where F: FnOnce(&mut T) -> &mut U {
346        self.option_map(move |t| Some(action(t))).unwrap()
347    }
348
349    /// Conditionally transform this guard into a sub-borrow of the original data.
350    #[inline]
351    pub fn option_map<U: ?Sized, F>(self, action: F) -> Option<MappedSharedMutexWriteGuard<'mutex, U>>
352    where F: FnOnce(&mut T) -> Option<&mut U> {
353        self.result_map(move |t| action(t).ok_or(())).ok()
354    }
355
356    /// Conditionally transform this guard into a sub-borrow of the original data.
357    ///
358    /// If the transformation operation is aborted, returns the original guard.
359    #[inline]
360    pub fn result_map<U: ?Sized, E, F>(self, action: F)
361        -> Result<MappedSharedMutexWriteGuard<'mutex, U>, (Self, E)>
362    where F: FnOnce(&mut T) -> Result<&mut U, E> {
363        let data = unsafe { ptr::read(&self.data) };
364        let mutex = self.mutex;
365
366        match action(data) {
367            Ok(new_data) => {
368                let poison = unsafe { ptr::read(&self.poison) };
369
370                // Don't double-unlock.
371                mem::forget(self);
372
373                Ok(MappedSharedMutexWriteGuard {
374                    data: new_data,
375                    poison: poison,
376                    mutex: mutex
377                })
378            },
379            Err(e) => { Err((self, e)) }
380        }
381    }
382
383    /// Recover the original guard for waiting.
384    ///
385    /// Takes the original mutex to recover the original type and data. If the
386    /// passed mutex is not the same object as the original mutex, returns `Err`.
387    #[inline]
388    pub fn recover<U: ?Sized>(self, mutex: &'mutex SharedMutex<U>) -> Result<SharedMutexWriteGuard<'mutex, U>, Self> {
389        if self.mutex.is(&mutex.raw) {
390            // The mutex can't have become poisoned since we are continuously holding a guard.
391            let guard = unsafe { SharedMutexWriteGuard::new(mutex) }.unwrap();
392
393            // Don't double-unlock.
394            mem::forget(self);
395
396            Ok(guard)
397        } else {
398            Err(self)
399        }
400    }
401}
402
403impl<'mutex, T: ?Sized> Deref for MappedSharedMutexReadGuard<'mutex, T> {
404    type Target = T;
405
406    #[inline]
407    fn deref(&self) -> &T { self.data }
408}
409
410impl<'mutex, T: ?Sized> Deref for MappedSharedMutexWriteGuard<'mutex, T> {
411    type Target = T;
412
413    #[inline]
414    fn deref(&self) -> &T { self.data }
415}
416
417impl<'mutex, T: ?Sized> DerefMut for MappedSharedMutexWriteGuard<'mutex, T> {
418    #[inline]
419    fn deref_mut(&mut self) -> &mut T { self.data }
420}
421
422impl<'mutex, T: ?Sized> Drop for MappedSharedMutexReadGuard<'mutex, T> {
423    #[inline]
424    fn drop(&mut self) { self.mutex.unlock_read() }
425}
426
427impl<'mutex, T: ?Sized> Drop for MappedSharedMutexWriteGuard<'mutex, T> {
428    #[inline]
429    fn drop(&mut self) { self.mutex.unlock_write() }
430}
431
432impl<T: ?Sized + fmt::Debug> fmt::Debug for SharedMutex<T> {
433    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
434        let mut writer = f.debug_struct("SharedMutex");
435
436        match self.try_read() {
437            Ok(l) => writer.field("data", &&*l),
438            Err(TryLockError::WouldBlock) => writer.field("data", &"{{ locked }}"),
439            Err(TryLockError::Poisoned(_)) => writer.field("data", &"{{ poisoned }}")
440        }.finish()
441    }
442}
443
444impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for SharedMutexReadGuard<'mutex, T> {
445    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
446        f.debug_struct("SharedMutexReadGuard")
447            .field("data", &*self)
448            .finish()
449    }
450}
451
452impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for SharedMutexWriteGuard<'mutex, T> {
453    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
454        f.debug_struct("SharedMutexWriteGuard")
455            .field("data", &*self)
456            .finish()
457    }
458}
459
460impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for MappedSharedMutexReadGuard<'mutex, T> {
461    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
462        f.debug_struct("MappedSharedMutexReadGuard")
463            .field("data", &*self)
464            .finish()
465    }
466}
467
468impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for MappedSharedMutexWriteGuard<'mutex, T> {
469    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
470        f.debug_struct("MappedSharedMutexWriteGuard")
471            .field("data", &*self)
472            .finish()
473    }
474}
475
476#[cfg(test)]
477mod test {
478    use std::sync::{Condvar, Barrier};
479    use std::sync::atomic::{AtomicUsize, Ordering};
480    use scoped_pool::Pool;
481
482    use super::*;
483
484    fn _check_bounds() {
485        fn _is_send_sync<T: Send + Sync>() {}
486
487        _is_send_sync::<RawSharedMutex>();
488        _is_send_sync::<SharedMutex<()>>();
489        _is_send_sync::<SharedMutexReadGuard<()>>();
490        _is_send_sync::<SharedMutexWriteGuard<()>>();
491    }
492
493    #[test]
494    fn test_simple_multithreaded() {
495        let pool = Pool::new(8);
496        let mut mutex = SharedMutex::new(0);
497        let n = 100;
498
499        pool.scoped(|scope| {
500            for _ in 0..n {
501                scope.execute(|| {
502                    let before = *mutex.read().unwrap();
503                    *mutex.write().unwrap() += 1;
504                    let after = *mutex.read().unwrap();
505
506                    assert!(before < after, "Time travel! {:?} >= {:?}", before, after);
507                })
508            }
509        });
510
511        assert_eq!(*mutex.get_mut().unwrap(), 100);
512        pool.shutdown();
513    }
514
515    #[test]
516    fn test_simple_single_thread() {
517        let mut mutex = SharedMutex::new(0);
518        let n = 100;
519
520        for _ in 0..n {
521            let before = *mutex.read().unwrap();
522            *mutex.write().unwrap() += 1;
523            let after = *mutex.read().unwrap();
524
525            assert!(before < after, "Time travel! {:?} >= {:?}", before, after);
526        }
527
528        assert_eq!(*mutex.get_mut().unwrap(), 100);
529    }
530
531    #[test]
532    fn test_locking_multithreaded() {
533        // This test makes a best effort to test the actual locking
534        // behavior of the mutex.
535        //
536        // Read locks attempt to read from an atomic many times,
537        // while write locks write to them many times.
538        //
539        // If any of these operations interleave (readers read different
540        // values under the same lock, writers observe other writers) then
541        // we know there is a bug.
542        //
543        // We make use of a barrier to attempt to cluster threads together.
544
545        let mut mutex = SharedMutex::new(());
546        let value = AtomicUsize::new(0);
547
548        let threads = 50;
549        let actors = threads * 20; // Must be a multiple threads.
550        let actions_per_actor = 100;
551        let start_barrier = Barrier::new(threads);
552        let pool = Pool::new(threads);
553
554        pool.scoped(|scope| {
555            for _ in 0..actors {
556                // Reader
557                scope.execute(|| {
558                    start_barrier.wait();
559
560                    let _read = mutex.read().unwrap();
561                    let original = value.load(Ordering::SeqCst);
562
563                    for _ in 0..actions_per_actor {
564                        assert_eq!(original, value.load(Ordering::SeqCst));
565                    }
566                });
567
568                // Writer
569                scope.execute(|| {
570                    start_barrier.wait();
571
572                    let _write = mutex.write().unwrap();
573                    let mut previous = value.load(Ordering::SeqCst);
574
575                    for _ in 0..actions_per_actor {
576                        let next = value.fetch_add(1, Ordering::SeqCst);
577
578                        // fetch_add returns the old value
579                        assert_eq!(previous, next);
580
581                        // next time we will expect the old value + 1
582                        previous = next + 1;
583                    }
584                });
585            }
586        });
587
588        mutex.get_mut().unwrap();
589        pool.shutdown();
590    }
591
592    #[test]
593    fn test_simple_waiting() {
594        let pool = Pool::new(20);
595        let mutex = SharedMutex::new(());
596        let cond = Condvar::new();
597
598        pool.scoped(|scope| {
599            let lock = mutex.write().unwrap();
600
601            scope.execute(|| {
602                let _ = mutex.write().unwrap();
603                cond.notify_one();
604            });
605
606            // Write -> Read
607            let lock = lock.wait_for_read(&cond).unwrap();
608
609            scope.execute(|| {
610                drop(mutex.write().unwrap());
611                cond.notify_one();
612            });
613
614            // Read -> Read
615            let lock = lock.wait_for_read(&cond).unwrap();
616
617            scope.execute(|| {
618                drop(mutex.write().unwrap());
619                cond.notify_one();
620            });
621
622
623            // Read -> Write
624            let lock = lock.wait_for_write(&cond).unwrap();
625
626            scope.execute(|| {
627                drop(mutex.write().unwrap());
628                cond.notify_one();
629            });
630
631            // Write -> Write
632            lock.wait_for_write(&cond).unwrap();
633        });
634
635        pool.shutdown();
636    }
637
638    #[test]
639    fn test_mapping() {
640        let mutex = SharedMutex::new(vec![1, 2, 3]);
641
642        *mutex.write().unwrap().into_mapped()
643            .map(|v| &mut v[0]) = 100;
644
645        assert_eq!(*mutex.read().unwrap().into_mapped().map(|v| &v[0]), 100);
646    }
647
648    #[test]
649    fn test_map_recover() {
650        let mutex = SharedMutex::new(vec![1, 2]);
651
652        let mut write_map = mutex.write().unwrap().into_mapped()
653            .map(|v| &mut v[0]);
654        *write_map = 123;
655
656        let whole_guard = write_map.recover(&mutex).unwrap();
657        assert_eq!(&*whole_guard, &[123, 2]);
658    }
659
660    #[test]
661    fn test_try_locking() {
662        let mutex = SharedMutex::new(10);
663        mutex.try_read().unwrap();
664        mutex.try_write().unwrap();
665    }
666}
667