sandbox_ipc/
sync.rs

1use ::{USIZE_SIZE};
2use ::shm::{SharedMemMap, Access as SharedMemAccess};
3use platform;
4
5use std::{io, mem, thread};
6use std::borrow::Borrow;
7use std::sync::{LockResult, PoisonError};
8use std::sync::atomic::{Ordering, AtomicBool};
9
10use uuid::Uuid;
11
12/// An analogue of `std::sync::Mutex` which can operate within shared memory.
13pub struct Mutex {
14    pub(crate) inner: platform::Mutex,
15    poison: *const AtomicBool,
16}
17
18unsafe impl Send for Mutex {}
19unsafe impl Sync for Mutex {}
20
21pub struct MutexGuard<'a> {
22    mutex: &'a Mutex,
23    _inner: platform::MutexGuard<'a>,
24}
25
26/// The amount of shared memory space required to hold a `Mutex`.
27pub const MUTEX_SHM_SIZE: usize = platform::MUTEX_SHM_SIZE + USIZE_SIZE;
28
29// On top of the OS-level mutex, we add a usize (protected by the mutex) that signals
30// that the lock is poisoned (possibly by a thread in another process).
31impl Mutex {
32    /// Creates a brand new `Mutex` in the given shared memory location.
33    /// 
34    /// This can *only* be used to create a brand new `Mutex`. It cannot be used to create a handle to an
35    /// existing `Mutex` already created at the given location in shared memory. To send the `Mutex` to another
36    /// process you must send the shared memory region and a `MutexHandle` produced via the `handle()` function.
37    /// 
38    /// # Panics
39    /// 
40    /// This function will panic if there is not `MUTEX_SHM_SIZE` bytes of memory available at the
41    /// given `offset`, or if the memory is not aligned to a pointer width within the shared memory section.
42    pub unsafe fn new_with_memory(memory: SharedMemMap, offset: usize) -> io::Result<Self> {
43        Self::with_inner(memory, offset, |memory| platform::Mutex::new_with_memory(memory, offset))
44    }
45
46    /// Establishes a new reference to a `Mutex` previously created with `new_with_memory`.
47    pub unsafe fn from_handle(handle: MutexHandle, memory: SharedMemMap) -> io::Result<Self> {
48        if memory.borrow().token() != handle.shm_token {
49            return Err(io::Error::new(io::ErrorKind::InvalidInput, "the mutex is not associated with the given shared memory"));
50        }
51        let local_offset = handle.inner.raw_offset().checked_sub(memory.borrow().offset())
52            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "mapping does not contain memory of shared mutex"))?;
53        if memory.borrow().len() < local_offset + MUTEX_SHM_SIZE {
54            return Err(io::Error::new(io::ErrorKind::InvalidInput, "mapping does not contain memory of shared mutex"));
55        }
56        Self::with_inner(memory, local_offset, move |memory| platform::Mutex::from_handle(handle.inner, memory, local_offset))
57    }
58
59    pub(crate) unsafe fn with_inner<F>(memory: SharedMemMap, offset: usize, f: F) -> io::Result<Self> where
60        F: FnOnce(SharedMemMap) -> io::Result<platform::Mutex>,
61    {
62        if memory.len() < offset + MUTEX_SHM_SIZE {
63            return Err(io::Error::new(io::ErrorKind::InvalidInput, "out of range offset for mutex shared memory"));
64        }
65        if (memory.pointer() as usize + offset) % mem::size_of::<usize>() != 0 {
66            return Err(io::Error::new(io::ErrorKind::InvalidInput, "memory for mutex must be aligned"));
67        }
68        if memory.access() != SharedMemAccess::ReadWrite {
69            return Err(io::Error::new(io::ErrorKind::InvalidInput, "memory for mutex must be read-write"));
70        }
71        let poison = memory.borrow().pointer().offset((offset + platform::MUTEX_SHM_SIZE) as isize) as *const AtomicBool;
72        (*poison).store(false, Ordering::SeqCst);
73        let inner = f(memory)?;
74        Ok(Mutex {
75            inner,
76            poison,
77        })
78    }
79
80    /// Acquires an exclusive lock on the `Mutex`, including any instances created from the same
81    /// `MutexHandle`.
82    pub fn lock(&self) -> LockResult<MutexGuard> {
83        let guard = self.inner.lock();
84        let guard = MutexGuard {
85            mutex: self,
86            _inner: guard,
87        };
88        // TODO: we can probably relax SeqCst, but shouldn't do so before testing on
89        // a weak memory architecture
90        if self.poison_flag().load(Ordering::SeqCst) {
91            Err(PoisonError::new(guard))
92        } else {
93            Ok(guard)
94        }
95    }
96
97    /// Creates a new handle to the `Mutex` that can be transmitted to other processes.
98    pub fn handle(&self) -> io::Result<MutexHandle> {
99        let inner = self.inner.handle()?;
100        Ok(MutexHandle { inner, shm_token: self.inner.memory().token() })
101    }
102
103    fn poison_flag(&self) -> &AtomicBool {
104        unsafe { &*self.poison }
105    }
106}
107
108impl<'a> Drop for MutexGuard<'a> {
109    fn drop(&mut self) {
110        if thread::panicking() {
111            // Indicate we are panicking, both to this instance and possible other instances in different processes
112            self.mutex.poison_flag().store(true, Ordering::SeqCst);
113        }
114    }
115}
116
117/// A handle to a `Mutex` that exists in shared memory.
118/// 
119/// This can be sent over any medium capable of transmitting OS resources (e.g. `MessageChannel`). To reconstitute a working `Mutex`,
120/// a reference to the `SharedMem` holding it must be transmitted as well.
121#[derive(Serialize, Deserialize, Debug)]
122pub struct MutexHandle {
123    inner: platform::MutexHandle,
124    shm_token: Uuid,
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use ::shm::{Access as SharedMemAccess, SharedMem};
131    use ::check_send;
132
133    use std::{mem, thread};
134    use std::time::Duration;
135    use std::sync::{Barrier, Arc};
136
137    #[test]
138    fn mutex_is_send() {
139        let memory = SharedMem::new(MUTEX_SHM_SIZE).unwrap();
140        let memory = memory.map(.. MUTEX_SHM_SIZE, SharedMemAccess::ReadWrite).unwrap();
141        let mutex = unsafe { Mutex::new_with_memory(memory, 0).unwrap() };
142        check_send(&mutex);
143    }
144
145    #[test]
146    fn uncontested_mutex_lock() {
147        let memory = SharedMem::new(MUTEX_SHM_SIZE).unwrap();
148        let memory = memory.map(.. MUTEX_SHM_SIZE, SharedMemAccess::ReadWrite).unwrap();
149        let mutex = unsafe { Mutex::new_with_memory(memory, 0).unwrap() };
150        {
151            let _guard = mutex.lock().unwrap();
152        }
153        {
154            let _guard = mutex.lock().unwrap();
155        }
156    }
157
158    #[test]
159    fn single_process_contested_mutex_lock() {
160        let barrier = Arc::new(Barrier::new(2));
161        let value = Arc::new(AtomicBool::new(false));
162
163        let memory = SharedMem::new(MUTEX_SHM_SIZE).unwrap();
164        let memory_map = memory.map(.., SharedMemAccess::ReadWrite).unwrap();
165        let mutex = unsafe { Mutex::new_with_memory(memory_map, 0).unwrap() };
166
167        let guard = mutex.lock().unwrap();
168        let thread = thread::spawn({
169            let barrier = barrier.clone();
170            let handle = mutex.handle().unwrap();
171            let value = value.clone();
172            move || {
173                let memory_map = memory.map(.., SharedMemAccess::ReadWrite).unwrap();
174                let mutex = unsafe { Mutex::from_handle(handle, memory_map).unwrap() };
175                barrier.wait();
176                let _guard = mutex.lock().unwrap();
177                value.store(true, Ordering::SeqCst);
178            }
179        });
180        barrier.wait();
181        thread::sleep(Duration::from_millis(100));
182        assert!(!value.load(Ordering::SeqCst));
183        mem::drop(guard);
184        thread.join().unwrap();
185        assert!(value.load(Ordering::SeqCst));
186    }
187}