Skip to main content

arcbox_datapath/
frame_buf.rs

1//! Owned frame buffer backed by [`PacketPool`] or a heap `Vec<u8>`.
2//!
3//! [`FrameBuf`] is the datapath's unit of frame ownership. It replaces raw
4//! `Vec<u8>` on the hot path with a pool-backed allocation that avoids the
5//! system allocator entirely (O(1) atomic CAS vs ~50-100 ns malloc).
6//!
7//! Code that produces frames from external sources (handshake synthesizer,
8//! DNS replies, ARP construction) can still use `FrameBuf::from(vec)` to
9//! wrap an existing `Vec<u8>` without copying. The datapath treats both
10//! variants uniformly via `Deref<Target = [u8]>`.
11
12use std::ops::Deref;
13use std::sync::Arc;
14
15use super::pool::PacketPool;
16
17/// A frame buffer backed by either the [`PacketPool`] or a heap `Vec`.
18///
19/// - `Pooled`: data lives in a pre-allocated [`PacketPool`] slot. On drop,
20///   the slot is returned to the pool (lock-free). This is the fast path.
21/// - `Heap`: data lives in a regular `Vec<u8>`. Used for frames that
22///   originate outside the pool (handshake replies, manually constructed
23///   frames).
24pub enum FrameBuf {
25    /// Pool-backed frame. Automatically freed on drop.
26    Pooled {
27        pool: Arc<PacketPool>,
28        index: u32,
29        len: u32,
30    },
31    /// Heap-backed frame (fallback for non-pool sources).
32    Heap(Vec<u8>),
33}
34
35impl FrameBuf {
36    /// Allocates a frame from the pool and copies `data` into it.
37    ///
38    /// Falls back to `Heap` if the pool is exhausted.
39    pub fn from_pool(pool: &Arc<PacketPool>, data: &[u8]) -> Self {
40        if let Some(mut pkt) = pool.alloc() {
41            if pkt.copy_from_slice(data).is_ok() {
42                let index = pkt.into_index();
43                return Self::Pooled {
44                    pool: Arc::clone(pool),
45                    index,
46                    len: data.len() as u32,
47                };
48            }
49            // copy_from_slice failed (data > MAX_PACKET_SIZE) — fall through.
50        }
51        // Pool exhausted or frame too large — fall back to heap.
52        Self::Heap(data.to_vec())
53    }
54
55    /// Returns the frame data length.
56    #[inline]
57    pub fn len(&self) -> usize {
58        match self {
59            Self::Pooled { len, .. } => *len as usize,
60            Self::Heap(v) => v.len(),
61        }
62    }
63
64    /// Returns true if the frame is empty.
65    #[inline]
66    pub fn is_empty(&self) -> bool {
67        self.len() == 0
68    }
69
70    /// Returns true if this frame is pool-backed.
71    #[inline]
72    pub fn is_pooled(&self) -> bool {
73        matches!(self, Self::Pooled { .. })
74    }
75}
76
77impl Deref for FrameBuf {
78    type Target = [u8];
79
80    #[inline]
81    fn deref(&self) -> &[u8] {
82        match self {
83            Self::Pooled { pool, index, len } => {
84                // SAFETY: The buffer at `index` is allocated (we hold ownership)
85                // and no other code has a mutable reference.
86                let buf = unsafe { pool.get(*index) };
87                &buf.as_full_slice()[..*len as usize]
88            }
89            Self::Heap(v) => v,
90        }
91    }
92}
93
94impl AsRef<[u8]> for FrameBuf {
95    #[inline]
96    fn as_ref(&self) -> &[u8] {
97        self
98    }
99}
100
101impl Drop for FrameBuf {
102    fn drop(&mut self) {
103        if let Self::Pooled { pool, index, .. } = self {
104            // SAFETY: We own this buffer slot exclusively. Returning it to the
105            // pool makes it available for reuse.
106            unsafe { pool.free_by_index(*index) };
107        }
108        // Heap variant: Vec<u8> drops normally.
109    }
110}
111
112impl From<Vec<u8>> for FrameBuf {
113    #[inline]
114    fn from(v: Vec<u8>) -> Self {
115        Self::Heap(v)
116    }
117}
118
119impl std::fmt::Debug for FrameBuf {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        match self {
122            Self::Pooled { index, len, .. } => f
123                .debug_struct("FrameBuf::Pooled")
124                .field("index", index)
125                .field("len", len)
126                .finish(),
127            Self::Heap(v) => f
128                .debug_struct("FrameBuf::Heap")
129                .field("len", &v.len())
130                .finish(),
131        }
132    }
133}
134
135// Compile-time assertion: FrameBuf must be Send (passed through channels).
136// Arc<PacketPool> is Send (PacketPool is Send+Sync), Vec<u8> is Send,
137// so FrameBuf derives Send automatically — no unsafe impl needed.
138const _: () = {
139    const fn assert_send<T: Send>() {}
140    assert_send::<FrameBuf>();
141};
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    fn make_pool() -> Arc<PacketPool> {
148        Arc::new(PacketPool::new(16).unwrap())
149    }
150
151    #[test]
152    fn test_from_pool() {
153        let pool = make_pool();
154        let data = [0xAB; 100];
155        let buf = FrameBuf::from_pool(&pool, &data);
156
157        assert!(buf.is_pooled());
158        assert_eq!(buf.len(), 100);
159        assert_eq!(&buf[..], &data[..]);
160        assert_eq!(pool.free_count(), 15);
161
162        drop(buf);
163        assert_eq!(pool.free_count(), 16);
164    }
165
166    #[test]
167    fn test_from_vec() {
168        let buf = FrameBuf::from(vec![1, 2, 3]);
169        assert!(!buf.is_pooled());
170        assert_eq!(buf.len(), 3);
171        assert_eq!(&buf[..], &[1, 2, 3]);
172    }
173
174    #[test]
175    fn test_pool_exhaustion_fallback() {
176        let pool = Arc::new(PacketPool::new(1).unwrap());
177
178        let _held = FrameBuf::from_pool(&pool, &[0; 10]);
179        assert_eq!(pool.free_count(), 0);
180
181        // Pool exhausted — should fall back to heap.
182        let fallback = FrameBuf::from_pool(&pool, &[0xFF; 20]);
183        assert!(!fallback.is_pooled());
184        assert_eq!(fallback.len(), 20);
185    }
186
187    #[test]
188    fn test_deref_as_slice() {
189        let pool = make_pool();
190        let buf = FrameBuf::from_pool(&pool, &[10, 20, 30]);
191        let slice: &[u8] = &buf;
192        assert_eq!(slice, &[10, 20, 30]);
193    }
194
195    #[test]
196    fn test_as_ref() {
197        let buf = FrameBuf::from(vec![4, 5, 6]);
198        let r: &[u8] = buf.as_ref();
199        assert_eq!(r, &[4, 5, 6]);
200    }
201
202    #[test]
203    fn test_empty() {
204        let buf = FrameBuf::from(Vec::new());
205        assert!(buf.is_empty());
206        assert_eq!(buf.len(), 0);
207    }
208
209    #[test]
210    fn test_send() {
211        fn assert_send<T: Send>() {}
212        assert_send::<FrameBuf>();
213    }
214
215    #[test]
216    fn test_concurrent_alloc_drop() {
217        let pool = Arc::new(PacketPool::new(64).unwrap());
218        let handles: Vec<_> = (0..4)
219            .map(|_| {
220                let pool = Arc::clone(&pool);
221                std::thread::spawn(move || {
222                    for i in 0..500 {
223                        let data = vec![(i % 256) as u8; 100];
224                        let buf = FrameBuf::from_pool(&pool, &data);
225                        assert_eq!(buf.len(), 100);
226                        assert_eq!(buf[0], (i % 256) as u8);
227                        // buf dropped here — returned to pool
228                    }
229                })
230            })
231            .collect();
232
233        for h in handles {
234            h.join().unwrap();
235        }
236        assert_eq!(pool.free_count(), 64);
237    }
238}