1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::cluster::sparse::recompute_routing_table;
use crate::error::{NexarError, Result};
use crate::rpc::registry::RpcRegistry;
use crate::types::Rank;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::{Mutex, RwLock};
use super::NexarClient;
use super::async_client::RawRecvSource;
use super::hash::fnv1a_comm_id;
impl NexarClient {
/// Rebuild the communicator excluding dead ranks.
///
/// This is a **local** operation: each surviving rank independently computes
/// the new rank mapping from the agreed `dead_ranks` list. All survivors must
/// call this with the **same** `dead_ranks` to get a consistent view.
///
/// Returns a new `NexarClient` with contiguous ranks `[0, survivors)`,
/// `world_size = survivors`, and the relative order of surviving ranks preserved.
///
/// The new client shares the parent's QUIC connections and routers. It uses
/// per-comm_id raw channels so its collectives don't interfere with the parent.
///
/// For sparse topologies, the parent's relay infrastructure is inherited so
/// non-neighbor communication continues to work via relays.
pub async fn rebuild_excluding(&self, dead_ranks: &[Rank]) -> Result<NexarClient> {
debug_assert!(
!dead_ranks.contains(&self.rank),
"a dead rank should not call rebuild_excluding"
);
// Compute surviving ranks in order.
let mut survivors: Vec<Rank> = (0..self.world_size)
.filter(|r| !dead_ranks.contains(r))
.collect();
survivors.sort();
let new_world_size = survivors.len() as u32;
let new_rank =
survivors
.iter()
.position(|&r| r == self.rank)
.ok_or(NexarError::CollectiveFailed {
operation: "rebuild_excluding",
rank: self.rank,
reason: "rank not found among survivors".into(),
})? as Rank;
// Generate a deterministic comm_id from the dead set so all survivors agree.
// Hash dead ranks in sorted order so all survivors compute the same comm_id
// regardless of the order dead_ranks was constructed.
let mut sorted_dead: Vec<Rank> = dead_ranks.to_vec();
sorted_dead.sort();
let rebuild_gen = self.split_generation.fetch_add(1, Ordering::Relaxed);
let new_comm_id = {
let mut parts: Vec<Vec<u8>> = vec![
self.comm_id.to_le_bytes().to_vec(),
rebuild_gen.to_le_bytes().to_vec(),
];
for &dr in &sorted_dead {
parts.push(dr.to_le_bytes().to_vec());
}
fnv1a_comm_id(&parts)
};
// Build rank_map and peer subset.
let mut rank_map = HashMap::new();
let mut new_peers = HashMap::new();
let mut comm_receivers = HashMap::new();
for (new_r, &orig_rank) in survivors.iter().enumerate() {
let new_r = new_r as Rank;
rank_map.insert(new_r, orig_rank);
if orig_rank != self.rank {
// Only add peers we have direct connections to (in sparse topology,
// not all survivors are neighbors).
if let Some(peer) = self.peers.get(&orig_rank) {
new_peers.insert(new_r, Arc::clone(peer));
if let Some(router) = self.routers.get(&orig_rank) {
let rx = router.register_comm(new_comm_id).await;
comm_receivers.insert(new_r, Mutex::new(rx));
}
}
}
}
Ok(NexarClient {
rank: new_rank,
world_size: new_world_size,
comm_id: new_comm_id,
peers: new_peers,
routers: HashMap::new(),
raw_recv: RawRecvSource::Comm(comm_receivers),
_router_handles: Vec::new(),
adapter: Arc::clone(&self.adapter),
_pool: Arc::clone(&self._pool),
barrier_epoch: AtomicU64::new(0),
rpc_registry: Arc::new(RwLock::new(RpcRegistry::new())),
rpc_req_id: AtomicU64::new(0),
split_generation: AtomicU64::new(0),
rank_map,
collective_tag: AtomicU64::new(1),
tagged_receivers: Mutex::new(HashMap::new()),
config: Arc::clone(&self.config),
failure_tx: Arc::clone(&self.failure_tx),
failure_rx: self.failure_rx.clone(),
_monitor_handle: None,
routing_table: recompute_routing_table(&self.config.topology, new_rank, new_world_size),
// Inherit parent's relay infrastructure for sparse topologies.
// The parent's relay listeners are still running and deliver to these channels.
relay_deliveries: self.relay_deliveries.clone(),
_relay_handles: Vec::new(),
_endpoints: Vec::new(),
})
}
}