use crate::channel::TransportError;
use crate::ibverbs::error::IbvResult;
use crate::ibverbs::protection_domain::ProtectionDomain;
use crate::multi_channel::MultiChannel;
use crate::multi_channel::PeerRemoteMemoryRegion;
use crate::network::barrier::binary_tree::{BinaryTreeBarrier, PreparedBinaryTreeBarrier};
use crate::network::barrier::dissemination::{DisseminationBarrier, PreparedDisseminationBarrier};
use crate::network::barrier::linear::{LinearBarrier, PreparedLinearBarrier};
use std::time::Duration;
use thiserror::Error;
mod binary_tree;
mod dissemination;
mod linear;
mod memory;
#[derive(Debug, Error)]
pub enum BarrierError {
#[error("Barrier is poisoned from a previous error")]
Poisoned,
#[error("Self not in the barrier group")]
SelfNotInGroup,
#[error("Peers not in ascending order in barrier group")]
UnorderedPeers,
#[error("Duplicate peers in barrier group")]
DuplicatePeers,
#[error("Barrier timeout")]
Timeout,
#[error("Transport error: {0}")]
TransportError(#[from] TransportError),
}
#[derive(Debug, Copy, Clone)]
pub enum BarrierAlgorithm {
Centralized,
BinaryTree,
Dissemination,
}
impl BarrierAlgorithm {
pub fn instance(
&self,
pd: &ProtectionDomain,
rank: usize,
world_size: usize,
) -> IbvResult<PreparedBarrier> {
match self {
BarrierAlgorithm::Centralized => Barrier::new_centralized(pd, rank, world_size),
BarrierAlgorithm::BinaryTree => Barrier::new_binary_tree(pd, rank, world_size),
BarrierAlgorithm::Dissemination => Barrier::new_dissemination(pd, rank, world_size),
}
}
}
#[derive(Debug)]
pub enum Barrier {
Centralized(LinearBarrier),
BinaryTree(BinaryTreeBarrier),
Dissemination(DisseminationBarrier),
}
impl Barrier {
fn new_centralized(
pd: &ProtectionDomain,
rank: usize,
world_size: usize,
) -> IbvResult<PreparedBarrier> {
Ok(PreparedBarrier::Centralized(PreparedLinearBarrier::new(
pd, rank, world_size,
)?))
}
fn new_binary_tree(
pd: &ProtectionDomain,
rank: usize,
world_size: usize,
) -> IbvResult<PreparedBarrier> {
Ok(PreparedBarrier::BinaryTree(PreparedBinaryTreeBarrier::new(
pd, rank, world_size,
)?))
}
fn new_dissemination(
pd: &ProtectionDomain,
rank: usize,
world_size: usize,
) -> IbvResult<PreparedBarrier> {
Ok(PreparedBarrier::Dissemination(
PreparedDisseminationBarrier::new(pd, rank, world_size)?,
))
}
pub fn barrier(
&mut self,
multi_channel: &mut MultiChannel,
peers: &[usize],
timeout: Duration,
) -> Result<(), BarrierError> {
match self {
Barrier::Centralized(b) => b.barrier(multi_channel, peers, timeout),
Barrier::BinaryTree(b) => b.barrier(multi_channel, peers, timeout),
Barrier::Dissemination(b) => b.barrier(multi_channel, peers, timeout),
}
}
pub fn barrier_unchecked(
&mut self,
multi_channel: &mut MultiChannel,
peers: &[usize],
timeout: Duration,
) -> Result<(), BarrierError> {
match self {
Barrier::Centralized(b) => b.barrier_unchecked(multi_channel, peers, timeout),
Barrier::BinaryTree(b) => b.barrier_unchecked(multi_channel, peers, timeout),
Barrier::Dissemination(b) => b.barrier_unchecked(multi_channel, peers, timeout),
}
}
}
#[derive(Debug)]
pub enum PreparedBarrier {
Centralized(PreparedLinearBarrier),
BinaryTree(PreparedBinaryTreeBarrier),
Dissemination(PreparedDisseminationBarrier),
}
pub(super) fn validate_peer_list(peers: &[usize]) -> Result<(), BarrierError> {
if !peers.is_sorted() {
return Err(BarrierError::UnorderedPeers);
}
if peers.windows(2).any(|w| w[0] == w[1]) {
return Err(BarrierError::DuplicatePeers);
}
Ok(())
}
impl PreparedBarrier {
pub fn remote_mr(&self) -> PeerRemoteMemoryRegion {
match self {
PreparedBarrier::Centralized(p) => p.remote(),
PreparedBarrier::BinaryTree(p) => p.remote(),
PreparedBarrier::Dissemination(p) => p.remote(),
}
}
pub fn link_remote(self, remote_mrs: Box<[PeerRemoteMemoryRegion]>) -> Barrier {
match self {
PreparedBarrier::Centralized(p) => Barrier::Centralized(p.link_remote(remote_mrs)),
PreparedBarrier::BinaryTree(p) => Barrier::BinaryTree(p.link_remote(remote_mrs)),
PreparedBarrier::Dissemination(p) => Barrier::Dissemination(p.link_remote(remote_mrs)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn valid_sorted_unique_peers() {
assert!(validate_peer_list(&[0, 1, 2, 3]).is_ok());
}
#[test]
fn empty_peer_list_is_valid() {
assert!(validate_peer_list(&[]).is_ok());
}
#[test]
fn single_peer_is_valid() {
assert!(validate_peer_list(&[5]).is_ok());
}
#[test]
fn non_contiguous_ranks_are_valid() {
assert!(validate_peer_list(&[0, 3, 7, 100]).is_ok());
}
#[test]
fn unsorted_peers_rejected() {
let err = validate_peer_list(&[2, 1, 3]).unwrap_err();
assert!(matches!(err, BarrierError::UnorderedPeers));
}
#[test]
fn duplicate_peers_rejected() {
let err = validate_peer_list(&[0, 1, 1, 2]).unwrap_err();
assert!(matches!(err, BarrierError::DuplicatePeers));
}
#[test]
fn unsorted_takes_precedence_over_duplicate() {
let err = validate_peer_list(&[2, 1, 1]).unwrap_err();
assert!(matches!(err, BarrierError::UnorderedPeers));
}
#[test]
fn all_same_ranks_detected_as_unsorted_or_duplicate() {
let err = validate_peer_list(&[3, 3, 3]).unwrap_err();
assert!(matches!(err, BarrierError::DuplicatePeers));
}
}