linux_interprocess/
interprocess_mutex.rs

1use std::{
2    ffi::c_void,
3    mem::{size_of, zeroed},
4    num::NonZeroUsize,
5    os::{
6        fd::{FromRawFd, OwnedFd},
7        unix::io::AsRawFd,
8    },
9};
10
11use anyhow::{Result, anyhow};
12use nix::{
13    errno::Errno,
14    fcntl::{Flock, FlockArg, OFlag, open},
15    libc::{
16        EOWNERDEAD, PTHREAD_MUTEX_ROBUST, PTHREAD_PROCESS_SHARED, c_int, dup, munmap, off_t,
17        pthread_mutex_consistent, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_t,
18        pthread_mutex_unlock, pthread_mutexattr_destroy, pthread_mutexattr_init,
19        pthread_mutexattr_setpshared, pthread_mutexattr_setrobust, pthread_mutexattr_t,
20    },
21    sys::{
22        mman::{MapFlags, ProtFlags, mmap},
23        stat::Mode,
24    },
25    unistd::ftruncate,
26};
27
28/// The result of locking an interprocess mutex.
29pub enum LockResult {
30    /// Mutex acquired normally without prior owner death.
31    Acquired,
32    /// Mutex acquired after recovering from a previous owner's death.
33    OwnerDiedRecovered,
34}
35
36pub struct InterprocessMutex {
37    _fd: OwnedFd,
38    ptr: *mut pthread_mutex_t,
39}
40
41impl InterprocessMutex {
42    pub fn new(name: &str) -> Result<Self> {
43        let path = format!("/dev/shm/{}.mtx", name);
44        let fd = open(
45            path.as_str(),
46            OFlag::O_CREAT | OFlag::O_RDWR,
47            Mode::from_bits_truncate(0o600),
48        )?;
49
50        ftruncate(&fd, size_of::<pthread_mutex_t>() as off_t)?;
51
52        let dup_raw_fd = unsafe { Errno::result(dup(fd.as_raw_fd()))? };
53        let dup_fd = unsafe { OwnedFd::from_raw_fd(dup_raw_fd) };
54
55        let init_lock = Flock::lock(dup_fd, FlockArg::LockExclusive)
56            .map_err(|(_, e)| anyhow!("init-lock failed: {}", e))?;
57
58        let len = NonZeroUsize::new(size_of::<pthread_mutex_t>())
59            .expect("pthread_mutex_t has nonzero size");
60        let raw_ptr = unsafe {
61            mmap(
62                None,
63                len,
64                ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
65                MapFlags::MAP_SHARED,
66                &fd,
67                0,
68            )?
69        };
70
71        let mtx_ptr = raw_ptr.as_ptr() as *mut pthread_mutex_t;
72
73        let first = unsafe { *(mtx_ptr as *const c_int) };
74        if first == 0 {
75            let mut attr: pthread_mutexattr_t = unsafe { zeroed() };
76            unsafe {
77                Errno::result(pthread_mutexattr_init(&mut attr))?;
78                Errno::result(pthread_mutexattr_setpshared(
79                    &mut attr,
80                    PTHREAD_PROCESS_SHARED,
81                ))?;
82                Errno::result(pthread_mutexattr_setrobust(&mut attr, PTHREAD_MUTEX_ROBUST))?;
83                Errno::result(pthread_mutex_init(mtx_ptr, &attr))?;
84                Errno::result(pthread_mutexattr_destroy(&mut attr))?;
85            }
86        }
87
88        init_lock
89            .unlock()
90            .map_err(|(_, e)| anyhow!("init-unlock failed: {}", e))?;
91
92        Ok(Self {
93            _fd: fd,
94            ptr: mtx_ptr,
95        })
96    }
97
98    pub fn lock(&self) -> Result<LockResult> {
99        let err = unsafe { pthread_mutex_lock(self.ptr) };
100        if err == EOWNERDEAD {
101            unsafe {
102                Errno::result(pthread_mutex_consistent(self.ptr))
103                    .map_err(|e| anyhow!("pthread_mutex_consistent failed: {e}"))?;
104            }
105            Ok(LockResult::OwnerDiedRecovered)
106        } else {
107            Errno::result(err)
108                .map(|_| LockResult::Acquired)
109                .map_err(|e| anyhow!("pthread_mutex_lock failed: {e}"))
110        }
111    }
112
113    pub fn unlock(&self) -> Result<(), Errno> {
114        unsafe { Errno::result(pthread_mutex_unlock(self.ptr)).map(|_| ()) }
115    }
116}
117
118impl Drop for InterprocessMutex {
119    fn drop(&mut self) {
120        unsafe {
121            // Don't destroy the on-disk mutex so it remains valid for other processes
122            // Errno::result(nix::libc::pthread_mutex_destroy(self.ptr)).ok();
123            Errno::result(munmap(
124                self.ptr as *mut c_void,
125                size_of::<pthread_mutex_t>(),
126            ))
127            .ok();
128        }
129    }
130}