linux_interprocess/
interprocess_mutex.rs1use std::{
2 error::Error,
3 ffi::c_void,
4 mem::{size_of, zeroed},
5 num::NonZeroUsize,
6 os::{
7 fd::{FromRawFd, OwnedFd},
8 unix::io::AsRawFd,
9 },
10};
11
12use nix::{
13 errno::Errno,
14 fcntl::{Flock, FlockArg, OFlag, open},
15 libc::{
16 EOWNERDEAD, PTHREAD_MUTEX_ROBUST, PTHREAD_PROCESS_SHARED, c_int, dup, munmap, off_t,
17 pthread_mutex_consistent, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_t,
18 pthread_mutex_unlock, pthread_mutexattr_destroy, pthread_mutexattr_init,
19 pthread_mutexattr_setpshared, pthread_mutexattr_setrobust, pthread_mutexattr_t,
20 },
21 sys::{
22 mman::{MapFlags, ProtFlags, mmap},
23 stat::Mode,
24 },
25 unistd::ftruncate,
26};
27
28pub enum LockResult {
30 Acquired,
32 OwnerDiedRecovered,
34}
35
36pub struct InterprocessMutex {
37 _fd: OwnedFd,
38 ptr: *mut pthread_mutex_t,
39}
40
41impl InterprocessMutex {
42 pub fn new(name: &str) -> Result<Self, Box<dyn Error>> {
43 let path = format!("/dev/shm/{}.mtx", name);
44 let fd = open(
45 path.as_str(),
46 OFlag::O_CREAT | OFlag::O_RDWR,
47 Mode::from_bits_truncate(0o600),
48 )?;
49
50 ftruncate(&fd, size_of::<pthread_mutex_t>() as off_t)?;
51
52 let dup_raw_fd = unsafe { Errno::result(dup(fd.as_raw_fd()))? };
53 let dup_fd = unsafe { OwnedFd::from_raw_fd(dup_raw_fd) };
54
55 let init_lock = Flock::lock(dup_fd, FlockArg::LockExclusive)
56 .map_err(|(_, e)| format!("init-lock failed: {}", e))?;
57
58 let len = NonZeroUsize::new(size_of::<pthread_mutex_t>())
59 .expect("pthread_mutex_t has nonzero size");
60 let raw_ptr = unsafe {
61 mmap(
62 None,
63 len,
64 ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
65 MapFlags::MAP_SHARED,
66 &fd,
67 0,
68 )?
69 };
70
71 let mtx_ptr = raw_ptr.as_ptr() as *mut pthread_mutex_t;
72
73 let first = unsafe { *(mtx_ptr as *const c_int) };
74 if first == 0 {
75 let mut attr: pthread_mutexattr_t = unsafe { zeroed() };
76 unsafe {
77 Errno::result(pthread_mutexattr_init(&mut attr))?;
78 Errno::result(pthread_mutexattr_setpshared(
79 &mut attr,
80 PTHREAD_PROCESS_SHARED,
81 ))?;
82 Errno::result(pthread_mutexattr_setrobust(&mut attr, PTHREAD_MUTEX_ROBUST))?;
83 Errno::result(pthread_mutex_init(mtx_ptr, &attr))?;
84 Errno::result(pthread_mutexattr_destroy(&mut attr))?;
85 }
86 }
87
88 init_lock
89 .unlock()
90 .map_err(|(_, e)| format!("init-unlock failed: {}", e))?;
91
92 Ok(Self {
93 _fd: fd,
94 ptr: mtx_ptr,
95 })
96 }
97
98 pub fn lock(&self) -> Result<LockResult, Errno> {
99 let err = unsafe { pthread_mutex_lock(self.ptr) };
100 if err == EOWNERDEAD {
101 unsafe {
102 Errno::result(pthread_mutex_consistent(self.ptr))?;
103 }
104 Ok(LockResult::OwnerDiedRecovered)
105 } else {
106 Errno::result(err).map(|_| LockResult::Acquired)
107 }
108 }
109
110 pub fn unlock(&self) -> Result<(), Errno> {
111 unsafe { Errno::result(pthread_mutex_unlock(self.ptr)).map(|_| ()) }
112 }
113}
114
115impl Drop for InterprocessMutex {
116 fn drop(&mut self) {
117 unsafe {
118 Errno::result(munmap(
121 self.ptr as *mut c_void,
122 size_of::<pthread_mutex_t>(),
123 ))
124 .ok();
125 }
126 }
127}