1#![cfg_attr(test, deny(warnings))]
2#![deny(missing_docs)]
3
4#[cfg(test)]
9extern crate scoped_pool;
10
11extern crate poison;
12
13use std::sync::{Condvar, LockResult, TryLockResult, TryLockError};
14use std::cell::UnsafeCell;
15use std::ops::{Deref, DerefMut};
16use std::{mem, ptr, fmt};
17
18use poison::{Poison, PoisonGuard, RawPoisonGuard};
19
20pub use raw::RawSharedMutex;
21
22pub mod monitor;
23mod raw;
24
25pub struct SharedMutex<T: ?Sized> {
33 raw: RawSharedMutex,
34 data: UnsafeCell<Poison<T>>
35}
36
37unsafe impl<T: ?Sized + Send> Send for SharedMutex<T> {}
38unsafe impl<T: ?Sized + Sync> Sync for SharedMutex<T> {}
39
40impl<T> SharedMutex<T> {
41 #[inline]
43 pub fn new(value: T) -> Self {
44 SharedMutex {
45 raw: RawSharedMutex::new(),
46 data: UnsafeCell::new(Poison::new(value))
47 }
48 }
49
50 #[inline]
54 pub fn into_inner(self) -> LockResult<T> {
55 unsafe { self.data.into_inner().into_inner() }
56 }
57}
58
59impl<T: ?Sized> SharedMutex<T> {
60 #[inline]
62 pub fn write(&self) -> LockResult<SharedMutexWriteGuard<T>> {
63 self.raw.write();
64 unsafe { SharedMutexWriteGuard::new(self) }
65 }
66
67 #[inline]
69 pub fn read(&self) -> LockResult<SharedMutexReadGuard<T>> {
70 self.raw.read();
71 unsafe { SharedMutexReadGuard::new(self) }
72 }
73
74 #[inline]
78 pub fn try_read(&self) -> TryLockResult<SharedMutexReadGuard<T>> {
79 if self.raw.try_read() {
80 Ok(try!(unsafe { SharedMutexReadGuard::new(self) }))
81 } else {
82 Err(TryLockError::WouldBlock)
83 }
84 }
85
86 #[inline]
90 pub fn try_write(&self) -> TryLockResult<SharedMutexWriteGuard<T>> {
91 if self.raw.try_write() {
92 Ok(try!(unsafe { SharedMutexWriteGuard::new(self) }))
93 } else {
94 Err(TryLockError::WouldBlock)
95 }
96 }
97
98 #[inline]
102 pub fn get_mut(&mut self) -> LockResult<&mut T> {
103 poison::map_result(unsafe { &mut *self.data.get() }.lock(),
104 |poison| unsafe { poison.into_mut() })
105 }
106}
107
108pub struct SharedMutexReadGuard<'mutex, T: ?Sized + 'mutex> {
110 data: &'mutex T,
111 mutex: &'mutex SharedMutex<T>
112}
113
114unsafe impl<'mutex, T: ?Sized + Send> Send for SharedMutexReadGuard<'mutex, T> {}
115unsafe impl<'mutex, T: ?Sized + Sync> Sync for SharedMutexReadGuard<'mutex, T> {}
116
117pub struct SharedMutexWriteGuard<'mutex, T: ?Sized + 'mutex> {
119 data: PoisonGuard<'mutex, T>,
120 mutex: &'mutex SharedMutex<T>
121}
122
123impl<'mutex, T: ?Sized> Deref for SharedMutexReadGuard<'mutex, T> {
124 type Target = T;
125
126 #[inline]
127 fn deref(&self) -> &T { self.data }
128}
129
130impl<'mutex, T: ?Sized> Deref for SharedMutexWriteGuard<'mutex, T> {
131 type Target = T;
132
133 #[inline]
134 fn deref(&self) -> &T { self.data.get() }
135}
136
137impl<'mutex, T: ?Sized> DerefMut for SharedMutexWriteGuard<'mutex, T> {
138 #[inline]
139 fn deref_mut(&mut self) -> &mut T { self.data.get_mut() }
140}
141
142impl<'mutex, T: ?Sized> SharedMutexReadGuard<'mutex, T> {
143 #[inline]
144 unsafe fn new(mutex: &'mutex SharedMutex<T>) -> LockResult<Self> {
145 poison::map_result((&*mutex.data.get()).get(), |data| {
146 SharedMutexReadGuard {
147 data: data,
148 mutex: mutex
149 }
150 })
151 }
152}
153
154impl<'mutex, T: ?Sized> SharedMutexWriteGuard<'mutex, T> {
155 #[inline]
156 unsafe fn new(mutex: &'mutex SharedMutex<T>) -> LockResult<Self> {
157 poison::map_result((&mut *mutex.data.get()).lock(), |poison| {
158 SharedMutexWriteGuard {
159 data: poison,
160 mutex: mutex
161 }
162 })
163 }
164}
165
166impl<'mutex, T: ?Sized> SharedMutexReadGuard<'mutex, T> {
167 pub fn into_mapped(self) -> MappedSharedMutexReadGuard<'mutex, T> {
171 let guard = MappedSharedMutexReadGuard {
172 mutex: &self.mutex.raw,
173 data: self.data
174 };
175
176 mem::forget(self);
178
179 guard
180 }
181
182 pub fn wait_for_write(self, cond: &Condvar) -> LockResult<SharedMutexWriteGuard<'mutex, T>> {
186 self.mutex.raw.wait_from_read_to_write(cond);
187
188 let guard = unsafe { SharedMutexWriteGuard::new(self.mutex) };
189
190 mem::forget(self);
192
193 guard
194 }
195
196 pub fn wait_for_read(self, cond: &Condvar) -> LockResult<Self> {
200 self.mutex.raw.wait_from_read_to_read(cond);
201
202 let guard = unsafe { SharedMutexReadGuard::new(self.mutex) };
203
204 mem::forget(self);
206
207 guard
208 }
209}
210
211impl<'mutex, T: ?Sized> SharedMutexWriteGuard<'mutex, T> {
212 pub fn into_mapped(self) -> MappedSharedMutexWriteGuard<'mutex, T> {
216 let guard = MappedSharedMutexWriteGuard {
217 mutex: &self.mutex.raw,
218 poison: unsafe { ptr::read(&self.data).into_raw() },
219 data: unsafe { (&mut *self.mutex.data.get()).get_mut() }
220 };
221
222 mem::forget(self);
224
225 guard
226 }
227
228 pub fn wait_for_write(self, cond: &Condvar) -> LockResult<Self> {
230 self.mutex.raw.wait_from_write_to_write(cond);
231
232 let guard = unsafe { SharedMutexWriteGuard::new(self.mutex) };
233
234 mem::forget(self);
236
237 guard
238 }
239
240 pub fn wait_for_read(self, cond: &Condvar) -> LockResult<SharedMutexReadGuard<'mutex, T>> {
242 self.mutex.raw.wait_from_write_to_read(cond);
243
244 let guard = unsafe { SharedMutexReadGuard::new(self.mutex) };
245
246 mem::forget(self);
248
249 guard
250 }
251}
252
253impl<'mutex, T: ?Sized> Drop for SharedMutexReadGuard<'mutex, T> {
254 #[inline]
255 fn drop(&mut self) { self.mutex.raw.unlock_read() }
256}
257
258impl<'mutex, T: ?Sized> Drop for SharedMutexWriteGuard<'mutex, T> {
259 #[inline]
260 fn drop(&mut self) { self.mutex.raw.unlock_write() }
261}
262
263pub struct MappedSharedMutexReadGuard<'mutex, T: ?Sized + 'mutex> {
268 mutex: &'mutex RawSharedMutex,
269 data: &'mutex T
270}
271
272pub struct MappedSharedMutexWriteGuard<'mutex, T: ?Sized + 'mutex> {
277 mutex: &'mutex RawSharedMutex,
278 poison: RawPoisonGuard<'mutex>,
279 data: &'mutex mut T,
280}
281
282impl<'mutex, T: ?Sized> MappedSharedMutexReadGuard<'mutex, T> {
283 #[inline]
285 pub fn map<U: ?Sized, F>(self, action: F) -> MappedSharedMutexReadGuard<'mutex, U>
286 where F: FnOnce(&T) -> &U {
287 self.option_map(move |t| Some(action(t))).unwrap()
288 }
289
290 #[inline]
292 pub fn option_map<U: ?Sized, F>(self, action: F) -> Option<MappedSharedMutexReadGuard<'mutex, U>>
293 where F: FnOnce(&T) -> Option<&U> {
294 self.result_map(move |t| action(t).ok_or(())).ok()
295 }
296
297 #[inline]
301 pub fn result_map<U: ?Sized, E, F>(self, action: F)
302 -> Result<MappedSharedMutexReadGuard<'mutex, U>, (Self, E)>
303 where F: FnOnce(&T) -> Result<&U, E> {
304 let data = self.data;
305 let mutex = self.mutex;
306
307 match action(data) {
308 Ok(new_data) => {
309 mem::forget(self);
311
312 Ok(MappedSharedMutexReadGuard {
313 data: new_data,
314 mutex: mutex
315 })
316 },
317 Err(e) => { Err((self, e)) }
318 }
319 }
320
321 #[inline]
326 pub fn recover<U: ?Sized>(self, mutex: &'mutex SharedMutex<U>) -> Result<SharedMutexReadGuard<'mutex, U>, Self> {
327 if self.mutex.is(&mutex.raw) {
328 let guard = unsafe { SharedMutexReadGuard::new(mutex) }.unwrap();
330
331 mem::forget(self);
333
334 Ok(guard)
335 } else {
336 Err(self)
337 }
338 }
339}
340
341impl<'mutex, T: ?Sized> MappedSharedMutexWriteGuard<'mutex, T> {
342 #[inline]
344 pub fn map<U: ?Sized, F>(self, action: F) -> MappedSharedMutexWriteGuard<'mutex, U>
345 where F: FnOnce(&mut T) -> &mut U {
346 self.option_map(move |t| Some(action(t))).unwrap()
347 }
348
349 #[inline]
351 pub fn option_map<U: ?Sized, F>(self, action: F) -> Option<MappedSharedMutexWriteGuard<'mutex, U>>
352 where F: FnOnce(&mut T) -> Option<&mut U> {
353 self.result_map(move |t| action(t).ok_or(())).ok()
354 }
355
356 #[inline]
360 pub fn result_map<U: ?Sized, E, F>(self, action: F)
361 -> Result<MappedSharedMutexWriteGuard<'mutex, U>, (Self, E)>
362 where F: FnOnce(&mut T) -> Result<&mut U, E> {
363 let data = unsafe { ptr::read(&self.data) };
364 let mutex = self.mutex;
365
366 match action(data) {
367 Ok(new_data) => {
368 let poison = unsafe { ptr::read(&self.poison) };
369
370 mem::forget(self);
372
373 Ok(MappedSharedMutexWriteGuard {
374 data: new_data,
375 poison: poison,
376 mutex: mutex
377 })
378 },
379 Err(e) => { Err((self, e)) }
380 }
381 }
382
383 #[inline]
388 pub fn recover<U: ?Sized>(self, mutex: &'mutex SharedMutex<U>) -> Result<SharedMutexWriteGuard<'mutex, U>, Self> {
389 if self.mutex.is(&mutex.raw) {
390 let guard = unsafe { SharedMutexWriteGuard::new(mutex) }.unwrap();
392
393 mem::forget(self);
395
396 Ok(guard)
397 } else {
398 Err(self)
399 }
400 }
401}
402
403impl<'mutex, T: ?Sized> Deref for MappedSharedMutexReadGuard<'mutex, T> {
404 type Target = T;
405
406 #[inline]
407 fn deref(&self) -> &T { self.data }
408}
409
410impl<'mutex, T: ?Sized> Deref for MappedSharedMutexWriteGuard<'mutex, T> {
411 type Target = T;
412
413 #[inline]
414 fn deref(&self) -> &T { self.data }
415}
416
417impl<'mutex, T: ?Sized> DerefMut for MappedSharedMutexWriteGuard<'mutex, T> {
418 #[inline]
419 fn deref_mut(&mut self) -> &mut T { self.data }
420}
421
422impl<'mutex, T: ?Sized> Drop for MappedSharedMutexReadGuard<'mutex, T> {
423 #[inline]
424 fn drop(&mut self) { self.mutex.unlock_read() }
425}
426
427impl<'mutex, T: ?Sized> Drop for MappedSharedMutexWriteGuard<'mutex, T> {
428 #[inline]
429 fn drop(&mut self) { self.mutex.unlock_write() }
430}
431
432impl<T: ?Sized + fmt::Debug> fmt::Debug for SharedMutex<T> {
433 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
434 let mut writer = f.debug_struct("SharedMutex");
435
436 match self.try_read() {
437 Ok(l) => writer.field("data", &&*l),
438 Err(TryLockError::WouldBlock) => writer.field("data", &"{{ locked }}"),
439 Err(TryLockError::Poisoned(_)) => writer.field("data", &"{{ poisoned }}")
440 }.finish()
441 }
442}
443
444impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for SharedMutexReadGuard<'mutex, T> {
445 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
446 f.debug_struct("SharedMutexReadGuard")
447 .field("data", &*self)
448 .finish()
449 }
450}
451
452impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for SharedMutexWriteGuard<'mutex, T> {
453 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
454 f.debug_struct("SharedMutexWriteGuard")
455 .field("data", &*self)
456 .finish()
457 }
458}
459
460impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for MappedSharedMutexReadGuard<'mutex, T> {
461 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
462 f.debug_struct("MappedSharedMutexReadGuard")
463 .field("data", &*self)
464 .finish()
465 }
466}
467
468impl<'mutex, T: ?Sized + fmt::Debug> fmt::Debug for MappedSharedMutexWriteGuard<'mutex, T> {
469 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
470 f.debug_struct("MappedSharedMutexWriteGuard")
471 .field("data", &*self)
472 .finish()
473 }
474}
475
476#[cfg(test)]
477mod test {
478 use std::sync::{Condvar, Barrier};
479 use std::sync::atomic::{AtomicUsize, Ordering};
480 use scoped_pool::Pool;
481
482 use super::*;
483
484 fn _check_bounds() {
485 fn _is_send_sync<T: Send + Sync>() {}
486
487 _is_send_sync::<RawSharedMutex>();
488 _is_send_sync::<SharedMutex<()>>();
489 _is_send_sync::<SharedMutexReadGuard<()>>();
490 _is_send_sync::<SharedMutexWriteGuard<()>>();
491 }
492
493 #[test]
494 fn test_simple_multithreaded() {
495 let pool = Pool::new(8);
496 let mut mutex = SharedMutex::new(0);
497 let n = 100;
498
499 pool.scoped(|scope| {
500 for _ in 0..n {
501 scope.execute(|| {
502 let before = *mutex.read().unwrap();
503 *mutex.write().unwrap() += 1;
504 let after = *mutex.read().unwrap();
505
506 assert!(before < after, "Time travel! {:?} >= {:?}", before, after);
507 })
508 }
509 });
510
511 assert_eq!(*mutex.get_mut().unwrap(), 100);
512 pool.shutdown();
513 }
514
515 #[test]
516 fn test_simple_single_thread() {
517 let mut mutex = SharedMutex::new(0);
518 let n = 100;
519
520 for _ in 0..n {
521 let before = *mutex.read().unwrap();
522 *mutex.write().unwrap() += 1;
523 let after = *mutex.read().unwrap();
524
525 assert!(before < after, "Time travel! {:?} >= {:?}", before, after);
526 }
527
528 assert_eq!(*mutex.get_mut().unwrap(), 100);
529 }
530
531 #[test]
532 fn test_locking_multithreaded() {
533 let mut mutex = SharedMutex::new(());
546 let value = AtomicUsize::new(0);
547
548 let threads = 50;
549 let actors = threads * 20; let actions_per_actor = 100;
551 let start_barrier = Barrier::new(threads);
552 let pool = Pool::new(threads);
553
554 pool.scoped(|scope| {
555 for _ in 0..actors {
556 scope.execute(|| {
558 start_barrier.wait();
559
560 let _read = mutex.read().unwrap();
561 let original = value.load(Ordering::SeqCst);
562
563 for _ in 0..actions_per_actor {
564 assert_eq!(original, value.load(Ordering::SeqCst));
565 }
566 });
567
568 scope.execute(|| {
570 start_barrier.wait();
571
572 let _write = mutex.write().unwrap();
573 let mut previous = value.load(Ordering::SeqCst);
574
575 for _ in 0..actions_per_actor {
576 let next = value.fetch_add(1, Ordering::SeqCst);
577
578 assert_eq!(previous, next);
580
581 previous = next + 1;
583 }
584 });
585 }
586 });
587
588 mutex.get_mut().unwrap();
589 pool.shutdown();
590 }
591
592 #[test]
593 fn test_simple_waiting() {
594 let pool = Pool::new(20);
595 let mutex = SharedMutex::new(());
596 let cond = Condvar::new();
597
598 pool.scoped(|scope| {
599 let lock = mutex.write().unwrap();
600
601 scope.execute(|| {
602 let _ = mutex.write().unwrap();
603 cond.notify_one();
604 });
605
606 let lock = lock.wait_for_read(&cond).unwrap();
608
609 scope.execute(|| {
610 drop(mutex.write().unwrap());
611 cond.notify_one();
612 });
613
614 let lock = lock.wait_for_read(&cond).unwrap();
616
617 scope.execute(|| {
618 drop(mutex.write().unwrap());
619 cond.notify_one();
620 });
621
622
623 let lock = lock.wait_for_write(&cond).unwrap();
625
626 scope.execute(|| {
627 drop(mutex.write().unwrap());
628 cond.notify_one();
629 });
630
631 lock.wait_for_write(&cond).unwrap();
633 });
634
635 pool.shutdown();
636 }
637
638 #[test]
639 fn test_mapping() {
640 let mutex = SharedMutex::new(vec![1, 2, 3]);
641
642 *mutex.write().unwrap().into_mapped()
643 .map(|v| &mut v[0]) = 100;
644
645 assert_eq!(*mutex.read().unwrap().into_mapped().map(|v| &v[0]), 100);
646 }
647
648 #[test]
649 fn test_map_recover() {
650 let mutex = SharedMutex::new(vec![1, 2]);
651
652 let mut write_map = mutex.write().unwrap().into_mapped()
653 .map(|v| &mut v[0]);
654 *write_map = 123;
655
656 let whole_guard = write_map.recover(&mutex).unwrap();
657 assert_eq!(&*whole_guard, &[123, 2]);
658 }
659
660 #[test]
661 fn test_try_locking() {
662 let mutex = SharedMutex::new(10);
663 mutex.try_read().unwrap();
664 mutex.try_write().unwrap();
665 }
666}
667