nexar 0.1.2

Distributed runtime with QUIC transport, stream-multiplexed messaging, and built-in collectives
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
use crossbeam_queue::ArrayQueue;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

/// Default small buffer capacity: 64 KiB.
const SMALL_BUF_CAPACITY: usize = 64 * 1024;

/// Default large buffer capacity: 8 MiB.
const LARGE_BUF_CAPACITY: usize = 8 * 1024 * 1024;

/// Default huge buffer capacity: 64 MiB.
const HUGE_BUF_CAPACITY: usize = 64 * 1024 * 1024;

/// Default giant buffer capacity: 256 MiB.
const GIANT_BUF_CAPACITY: usize = 256 * 1024 * 1024;

/// Workload profile that determines pool tier sizes.
///
/// Training and inference have fundamentally different buffer usage patterns.
/// Choosing the right profile avoids wasting memory (inference) or thrashing
/// allocations (training).
///
/// All profiles use **lazy allocation**: queues are created at construction
/// with the specified capacity limits, but buffers are only allocated on first
/// checkout and recycled on return. No memory is consumed until buffers are
/// actually needed.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolProfile {
    /// Optimized for distributed training (gradient allreduce, broadcast).
    ///
    /// Queue capacities (lazy, allocated on demand):
    /// - Small: up to 256 × 64 KiB (control messages)
    /// - Large: up to 32 × 8 MiB (embedding gradients)
    /// - Huge: up to 16 × 64 MiB (ring allreduce chunks)
    /// - Giant: up to 4 × 256 MiB (halving-doubling, broadcasts)
    Training,

    /// Optimized for distributed inference (pipeline parallelism, KV cache
    /// migration, MoE expert dispatch).
    ///
    /// Queue capacities (lazy, allocated on demand):
    /// - Small: up to 256 × 64 KiB (control, routing decisions)
    /// - Large: up to 16 × 8 MiB (MoE token dispatch, small activations)
    /// - Huge: up to 4 × 64 MiB (KV cache layers, pipeline activations)
    /// - Giant: 0 (no gradient chunks during inference)
    Inference,
}

/// Configuration for a single buffer tier.
#[derive(Debug, Clone, Copy)]
pub struct TierConfig {
    /// Maximum number of buffers to keep in this tier's queue.
    pub count: usize,
    /// Capacity of each buffer in bytes.
    pub capacity: usize,
}

/// Builder for custom buffer pool configurations.
///
/// Use this when the built-in [`PoolProfile`] variants don't fit your workload.
///
/// # Example
///
/// ```
/// use nexar::transport::buffer_pool::PoolBuilder;
/// use nexar::transport::buffer_pool::TierConfig;
///
/// let pool = PoolBuilder::new()
///     .small(TierConfig { count: 64, capacity: 32 * 1024 })
///     .large(TierConfig { count: 8, capacity: 4 * 1024 * 1024 })
///     .huge(TierConfig { count: 2, capacity: 32 * 1024 * 1024 })
///     .build();
/// ```
pub struct PoolBuilder {
    small: TierConfig,
    large: TierConfig,
    huge: TierConfig,
    giant: TierConfig,
}

impl PoolBuilder {
    /// Create a builder with minimal defaults (all tiers have count=1).
    pub fn new() -> Self {
        Self {
            small: TierConfig {
                count: 1,
                capacity: SMALL_BUF_CAPACITY,
            },
            large: TierConfig {
                count: 1,
                capacity: LARGE_BUF_CAPACITY,
            },
            huge: TierConfig {
                count: 1,
                capacity: HUGE_BUF_CAPACITY,
            },
            giant: TierConfig {
                count: 0,
                capacity: GIANT_BUF_CAPACITY,
            },
        }
    }

    /// Configure the small tier (default: 64 KiB buffers).
    pub fn small(mut self, config: TierConfig) -> Self {
        self.small = config;
        self
    }

    /// Configure the large tier (default: 8 MiB buffers).
    pub fn large(mut self, config: TierConfig) -> Self {
        self.large = config;
        self
    }

    /// Configure the huge tier (default: 64 MiB buffers).
    pub fn huge(mut self, config: TierConfig) -> Self {
        self.huge = config;
        self
    }

    /// Configure the giant tier (default: 256 MiB buffers).
    pub fn giant(mut self, config: TierConfig) -> Self {
        self.giant = config;
        self
    }

    /// Build the pool. All tiers start empty (lazy allocation).
    pub fn build(self) -> Arc<BufferPool> {
        BufferPool::from_tier_configs(self.small, self.large, self.huge, self.giant)
    }
}

impl Default for PoolBuilder {
    fn default() -> Self {
        Self::new()
    }
}

