1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! Runtime-configurable tuning parameters for nexar.
//!
//! All values have sensible defaults. Override via environment variables
//! (prefixed `NEXAR_`) or by constructing a custom `NexarConfig`.
use crate::cluster::recovery::RecoveryPolicy;
use crate::cluster::sparse::TopologyStrategy;
use std::time::Duration;
/// Tuning parameters for collective operations and transport.
#[derive(Debug, Clone)]
pub struct NexarConfig {
/// Topology strategy for the peer mesh.
pub topology: TopologyStrategy,
/// Timeout for individual send/recv operations within collectives.
pub collective_timeout: Duration,
/// Timeout for barrier operations.
pub barrier_timeout: Duration,
/// Timeout for RPC calls.
pub rpc_timeout: Duration,
/// Messages larger than this threshold use pipelined ring allreduce
/// instead of ring or halving-doubling.
pub large_msg_bytes: usize,
/// Segment size for pipelined allreduce stages.
pub pipeline_segment_bytes: usize,
/// Maximum world size for preferring ring allreduce over
/// halving-doubling in the medium-message range.
pub ring_max_world: usize,
/// Enable TCP bulk sidecar connections for large tensor transfers.
///
/// When enabled, a raw TCP connection is established alongside each QUIC
/// peer connection. Collectives automatically prefer the TCP path for
/// large payloads to bypass QUIC's AES-256-GCM overhead.
///
/// **Security warning:** Unless `encrypt_bulk_transport` is also set,
/// data sent via the TCP sidecar is **unencrypted**. Do not enable in
/// zero-trust environments (public clouds) without encryption.
pub enable_tcp_bulk_sidecar: bool,
/// Require TLS encryption on TCP bulk sidecar connections.
///
/// Defaults to `true` (encrypted). When `true`, the TCP sidecar uses
/// TLS with the cluster CA for encryption. Set to `false` to disable
/// encryption for maximum throughput on trusted networks (e.g. isolated
/// InfiniBand fabrics).
///
/// Only meaningful when `enable_tcp_bulk_sidecar` is `true`.
pub encrypt_bulk_transport: bool,
/// Maximum memory (in bytes) that compressed allreduce may allocate for
/// buffering compressed chunks from all peers.
///
/// The allgather-then-reduce algorithm requires `O(N × chunk_size)` memory
/// where N is the world size. This limit prevents OOM crashes on large
/// clusters. Set to `0` to disable the check.
pub compressed_allreduce_max_bytes: usize,
/// Interval between heartbeat probes sent to each peer.
pub heartbeat_interval: Duration,
/// Duration after which a peer with no heartbeat response is considered dead.
pub heartbeat_timeout: Duration,
/// Timeout for the recovery agreement protocol (vote collection + rebuild).
pub recovery_timeout: Duration,
/// Enable elastic scaling (dynamic grow/shrink).
pub elastic_enabled: bool,
/// Minimum world size for elastic scaling (won't shrink below this).
pub elastic_min_world_size: u32,
/// Maximum world size for elastic scaling (0 = unlimited).
pub elastic_max_world_size: u32,
/// Timeout for the elastic checkpoint protocol.
pub elastic_checkpoint_timeout: Duration,
/// Policy for handling detected node failures.
pub recovery_policy: RecoveryPolicy,
}
impl Default for NexarConfig {
fn default() -> Self {
Self {
topology: TopologyStrategy::default(),
collective_timeout: Duration::from_secs(30),
barrier_timeout: Duration::from_secs(30),
rpc_timeout: Duration::from_secs(30),
large_msg_bytes: 8 * 1024 * 1024, // 8 MiB
pipeline_segment_bytes: 2 * 1024 * 1024, // 2 MiB
ring_max_world: 8,
enable_tcp_bulk_sidecar: true,
encrypt_bulk_transport: true,
compressed_allreduce_max_bytes: 4 * 1024 * 1024 * 1024, // 4 GiB
heartbeat_interval: Duration::from_secs(1),
heartbeat_timeout: Duration::from_secs(5),
recovery_timeout: Duration::from_secs(30),
elastic_enabled: false,
elastic_min_world_size: 1,
elastic_max_world_size: 0,
elastic_checkpoint_timeout: Duration::from_secs(60),
recovery_policy: RecoveryPolicy::Automatic,
}
}
}
impl NexarConfig {
/// Load config from environment variables, falling back to defaults.
///
/// Recognized variables:
/// - `NEXAR_COLLECTIVE_TIMEOUT_SECS`
/// - `NEXAR_BARRIER_TIMEOUT_SECS`
/// - `NEXAR_RPC_TIMEOUT_SECS`
/// - `NEXAR_LARGE_MSG_BYTES`
/// - `NEXAR_PIPELINE_SEGMENT_BYTES`
/// - `NEXAR_RING_MAX_WORLD`
/// - `NEXAR_ENABLE_TCP_BULK_SIDECAR` (default: true, set to "0" or "false" to disable)
/// - `NEXAR_ENCRYPT_BULK_TRANSPORT` (default: true, set to "0" or "false" to disable)
/// - `NEXAR_COMPRESSED_ALLREDUCE_MAX_BYTES` (default: 4 GiB, set to "0" to disable)
/// - `NEXAR_HEARTBEAT_INTERVAL_SECS` (default: 1)
/// - `NEXAR_HEARTBEAT_TIMEOUT_SECS` (default: 5)
/// - `NEXAR_RECOVERY_TIMEOUT_SECS` (default: 30)
/// - `NEXAR_RECOVERY_POLICY` (default: "automatic", options: "automatic"/"auto", "manual", "abort")
pub fn from_env() -> Self {
let mut cfg = Self::default();
if let Ok(v) = std::env::var("NEXAR_COLLECTIVE_TIMEOUT_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.collective_timeout = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_BARRIER_TIMEOUT_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.barrier_timeout = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_RPC_TIMEOUT_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.rpc_timeout = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_LARGE_MSG_BYTES")
&& let Ok(n) = v.parse::<usize>()
{
cfg.large_msg_bytes = n;
}
if let Ok(v) = std::env::var("NEXAR_PIPELINE_SEGMENT_BYTES")
&& let Ok(n) = v.parse::<usize>()
{
cfg.pipeline_segment_bytes = n;
}
if let Ok(v) = std::env::var("NEXAR_RING_MAX_WORLD")
&& let Ok(n) = v.parse::<usize>()
{
cfg.ring_max_world = n;
}
if let Ok(v) = std::env::var("NEXAR_ENABLE_TCP_BULK_SIDECAR") {
cfg.enable_tcp_bulk_sidecar = v != "0" && v.to_lowercase() != "false";
}
if let Ok(v) = std::env::var("NEXAR_ENCRYPT_BULK_TRANSPORT") {
cfg.encrypt_bulk_transport = v != "0" && v.to_lowercase() != "false";
}
if let Ok(v) = std::env::var("NEXAR_COMPRESSED_ALLREDUCE_MAX_BYTES")
&& let Ok(n) = v.parse::<usize>()
{
cfg.compressed_allreduce_max_bytes = n;
}
if let Ok(v) = std::env::var("NEXAR_HEARTBEAT_INTERVAL_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.heartbeat_interval = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_HEARTBEAT_TIMEOUT_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.heartbeat_timeout = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_RECOVERY_TIMEOUT_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.recovery_timeout = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_TOPOLOGY")
&& let Some(topo) = crate::cluster::sparse::parse_topology(&v)
{
cfg.topology = topo;
}
if let Ok(v) = std::env::var("NEXAR_ELASTIC_ENABLED") {
cfg.elastic_enabled = v != "0" && v.to_lowercase() != "false";
}
if let Ok(v) = std::env::var("NEXAR_ELASTIC_MIN_WORLD_SIZE")
&& let Ok(n) = v.parse::<u32>()
{
cfg.elastic_min_world_size = n;
}
if let Ok(v) = std::env::var("NEXAR_ELASTIC_MAX_WORLD_SIZE")
&& let Ok(n) = v.parse::<u32>()
{
cfg.elastic_max_world_size = n;
}
if let Ok(v) = std::env::var("NEXAR_ELASTIC_CHECKPOINT_TIMEOUT_SECS")
&& let Ok(s) = v.parse::<u64>()
{
cfg.elastic_checkpoint_timeout = Duration::from_secs(s);
}
if let Ok(v) = std::env::var("NEXAR_RECOVERY_POLICY") {
match v.to_lowercase().as_str() {
"automatic" | "auto" => cfg.recovery_policy = RecoveryPolicy::Automatic,
"manual" => cfg.recovery_policy = RecoveryPolicy::Manual,
"abort" => cfg.recovery_policy = RecoveryPolicy::Abort,
_ => {}
}
}
cfg
}
}