Skip to main content

primitives/utils/
sync.rs

1/// Macro that joins futures and ensures all have completed before any values are dropped.
2///
3/// Uses a barrier synchronization pattern to make sure all futures are completed before returning.
4/// This is useful when the futures are expected to have side effects that need to be synchronized.
5#[macro_export]
6macro_rules! sync_join {
7    ($($fut:expr),+ $(,)?) => {{
8        use tokio::sync::Barrier;
9        use std::sync::Arc;
10
11        // Count the number of futures
12        let future_count = [$($fut),+].len();
13
14        // Create a shared barrier for all futures
15        let barrier = std::sync::Arc::new(tokio::sync::Barrier::new(future_count));
16
17        // Create synchronized futures
18        tokio::join!(
19            $({
20                let barrier = barrier.clone();
21                async move {
22                    // Execute the original future
23                    let result = $fut.await?;
24                    // Wait for all futures to reach this point
25                    barrier.wait().await;
26                    Ok(result)
27                }
28            }),+
29        )
30    }};
31}
32
33/// Macro that try_joins futures and ensures all have completed before any values are dropped.
34///
35/// Uses a barrier synchronization pattern to make sure all futures are completed before returning.
36/// This is useful when the futures are expected to have side effects that need to be synchronized.
37#[macro_export]
38macro_rules! sync_try_join {
39    ($($fut:expr),+ $(,)?) => {{
40        // Count the number of futures
41        let future_count = [$($fut),+].len();
42
43        // Create a shared barrier for all futures
44        let barrier = std::sync::Arc::new(tokio::sync::Barrier::new(future_count));
45
46        // Create synchronized futures
47        tokio::try_join!(
48            $({
49                let barrier = barrier.clone();
50                async move {
51                    // Execute the original future
52                    let result = $fut.await?;
53                    // Wait for all futures to reach this point
54                    barrier.wait().await;
55                    Ok(result)
56                }
57            }),+
58        )
59    }};
60}