/// A tiered lock-free buffer pool with small, large, huge, and giant tiers.
///
/// Buffers are allocated **lazily**: the pool starts empty and buffers are
/// created on first checkout. When returned, they are recycled into the
/// appropriate tier queue for reuse. This means zero memory is consumed
/// until buffers are actually needed.
///
/// Use [`PoolProfile`] for preset configurations or [`PoolBuilder`] for
/// full control over tier counts and capacities.
///
/// Checkout picks the appropriate tier based on requested size. Return goes
/// back to the matching tier. Buffers that have grown beyond 4× their tier's
/// capacity are dropped instead of returned.
pub struct BufferPool {
    small: ArrayQueue<Vec<u8>>,
    large: ArrayQueue<Vec<u8>>,
    huge: ArrayQueue<Vec<u8>>,
    giant: ArrayQueue<Vec<u8>>,
    small_cap: usize,
    large_cap: usize,
    huge_cap: usize,
    giant_cap: usize,
}

impl BufferPool {
    /// Create a pool with the default training profile (lazy allocation).
    ///
    /// Equivalent to `BufferPool::with_profile(PoolProfile::Training)`.
    pub fn new() -> Arc<Self> {
        Self::with_profile(PoolProfile::Training)
    }

    /// Create a pool sized for the given workload profile (lazy allocation).
    pub fn with_profile(profile: PoolProfile) -> Arc<Self> {
        let (small_count, large_count, huge_count, giant_count) = match profile {
            PoolProfile::Training => (256, 32, 16, 4),
            PoolProfile::Inference => (256, 16, 4, 0),
        };
        Self::from_tier_configs(
            TierConfig {
                count: small_count,
                capacity: SMALL_BUF_CAPACITY,
            },
            TierConfig {
                count: large_count,
                capacity: LARGE_BUF_CAPACITY,
            },
            TierConfig {
                count: huge_count,
                capacity: HUGE_BUF_CAPACITY,
            },
            TierConfig {
                count: giant_count,
                capacity: GIANT_BUF_CAPACITY,
            },
        )
    }

    /// Create a pool with custom small tier sizes (primarily for testing).
    ///
    /// Large, huge, and giant tiers use minimal sizes to avoid excessive
    /// memory usage in test environments.
    pub fn with_config(small_pool_size: usize, small_buf_cap: usize) -> Arc<Self> {
        Self::from_tier_configs(
            TierConfig {
                count: small_pool_size,
                capacity: small_buf_cap,
            },
            TierConfig {
                count: 4,
                capacity: LARGE_BUF_CAPACITY,
            },
            TierConfig {
                count: 2,
                capacity: HUGE_BUF_CAPACITY,
            },
            TierConfig {
                count: 1,
                capacity: GIANT_BUF_CAPACITY,
            },
        )
    }

    fn from_tier_configs(
        small: TierConfig,
        large: TierConfig,
        huge: TierConfig,
        giant: TierConfig,
    ) -> Arc<Self> {
        Arc::new(Self {
            small: ArrayQueue::new(small.count.max(1)),
            large: ArrayQueue::new(large.count.max(1)),
            huge: ArrayQueue::new(huge.count.max(1)),
            giant: ArrayQueue::new(giant.count.max(1)),
            small_cap: small.capacity,
            large_cap: large.capacity,
            huge_cap: huge.capacity,
            giant_cap: giant.capacity,
        })
    }

    /// Check out a buffer, resized to `len` bytes (zeroed).
    ///
    /// Selects the appropriate tier based on the configured tier capacities.
    /// If the tier queue is empty, a fresh buffer is allocated (lazy).
    /// If `len` exceeds all tier capacities, an unpooled buffer is allocated.
    pub fn checkout(self: &Arc<Self>, len: usize) -> PooledBuf {
        let (queue, tier, capacity) = self.tier_for_size(len);
        let mut buf = match queue {
            Some(q) => q.pop().unwrap_or_else(|| Vec::with_capacity(capacity)),
            None => Vec::with_capacity(len),
        };
        buf.resize(len, 0);
        PooledBuf {
            buf: Some(buf),
            pool: Arc::clone(self),
            tier,
        }
    }

    /// Select the pool tier for a given buffer size.
    fn tier_for_size(&self, len: usize) -> (Option<&ArrayQueue<Vec<u8>>>, PoolTier, usize) {
        if len <= self.small_cap {
            (Some(&self.small), PoolTier::Small, self.small_cap)
        } else if len <= self.large_cap {
            (Some(&self.large), PoolTier::Large, self.large_cap)
        } else if len <= self.huge_cap {
            (Some(&self.huge), PoolTier::Huge, self.huge_cap)
        } else if len <= self.giant_cap {
            (Some(&self.giant), PoolTier::Giant, self.giant_cap)
        } else {
            (None, PoolTier::Unpooled, len)
        }
    }

    /// Return a buffer to the appropriate tier.
    fn return_buf(&self, mut buf: Vec<u8>, tier: PoolTier) {
        let (queue, max_cap) = match tier {
            PoolTier::Small => (Some(&self.small), self.small_cap * 4),
            PoolTier::Large => (Some(&self.large), self.large_cap * 4),
            PoolTier::Huge => (Some(&self.huge), self.huge_cap * 4),
            PoolTier::Giant => (Some(&self.giant), self.giant_cap * 4),
            PoolTier::Unpooled => (None, 0),
        };
        if let Some(q) = queue
            && buf.capacity() <= max_cap
        {
            buf.clear();
            let _ = q.push(buf);
        }
    }
}

