1use ::{USIZE_SIZE};
2use ::shm::{SharedMemMap, Access as SharedMemAccess};
3use platform;
4
5use std::{io, mem, thread};
6use std::borrow::Borrow;
7use std::sync::{LockResult, PoisonError};
8use std::sync::atomic::{Ordering, AtomicBool};
9
10use uuid::Uuid;
11
12pub struct Mutex {
14 pub(crate) inner: platform::Mutex,
15 poison: *const AtomicBool,
16}
17
18unsafe impl Send for Mutex {}
19unsafe impl Sync for Mutex {}
20
21pub struct MutexGuard<'a> {
22 mutex: &'a Mutex,
23 _inner: platform::MutexGuard<'a>,
24}
25
26pub const MUTEX_SHM_SIZE: usize = platform::MUTEX_SHM_SIZE + USIZE_SIZE;
28
29impl Mutex {
32 pub unsafe fn new_with_memory(memory: SharedMemMap, offset: usize) -> io::Result<Self> {
43 Self::with_inner(memory, offset, |memory| platform::Mutex::new_with_memory(memory, offset))
44 }
45
46 pub unsafe fn from_handle(handle: MutexHandle, memory: SharedMemMap) -> io::Result<Self> {
48 if memory.borrow().token() != handle.shm_token {
49 return Err(io::Error::new(io::ErrorKind::InvalidInput, "the mutex is not associated with the given shared memory"));
50 }
51 let local_offset = handle.inner.raw_offset().checked_sub(memory.borrow().offset())
52 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "mapping does not contain memory of shared mutex"))?;
53 if memory.borrow().len() < local_offset + MUTEX_SHM_SIZE {
54 return Err(io::Error::new(io::ErrorKind::InvalidInput, "mapping does not contain memory of shared mutex"));
55 }
56 Self::with_inner(memory, local_offset, move |memory| platform::Mutex::from_handle(handle.inner, memory, local_offset))
57 }
58
59 pub(crate) unsafe fn with_inner<F>(memory: SharedMemMap, offset: usize, f: F) -> io::Result<Self> where
60 F: FnOnce(SharedMemMap) -> io::Result<platform::Mutex>,
61 {
62 if memory.len() < offset + MUTEX_SHM_SIZE {
63 return Err(io::Error::new(io::ErrorKind::InvalidInput, "out of range offset for mutex shared memory"));
64 }
65 if (memory.pointer() as usize + offset) % mem::size_of::<usize>() != 0 {
66 return Err(io::Error::new(io::ErrorKind::InvalidInput, "memory for mutex must be aligned"));
67 }
68 if memory.access() != SharedMemAccess::ReadWrite {
69 return Err(io::Error::new(io::ErrorKind::InvalidInput, "memory for mutex must be read-write"));
70 }
71 let poison = memory.borrow().pointer().offset((offset + platform::MUTEX_SHM_SIZE) as isize) as *const AtomicBool;
72 (*poison).store(false, Ordering::SeqCst);
73 let inner = f(memory)?;
74 Ok(Mutex {
75 inner,
76 poison,
77 })
78 }
79
80 pub fn lock(&self) -> LockResult<MutexGuard> {
83 let guard = self.inner.lock();
84 let guard = MutexGuard {
85 mutex: self,
86 _inner: guard,
87 };
88 if self.poison_flag().load(Ordering::SeqCst) {
91 Err(PoisonError::new(guard))
92 } else {
93 Ok(guard)
94 }
95 }
96
97 pub fn handle(&self) -> io::Result<MutexHandle> {
99 let inner = self.inner.handle()?;
100 Ok(MutexHandle { inner, shm_token: self.inner.memory().token() })
101 }
102
103 fn poison_flag(&self) -> &AtomicBool {
104 unsafe { &*self.poison }
105 }
106}
107
108impl<'a> Drop for MutexGuard<'a> {
109 fn drop(&mut self) {
110 if thread::panicking() {
111 self.mutex.poison_flag().store(true, Ordering::SeqCst);
113 }
114 }
115}
116
117#[derive(Serialize, Deserialize, Debug)]
122pub struct MutexHandle {
123 inner: platform::MutexHandle,
124 shm_token: Uuid,
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use ::shm::{Access as SharedMemAccess, SharedMem};
131 use ::check_send;
132
133 use std::{mem, thread};
134 use std::time::Duration;
135 use std::sync::{Barrier, Arc};
136
137 #[test]
138 fn mutex_is_send() {
139 let memory = SharedMem::new(MUTEX_SHM_SIZE).unwrap();
140 let memory = memory.map(.. MUTEX_SHM_SIZE, SharedMemAccess::ReadWrite).unwrap();
141 let mutex = unsafe { Mutex::new_with_memory(memory, 0).unwrap() };
142 check_send(&mutex);
143 }
144
145 #[test]
146 fn uncontested_mutex_lock() {
147 let memory = SharedMem::new(MUTEX_SHM_SIZE).unwrap();
148 let memory = memory.map(.. MUTEX_SHM_SIZE, SharedMemAccess::ReadWrite).unwrap();
149 let mutex = unsafe { Mutex::new_with_memory(memory, 0).unwrap() };
150 {
151 let _guard = mutex.lock().unwrap();
152 }
153 {
154 let _guard = mutex.lock().unwrap();
155 }
156 }
157
158 #[test]
159 fn single_process_contested_mutex_lock() {
160 let barrier = Arc::new(Barrier::new(2));
161 let value = Arc::new(AtomicBool::new(false));
162
163 let memory = SharedMem::new(MUTEX_SHM_SIZE).unwrap();
164 let memory_map = memory.map(.., SharedMemAccess::ReadWrite).unwrap();
165 let mutex = unsafe { Mutex::new_with_memory(memory_map, 0).unwrap() };
166
167 let guard = mutex.lock().unwrap();
168 let thread = thread::spawn({
169 let barrier = barrier.clone();
170 let handle = mutex.handle().unwrap();
171 let value = value.clone();
172 move || {
173 let memory_map = memory.map(.., SharedMemAccess::ReadWrite).unwrap();
174 let mutex = unsafe { Mutex::from_handle(handle, memory_map).unwrap() };
175 barrier.wait();
176 let _guard = mutex.lock().unwrap();
177 value.store(true, Ordering::SeqCst);
178 }
179 });
180 barrier.wait();
181 thread::sleep(Duration::from_millis(100));
182 assert!(!value.load(Ordering::SeqCst));
183 mem::drop(guard);
184 thread.join().unwrap();
185 assert!(value.load(Ordering::SeqCst));
186 }
187}