rendezvous/lib.rs
1//! # Easier Rendezvous Channels
2//!
3//! In rust, [mpsc::channel](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html) can be used as a synchronization
4//! primitive between threads by utilizing the fact that we can block on the receiver's `recv()` function until all senders
5//! are dropped.
6//!
7//! This crate aims at giving the concept an expressive name and at reducing some classes of race conditions, namely those
8//! where the original sender was not dropped before the call to `recv()`.
9//!
10//! This version of the crate only supports synchronous code due to the dropping semantics.
11//!
12//! ## Crate Features
13//!
14//! * `log` - Enables support for the `log` crate.
15//! * `tokio` - Enables the `rendezvous_async` method to asynchronously wait for the rendezvous
16//! points to be reached.
17//!
18//! ## Example usage
19//!
20//! ```rust
21//! use std::sync::{Arc, Mutex};
22//! use std::thread;
23//! use std::time::Duration;
24//! use rendezvous::{Rendezvous, RendezvousGuard};
25//!
26//! /// A slow worker function. Sleeps, then mutates a value.
27//! fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
28//! thread::sleep(Duration::from_millis(400));
29//! let mut value = value.lock().unwrap();
30//! *value = 42;
31//! }
32//!
33//! fn example() {
34//! // The guard that ensures synchronization across threads.
35//! // Rendezvous itself acts as a guard: If not explicitly dropped, it will block the current
36//! // scope until all rendezvous points are reached.
37//! let rendezvous = Rendezvous::new();
38//!
39//! // A value to mutate in a different thread.
40//! let value = Arc::new(Mutex::new(0u32));
41//!
42//! // Run the worker in a thread.
43//! thread::spawn({
44//! let guard = rendezvous.fork_guard();
45//! let value = value.clone();
46//! move || slow_worker_fn(guard, value)
47//! });
48//!
49//! // Block until the thread has finished its work.
50//! rendezvous.rendezvous();
51//!
52//! // The thread finished in time.
53//! assert_eq!(*(value.lock().unwrap()), 42);
54//! }
55//! ```
56
57// only enables the `doc_cfg` feature when
58// the `docsrs` configuration attribute is defined
59#![cfg_attr(docsrs, feature(doc_cfg))]
60
61#[cfg(feature = "log")]
62use log::{debug, error, trace};
63
64#[cfg(feature = "tokio")]
65use tokio::task::{self, JoinError};
66
67use std::error::Error;
68use std::fmt::{Display, Formatter};
69use std::sync::mpsc;
70use std::sync::mpsc::RecvTimeoutError;
71use std::time::Duration;
72
73/// [`Rendezvous`] is a synchronization primitive that allows two threads to rendezvous
74/// at a certain point in the code before proceeding.
75pub struct Rendezvous {
76 /// The receiver used for the rendezvous process. If all senders are dropped, the
77 /// receiver allows the [`Rendezvous::rendezvous`] method to pass.
78 rx: mpsc::Receiver<()>,
79 /// The original sender for the rendezvous process. Will be forked using [`Rendezvous::fork_guard`]
80 /// or transiently forked from [`RendezvousGuard::fork`]. If all senders are dropped,
81 /// [`Rendezvous::rendezvous`] can proceed.
82 tx: Option<mpsc::Sender<()>>,
83}
84
85/// A guard forked off a [`Rendezvous`] struct.
86pub struct RendezvousGuard(mpsc::Sender<()>);
87
88impl Rendezvous {
89 /// Create a new instance of a [`Rendezvous`] channel.
90 ///
91 /// # Returns
92 ///
93 /// The newly created rendezvous channel.
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use rendezvous::Rendezvous;
99 ///
100 /// let rendezvous = Rendezvous::new();
101 /// ```
102 pub fn new() -> Self {
103 let (tx, rx) = mpsc::channel();
104 Self { tx: Some(tx), rx }
105 }
106
107 /// Forks a guard off the [`Rendezvous`] channel.
108 ///
109 /// When all guards are dropped, [`Rendezvous::rendezvous`] will proceed; until then, that
110 /// call blocks.
111 ///
112 /// ## Example
113 ///
114 /// See [`Rendezvous::new`] for a usage example.
115 ///
116 /// <div class="warning">
117 /// Note that forking and not dropping a guard in the same thread is a deadlock:
118 /// </div>
119 ///
120 /// ```no_run
121 /// use rendezvous::Rendezvous;
122 ///
123 /// let mut rendezvous = Rendezvous::new();
124 /// let guard = rendezvous.fork_guard();
125 /// rendezvous.rendezvous(); // will deadlock
126 /// drop(guard);
127 /// ```
128 pub fn fork_guard(&self) -> RendezvousGuard {
129 if let Some(tx) = &self.tx {
130 #[cfg(feature = "log")]
131 {
132 trace!("Forking rendezvous guard");
133 }
134 RendezvousGuard(tx.clone())
135 } else {
136 unreachable!("Fork called after Rendezvous is dropped")
137 }
138 }
139
140 /// Executes the rendezvous process.
141 ///
142 /// ## Example
143 ///
144 /// ```
145 /// use std::sync::{Arc, Mutex};
146 /// use std::thread;
147 /// use std::time::Duration;
148 /// use rendezvous::{Rendezvous, RendezvousGuard};
149 ///
150 /// // A slow worker function. Sleeps, then mutates a value.
151 /// fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
152 /// thread::sleep(Duration::from_millis(400));
153 /// let mut value = value.lock().unwrap();
154 /// *value = 42;
155 /// }
156 ///
157 /// // The guard that ensures synchronization across threads.
158 /// let rendezvous = Rendezvous::new();
159 ///
160 /// // A value to mutate in a different thread.
161 /// let value = Arc::new(Mutex::new(0u32));
162 ///
163 /// // Run the worker in a thread.
164 /// thread::spawn({
165 /// let guard = rendezvous.fork_guard();
166 /// let value = value.clone();
167 /// move || slow_worker_fn(guard, value)
168 /// });
169 ///
170 /// // Block until the thread has finished its work.
171 /// rendezvous.rendezvous();
172 ///
173 /// // The thread finished in time.
174 /// assert_eq!(*(value.lock().unwrap()), 42);
175 /// ```
176 ///
177 /// <div class="warning">
178 /// Note that forking and not dropping a guard in the same thread is a deadlock:
179 /// </div>
180 ///
181 /// ```no_run
182 /// use rendezvous::Rendezvous;
183 ///
184 /// let mut rendezvous = Rendezvous::new();
185 /// let guard = rendezvous.fork_guard();
186 /// rendezvous.rendezvous(); // will deadlock
187 /// drop(guard);
188 /// ```
189 pub fn rendezvous(mut self) {
190 self.rendezvous_internal();
191 }
192
193 /// Asynchronously executes the rendezvous process.
194 ///
195 /// ## Usage notes
196 ///
197 /// When the rendezvous channel is dropped without a call to [`Rendezvous::rendezvous_async`],
198 /// the currently executed will block until all rendezvous points are reached.
199 ///
200 /// ## Example
201 ///
202 /// ```
203 /// use std::sync::{Arc, Mutex};
204 /// use std::thread;
205 /// use std::time::Duration;
206 /// use rendezvous::{Rendezvous, RendezvousGuard};
207 ///
208 /// // A slow worker function. Sleeps, then mutates a value.
209 /// fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
210 /// thread::sleep(Duration::from_millis(400));
211 /// let mut value = value.lock().unwrap();
212 /// *value = 42;
213 /// }
214 ///
215 /// // The guard that ensures synchronization across threads.
216 /// let rendezvous = Rendezvous::new();
217 ///
218 /// // A value to mutate in a different thread.
219 /// let value = Arc::new(Mutex::new(0u32));
220 ///
221 /// // Run the worker in a thread.
222 /// thread::spawn({
223 /// let guard = rendezvous.fork_guard();
224 /// let value = value.clone();
225 /// move || slow_worker_fn(guard, value)
226 /// });
227 ///
228 /// // Block until the thread has finished its work.
229 /// # tokio_test::block_on(async {
230 /// rendezvous.rendezvous_async().await.ok();
231 /// # });
232 ///
233 /// // The thread finished in time.
234 /// assert_eq!(*(value.lock().unwrap()), 42);
235 /// ```
236 #[cfg(feature = "tokio")]
237 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
238 pub async fn rendezvous_async(self) -> Result<(), JoinError> {
239 let handle = task::spawn_blocking(|| self.rendezvous());
240 handle.await
241 }
242
243 /// Executes the rendezvous process with a timeout.
244 ///
245 /// ## Example
246 ///
247 /// ```
248 /// use std::sync::{Arc, Mutex};
249 /// use std::thread;
250 /// use std::time::Duration;
251 /// use rendezvous::{Rendezvous, RendezvousGuard, RendezvousTimeoutError};
252 ///
253 /// // A slow worker function. Sleeps, then mutates a value.
254 /// fn slow_worker_fn(_guard: RendezvousGuard, mut value: Arc<Mutex<u32>>) {
255 /// thread::sleep(Duration::from_millis(400));
256 /// let mut value = value.lock().unwrap();
257 /// *value = 42;
258 /// }
259 ///
260 /// // The guard that ensures synchronization across threads.
261 /// let mut rendezvous = Rendezvous::new();
262 ///
263 /// // A value to mutate in a different thread.
264 /// let value = Arc::new(Mutex::new(0u32));
265 ///
266 /// // Run the worker in a thread.
267 /// thread::spawn({
268 /// let guard = rendezvous.fork_guard();
269 /// let value = value.clone();
270 /// move || slow_worker_fn(guard, value)
271 /// });
272 ///
273 /// // Wait briefly - this will fail.
274 /// let result = rendezvous.rendezvous_timeout(Duration::from_millis(10));
275 /// assert_eq!(result, Err(RendezvousTimeoutError::Timeout));
276 ///
277 /// // Block until the thread has finished its work, or the timeout occurs.
278 /// let result = rendezvous.rendezvous_timeout(Duration::from_secs(1));
279 /// assert_eq!(result, Ok(()));
280 ///
281 /// // The thread finished in time.
282 /// assert_eq!(*(value.lock().unwrap()), 42);
283 /// ```
284 ///
285 /// <div class="warning">
286 /// Note that forking and not dropping a guard is generally a deadlock, and a timeout will occur:
287 /// </div>
288 ///
289 /// ```
290 /// use std::time::Duration;
291 /// use rendezvous::{Rendezvous, RendezvousTimeoutError};
292 ///
293 /// let mut rendezvous = Rendezvous::new();
294 /// let guard = rendezvous.fork_guard();
295 /// assert_eq!(rendezvous.rendezvous_timeout(Duration::from_millis(10)), Err(RendezvousTimeoutError::Timeout));
296 /// drop(guard);
297 /// ```
298 pub fn rendezvous_timeout(&mut self, timeout: Duration) -> Result<(), RendezvousTimeoutError> {
299 if let Some(tx) = self.tx.take() {
300 drop(tx);
301 } else {
302 #[cfg(feature = "log")]
303 {
304 trace!("Rendezvous was called previously, attempting again");
305 }
306 }
307 match self.rx.recv_timeout(timeout) {
308 Ok(_) => Ok(()),
309 Err(err) => match err {
310 RecvTimeoutError::Timeout => {
311 #[cfg(feature = "log")]
312 {
313 debug!("A timeout occurred during a rendezvous");
314 }
315 Err(RendezvousTimeoutError::Timeout)
316 }
317 RecvTimeoutError::Disconnected => Ok(()),
318 },
319 }
320 }
321
322 /// Performs a rendezvous operation internally.
323 ///
324 /// This function borrows `self` and drops the `tx` channel if it exists.
325 /// It then blocks on the `rx` channel, waiting for all [`RendezvousGuard`] instances to be
326 /// dropped, and discards any error that may occur.
327 fn rendezvous_internal(&mut self) {
328 if let Some(tx) = self.tx.take() {
329 drop(tx);
330 }
331 self.rx.recv().ok();
332 }
333}
334
335impl Default for Rendezvous {
336 fn default() -> Self {
337 Rendezvous::new()
338 }
339}
340
341impl RendezvousGuard {
342 /// Forks a guard off the owning [`Rendezvous`] channel.
343 ///
344 /// When all guards are dropped, [`Rendezvous::rendezvous`] will proceed; until then, that
345 /// call blocks.
346 pub fn fork(&self) -> RendezvousGuard {
347 #[cfg(feature = "log")]
348 {
349 trace!("Forking nested rendezvous guard");
350 }
351 RendezvousGuard(self.0.clone())
352 }
353
354 /// A no-operation that consumes self, marking a rendezvous point.
355 ///
356 /// ## Example
357 ///
358 /// ```
359 /// use rendezvous::Rendezvous;
360 ///
361 /// let mut rendezvous = Rendezvous::new();
362 /// let guard = rendezvous.fork_guard();
363 /// guard.completed();
364 /// rendezvous.rendezvous();
365 /// ```
366 pub fn completed(self) {}
367}
368
369impl Clone for RendezvousGuard {
370 fn clone(&self) -> Self {
371 self.fork()
372 }
373}
374
375impl Drop for Rendezvous {
376 fn drop(&mut self) {
377 #[cfg(all(debug_assertions, feature = "log"))]
378 if self.tx.is_some() {
379 error!("Implementation error: Rendezvous method not invoked")
380 }
381 self.rendezvous_internal()
382 }
383}
384
385/// Timeout error that may occur during a rendezvous process.
386///
387/// This error is used to indicate that a timeout has occurred while waiting for a rendezvous.
388#[derive(Debug, Eq, PartialEq)]
389pub enum RendezvousTimeoutError {
390 /// A timeout occurred that may occur during a rendezvous process. Forks have not disconnected
391 /// yet, so the work might not have been completed.
392 Timeout,
393}
394
395impl Display for RendezvousTimeoutError {
396 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
397 match self {
398 RendezvousTimeoutError::Timeout => write!(f, "Timeout"),
399 }
400 }
401}
402
403impl Error for RendezvousTimeoutError {}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::thread;
409
410 #[test]
411 fn rendezvous_can_pass_away() {
412 let rendezvous = Rendezvous::new();
413 rendezvous.rendezvous();
414 }
415
416 #[test]
417 fn rendezvous_can_be_dropped_right_away() {
418 let rendezvous = Rendezvous::new();
419 drop(rendezvous);
420 }
421
422 #[test]
423 fn test_timeout() {
424 let mut rendezvous = Rendezvous::new();
425 let guard = rendezvous.fork_guard();
426
427 let result = rendezvous.rendezvous_timeout(Duration::from_millis(100));
428 assert_eq!(result, Err(RendezvousTimeoutError::Timeout));
429 drop(guard);
430 }
431
432 #[test]
433 fn test_background_forks() {
434 let rendezvous = Rendezvous::new();
435
436 let guard = rendezvous.fork_guard();
437 thread::spawn(move || {
438 let _guard = guard;
439 thread::sleep(Duration::from_millis(400))
440 });
441
442 rendezvous.rendezvous();
443 }
444}