Skip to main content

ibverbs_rs/network/barrier/
mod.rs

1use crate::channel::TransportError;
2use crate::ibverbs::error::IbvResult;
3use crate::ibverbs::protection_domain::ProtectionDomain;
4use crate::multi_channel::MultiChannel;
5use crate::multi_channel::PeerRemoteMemoryRegion;
6use crate::network::barrier::binary_tree::{BinaryTreeBarrier, PreparedBinaryTreeBarrier};
7use crate::network::barrier::dissemination::{DisseminationBarrier, PreparedDisseminationBarrier};
8use crate::network::barrier::linear::{LinearBarrier, PreparedLinearBarrier};
9use std::time::Duration;
10use thiserror::Error;
11
12mod binary_tree;
13mod dissemination;
14mod linear;
15mod memory;
16
17/// An error that can occur during a barrier synchronization.
18///
19/// Barriers are **poisoned** after any error: once a `Barrier` returns an `Err`,
20/// every subsequent call will immediately return [`Poisoned`](BarrierError::Poisoned)
21/// without attempting any RDMA operations. This prevents use of a barrier whose
22/// internal epoch state may be inconsistent with remote peers.
23#[derive(Debug, Error)]
24pub enum BarrierError {
25    /// The barrier was poisoned by a previous error and can no longer be used.
26    #[error("Barrier is poisoned from a previous error")]
27    Poisoned,
28    /// This node's own rank is not present in the supplied peer list.
29    #[error("Self not in the barrier group")]
30    SelfNotInGroup,
31    /// The peer list is not sorted in strictly ascending order.
32    #[error("Peers not in ascending order in barrier group")]
33    UnorderedPeers,
34    /// The peer list contains the same rank more than once.
35    #[error("Duplicate peers in barrier group")]
36    DuplicatePeers,
37    /// Not all peers reached the barrier within the allotted time.
38    #[error("Barrier timeout")]
39    Timeout,
40    /// An RDMA transport error occurred while exchanging barrier notifications.
41    #[error("Transport error: {0}")]
42    TransportError(#[from] TransportError),
43}
44
45/// Selects which barrier algorithm a [`Node`](crate::network::Node) uses.
46///
47/// All algorithms are implemented over one-sided RDMA writes and spin-poll on
48/// a local memory region, so no CPU involvement is required on the remote side
49/// during the synchronization itself.
50///
51/// The algorithm is chosen once at node construction and cannot be changed
52/// afterwards. The default used by [`Node::builder`](crate::network::Node::builder)
53/// is [`BinaryTree`](BarrierAlgorithm::BinaryTree).
54#[derive(Debug, Copy, Clone)]
55pub enum BarrierAlgorithm {
56    /// Leader-based barrier. The lowest-ranked participant collects a notification
57    /// from every other peer, then broadcasts back. O(n) messages; simple and
58    /// correct but does not scale with large groups.
59    Centralized,
60    /// Tree-structured barrier. Peers are arranged in a binary tree by their
61    /// position in the sorted peer list. A reduce phase propagates notifications
62    /// from leaves up to the root, followed by a broadcast phase back down.
63    /// O(log n) rounds; balanced and generally a good default.
64    BinaryTree,
65    /// Dissemination barrier. In each round every peer notifies the peer at
66    /// distance `d` to its right (circularly) and waits for the peer at
67    /// distance `d` to its left. The distance doubles each round: 1, 2, 4, …
68    /// Completes in ⌈log₂ n⌉ rounds with no designated leader and no single
69    /// point of contention.
70    Dissemination,
71}
72
73impl BarrierAlgorithm {
74    /// Creates a [`PreparedBarrier`] using this algorithm.
75    pub fn instance(
76        &self,
77        pd: &ProtectionDomain,
78        rank: usize,
79        world_size: usize,
80    ) -> IbvResult<PreparedBarrier> {
81        match self {
82            BarrierAlgorithm::Centralized => Barrier::new_centralized(pd, rank, world_size),
83            BarrierAlgorithm::BinaryTree => Barrier::new_binary_tree(pd, rank, world_size),
84            BarrierAlgorithm::Dissemination => Barrier::new_dissemination(pd, rank, world_size),
85        }
86    }
87}
88
89/// A connected barrier ready to synchronize with peers.
90///
91/// Normally accessed through [`Node::barrier`](crate::network::Node::barrier) rather
92/// than directly.
93///
94/// # Peer list contract
95///
96/// Both [`barrier`](Barrier::barrier) and [`barrier_unchecked`](Barrier::barrier_unchecked)
97/// require the peer list to obey the following rules (only [`barrier`](Barrier::barrier)
98/// validates them):
99///
100/// * **Sorted** — ranks must appear in strictly ascending order.
101/// * **No duplicates** — each rank may appear at most once.
102/// * **Self included** — this node's own rank must be present.
103///
104/// # Poisoning
105///
106/// If any call returns an error, the barrier is permanently poisoned and all
107/// subsequent calls return [`BarrierError::Poisoned`] immediately. Recreate the
108/// [`Node`](crate::network::Node) to recover.
109#[derive(Debug)]
110pub enum Barrier {
111    /// Leader-based barrier. See [`BarrierAlgorithm::Centralized`].
112    Centralized(LinearBarrier),
113    /// Binary-tree barrier. See [`BarrierAlgorithm::BinaryTree`].
114    BinaryTree(BinaryTreeBarrier),
115    /// Dissemination barrier. See [`BarrierAlgorithm::Dissemination`].
116    Dissemination(DisseminationBarrier),
117}
118
119impl Barrier {
120    fn new_centralized(
121        pd: &ProtectionDomain,
122        rank: usize,
123        world_size: usize,
124    ) -> IbvResult<PreparedBarrier> {
125        Ok(PreparedBarrier::Centralized(PreparedLinearBarrier::new(
126            pd, rank, world_size,
127        )?))
128    }
129
130    fn new_binary_tree(
131        pd: &ProtectionDomain,
132        rank: usize,
133        world_size: usize,
134    ) -> IbvResult<PreparedBarrier> {
135        Ok(PreparedBarrier::BinaryTree(PreparedBinaryTreeBarrier::new(
136            pd, rank, world_size,
137        )?))
138    }
139
140    fn new_dissemination(
141        pd: &ProtectionDomain,
142        rank: usize,
143        world_size: usize,
144    ) -> IbvResult<PreparedBarrier> {
145        Ok(PreparedBarrier::Dissemination(
146            PreparedDisseminationBarrier::new(pd, rank, world_size)?,
147        ))
148    }
149
150    /// Synchronizes with the given peers, blocking until all have reached the barrier or timeout.
151    pub fn barrier(
152        &mut self,
153        multi_channel: &mut MultiChannel,
154        peers: &[usize],
155        timeout: Duration,
156    ) -> Result<(), BarrierError> {
157        match self {
158            Barrier::Centralized(b) => b.barrier(multi_channel, peers, timeout),
159            Barrier::BinaryTree(b) => b.barrier(multi_channel, peers, timeout),
160            Barrier::Dissemination(b) => b.barrier(multi_channel, peers, timeout),
161        }
162    }
163
164    /// Like [`barrier`](Self::barrier), but skips validation of the peer list.
165    pub fn barrier_unchecked(
166        &mut self,
167        multi_channel: &mut MultiChannel,
168        peers: &[usize],
169        timeout: Duration,
170    ) -> Result<(), BarrierError> {
171        match self {
172            Barrier::Centralized(b) => b.barrier_unchecked(multi_channel, peers, timeout),
173            Barrier::BinaryTree(b) => b.barrier_unchecked(multi_channel, peers, timeout),
174            Barrier::Dissemination(b) => b.barrier_unchecked(multi_channel, peers, timeout),
175        }
176    }
177}
178
179/// A barrier that has been allocated but not yet linked to remote peers.
180///
181/// Call [`link_remote`](Self::link_remote) with the remote memory region handles
182/// after the endpoint exchange to produce a [`Barrier`].
183#[derive(Debug)]
184pub enum PreparedBarrier {
185    /// Leader-based barrier. See [`BarrierAlgorithm::Centralized`].
186    Centralized(PreparedLinearBarrier),
187    /// Binary-tree barrier. See [`BarrierAlgorithm::BinaryTree`].
188    BinaryTree(PreparedBinaryTreeBarrier),
189    /// Dissemination barrier. See [`BarrierAlgorithm::Dissemination`].
190    Dissemination(PreparedDisseminationBarrier),
191}
192
193/// Validates that a peer list is sorted and contains no duplicates.
194///
195/// Used by barrier implementations before dispatching to the algorithm-specific logic.
196pub(super) fn validate_peer_list(peers: &[usize]) -> Result<(), BarrierError> {
197    if !peers.is_sorted() {
198        return Err(BarrierError::UnorderedPeers);
199    }
200    if peers.windows(2).any(|w| w[0] == w[1]) {
201        return Err(BarrierError::DuplicatePeers);
202    }
203    Ok(())
204}
205
206impl PreparedBarrier {
207    /// Returns this node's barrier memory region handle for exchange with peers.
208    pub fn remote_mr(&self) -> PeerRemoteMemoryRegion {
209        match self {
210            PreparedBarrier::Centralized(p) => p.remote(),
211            PreparedBarrier::BinaryTree(p) => p.remote(),
212            PreparedBarrier::Dissemination(p) => p.remote(),
213        }
214    }
215
216    /// Links remote peer memory regions and returns a ready-to-use [`Barrier`].
217    pub fn link_remote(self, remote_mrs: Box<[PeerRemoteMemoryRegion]>) -> Barrier {
218        match self {
219            PreparedBarrier::Centralized(p) => Barrier::Centralized(p.link_remote(remote_mrs)),
220            PreparedBarrier::BinaryTree(p) => Barrier::BinaryTree(p.link_remote(remote_mrs)),
221            PreparedBarrier::Dissemination(p) => Barrier::Dissemination(p.link_remote(remote_mrs)),
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn valid_sorted_unique_peers() {
232        assert!(validate_peer_list(&[0, 1, 2, 3]).is_ok());
233    }
234
235    #[test]
236    fn empty_peer_list_is_valid() {
237        assert!(validate_peer_list(&[]).is_ok());
238    }
239
240    #[test]
241    fn single_peer_is_valid() {
242        assert!(validate_peer_list(&[5]).is_ok());
243    }
244
245    #[test]
246    fn non_contiguous_ranks_are_valid() {
247        assert!(validate_peer_list(&[0, 3, 7, 100]).is_ok());
248    }
249
250    #[test]
251    fn unsorted_peers_rejected() {
252        let err = validate_peer_list(&[2, 1, 3]).unwrap_err();
253        assert!(matches!(err, BarrierError::UnorderedPeers));
254    }
255
256    #[test]
257    fn duplicate_peers_rejected() {
258        let err = validate_peer_list(&[0, 1, 1, 2]).unwrap_err();
259        assert!(matches!(err, BarrierError::DuplicatePeers));
260    }
261
262    #[test]
263    fn unsorted_takes_precedence_over_duplicate() {
264        // [2, 1, 1] — unsorted check triggers first
265        let err = validate_peer_list(&[2, 1, 1]).unwrap_err();
266        assert!(matches!(err, BarrierError::UnorderedPeers));
267    }
268
269    #[test]
270    fn all_same_ranks_detected_as_unsorted_or_duplicate() {
271        // [3, 3, 3] — is_sorted() returns true for equal elements, so duplicates detected
272        let err = validate_peer_list(&[3, 3, 3]).unwrap_err();
273        assert!(matches!(err, BarrierError::DuplicatePeers));
274    }
275}