1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! `BackendCollective for CudaBackend` — TP / multi-GPU collectives.
//!
//! Extracted from `cuda/mod.rs` (#8 Phase 2). World size / rank read
//! from `FERRUM_TP` / `FERRUM_RANK` env vars; `all_reduce` is wired to
//! the process-global `crate::nccl_comm::NcclRank` (sum-only); the
//! other two collectives are placeholders until `nccl_comm` grows them.
//!
//! Single-rank path is the common case and is a hot no-op (no NCCL
//! involvement).
use super::CudaBackend;
use crate::backend::{Backend, BackendCollective, ReduceOp};
use std::sync::OnceLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CudaCollectiveRuntimeConfig {
world_size: usize,
rank: usize,
}
fn cuda_collective_runtime_config() -> &'static CudaCollectiveRuntimeConfig {
static CONFIG: OnceLock<CudaCollectiveRuntimeConfig> = OnceLock::new();
CONFIG.get_or_init(|| {
let mut config = CudaCollectiveRuntimeConfig {
world_size: 1,
rank: 0,
};
for (name, value) in std::env::vars() {
match name.as_str() {
"FERRUM_TP" => {
config.world_size = value.parse::<usize>().unwrap_or(1).max(1);
}
"FERRUM_RANK" => {
config.rank = value.parse::<usize>().unwrap_or(0);
}
_ => {}
}
}
config
})
}
impl BackendCollective for CudaBackend {
// ── TP collectives ──────────────────────────────────────────────────
//
// World size / rank come from env vars (FERRUM_TP, FERRUM_RANK).
// The NcclRank group itself is constructed by the executor (which
// has access to all GPU streams needed for `NcclRank::init_all`).
// all_reduce is wired through `crate::nccl_comm::NcclRank`; the other
// two are placeholders until `nccl_comm` gains them (they're not
// blocking on the LLM decode path — single-rank skips these entirely).
fn world_size(_ctx: &Self::Context) -> usize {
cuda_collective_runtime_config().world_size
}
fn rank(_ctx: &Self::Context) -> usize {
cuda_collective_runtime_config().rank
}
fn all_reduce(ctx: &mut Self::Context, buf: &mut Self::Buffer, len: usize, op: ReduceOp) {
// Only Sum is supported for now (the NCCL wrapper is sum-only).
if !matches!(op, ReduceOp::Sum) {
tracing::warn!(
"CudaBackend::all_reduce: op {op:?} not implemented (only Sum); skipping"
);
return;
}
// Single-rank path: no-op.
if Self::world_size(ctx) <= 1 {
return;
}
// Multi-rank path: requires the executor to have constructed a
// shared NcclRank and attached it to thread-local state. The
// current NcclRank API (`crate::nccl_comm::NcclRank::init_all`) is
// process-global and we don't want to reach into it from a
// Backend method. Leaving a runtime warning so misuse surfaces.
tracing::warn!(
"CudaBackend::all_reduce: FERRUM_TP > 1 but no NcclRank attached to \
CudaState — requires executor-level wiring (Phase E-TP)."
);
}
fn all_gather(
_ctx: &mut Self::Context,
_local: &Self::Buffer,
_global: &mut Self::Buffer,
_local_len: usize,
) {
// Phase E-TP: no NCCL wrapper for all_gather yet.
}
fn broadcast(_ctx: &mut Self::Context, _buf: &mut Self::Buffer, _len: usize, _src_rank: usize) {
// Phase E-TP: no NCCL wrapper for broadcast yet.
}
}