linux_interprocess/
interprocess_mutex.rs1use std::{
2 ffi::c_void,
3 mem::{size_of, zeroed},
4 num::NonZeroUsize,
5 os::{
6 fd::{FromRawFd, OwnedFd},
7 unix::io::AsRawFd,
8 },
9};
10
11use anyhow::{Result, anyhow};
12use nix::{
13 errno::Errno,
14 fcntl::{Flock, FlockArg, OFlag, open},
15 libc::{
16 EOWNERDEAD, PTHREAD_MUTEX_ROBUST, PTHREAD_PROCESS_SHARED, c_int, dup, munmap, off_t,
17 pthread_mutex_consistent, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_t,
18 pthread_mutex_unlock, pthread_mutexattr_destroy, pthread_mutexattr_init,
19 pthread_mutexattr_setpshared, pthread_mutexattr_setrobust, pthread_mutexattr_t,
20 },
21 sys::{
22 mman::{MapFlags, ProtFlags, mmap},
23 stat::Mode,
24 },
25 unistd::ftruncate,
26};
27
28pub enum LockResult {
30 Acquired,
32 OwnerDiedRecovered,
34}
35
36pub struct InterprocessMutex {
37 _fd: OwnedFd,
38 ptr: *mut pthread_mutex_t,
39}
40
41impl InterprocessMutex {
42 pub fn new(name: &str) -> Result<Self> {
43 let path = format!("/dev/shm/{}.mtx", name);
44 let fd = open(
45 path.as_str(),
46 OFlag::O_CREAT | OFlag::O_RDWR,
47 Mode::from_bits_truncate(0o600),
48 )?;
49
50 ftruncate(&fd, size_of::<pthread_mutex_t>() as off_t)?;
51
52 let dup_raw_fd = unsafe { Errno::result(dup(fd.as_raw_fd()))? };
53 let dup_fd = unsafe { OwnedFd::from_raw_fd(dup_raw_fd) };
54
55 let init_lock = Flock::lock(dup_fd, FlockArg::LockExclusive)
56 .map_err(|(_, e)| anyhow!("init-lock failed: {}", e))?;
57
58 let len = NonZeroUsize::new(size_of::<pthread_mutex_t>())
59 .expect("pthread_mutex_t has nonzero size");
60 let raw_ptr = unsafe {
61 mmap(
62 None,
63 len,
64 ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
65 MapFlags::MAP_SHARED,
66 &fd,
67 0,
68 )?
69 };
70
71 let mtx_ptr = raw_ptr.as_ptr() as *mut pthread_mutex_t;
72
73 let first = unsafe { *(mtx_ptr as *const c_int) };
74 if first == 0 {
75 let mut attr: pthread_mutexattr_t = unsafe { zeroed() };
76 unsafe {
77 Errno::result(pthread_mutexattr_init(&mut attr))?;
78 Errno::result(pthread_mutexattr_setpshared(
79 &mut attr,
80 PTHREAD_PROCESS_SHARED,
81 ))?;
82 Errno::result(pthread_mutexattr_setrobust(&mut attr, PTHREAD_MUTEX_ROBUST))?;
83 Errno::result(pthread_mutex_init(mtx_ptr, &attr))?;
84 Errno::result(pthread_mutexattr_destroy(&mut attr))?;
85 }
86 }
87
88 init_lock
89 .unlock()
90 .map_err(|(_, e)| anyhow!("init-unlock failed: {}", e))?;
91
92 Ok(Self {
93 _fd: fd,
94 ptr: mtx_ptr,
95 })
96 }
97
98 pub fn lock(&self) -> Result<LockResult> {
99 let err = unsafe { pthread_mutex_lock(self.ptr) };
100 if err == EOWNERDEAD {
101 unsafe {
102 Errno::result(pthread_mutex_consistent(self.ptr))
103 .map_err(|e| anyhow!("pthread_mutex_consistent failed: {e}"))?;
104 }
105 Ok(LockResult::OwnerDiedRecovered)
106 } else {
107 Errno::result(err)
108 .map(|_| LockResult::Acquired)
109 .map_err(|e| anyhow!("pthread_mutex_lock failed: {e}"))
110 }
111 }
112
113 pub fn unlock(&self) -> Result<(), Errno> {
114 unsafe { Errno::result(pthread_mutex_unlock(self.ptr)).map(|_| ()) }
115 }
116}
117
118impl Drop for InterprocessMutex {
119 fn drop(&mut self) {
120 unsafe {
121 Errno::result(munmap(
124 self.ptr as *mut c_void,
125 size_of::<pthread_mutex_t>(),
126 ))
127 .ok();
128 }
129 }
130}