linux_interprocess/
interprocess_mutex.rs

1use std::{
2    ffi::c_void,
3    mem::{size_of, zeroed},
4    num::NonZeroUsize,
5    os::{
6        fd::{FromRawFd, OwnedFd},
7        unix::io::AsRawFd,
8    },
9};
10
11use anyhow::{Result, anyhow};
12use nix::{
13    errno::Errno,
14    fcntl::{Flock, FlockArg, OFlag, open},
15    libc::{
16        EOWNERDEAD, PTHREAD_MUTEX_ROBUST, PTHREAD_PROCESS_SHARED, c_int, dup, munmap, off_t,
17        pthread_mutex_consistent, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_t,
18        pthread_mutex_unlock, pthread_mutexattr_destroy, pthread_mutexattr_init,
19        pthread_mutexattr_setpshared, pthread_mutexattr_setrobust, pthread_mutexattr_t,
20    },
21    sys::{
22        mman::{MapFlags, ProtFlags, mmap},
23        stat::Mode,
24    },
25    unistd::ftruncate,
26};
27
28/// The result of locking an interprocess mutex.
29#[derive(Debug, Clone)]
30pub enum LockResult {
31    /// Mutex acquired normally without prior owner death.
32    Acquired,
33    /// Mutex acquired after recovering from a previous owner's death.
34    OwnerDiedRecovered,
35}
36
37pub struct InterprocessMutex {
38    _fd: OwnedFd,
39    ptr: *mut pthread_mutex_t,
40}
41
42impl InterprocessMutex {
43    pub fn new(name: &str) -> Result<Self> {
44        let path = format!("/dev/shm/{}.mtx", name);
45        let fd = open(
46            path.as_str(),
47            OFlag::O_CREAT | OFlag::O_RDWR,
48            Mode::from_bits_truncate(0o600),
49        )?;
50
51        ftruncate(&fd, size_of::<pthread_mutex_t>() as off_t)?;
52
53        let dup_raw_fd = unsafe { Errno::result(dup(fd.as_raw_fd()))? };
54        let dup_fd = unsafe { OwnedFd::from_raw_fd(dup_raw_fd) };
55
56        let init_lock = Flock::lock(dup_fd, FlockArg::LockExclusive)
57            .map_err(|(_, e)| anyhow!("init-lock failed: {}", e))?;
58
59        let len = NonZeroUsize::new(size_of::<pthread_mutex_t>())
60            .expect("pthread_mutex_t has nonzero size");
61        let raw_ptr = unsafe {
62            mmap(
63                None,
64                len,
65                ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
66                MapFlags::MAP_SHARED,
67                &fd,
68                0,
69            )?
70        };
71
72        let mtx_ptr = raw_ptr.as_ptr() as *mut pthread_mutex_t;
73
74        let first = unsafe { *(mtx_ptr as *const c_int) };
75        if first == 0 {
76            let mut attr: pthread_mutexattr_t = unsafe { zeroed() };
77            unsafe {
78                Errno::result(pthread_mutexattr_init(&mut attr))?;
79                Errno::result(pthread_mutexattr_setpshared(
80                    &mut attr,
81                    PTHREAD_PROCESS_SHARED,
82                ))?;
83                Errno::result(pthread_mutexattr_setrobust(&mut attr, PTHREAD_MUTEX_ROBUST))?;
84                Errno::result(pthread_mutex_init(mtx_ptr, &attr))?;
85                Errno::result(pthread_mutexattr_destroy(&mut attr))?;
86            }
87        }
88
89        init_lock
90            .unlock()
91            .map_err(|(_, e)| anyhow!("init-unlock failed: {}", e))?;
92
93        Ok(Self {
94            _fd: fd,
95            ptr: mtx_ptr,
96        })
97    }
98
99    pub fn lock(&self) -> Result<LockResult> {
100        let err = unsafe { pthread_mutex_lock(self.ptr) };
101        if err == EOWNERDEAD {
102            unsafe {
103                Errno::result(pthread_mutex_consistent(self.ptr))
104                    .map_err(|e| anyhow!("pthread_mutex_consistent failed: {e}"))?;
105            }
106            Ok(LockResult::OwnerDiedRecovered)
107        } else {
108            Errno::result(err)
109                .map(|_| LockResult::Acquired)
110                .map_err(|e| anyhow!("pthread_mutex_lock failed: {e}"))
111        }
112    }
113
114    pub fn unlock(&self) -> Result<(), Errno> {
115        unsafe { Errno::result(pthread_mutex_unlock(self.ptr)).map(|_| ()) }
116    }
117}
118
119impl Drop for InterprocessMutex {
120    fn drop(&mut self) {
121        unsafe {
122            // Don't destroy the on-disk mutex so it remains valid for other processes
123            // Errno::result(nix::libc::pthread_mutex_destroy(self.ptr)).ok();
124            Errno::result(munmap(
125                self.ptr as *mut c_void,
126                size_of::<pthread_mutex_t>(),
127            ))
128            .ok();
129        }
130    }
131}