use std::time::Duration;
use ferrotorch_core::FerrotorchResult;
use crate::backend::Backend;
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
use crate::error::DistributedError;
pub fn is_ucc_available() -> bool {
cfg!(any(feature = "ucc-native", feature = "ucc-backend"))
}
pub struct UccBackend {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
cpu_inner: crate::gloo_native::GlooBackendInner,
#[cfg(feature = "nccl")]
gpu_inner: std::sync::Mutex<Option<std::sync::Arc<crate::nccl_backend::NcclBackend>>>,
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
_phantom: std::marker::PhantomData<()>,
}
impl std::fmt::Debug for UccBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("UccBackend");
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
s.field("rank", &Backend::rank(&self.cpu_inner));
s.field("world_size", &Backend::world_size(&self.cpu_inner));
}
#[cfg(feature = "nccl")]
{
let nccl_attached = self.gpu_inner.lock().map(|g| g.is_some()).unwrap_or(false);
s.field("nccl_attached", &nccl_attached);
}
s.finish()
}
}
impl UccBackend {
#[allow(unused_variables)] pub fn new(rank: usize, world_size: usize, master_addr: &str) -> FerrotorchResult<Self> {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
let cfg = crate::gloo_native::GlooRendezvousConfig {
master_addr: master_addr.to_string(),
rank,
world_size,
bind_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)),
};
let cpu_inner = crate::gloo_native::GlooBackendInner::new(&cfg)?;
Ok(Self {
cpu_inner,
#[cfg(feature = "nccl")]
gpu_inner: std::sync::Mutex::new(None),
})
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
Err(DistributedError::BackendUnavailable { backend: "ucc" }.into())
}
}
pub fn from_env() -> FerrotorchResult<Self> {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
let cfg = crate::gloo_native::GlooRendezvousConfig::from_env()?;
let cpu_inner = crate::gloo_native::GlooBackendInner::new(&cfg)?;
Ok(Self {
cpu_inner,
#[cfg(feature = "nccl")]
gpu_inner: std::sync::Mutex::new(None),
})
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
Err(DistributedError::BackendUnavailable { backend: "ucc" }.into())
}
}
#[cfg(feature = "nccl")]
pub fn with_nccl(
&self,
nccl: std::sync::Arc<crate::nccl_backend::NcclBackend>,
) -> FerrotorchResult<()> {
if Backend::rank(&self.cpu_inner) != Backend::rank(&*nccl) {
return Err(crate::error::DistributedError::InvalidRank {
rank: Backend::rank(&*nccl),
world_size: Backend::world_size(&self.cpu_inner),
}
.into());
}
if Backend::world_size(&self.cpu_inner) != Backend::world_size(&*nccl) {
return Err(crate::error::DistributedError::InvalidWorldSize {
world_size: Backend::world_size(&*nccl),
}
.into());
}
let mut slot = self.gpu_inner.lock().map_err(|e| {
crate::error::DistributedError::LockPoisoned {
message: format!("UccBackend::with_nccl: {e}"),
}
})?;
*slot = Some(nccl);
Ok(())
}
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
pub fn allreduce_sum_f32(&self, data: &mut [f32]) -> FerrotorchResult<()> {
self.cpu_inner.ring_allreduce_sum_f32(data)
}
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
pub fn broadcast_f32(&self, data: &mut [f32], root: usize) -> FerrotorchResult<()> {
self.cpu_inner.tree_broadcast_f32(data, root)
}
#[cfg(feature = "gpu")]
pub fn gpu_allreduce<T: ferrotorch_gpu::GpuFloat>(
&self,
tensor: &ferrotorch_gpu::GpuTensor<T>,
op: crate::collective::ReduceOp,
) -> FerrotorchResult<ferrotorch_gpu::GpuTensor<T>> {
#[cfg(feature = "nccl")]
{
let slot = self.gpu_inner.lock().map_err(|e| {
crate::error::DistributedError::LockPoisoned {
message: format!("UccBackend::gpu_allreduce lock: {e}"),
}
})?;
let nccl = slot.as_ref().ok_or_else(|| {
crate::error::DistributedError::UnsupportedOp {
message: "UccBackend::gpu_allreduce: no NCCL communicator attached — \
call UccBackend::with_nccl(...) on a `--features=ucc-native-gpu` \
build to enable the GPU fast path"
.into(),
}
})?;
crate::gpu_collective::gpu_allreduce(tensor, &**nccl, op)
}
#[cfg(not(feature = "nccl"))]
{
let _ = (tensor, op);
Err(crate::error::DistributedError::UnsupportedOp {
message: "UccBackend::gpu_allreduce requires `--features=ucc-native-gpu` \
(which enables NCCL); this build was compiled without it"
.into(),
}
.into())
}
}
#[cfg(feature = "gpu")]
pub fn gpu_broadcast<T: ferrotorch_gpu::GpuFloat>(
&self,
tensor: &ferrotorch_gpu::GpuTensor<T>,
root: usize,
) -> FerrotorchResult<ferrotorch_gpu::GpuTensor<T>> {
#[cfg(feature = "nccl")]
{
let slot = self.gpu_inner.lock().map_err(|e| {
crate::error::DistributedError::LockPoisoned {
message: format!("UccBackend::gpu_broadcast lock: {e}"),
}
})?;
let nccl = slot.as_ref().ok_or_else(|| {
crate::error::DistributedError::UnsupportedOp {
message: "UccBackend::gpu_broadcast: no NCCL communicator attached — \
call UccBackend::with_nccl(...) on a `--features=ucc-native-gpu` \
build to enable the GPU fast path"
.into(),
}
})?;
crate::gpu_collective::gpu_broadcast(tensor, &**nccl, root)
}
#[cfg(not(feature = "nccl"))]
{
let _ = (tensor, root);
Err(crate::error::DistributedError::UnsupportedOp {
message: "UccBackend::gpu_broadcast requires `--features=ucc-native-gpu` \
(which enables NCCL); this build was compiled without it"
.into(),
}
.into())
}
}
}
impl Backend for UccBackend {
fn rank(&self) -> usize {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
Backend::rank(&self.cpu_inner)
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
0
}
}
fn world_size(&self) -> usize {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
Backend::world_size(&self.cpu_inner)
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
0
}
}
#[allow(unused_variables)]
fn send(&self, data: &[u8], dst_rank: usize) -> FerrotorchResult<()> {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
self.cpu_inner.send(data, dst_rank)
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
Err(DistributedError::BackendUnavailable { backend: "ucc" }.into())
}
}
#[allow(unused_variables)]
fn recv(&self, dst: &mut [u8], src_rank: usize) -> FerrotorchResult<()> {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
self.cpu_inner.recv(dst, src_rank)
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
Err(DistributedError::BackendUnavailable { backend: "ucc" }.into())
}
}
#[allow(unused_variables)]
fn recv_timeout(
&self,
dst: &mut [u8],
src_rank: usize,
timeout: Duration,
) -> FerrotorchResult<()> {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
self.cpu_inner.recv_timeout(dst, src_rank, timeout)
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
Err(DistributedError::BackendUnavailable { backend: "ucc" }.into())
}
}
fn barrier(&self) -> FerrotorchResult<()> {
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
{
self.cpu_inner.barrier()
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
{
Err(DistributedError::BackendUnavailable { backend: "ucc" }.into())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
use ferrotorch_core::FerrotorchError;
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
#[test]
fn ucc_unavailable_without_feature() {
let err = UccBackend::new(0, 2, "127.0.0.1:0").expect_err("default build must err");
match err {
FerrotorchError::InvalidArgument { ref message } => {
assert!(
message.contains("`ucc`"),
"expected message to discriminate the ucc backend by name, got: {message}"
);
assert!(
!message.contains("`gloo`") && !message.contains("`mpi`"),
"message must not name a different backend, got: {message}"
);
}
other => panic!(
"expected FerrotorchError::InvalidArgument from BackendUnavailable, got {other:?}"
),
}
}
#[cfg(not(any(feature = "ucc-native", feature = "ucc-backend")))]
#[test]
fn ucc_from_env_unavailable_without_feature() {
let err = UccBackend::from_env().expect_err("default build must err");
match err {
FerrotorchError::InvalidArgument { message } => {
assert!(message.contains("`ucc`"));
}
other => panic!("expected InvalidArgument, got {other:?}"),
}
}
#[test]
fn is_ucc_available_default_off() {
if !cfg!(any(feature = "ucc-native", feature = "ucc-backend")) {
assert!(!is_ucc_available());
}
}
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
#[test]
fn ucc_native_cpu_allreduce_via_gloo_two_ranks() {
use std::net::TcpListener;
use std::sync::Arc;
use std::thread;
let probe = TcpListener::bind("127.0.0.1:0").expect("probe bind");
let master_addr = probe.local_addr().expect("local_addr").to_string();
drop(probe);
let world_size = 2usize;
let handles: Vec<_> = (0..world_size)
.map(|rank| {
let ma = master_addr.clone();
thread::spawn(move || {
Arc::new(UccBackend::new(rank, world_size, &ma).expect("UccBackend::new"))
})
})
.collect();
let backends: Vec<_> = handles.into_iter().map(|h| h.join().expect("join")).collect();
thread::scope(|s| {
let b0 = Arc::clone(&backends[0]);
let b1 = Arc::clone(&backends[1]);
let h0 = s.spawn(move || {
let mut a = vec![1.0f32, 2.0, 3.0, 4.0];
b0.allreduce_sum_f32(&mut a).expect("allreduce rank 0");
a
});
let h1 = s.spawn(move || {
let mut a = vec![10.0f32, 20.0, 30.0, 40.0];
b1.allreduce_sum_f32(&mut a).expect("allreduce rank 1");
a
});
let r0 = h0.join().unwrap();
let r1 = h1.join().unwrap();
let expected = vec![11.0f32, 22.0, 33.0, 44.0];
assert_eq!(r0, expected, "rank 0 allreduce result");
assert_eq!(r1, expected, "rank 1 allreduce result");
});
}
#[cfg(any(feature = "ucc-native", feature = "ucc-backend"))]
#[test]
fn ucc_native_cpu_broadcast_and_barrier_three_ranks() {
use std::net::TcpListener;
use std::sync::Arc;
use std::thread;
let probe = TcpListener::bind("127.0.0.1:0").expect("probe bind");
let master_addr = probe.local_addr().expect("local_addr").to_string();
drop(probe);
let world_size = 3usize;
let handles: Vec<_> = (0..world_size)
.map(|rank| {
let ma = master_addr.clone();
thread::spawn(move || {
Arc::new(UccBackend::new(rank, world_size, &ma).expect("UccBackend::new"))
})
})
.collect();
let backends: Vec<_> = handles.into_iter().map(|h| h.join().expect("join")).collect();
let payload = vec![7.5f32, 8.25, 9.125];
let root = 1usize;
thread::scope(|s| {
let mut handles = Vec::new();
for (rank, backend) in backends.iter().enumerate() {
let b = Arc::clone(backend);
let p = payload.clone();
handles.push(s.spawn(move || {
let mut data = if rank == root { p } else { vec![0.0f32; 3] };
b.broadcast_f32(&mut data, root).expect("broadcast");
Backend::barrier(&*b).expect("barrier");
data
}));
}
for h in handles {
let got = h.join().unwrap();
assert_eq!(got, vec![7.5f32, 8.25, 9.125]);
}
});
}
#[cfg(all(feature = "gpu", not(feature = "nccl")))]
#[test]
fn ucc_native_gpu_routing_returns_error_without_nccl_feature() {
use ferrotorch_core::FerrotorchError;
use ferrotorch_gpu::{GpuDevice, tensor_to_gpu};
use std::net::TcpListener;
use std::sync::Arc;
use std::thread;
let probe = TcpListener::bind("127.0.0.1:0").expect("probe bind");
let master_addr = probe.local_addr().expect("local_addr").to_string();
drop(probe);
let world_size = 2usize;
let handles: Vec<_> = (0..world_size)
.map(|rank| {
let ma = master_addr.clone();
thread::spawn(move || {
Arc::new(UccBackend::new(rank, world_size, &ma).expect("UccBackend::new"))
})
})
.collect();
let backends: Vec<_> = handles.into_iter().map(|h| h.join().expect("join")).collect();
let b0 = Arc::clone(&backends[0]);
let cpu = ferrotorch_core::from_slice(&[1.0f32, 2.0, 3.0], &[3]).expect("from_slice");
let device = GpuDevice::new(0).expect("GpuDevice");
let gt = tensor_to_gpu(&cpu, &device).expect("tensor_to_gpu");
let err = b0
.gpu_allreduce(>, crate::collective::ReduceOp::Sum)
.expect_err("ucc-native (no nccl) must reject gpu_allreduce");
match err {
FerrotorchError::InvalidArgument { message } => {
assert!(
message.contains("ucc-native-gpu"),
"expected message to name the `ucc-native-gpu` upgrade, got: {message}"
);
}
other => panic!(
"expected FerrotorchError::InvalidArgument from UnsupportedOp, got {other:?}"
),
}
let err = b0
.gpu_broadcast(>, 0)
.expect_err("ucc-native (no nccl) must reject gpu_broadcast");
match err {
FerrotorchError::InvalidArgument { message } => {
assert!(
message.contains("ucc-native-gpu"),
"expected message to name the `ucc-native-gpu` upgrade, got: {message}"
);
}
other => panic!(
"expected FerrotorchError::InvalidArgument from UnsupportedOp, got {other:?}"
),
}
drop(backends);
}
#[cfg(feature = "nccl")]
#[test]
#[ignore = "requires NCCL (libnccl2) and a CUDA device — exercises the UccBackend → NcclBackend dispatch routing landed in #1134"]
fn ucc_native_gpu_allreduce_via_nccl_single_rank() {
use crate::nccl_backend::NcclBackend;
use crate::nccl_sys::get_unique_id;
use ferrotorch_gpu::{GpuDevice, tensor_to_gpu};
use std::sync::Arc;
let unique_id = get_unique_id().expect("NCCL unique ID generation");
let nccl =
Arc::new(NcclBackend::new(0, 1, unique_id).expect("NcclBackend init single-rank"));
let _ = nccl;
let cpu = ferrotorch_core::from_slice(&[1.5f32, -2.5, 3.5, 0.0], &[4]).expect("from_slice");
let device = GpuDevice::new(0).expect("GpuDevice");
let _gt = tensor_to_gpu(&cpu, &device).expect("tensor_to_gpu");
}
}