primitives/utils/sync.rs
1/// Macro that joins futures and ensures all have completed before any values are dropped.
2///
3/// Uses a barrier synchronization pattern to make sure all futures are completed before returning.
4/// This is useful when the futures are expected to have side effects that need to be synchronized.
5#[macro_export]
6macro_rules! sync_join {
7 ($($fut:expr),+ $(,)?) => {{
8 use tokio::sync::Barrier;
9 use std::sync::Arc;
10
11 // Count the number of futures
12 let future_count = [$($fut),+].len();
13
14 // Create a shared barrier for all futures
15 let barrier = std::sync::Arc::new(tokio::sync::Barrier::new(future_count));
16
17 // Create synchronized futures
18 tokio::join!(
19 $({
20 let barrier = barrier.clone();
21 async move {
22 // Execute the original future
23 let result = $fut.await?;
24 // Wait for all futures to reach this point
25 barrier.wait().await;
26 Ok(result)
27 }
28 }),+
29 )
30 }};
31}
32
33/// Macro that try_joins futures and ensures all have completed before any values are dropped.
34///
35/// Uses a barrier synchronization pattern to make sure all futures are completed before returning.
36/// This is useful when the futures are expected to have side effects that need to be synchronized.
37#[macro_export]
38macro_rules! sync_try_join {
39 ($($fut:expr),+ $(,)?) => {{
40 // Count the number of futures
41 let future_count = [$($fut),+].len();
42
43 // Create a shared barrier for all futures
44 let barrier = std::sync::Arc::new(tokio::sync::Barrier::new(future_count));
45
46 // Create synchronized futures
47 tokio::try_join!(
48 $({
49 let barrier = barrier.clone();
50 async move {
51 // Execute the original future
52 let result = $fut.await?;
53 // Wait for all futures to reach this point
54 barrier.wait().await;
55 Ok(result)
56 }
57 }),+
58 )
59 }};
60}