linux_interprocess/
interprocess_mutex.rs1use std::{
2 ffi::c_void,
3 mem::{size_of, zeroed},
4 num::NonZeroUsize,
5 os::{
6 fd::{FromRawFd, OwnedFd},
7 unix::io::AsRawFd,
8 },
9};
10
11use anyhow::{Result, anyhow};
12use nix::{
13 errno::Errno,
14 fcntl::{Flock, FlockArg, OFlag, open},
15 libc::{
16 EOWNERDEAD, PTHREAD_MUTEX_ROBUST, PTHREAD_PROCESS_SHARED, c_int, dup, munmap, off_t,
17 pthread_mutex_consistent, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_t,
18 pthread_mutex_unlock, pthread_mutexattr_destroy, pthread_mutexattr_init,
19 pthread_mutexattr_setpshared, pthread_mutexattr_setrobust, pthread_mutexattr_t,
20 },
21 sys::{
22 mman::{MapFlags, ProtFlags, mmap},
23 stat::Mode,
24 },
25 unistd::ftruncate,
26};
27
28#[derive(Debug, Clone)]
30pub enum LockResult {
31 Acquired,
33 OwnerDiedRecovered,
35}
36
37pub struct InterprocessMutex {
38 _fd: OwnedFd,
39 ptr: *mut pthread_mutex_t,
40}
41
42impl InterprocessMutex {
43 pub fn new(name: &str) -> Result<Self> {
44 let path = format!("/dev/shm/{}.mtx", name);
45 let fd = open(
46 path.as_str(),
47 OFlag::O_CREAT | OFlag::O_RDWR,
48 Mode::from_bits_truncate(0o600),
49 )?;
50
51 ftruncate(&fd, size_of::<pthread_mutex_t>() as off_t)?;
52
53 let dup_raw_fd = unsafe { Errno::result(dup(fd.as_raw_fd()))? };
54 let dup_fd = unsafe { OwnedFd::from_raw_fd(dup_raw_fd) };
55
56 let init_lock = Flock::lock(dup_fd, FlockArg::LockExclusive)
57 .map_err(|(_, e)| anyhow!("init-lock failed: {}", e))?;
58
59 let len = NonZeroUsize::new(size_of::<pthread_mutex_t>())
60 .expect("pthread_mutex_t has nonzero size");
61 let raw_ptr = unsafe {
62 mmap(
63 None,
64 len,
65 ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
66 MapFlags::MAP_SHARED,
67 &fd,
68 0,
69 )?
70 };
71
72 let mtx_ptr = raw_ptr.as_ptr() as *mut pthread_mutex_t;
73
74 let first = unsafe { *(mtx_ptr as *const c_int) };
75 if first == 0 {
76 let mut attr: pthread_mutexattr_t = unsafe { zeroed() };
77 unsafe {
78 Errno::result(pthread_mutexattr_init(&mut attr))?;
79 Errno::result(pthread_mutexattr_setpshared(
80 &mut attr,
81 PTHREAD_PROCESS_SHARED,
82 ))?;
83 Errno::result(pthread_mutexattr_setrobust(&mut attr, PTHREAD_MUTEX_ROBUST))?;
84 Errno::result(pthread_mutex_init(mtx_ptr, &attr))?;
85 Errno::result(pthread_mutexattr_destroy(&mut attr))?;
86 }
87 }
88
89 init_lock
90 .unlock()
91 .map_err(|(_, e)| anyhow!("init-unlock failed: {}", e))?;
92
93 Ok(Self {
94 _fd: fd,
95 ptr: mtx_ptr,
96 })
97 }
98
99 pub fn lock(&self) -> Result<LockResult> {
100 let err = unsafe { pthread_mutex_lock(self.ptr) };
101 if err == EOWNERDEAD {
102 unsafe {
103 Errno::result(pthread_mutex_consistent(self.ptr))
104 .map_err(|e| anyhow!("pthread_mutex_consistent failed: {e}"))?;
105 }
106 Ok(LockResult::OwnerDiedRecovered)
107 } else {
108 Errno::result(err)
109 .map(|_| LockResult::Acquired)
110 .map_err(|e| anyhow!("pthread_mutex_lock failed: {e}"))
111 }
112 }
113
114 pub fn unlock(&self) -> Result<(), Errno> {
115 unsafe { Errno::result(pthread_mutex_unlock(self.ptr)).map(|_| ()) }
116 }
117}
118
119impl Drop for InterprocessMutex {
120 fn drop(&mut self) {
121 unsafe {
122 Errno::result(munmap(
125 self.ptr as *mut c_void,
126 size_of::<pthread_mutex_t>(),
127 ))
128 .ok();
129 }
130 }
131}