rendezvous/
lib.rs

1//! # Easier Rendezvous Channels
2//!
3//! In rust, [mpsc::channel](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html) can be used as a synchronization
4//! primitive between threads by utilizing the fact that we can block on the receiver's `recv()` function until all senders
5//! are dropped.
6//!
7//! This crate aims at giving the concept an expressive name and at reducing some classes of race conditions, namely those
8//! where the original sender was not dropped before the call to `recv()`.
9//!
10//! This version of the crate only supports synchronous code due to the dropping semantics.
11//!
12//! ## Crate Features
13//!
14//! * `log` - Enables support for the `log` crate.
15//! * `tokio` - Enables the `rendezvous_async` method to asynchronously wait for the rendezvous
16//!             points to be reached.
17//!
18//! ## Example usage
19//!
20//! ```rust
21//! use std::sync::{Arc, Mutex};
22//! use std::thread;
23//! use std::time::Duration;
24//! use rendezvous::{Rendezvous, RendezvousGuard};
25//!
26//! /// A slow worker function. Sleeps, then mutates a value.
27//! fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
28//!     thread::sleep(Duration::from_millis(400));
29//!     let mut value = value.lock().unwrap();
30//!     *value = 42;
31//! }
32//!
33//! fn example() {
34//!     // The guard that ensures synchronization across threads.
35//!     // Rendezvous itself acts as a guard: If not explicitly dropped, it will block the current
36//!     // scope until all rendezvous points are reached.
37//!     let rendezvous = Rendezvous::new();
38//!
39//!     // A value to mutate in a different thread.
40//!     let value = Arc::new(Mutex::new(0u32));
41//!
42//!     // Run the worker in a thread.
43//!     thread::spawn({
44//!         let guard = rendezvous.fork_guard();
45//!         let value = value.clone();
46//!         move || slow_worker_fn(guard, value)
47//!     });
48//!
49//!     // Block until the thread has finished its work.
50//!     rendezvous.rendezvous();
51//!
52//!     // The thread finished in time.
53//!     assert_eq!(*(value.lock().unwrap()), 42);
54//! }
55//! ```
56
57// only enables the `doc_cfg` feature when
58// the `docsrs` configuration attribute is defined
59#![cfg_attr(docsrs, feature(doc_cfg))]
60
61#[cfg(feature = "log")]
62use log::{debug, error, trace};
63
64#[cfg(feature = "tokio")]
65use tokio::task::{self, JoinError};
66
67use std::error::Error;
68use std::fmt::{Display, Formatter};
69use std::sync::mpsc;
70use std::sync::mpsc::RecvTimeoutError;
71use std::time::Duration;
72
73/// [`Rendezvous`] is a synchronization primitive that allows two threads to rendezvous
74/// at a certain point in the code before proceeding.
75pub struct Rendezvous {
76    /// The receiver used for the rendezvous process. If all senders are dropped, the
77    /// receiver allows the [`Rendezvous::rendezvous`] method to pass.
78    rx: mpsc::Receiver<()>,
79    /// The original sender for the rendezvous process. Will be forked using [`Rendezvous::fork_guard`]
80    /// or transiently forked from [`RendezvousGuard::fork`]. If all senders are dropped,
81    /// [`Rendezvous::rendezvous`] can proceed.
82    tx: Option<mpsc::Sender<()>>,
83}
84
85/// A guard forked off a [`Rendezvous`] struct.
86pub struct RendezvousGuard(mpsc::Sender<()>);
87
88impl Rendezvous {
89    /// Create a new instance of a [`Rendezvous`] channel.
90    ///
91    /// # Returns
92    ///
93    /// The newly created rendezvous channel.
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use rendezvous::Rendezvous;
99    ///
100    /// let rendezvous = Rendezvous::new();
101    /// ```
102    pub fn new() -> Self {
103        let (tx, rx) = mpsc::channel();
104        Self { tx: Some(tx), rx }
105    }
106
107    /// Forks a guard off the [`Rendezvous`] channel.
108    ///
109    /// When all guards are dropped, [`Rendezvous::rendezvous`] will proceed; until then, that
110    /// call blocks.
111    ///
112    /// ## Example
113    ///
114    /// See [`Rendezvous::new`] for a usage example.
115    ///
116    /// <div class="warning">
117    /// Note that forking and not dropping a guard in the same thread is a deadlock:
118    /// </div>
119    ///
120    /// ```no_run
121    /// use rendezvous::Rendezvous;
122    ///
123    /// let mut rendezvous = Rendezvous::new();
124    /// let guard = rendezvous.fork_guard();
125    /// rendezvous.rendezvous(); // will deadlock
126    /// drop(guard);
127    /// ```
128    pub fn fork_guard(&self) -> RendezvousGuard {
129        if let Some(tx) = &self.tx {
130            #[cfg(feature = "log")]
131            {
132                trace!("Forking rendezvous guard");
133            }
134            RendezvousGuard(tx.clone())
135        } else {
136            unreachable!("Fork called after Rendezvous is dropped")
137        }
138    }
139
140    /// Executes the rendezvous process.
141    ///
142    /// ## Example
143    ///
144    /// ```
145    /// use std::sync::{Arc, Mutex};
146    /// use std::thread;
147    /// use std::time::Duration;
148    /// use rendezvous::{Rendezvous, RendezvousGuard};
149    ///
150    /// // A slow worker function. Sleeps, then mutates a value.
151    /// fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
152    ///     thread::sleep(Duration::from_millis(400));
153    ///     let mut value = value.lock().unwrap();
154    ///     *value = 42;
155    /// }
156    ///
157    /// // The guard that ensures synchronization across threads.
158    /// let rendezvous = Rendezvous::new();
159    ///
160    /// // A value to mutate in a different thread.
161    /// let value = Arc::new(Mutex::new(0u32));
162    ///
163    /// // Run the worker in a thread.
164    /// thread::spawn({
165    ///     let guard = rendezvous.fork_guard();
166    ///     let value = value.clone();
167    ///     move || slow_worker_fn(guard, value)
168    /// });
169    ///
170    /// // Block until the thread has finished its work.
171    /// rendezvous.rendezvous();
172    ///
173    /// // The thread finished in time.
174    /// assert_eq!(*(value.lock().unwrap()), 42);
175    /// ```
176    ///
177    /// <div class="warning">
178    /// Note that forking and not dropping a guard in the same thread is a deadlock:
179    /// </div>
180    ///
181    /// ```no_run
182    /// use rendezvous::Rendezvous;
183    ///
184    /// let mut rendezvous = Rendezvous::new();
185    /// let guard = rendezvous.fork_guard();
186    /// rendezvous.rendezvous(); // will deadlock
187    /// drop(guard);
188    /// ```
189    pub fn rendezvous(mut self) {
190        self.rendezvous_internal();
191    }
192
193    /// Asynchronously executes the rendezvous process.
194    ///
195    /// ## Usage notes
196    ///
197    /// When the rendezvous channel is dropped without a call to [`Rendezvous::rendezvous_async`],
198    /// the currently executed will block until all rendezvous points are reached.
199    ///
200    /// ## Example
201    ///
202    /// ```
203    /// use std::sync::{Arc, Mutex};
204    /// use std::thread;
205    /// use std::time::Duration;
206    /// use rendezvous::{Rendezvous, RendezvousGuard};
207    ///
208    /// // A slow worker function. Sleeps, then mutates a value.
209    /// fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
210    ///     thread::sleep(Duration::from_millis(400));
211    ///     let mut value = value.lock().unwrap();
212    ///     *value = 42;
213    /// }
214    ///
215    /// // The guard that ensures synchronization across threads.
216    /// let rendezvous = Rendezvous::new();
217    ///
218    /// // A value to mutate in a different thread.
219    /// let value = Arc::new(Mutex::new(0u32));
220    ///
221    /// // Run the worker in a thread.
222    /// thread::spawn({
223    ///     let guard = rendezvous.fork_guard();
224    ///     let value = value.clone();
225    ///     move || slow_worker_fn(guard, value)
226    /// });
227    ///
228    /// // Block until the thread has finished its work.
229    /// # tokio_test::block_on(async {
230    /// rendezvous.rendezvous_async().await.ok();
231    /// # });
232    ///
233    /// // The thread finished in time.
234    /// assert_eq!(*(value.lock().unwrap()), 42);
235    /// ```
236    #[cfg(feature = "tokio")]
237    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
238    pub async fn rendezvous_async(self) -> Result<(), JoinError> {
239        let handle = task::spawn_blocking(|| self.rendezvous());
240        handle.await
241    }
242
243    /// Executes the rendezvous process with a timeout.
244    ///
245    /// ## Example
246    ///
247    /// ```
248    /// use std::sync::{Arc, Mutex};
249    /// use std::thread;
250    /// use std::time::Duration;
251    /// use rendezvous::{Rendezvous, RendezvousGuard, RendezvousTimeoutError};
252    ///
253    /// // A slow worker function. Sleeps, then mutates a value.
254    /// fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
255    ///     thread::sleep(Duration::from_millis(400));
256    ///     let mut value = value.lock().unwrap();
257    ///     *value = 42;
258    /// }
259    ///
260    /// // The guard that ensures synchronization across threads.
261    /// let mut rendezvous = Rendezvous::new();
262    ///
263    /// // A value to mutate in a different thread.
264    /// let value = Arc::new(Mutex::new(0u32));
265    ///
266    /// // Run the worker in a thread.
267    /// thread::spawn({
268    ///     let guard = rendezvous.fork_guard();
269    ///     let value = value.clone();
270    ///     move || slow_worker_fn(guard, value)
271    /// });
272    ///
273    /// // Wait briefly - this will fail.
274    /// let result = rendezvous.rendezvous_timeout(Duration::from_millis(10));
275    /// assert_eq!(result, Err(RendezvousTimeoutError::Timeout));
276    ///
277    /// // Block until the thread has finished its work, or the timeout occurs.
278    /// let result = rendezvous.rendezvous_timeout(Duration::from_secs(1));
279    /// assert_eq!(result, Ok(()));
280    ///
281    /// // The thread finished in time.
282    /// assert_eq!(*(value.lock().unwrap()), 42);
283    /// ```
284    ///
285    /// <div class="warning">
286    /// Note that forking and not dropping a guard is generally a deadlock, and a timeout will occur:
287    /// </div>
288    ///
289    /// ```
290    /// use std::time::Duration;
291    /// use rendezvous::{Rendezvous, RendezvousTimeoutError};
292    ///
293    /// let mut rendezvous = Rendezvous::new();
294    /// let guard = rendezvous.fork_guard();
295    /// assert_eq!(rendezvous.rendezvous_timeout(Duration::from_millis(10)), Err(RendezvousTimeoutError::Timeout));
296    /// drop(guard);
297    /// ```
298    pub fn rendezvous_timeout(&mut self, timeout: Duration) -> Result<(), RendezvousTimeoutError> {
299        if let Some(tx) = self.tx.take() {
300            drop(tx);
301        } else {
302            #[cfg(feature = "log")]
303            {
304                trace!("Rendezvous was called previously, attempting again");
305            }
306        }
307        match self.rx.recv_timeout(timeout) {
308            Ok(_) => Ok(()),
309            Err(err) => match err {
310                RecvTimeoutError::Timeout => {
311                    #[cfg(feature = "log")]
312                    {
313                        debug!("A timeout occurred during a rendezvous");
314                    }
315                    Err(RendezvousTimeoutError::Timeout)
316                }
317                RecvTimeoutError::Disconnected => Ok(()),
318            },
319        }
320    }
321
322    /// Performs a rendezvous operation internally.
323    ///
324    /// This function borrows `self` and drops the `tx` channel if it exists.
325    /// It then blocks on the `rx` channel, waiting for all [`RendezvousGuard`] instances to be
326    /// dropped, and discards any error that may occur.
327    fn rendezvous_internal(&mut self) {
328        if let Some(tx) = self.tx.take() {
329            drop(tx);
330        }
331        self.rx.recv().ok();
332    }
333}
334
335impl Default for Rendezvous {
336    fn default() -> Self {
337        Rendezvous::new()
338    }
339}
340
341impl RendezvousGuard {
342    /// Forks a guard off the owning [`Rendezvous`] channel.
343    ///
344    /// When all guards are dropped, [`Rendezvous::rendezvous`] will proceed; until then, that
345    /// call blocks.
346    pub fn fork(&self) -> RendezvousGuard {
347        #[cfg(feature = "log")]
348        {
349            trace!("Forking nested rendezvous guard");
350        }
351        RendezvousGuard(self.0.clone())
352    }
353
354    /// A no-operation that consumes self, marking a rendezvous point.
355    ///
356    /// ## Example
357    ///
358    /// ```
359    /// use rendezvous::Rendezvous;
360    ///
361    /// let mut rendezvous = Rendezvous::new();
362    /// let guard = rendezvous.fork_guard();
363    /// guard.completed();
364    /// rendezvous.rendezvous();
365    /// ```
366    pub fn completed(self) {}
367}
368
369impl Clone for RendezvousGuard {
370    fn clone(&self) -> Self {
371        self.fork()
372    }
373}
374
375impl Drop for Rendezvous {
376    fn drop(&mut self) {
377        #[cfg(all(debug_assertions, feature = "log"))]
378        if self.tx.is_some() {
379            error!("Implementation error: Rendezvous method not invoked")
380        }
381        self.rendezvous_internal()
382    }
383}
384
385/// Timeout error that may occur during a rendezvous process.
386///
387/// This error is used to indicate that a timeout has occurred while waiting for a rendezvous.
388#[derive(Debug, Eq, PartialEq)]
389pub enum RendezvousTimeoutError {
390    /// A timeout occurred that may occur during a rendezvous process. Forks have not disconnected
391    /// yet, so the work might not have been completed.
392    Timeout,
393}
394
395impl Display for RendezvousTimeoutError {
396    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
397        match self {
398            RendezvousTimeoutError::Timeout => write!(f, "Timeout"),
399        }
400    }
401}
402
403impl Error for RendezvousTimeoutError {}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::thread;
409
410    #[test]
411    fn rendezvous_can_pass_away() {
412        let rendezvous = Rendezvous::new();
413        rendezvous.rendezvous();
414    }
415
416    #[test]
417    fn rendezvous_can_be_dropped_right_away() {
418        let rendezvous = Rendezvous::new();
419        drop(rendezvous);
420    }
421
422    #[test]
423    fn test_timeout() {
424        let mut rendezvous = Rendezvous::new();
425        let guard = rendezvous.fork_guard();
426
427        let result = rendezvous.rendezvous_timeout(Duration::from_millis(100));
428        assert_eq!(result, Err(RendezvousTimeoutError::Timeout));
429        drop(guard);
430    }
431
432    #[test]
433    fn test_background_forks() {
434        let rendezvous = Rendezvous::new();
435
436        let guard = rendezvous.fork_guard();
437        thread::spawn(move || {
438            let _guard = guard;
439            thread::sleep(Duration::from_millis(400))
440        });
441
442        rendezvous.rendezvous();
443    }
444}