Skip to main content

nexus_net/buf/
read_buf.rs

1/// Flat byte slab for inbound protocol parsing.
2///
3/// The I/O layer reads bytes into the slab via [`spare()`](ReadBuf::spare) +
4/// [`filled()`](ReadBuf::filled). The protocol parser walks it via
5/// [`data()`](ReadBuf::data) + [`advance()`](ReadBuf::advance). When all
6/// data is consumed, cursors reset to the pre-padding offset — no memmove.
7///
8/// Fixed capacity. No growth.
9///
10/// # Layout
11///
12/// ```text
13/// [ pre_padding | usable capacity                        | post_padding ]
14///               ^                                        ^
15///               head/tail start here                     end of usable
16/// ```
17///
18/// Pre-padding: reserved bytes before the data region. Protocol layers
19/// can use this for header reassembly (e.g., uWS writes spilled header
20/// bytes backward into pre-padding). Accessible via [`pre_padding_mut()`](ReadBuf::pre_padding_mut).
21///
22/// Post-padding: reserved bytes after the data region. SIMD operations
23/// may overrun by up to alignment width. ReadBuf guarantees this space
24/// exists but doesn't manage it.
25///
26/// # Examples
27///
28/// ```
29/// use nexus_net::buf::ReadBuf;
30///
31/// let mut buf = ReadBuf::with_capacity(4096);
32///
33/// // I/O: read bytes into spare region
34/// let spare = buf.spare();
35/// spare[..5].copy_from_slice(b"Hello");
36/// buf.filled(5);
37///
38/// // Parse: consume from data region
39/// assert_eq!(buf.data(), b"Hello");
40/// buf.advance(5);
41/// assert!(buf.is_empty()); // cursors auto-reset
42/// ```
43pub struct ReadBuf {
44    buf: Box<[u8]>,
45    head: usize,
46    tail: usize,
47    capacity: usize,
48    pre_padding: usize,
49}
50
51impl ReadBuf {
52    /// Create a ReadBuf with explicit padding.
53    ///
54    /// Total allocation: `pre_padding + capacity + post_padding`.
55    /// The buffer is fixed-size — it never reallocates.
56    #[must_use]
57    pub fn new(capacity: usize, pre_padding: usize, post_padding: usize) -> Self {
58        let total = pre_padding + capacity + post_padding;
59        Self {
60            buf: vec![0u8; total].into_boxed_slice(),
61            head: pre_padding,
62            tail: pre_padding,
63            capacity,
64            pre_padding,
65        }
66    }
67
68    /// Convenience: capacity only, zero padding.
69    #[must_use]
70    pub fn with_capacity(capacity: usize) -> Self {
71        Self::new(capacity, 0, 0)
72    }
73
74    // =========================================================================
75    // Read side (parser)
76    // =========================================================================
77
78    /// Unconsumed bytes. Always a single contiguous slice.
79    #[inline]
80    pub fn data(&self) -> &[u8] {
81        &self.buf[self.head..self.tail]
82    }
83
84    /// Mutable access to unconsumed bytes.
85    /// For in-place operations like XOR unmasking.
86    #[inline]
87    pub fn data_mut(&mut self) -> &mut [u8] {
88        &mut self.buf[self.head..self.tail]
89    }
90
91    /// Consume `n` bytes from the front.
92    ///
93    /// If the buffer becomes empty after advance, resets head and tail
94    /// to `pre_padding` offset (free — no memmove, just cursor reset).
95    ///
96    /// # Panics
97    /// Panics if `n > self.len()`.
98    #[inline]
99    pub fn advance(&mut self, n: usize) {
100        if n > self.len() {
101            Self::panic_advance(n, self.len());
102        }
103        self.head += n;
104        if self.head == self.tail {
105            self.head = self.pre_padding;
106            self.tail = self.pre_padding;
107        }
108    }
109
110    // =========================================================================
111    // Write side (I/O layer)
112    // =========================================================================
113
114    /// Writable tail region for direct socket reads.
115    ///
116    /// ```ignore
117    /// let n = socket.read(buf.spare())?;
118    /// buf.filled(n);
119    /// ```
120    ///
121    /// Returns `buf[tail .. pre_padding + capacity]`.
122    /// May be empty if tail has reached the capacity boundary.
123    #[inline]
124    pub fn spare(&mut self) -> &mut [u8] {
125        let end = self.pre_padding + self.capacity;
126        &mut self.buf[self.tail..end]
127    }
128
129    /// Commit `n` bytes written into [`spare()`](Self::spare).
130    ///
131    /// # Panics
132    /// Panics if `n` would push tail past capacity boundary.
133    #[inline]
134    pub fn filled(&mut self, n: usize) {
135        let new_tail = self.tail + n;
136        let end = self.pre_padding + self.capacity;
137        if new_tail > end {
138            Self::panic_filled(n, self.tail, end);
139        }
140        self.tail = new_tail;
141    }
142
143    // =========================================================================
144    // Padding access
145    // =========================================================================
146
147    /// Mutable access to the pre-padding region (bytes before head).
148    ///
149    /// Returns `buf[0..head]`. Protocol layers can use this for header
150    /// reassembly — e.g., writing spilled header bytes backward so the
151    /// parser sees a contiguous header without memmove.
152    ///
153    /// The returned slice includes both the original pre-padding AND any
154    /// consumed-but-not-reset space before head.
155    #[inline]
156    pub fn pre_padding_mut(&mut self) -> &mut [u8] {
157        &mut self.buf[..self.head]
158    }
159
160    // =========================================================================
161    // Capacity queries
162    // =========================================================================
163
164    /// Bytes of unconsumed data.
165    #[inline]
166    pub fn len(&self) -> usize {
167        self.tail - self.head
168    }
169
170    /// Whether the buffer is empty.
171    #[inline]
172    pub fn is_empty(&self) -> bool {
173        self.head == self.tail
174    }
175
176    /// Usable capacity (excluding padding).
177    #[inline]
178    pub fn capacity(&self) -> usize {
179        self.capacity
180    }
181
182    /// Writable space at the tail (same as `spare().len()`).
183    #[inline]
184    pub fn remaining(&self) -> usize {
185        self.pre_padding + self.capacity - self.tail
186    }
187
188    /// Discard all data. Cursors reset to pre_padding offset.
189    pub fn clear(&mut self) {
190        self.head = self.pre_padding;
191        self.tail = self.pre_padding;
192    }
193
194    /// Bytes consumed from the front (how far head has advanced from start).
195    ///
196    /// Useful for determining when to proactively [`compact()`](Self::compact).
197    #[inline]
198    pub fn consumed(&self) -> usize {
199        self.head - self.pre_padding
200    }
201
202    /// Reclaim consumed space by moving unconsumed data to the front.
203    ///
204    /// Call when [`spare()`](Self::spare) is empty but there is consumed
205    /// space before `head` that can be reclaimed. The unconsumed data
206    /// (typically a partial frame) is moved to the pre_padding offset.
207    ///
208    /// Cost: O(unconsumed bytes). For a partial frame this is typically
209    /// a few hundred bytes — under 100ns.
210    ///
211    /// No-op if the buffer is already at the front or empty.
212    pub fn compact(&mut self) {
213        if self.head <= self.pre_padding || self.head == self.tail {
214            return; // already at front, or empty (auto-reset handles empty)
215        }
216        let len = self.tail - self.head;
217        self.buf.copy_within(self.head..self.tail, self.pre_padding);
218        self.head = self.pre_padding;
219        self.tail = self.pre_padding + len;
220    }
221
222    #[cold]
223    #[inline(never)]
224    fn panic_advance(n: usize, len: usize) -> ! {
225        panic!("advance({n}) exceeds buffered data ({len})")
226    }
227
228    #[cold]
229    #[inline(never)]
230    fn panic_filled(n: usize, tail: usize, end: usize) -> ! {
231        panic!("filled({n}) would exceed capacity (tail={tail}, end={end})")
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn new_allocation_size() {
241        let buf = ReadBuf::new(100, 16, 4);
242        assert_eq!(buf.buf.len(), 120);
243        assert_eq!(buf.capacity(), 100);
244        assert_eq!(buf.len(), 0);
245        assert_eq!(buf.remaining(), 100);
246    }
247
248    #[test]
249    fn with_capacity_zero_padding() {
250        let buf = ReadBuf::with_capacity(4096);
251        assert_eq!(buf.capacity(), 4096);
252        assert_eq!(buf.buf.len(), 4096); // no padding
253        assert_eq!(buf.remaining(), 4096);
254        assert_eq!(buf.pre_padding, 0);
255    }
256
257    #[test]
258    fn spare_filled_data() {
259        let mut buf = ReadBuf::with_capacity(64);
260        buf.spare()[..5].copy_from_slice(b"Hello");
261        buf.filled(5);
262        assert_eq!(buf.data(), b"Hello");
263        assert_eq!(buf.len(), 5);
264        assert_eq!(buf.remaining(), 59);
265    }
266
267    #[test]
268    fn advance_consumes() {
269        let mut buf = ReadBuf::with_capacity(64);
270        buf.spare()[..10].copy_from_slice(b"HelloWorld");
271        buf.filled(10);
272        buf.advance(5);
273        assert_eq!(buf.data(), b"World");
274        assert_eq!(buf.len(), 5);
275    }
276
277    #[test]
278    fn advance_auto_reset() {
279        let mut buf = ReadBuf::with_capacity(64);
280        buf.spare()[..5].copy_from_slice(b"Hello");
281        buf.filled(5);
282        buf.advance(5);
283        assert!(buf.is_empty());
284        assert_eq!(buf.remaining(), 64);
285    }
286
287    #[test]
288    fn data_mut_in_place() {
289        let mut buf = ReadBuf::with_capacity(64);
290        buf.spare()[..4].copy_from_slice(&[0x00, 0x01, 0x02, 0x03]);
291        buf.filled(4);
292        for b in buf.data_mut().iter_mut() {
293            *b ^= 0xFF;
294        }
295        assert_eq!(buf.data(), &[0xFF, 0xFE, 0xFD, 0xFC]);
296    }
297
298    #[test]
299    fn pre_padding_mut_accessible() {
300        let mut buf = ReadBuf::new(64, 16, 4);
301        buf.spare()[..5].copy_from_slice(b"World");
302        buf.filled(5);
303
304        // Pre-padding is 16 bytes before head
305        let padding = buf.pre_padding_mut();
306        assert_eq!(padding.len(), 16);
307
308        // Write header bytes backward
309        padding[12..16].copy_from_slice(b"Hdr:");
310
311        // After consuming data, pre-padding grows
312        buf.advance(3);
313        assert_eq!(buf.pre_padding_mut().len(), 19); // 16 + 3 consumed
314    }
315
316    #[test]
317    fn pre_padding_mut_after_advance() {
318        let mut buf = ReadBuf::new(64, 8, 0);
319        buf.spare()[..10].copy_from_slice(b"0123456789");
320        buf.filled(10);
321        buf.advance(5);
322
323        // Pre-padding should be 8 + 5 consumed = 13
324        let padding = buf.pre_padding_mut();
325        assert_eq!(padding.len(), 13);
326    }
327
328    #[test]
329    fn remaining_tracks() {
330        let mut buf = ReadBuf::with_capacity(32);
331        assert_eq!(buf.remaining(), 32);
332        buf.spare()[..10].copy_from_slice(&[0; 10]);
333        buf.filled(10);
334        assert_eq!(buf.remaining(), 22);
335        buf.advance(10);
336        assert_eq!(buf.remaining(), 32); // auto-reset
337    }
338
339    #[test]
340    fn clear_resets() {
341        let mut buf = ReadBuf::with_capacity(64);
342        buf.spare()[..10].copy_from_slice(&[0; 10]);
343        buf.filled(10);
344        buf.clear();
345        assert!(buf.is_empty());
346        assert_eq!(buf.remaining(), 64);
347    }
348
349    #[test]
350    fn spare_when_full() {
351        let mut buf = ReadBuf::with_capacity(8);
352        buf.spare()[..8].copy_from_slice(&[0; 8]);
353        buf.filled(8);
354        assert!(buf.spare().is_empty());
355        assert_eq!(buf.remaining(), 0);
356    }
357
358    #[test]
359    fn zero_length_operations() {
360        let mut buf = ReadBuf::with_capacity(32);
361        buf.filled(0);
362        assert!(buf.is_empty());
363        buf.advance(0);
364        assert!(buf.is_empty());
365    }
366
367    #[test]
368    fn large_capacity_smoke() {
369        let mut buf = ReadBuf::with_capacity(256 * 1024);
370        let data: Vec<u8> = (0..1024).map(|i| (i & 0xFF) as u8).collect();
371        buf.spare()[..1024].copy_from_slice(&data);
372        buf.filled(1024);
373        assert_eq!(buf.data(), data.as_slice());
374        buf.advance(512);
375        assert_eq!(buf.data(), &data[512..]);
376        buf.advance(512);
377        assert!(buf.is_empty());
378        assert_eq!(buf.remaining(), 256 * 1024);
379    }
380
381    #[test]
382    fn multiple_write_advance_cycles() {
383        let mut buf = ReadBuf::with_capacity(32);
384        for i in 0u8..10 {
385            buf.spare()[..4].copy_from_slice(&[i; 4]);
386            buf.filled(4);
387            assert_eq!(buf.data(), &[i; 4]);
388            buf.advance(4);
389            assert!(buf.is_empty());
390        }
391    }
392
393    #[test]
394    #[should_panic(expected = "advance")]
395    fn advance_exceeds_data() {
396        let mut buf = ReadBuf::with_capacity(32);
397        buf.spare()[..5].copy_from_slice(b"Hello");
398        buf.filled(5);
399        buf.advance(10);
400    }
401
402    #[test]
403    #[should_panic(expected = "filled")]
404    fn filled_exceeds_capacity() {
405        let mut buf = ReadBuf::with_capacity(8);
406        buf.filled(16);
407    }
408
409    #[test]
410    fn partial_advance_then_fill() {
411        let mut buf = ReadBuf::with_capacity(32);
412        buf.spare()[..10].copy_from_slice(b"0123456789");
413        buf.filled(10);
414        buf.advance(5);
415        assert_eq!(buf.data(), b"56789");
416
417        buf.spare()[..3].copy_from_slice(b"ABC");
418        buf.filled(3);
419        assert_eq!(buf.data(), b"56789ABC");
420    }
421
422    #[test]
423    fn head_drift_and_compact() {
424        let mut buf = ReadBuf::with_capacity(32);
425
426        // Fill 20 bytes
427        buf.spare()[..20].copy_from_slice(&[0xAA; 20]);
428        buf.filled(20);
429
430        // Consume 15 — leaves 5 bytes unconsumed, head at 15
431        buf.advance(15);
432        assert_eq!(buf.len(), 5);
433        assert_eq!(buf.remaining(), 12);
434
435        // Fill 12 more — tail at 32, head at 15
436        buf.spare()[..12].copy_from_slice(&[0xBB; 12]);
437        buf.filled(12);
438        assert_eq!(buf.remaining(), 0); // full!
439        assert_eq!(buf.len(), 17);
440
441        // Consume 10 — head at 25, tail at 32
442        buf.advance(10);
443        assert_eq!(buf.len(), 7);
444        assert_eq!(buf.remaining(), 0); // tail at capacity, can't write
445
446        // spare() is empty
447        assert!(buf.spare().is_empty());
448
449        // Compact reclaims consumed space — moves 7 bytes to front
450        buf.compact();
451        assert_eq!(buf.len(), 7); // data preserved
452        assert_eq!(buf.remaining(), 25); // 32 - 7 = 25 bytes reclaimed
453
454        // Data is intact after compaction
455        assert_eq!(&buf.data()[..5], &[0xBB; 5]); // last 5 of the BB fill
456    }
457
458    #[test]
459    fn compact_noop_when_at_front() {
460        let mut buf = ReadBuf::with_capacity(32);
461        buf.spare()[..10].copy_from_slice(&[0x42; 10]);
462        buf.filled(10);
463        // head is at front — compact is a no-op
464        buf.compact();
465        assert_eq!(buf.len(), 10);
466        assert_eq!(buf.remaining(), 22);
467    }
468
469    #[test]
470    fn compact_noop_when_empty() {
471        let mut buf = ReadBuf::with_capacity(32);
472        buf.compact(); // empty — no-op
473        assert_eq!(buf.remaining(), 32);
474    }
475
476    #[test]
477    fn with_padding_smoke() {
478        let mut buf = ReadBuf::new(64, 16, 32);
479        assert_eq!(buf.buf.len(), 112); // 16 + 64 + 32
480        assert_eq!(buf.capacity(), 64);
481        assert_eq!(buf.remaining(), 64);
482
483        buf.spare()[..10].copy_from_slice(b"0123456789");
484        buf.filled(10);
485        assert_eq!(buf.data(), b"0123456789");
486    }
487}