Skip to main content

monocoque_core/
buffer.rs

1use bytes::{Buf, Bytes, BytesMut};
2use std::collections::VecDeque;
3
4/// A segmented buffer optimized for zero-copy operations.
5///
6/// This buffer holds multiple `Bytes` segments and provides efficient
7/// extraction without unnecessary copies. When data is requested and
8/// fits within a single segment, it can be returned with zero copies
9/// (just a refcount increment on the underlying `Bytes`).
10///
11/// # Use Cases
12///
13/// - Protocol decoders reading frames from streaming data
14/// - Minimizing memcpy overhead for small messages (< 8KB)
15/// - Preserving arena-allocated buffer segments through the pipeline
16///
17/// # Tradeoffs
18///
19/// - **Fast path**: Single-segment extraction is O(1) with no copy
20/// - **Slow path**: Multi-segment extraction requires copying into contiguous buffer
21/// - For large messages spanning many reads, the copy cost is unavoidable
22#[derive(Debug, Default)]
23pub struct SegmentedBuffer {
24    segs: VecDeque<Bytes>,
25    len: usize,
26}
27
28impl SegmentedBuffer {
29    #[must_use]
30    pub const fn new() -> Self {
31        Self {
32            segs: VecDeque::new(),
33            len: 0,
34        }
35    }
36
37    #[inline]
38    #[must_use]
39    pub const fn len(&self) -> usize {
40        self.len
41    }
42
43    #[inline]
44    #[must_use]
45    pub const fn is_empty(&self) -> bool {
46        self.len == 0
47    }
48
49    #[inline]
50    pub fn push(&mut self, bytes: Bytes) {
51        if bytes.is_empty() {
52            return;
53        }
54        self.len += bytes.len();
55        self.segs.push_back(bytes);
56    }
57
58    /// Copy the first `n` bytes into `dst`.
59    ///
60    /// Returns `false` if fewer than `n` bytes are available.
61    pub fn copy_prefix(&self, n: usize, dst: &mut [u8]) -> bool {
62        if n > self.len || dst.len() < n {
63            return false;
64        }
65
66        let mut remaining = n;
67        let mut out_off = 0;
68        for seg in &self.segs {
69            if remaining == 0 {
70                break;
71            }
72            let take = remaining.min(seg.len());
73            dst[out_off..out_off + take].copy_from_slice(&seg[..take]);
74            out_off += take;
75            remaining -= take;
76        }
77        true
78    }
79
80    /// Advance the queue by `n` bytes, dropping fully-consumed segments.
81    ///
82    /// # Panics
83    ///
84    /// Panics if `n > self.len`.
85    pub fn advance(&mut self, mut n: usize) {
86        assert!(n <= self.len);
87        self.len -= n;
88
89        while n > 0 {
90            let Some(mut front) = self.segs.pop_front() else {
91                break;
92            };
93            if n >= front.len() {
94                n -= front.len();
95                continue;
96            }
97            // partially consumed
98            front.advance(n);
99            self.segs.push_front(front);
100            break;
101        }
102    }
103
104    /// Take exactly `n` bytes from the front of the queue.
105    ///
106    /// If the first segment contains all `n` bytes, this is zero-copy.
107    /// If the bytes span multiple segments, this copies into a contiguous buffer.
108    pub fn take_bytes(&mut self, n: usize) -> Option<Bytes> {
109        if n == 0 {
110            return Some(Bytes::new());
111        }
112        if n > self.len {
113            return None;
114        }
115
116        let Some(front) = self.segs.front_mut() else {
117            return None;
118        };
119
120        if front.len() >= n {
121            self.len -= n;
122            let out = front.split_to(n);
123            if front.is_empty() {
124                self.segs.pop_front();
125            }
126            return Some(out);
127        }
128
129        // Spans segments: copy.
130        let mut out = BytesMut::with_capacity(n);
131        let mut remaining = n;
132        while remaining > 0 {
133            let seg = self
134                .segs
135                .pop_front()
136                .expect("len check ensures segments exist");
137            let take = remaining.min(seg.len());
138            out.extend_from_slice(&seg[..take]);
139            remaining -= take;
140            self.len -= take;
141            if take < seg.len() {
142                let mut rest = seg;
143                rest.advance(take);
144                self.segs.push_front(rest);
145            }
146        }
147
148        Some(out.freeze())
149    }
150}