/// Which pool tier a buffer belongs to.
#[derive(Debug, Clone, Copy)]
enum PoolTier {
    Small,
    Large,
    Huge,
    Giant,
    Unpooled,
}

/// A buffer checked out from a `BufferPool`. Derefs to `[u8]`.
/// On drop, the underlying `Vec` is cleared and returned to the appropriate tier.
pub struct PooledBuf {
    buf: Option<Vec<u8>>,
    pool: Arc<BufferPool>,
    tier: PoolTier,
}

impl PooledBuf {
    /// Wrap an externally-received `Vec<u8>` as a `PooledBuf`.
    ///
    /// The buffer will be returned to the pool's appropriate tier on drop.
    /// Useful for wrapping data received via non-QUIC transports (e.g., RDMA).
    pub fn from_vec(v: Vec<u8>, pool: Arc<BufferPool>) -> Self {
        let len = v.len();
        let (_, tier, _) = pool.tier_for_size(len);
        Self {
            buf: Some(v),
            pool,
            tier,
        }
    }
}

impl Deref for PooledBuf {
    type Target = [u8];

    fn deref(&self) -> &[u8] {
        // SAFETY invariant: `buf` is `Some` from construction until `Drop`.
        self.buf.as_ref().expect("PooledBuf used after drop")
    }
}

impl DerefMut for PooledBuf {
    fn deref_mut(&mut self) -> &mut [u8] {
        self.buf.as_mut().expect("PooledBuf used after drop")
    }
}

impl Drop for PooledBuf {
    fn drop(&mut self) {
        if let Some(buf) = self.buf.take() {
            self.pool.return_buf(buf, self.tier);
        }
    }
}

impl AsRef<[u8]> for PooledBuf {
    fn as_ref(&self) -> &[u8] {
        self
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_checkout_zeroed_and_deref_mut() {
        let pool = BufferPool::with_config(4, 1024);
        let mut buf = pool.checkout(100);
        assert_eq!(buf.len(), 100);
        assert!(buf.iter().all(|&b| b == 0));
        buf[0] = 0xAA;
        assert_eq!(buf[0], 0xAA);
    }

    #[test]
    fn test_exhaustion_fallback_and_return() {
        let pool = BufferPool::with_config(1, 64);
        let b1 = pool.checkout(10);
        let b2 = pool.checkout(10); // pool empty, allocates fresh
        assert_eq!(b2.len(), 10);
        drop(b1); // returns to pool
        drop(b2); // pool full, silently dropped — no panic
    }

    #[test]
    fn test_tier_selection() {
        let pool = BufferPool::new();
        let m = 1024 * 1024;
        // Each tier boundary: small (100), large (1M), huge (32M), giant (128M), unpooled (512M).
        for &size in &[100, m, 32 * m, 128 * m, 512 * m] {
            let buf = pool.checkout(size);
            assert_eq!(buf.len(), size);
        }
    }

    fn assert_checkout(pool: &Arc<BufferPool>, sizes: &[usize]) {
        let bufs: Vec<_> = sizes
            .iter()
            .map(|&s| {
                let b = pool.checkout(s);
                assert_eq!(b.len(), s);
                b
            })
            .collect();
        drop(bufs);
    }

    #[test]
    fn test_workload_profiles() {
        let m = 1024 * 1024;
        // Training: ring allreduce chunks + halving-doubling + broadcast.
        assert_checkout(
            &BufferPool::new(),
            &[14 * m, 14 * m, 14 * m, 208 * m, 48 * m],
        );
        // Inference: KV layers + MoE dispatches + pipeline activation.
        assert_checkout(
            &BufferPool::with_profile(PoolProfile::Inference),
            &[16 * m, 16 * m, 64 * m, m, m, m, m, 128 * m],
        );
    }

    #[test]
    fn test_lazy_allocation() {
        let pool = BufferPool::with_profile(PoolProfile::Training);
        let buf = pool.checkout(1024);
        assert_eq!(buf.len(), 1024);
        drop(buf);
        // Reuses returned buffer.
        let buf2 = pool.checkout(512);
        assert_eq!(buf2.len(), 512);
    }

    #[test]
    fn test_pool_builder() {
        let pool = PoolBuilder::new()
            .small(TierConfig {
                count: 4,
                capacity: 1024,
            })
            .large(TierConfig {
                count: 2,
                capacity: 1024 * 1024,
            })
            .build();
        let buf = pool.checkout(500);
        assert_eq!(buf.len(), 500);

        let pool2 = PoolBuilder::default().build();
        let buf2 = pool2.checkout(100);
        assert_eq!(buf2.len(), 100);
    }
}