use ::{CACHE_LINE};
use ::shm::{SharedMemMap};
use ::platform::SendableWinHandle;
use std::{io, mem, ptr, thread, usize};
use std::borrow::Borrow;
use std::sync::atomic::{Ordering, AtomicUsize};
use winapi::shared::minwindef::{TRUE};
use winapi::um::winbase::{INFINITE, WAIT_OBJECT_0};
use winapi::um::synchapi::{WaitForSingleObject, CreateSemaphoreW, ReleaseSemaphore};
use winhandle::*;
pub(crate) struct Mutex {
mem: SharedMemMap,
raw_offset: usize,
atomic: *const AtomicUsize,
semaphore: WinHandle,
}
pub(crate) struct MutexGuard<'a> {
mutex: &'a Mutex,
}
pub(crate) const MUTEX_SHM_SIZE: usize = CACHE_LINE;
impl Mutex {
pub unsafe fn new_with_memory(memory: SharedMemMap, offset: usize) -> io::Result<Self> {
assert!(memory.borrow().len() >= offset + MUTEX_SHM_SIZE, "insufficient space for mutex");
assert!((memory.borrow().pointer() as usize + offset) % mem::size_of::<usize>() == 0, "shared memory for IPC mutex must be aligned");
let atomic = memory.borrow().pointer().offset(offset as isize) as *const AtomicUsize;
(*atomic).store(0, Ordering::SeqCst);
let semaphore = winapi_handle_call!(CreateSemaphoreW(
ptr::null_mut(),
0,
1,
ptr::null(),
))?;
let raw_offset = memory.borrow().offset() + offset;
Ok(Mutex {
mem: memory,
raw_offset,
atomic,
semaphore,
})
}
pub unsafe fn from_handle(handle: MutexHandle, memory: SharedMemMap, offset: usize) -> io::Result<Self> {
assert!(memory.borrow().len() >= offset + MUTEX_SHM_SIZE, "insufficient space for mutex");
assert!((memory.borrow().pointer() as usize + offset) % mem::size_of::<usize>() == 0, "shared memory for IPC mutex must be aligned");
let atomic = memory.borrow().pointer().offset(offset as isize) as *const AtomicUsize;
Ok(Mutex {
mem: memory,
raw_offset: handle.raw_offset,
atomic,
semaphore: handle.semaphore.0,
})
}
pub fn lock(&self) -> MutexGuard {
let shared_atomic = self.shared_atomic();
'outer: loop {
match shared_atomic.fetch_add(1, Ordering::SeqCst) {
0 => break,
_ => {
match unsafe { WaitForSingleObject(
self.semaphore.get(),
INFINITE,
) } {
WAIT_OBJECT_0 => {},
_ => {
panic!("WaitForSingleObject failed: {}", io::Error::last_os_error());
},
}
break;
},
}
}
return MutexGuard { mutex: self };
}
pub fn handle(&self) -> io::Result<MutexHandle> {
Ok(MutexHandle {
semaphore: SendableWinHandle(self.semaphore.clone()?),
raw_offset: self.raw_offset,
})
}
#[inline]
fn shared_atomic(&self) -> &AtomicUsize {
unsafe { &*self.atomic }
}
pub(crate) fn memory(&self) -> &::shm::SharedMemMap {
&self.mem
}
}
impl<'a> Drop for MutexGuard<'a> {
fn drop(&mut self) {
let shared_atomic = self.mutex.shared_atomic();
match shared_atomic.fetch_sub(1, Ordering::SeqCst) {
0 => if !thread::panicking() {
panic!("mutex shared memory in invalid state");
} else {
error!("mutex shared memory in invalid state");
},
1 => {}, _ => {
if unsafe { ReleaseSemaphore(self.mutex.semaphore.get(), 1, ptr::null_mut()) } != TRUE {
if !thread::panicking() {
panic!("ReleaseSemaphore failed: {}", io::Error::last_os_error());
} else {
error!("ReleaseSemaphore failed: {}", io::Error::last_os_error());
}
}
},
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct MutexHandle {
semaphore: SendableWinHandle,
raw_offset: usize,
}
impl MutexHandle {
pub fn raw_offset(&self) -> usize { self.raw_offset }
}