process_sync/
condvar.rs

1use libc::{
2    pid_t, pthread_cond_broadcast, pthread_cond_destroy, pthread_cond_init, pthread_cond_signal,
3    pthread_cond_t, pthread_cond_wait, pthread_condattr_destroy, pthread_condattr_init,
4    pthread_condattr_setpshared, pthread_condattr_t, PTHREAD_COND_INITIALIZER,
5    PTHREAD_PROCESS_SHARED,
6};
7
8use crate::{
9    shared_memory::SharedMemoryObject,
10    util::{check_libc_err, getpid},
11    SharedMutex,
12};
13
14/// Simple conditional variable that can be shared between processes and used with [`SharedMutex`]
15///
16/// Dropping conditional variable in creating process while it being used by another process will cause undefined behaviour.
17/// It is recommended to drop this conditional variable in creating process only after no other process has access to it.
18///
19/// For more information see [`pthread_cond_init`](https://man7.org/linux/man-pages/man3/pthread_cond_init.3p.html), [`pthread_cond_wait`](https://man7.org/linux/man-pages/man3/pthread_cond_wait.3p.html), [`SharedMutex`] and [`SharedMemoryObject`].
20///
21/// # Example
22/// ```rust
23/// # use std::error::Error;
24/// # use std::thread::sleep;
25/// # use std::time::Duration;
26/// #
27/// # use libc::fork;
28/// #
29/// # use process_sync::private::check_libc_err;
30/// # use process_sync::SharedMutex;
31/// # use process_sync::SharedCondvar;
32/// #
33/// # fn main() -> Result<(), Box<dyn Error>> {
34/// #
35/// let mut mutex = SharedMutex::new()?;
36/// let mut condvar = SharedCondvar::new()?;
37///
38/// let pid = unsafe { fork() };
39/// assert!(pid >= 0);
40///
41/// if pid == 0 {
42///     println!("child lock()");
43///     mutex.lock()?;
44///     println!("child wait()");
45///     condvar.wait(&mut mutex)?;
46///     println!("child notified");
47///     mutex.unlock()?;
48///     println!("child unlocked");
49/// } else {
50///     sleep(Duration::from_millis(40));
51///     println!("parent notify()");
52///     condvar.notify_one()?;
53/// }
54/// #
55/// #     Ok(())
56/// # }
57/// ```
58///
59/// Output:
60/// ```txt
61/// child lock()
62/// child wait()
63/// parent notify()
64/// child notified
65/// child unlocked
66/// ```
67pub struct SharedCondvar {
68    condvar: SharedMemoryObject<pthread_cond_t>,
69    owner_pid: pid_t,
70}
71
72impl SharedCondvar {
73    /// Creates new [`SharedCondvar`]
74    ///
75    /// # Errors
76    /// If allocation or initialization fails returns error from [`last_os_error`].
77    ///
78    /// [`last_os_error`]: https://doc.rust-lang.org/stable/std/io/struct.Error.html#method.last_os_error
79    pub fn new() -> std::io::Result<Self> {
80        let mut condvar = SharedMemoryObject::new(PTHREAD_COND_INITIALIZER)?;
81        initialize_condvar(condvar.get_mut())?;
82
83        let owner_pid = getpid();
84        Ok(Self { condvar, owner_pid })
85    }
86
87    /// Waits on given mutex
88    ///
89    /// This function will block until notified by another process
90    ///
91    /// # Errors
92    /// If any pthread call fails, returns error from [`last_os_error`]. For possible errors see [`pthread_cond_wait`](https://man7.org/linux/man-pages/man3/pthread_cond_wait.3p.html).
93    ///
94    /// [`last_os_error`]: https://doc.rust-lang.org/stable/std/io/struct.Error.html#method.last_os_error
95    pub fn wait(&mut self, mutex: &mut SharedMutex) -> std::io::Result<()> {
96        check_libc_err(unsafe { pthread_cond_wait(self.condvar.get_mut(), mutex.get_mut()) })?;
97        Ok(())
98    }
99
100    /// Notifies one of processes that are waiting on this condvar
101    ///
102    /// # Errors
103    /// If any pthread call fails, returns error from [`last_os_error`]. For possible errors see [`pthread_cond_signal`](https://man7.org/linux/man-pages/man3/pthread_cond_broadcast.3p.html).
104    ///
105    /// [`last_os_error`]: https://doc.rust-lang.org/stable/std/io/struct.Error.html#method.last_os_error
106    pub fn notify_one(&mut self) -> std::io::Result<()> {
107        check_libc_err(unsafe { pthread_cond_signal(self.condvar.get_mut()) })?;
108        Ok(())
109    }
110
111    /// Notifies all processes that are waiting on this condvar
112    ///
113    /// # Errors
114    /// If any pthread call fails, returns error from [`last_os_error`]. For possible errors see [`pthread_cond_broadcast`](https://man7.org/linux/man-pages/man3/pthread_cond_broadcast.3p.html).
115    ///
116    /// [`last_os_error`]: https://doc.rust-lang.org/stable/std/io/struct.Error.html#method.last_os_error
117    pub fn notify_all(&mut self) -> std::io::Result<()> {
118        check_libc_err(unsafe { pthread_cond_broadcast(self.condvar.get_mut()) })?;
119        Ok(())
120    }
121}
122
123impl Drop for SharedCondvar {
124    fn drop(&mut self) {
125        if getpid() == self.owner_pid {
126            check_libc_err(unsafe { pthread_cond_destroy(self.condvar.get_mut()) })
127                .expect("cannot destroy mutex");
128        }
129    }
130}
131
132fn initialize_condvar(condvar: &mut pthread_cond_t) -> std::io::Result<()> {
133    let mut attr: pthread_condattr_t = unsafe { std::mem::zeroed() };
134    check_libc_err(unsafe { pthread_condattr_init(&mut attr) })?;
135
136    check_libc_err(unsafe { pthread_condattr_setpshared(&mut attr, PTHREAD_PROCESS_SHARED) })
137        .expect("cannot set PTHREAD_PROCESS_SHARED");
138
139    let ret = check_libc_err(unsafe { pthread_cond_init(condvar, &mut attr) });
140
141    destroy_condattr(attr).expect("cannot destroy condattr");
142
143    ret.map(|_| ())
144}
145
146fn destroy_condattr(mut attr: pthread_condattr_t) -> std::io::Result<()> {
147    check_libc_err(unsafe { pthread_condattr_destroy(&mut attr) })?;
148    Ok(())
149}