Skip to main content

dynomite/io/
mbuf.rs

1//! Pooled fixed-size byte buffers.
2//!
3//! The C engine uses chained `struct mbuf` chunks for every connection's
4//! read and write paths. Each chunk has a read cursor (`pos`), a write
5//! cursor (`last`), a writable end boundary (`end`), and an extra
6//! trailing region (`end_extra`) that downstream code uses for crypto
7//! padding and overflow guards. Chunks are recycled through a global
8//! free list so steady-state traffic does not call into the allocator.
9//!
10//! This module reproduces the same shape in safe Rust:
11//!
12//! * [`Mbuf`] owns a [`Box<[u8]>`] of `chunk_size` bytes plus the four
13//!   cursors that track readable, writable, and reserved regions.
14//! * [`MbufQueue`] is a tail-queue of chunks. The C list head is
15//!   `struct mhdr`; the Rust container is a [`VecDeque`].
16//! * [`MbufPool`] owns the recycled buffers behind a parking-lot
17//!   mutex and is shared across worker tasks via [`Arc`].
18//!
19//! The default chunk size is [`MBUF_SIZE`] (16 KiB), tunable through
20//! [`MbufPool::new`]. The configurable range is
21//! `[MBUF_MIN_SIZE, MBUF_MAX_SIZE]`. The trailing [`MBUF_ESIZE`] bytes
22//! of every chunk are reserved for the crypto MAC region used by Stage
23//! 13 traffic; normal writes stop at [`Mbuf::capacity`].
24//!
25//! # Examples
26//!
27//! ```
28//! use dynomite::io::mbuf::{Mbuf, MbufPool, MbufQueue};
29//!
30//! let pool = MbufPool::default();
31//! let mut buf = pool.get();
32//! assert_eq!(buf.recv(b"hello"), 5);
33//! assert_eq!(buf.len(), 5);
34//!
35//! let mut out = [0u8; 5];
36//! assert_eq!(buf.send(&mut out), 5);
37//! assert_eq!(&out, b"hello");
38//!
39//! let mut q = MbufQueue::new();
40//! q.push_back(buf);
41//! assert_eq!(q.len(), 1);
42//! ```
43
44use std::collections::VecDeque;
45use std::sync::Arc;
46
47use parking_lot::Mutex;
48
49/// Default mbuf chunk size in bytes. Mirrors the reference `MBUF_SIZE`.
50pub const MBUF_SIZE: usize = 16384;
51
52/// Minimum permitted mbuf chunk size. Mirrors the reference
53/// `MBUF_MIN_SIZE`.
54pub const MBUF_MIN_SIZE: usize = 512;
55
56/// Maximum permitted mbuf chunk size. Mirrors the reference
57/// `MBUF_MAX_SIZE`.
58pub const MBUF_MAX_SIZE: usize = 512_000;
59
60/// Bytes reserved at the tail of every chunk for crypto padding and
61/// overflow guards. Mirrors the reference `MBUF_ESIZE`.
62pub const MBUF_ESIZE: usize = 16;
63
64/// Maximum number of free chunks the pool retains before dropping
65/// returned buffers on the floor.
66///
67/// The C engine has no hard cap on the free list because the worker
68/// loop bounds the working set implicitly. The Rust port adds an
69/// upper bound so a misbehaving caller cannot drive unbounded memory
70/// growth. The default tracks the connection-budget order-of-magnitude
71/// the reference docs cite (4096 connections * 4 chunks per direction).
72pub const MBUF_POOL_MAX_FREE: usize = 16384;
73
74/// Connection identifier carried by an mbuf to mark the conn that owns
75/// it. The reactor stamps this when it hands a chunk to a connection
76/// state machine; the chunk pool itself never inspects the value.
77pub type OwnerId = u64;
78
79/// A single chunk of a connection's I/O buffer chain.
80///
81/// An `Mbuf` exposes three regions:
82///
83/// * `[0, pos)` - already-consumed bytes (drained by the parser).
84/// * `[pos, last)` - readable bytes (parser input or pre-flight write).
85/// * `[last, end)` - writable bytes (target of [`recv`](Self::recv)).
86///
87/// The slice `[end, end_extra)` of length [`MBUF_ESIZE`] is reserved
88/// for crypto MAC and overflow guards. It is not visible through the
89/// normal write API.
90pub struct Mbuf {
91    buf: Box<[u8]>,
92    pos: usize,
93    last: usize,
94    end: usize,
95    flags: u32,
96    owner: Option<OwnerId>,
97}
98
99/// Bit flag set when the buffer has been flipped from write to read
100/// orientation. Mirrors `MBUF_FLAGS_READ_FLIP`.
101pub const MBUF_FLAG_READ_FLIP: u32 = 0x0000_0001;
102
103/// Bit flag set when the contents have just been decrypted. Mirrors
104/// `MBUF_FLAGS_JUST_DECRYPTED`.
105pub const MBUF_FLAG_JUST_DECRYPTED: u32 = 0x0000_0002;
106
107impl Mbuf {
108    /// Allocate a fresh chunk of the given total size. Used by the
109    /// pool to fill its free list and by callers that need a one-off
110    /// non-pooled buffer (the Stage 13 entropy path is a planned
111    /// consumer).
112    ///
113    /// `chunk_size` must be in `[MBUF_MIN_SIZE, MBUF_MAX_SIZE]`.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE};
119    /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
120    /// assert_eq!(buf.chunk_size(), MBUF_SIZE);
121    /// assert!(buf.is_empty());
122    /// ```
123    pub fn with_chunk_size(chunk_size: usize) -> Self {
124        assert!(
125            (MBUF_MIN_SIZE..=MBUF_MAX_SIZE).contains(&chunk_size),
126            "mbuf chunk_size {chunk_size} outside [{MBUF_MIN_SIZE}, {MBUF_MAX_SIZE}]"
127        );
128        let buf = vec![0u8; chunk_size].into_boxed_slice();
129        let end = chunk_size - MBUF_ESIZE;
130        Self {
131            buf,
132            pos: 0,
133            last: 0,
134            end,
135            flags: 0,
136            owner: None,
137        }
138    }
139
140    /// Total byte length of the chunk allocation.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE};
146    /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
147    /// assert_eq!(buf.chunk_size(), MBUF_SIZE);
148    /// ```
149    pub fn chunk_size(&self) -> usize {
150        self.buf.len()
151    }
152
153    /// Number of bytes that can be written before [`is_full`](Self::is_full)
154    /// trips, equal to `chunk_size - MBUF_ESIZE`. Mirrors `mbuf_data_size`.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE, MBUF_ESIZE};
160    /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
161    /// assert_eq!(buf.data_size(), MBUF_SIZE - MBUF_ESIZE);
162    /// ```
163    pub fn data_size(&self) -> usize {
164        self.end
165    }
166
167    /// Total byte addressable region including the trailing extra
168    /// area, equal to `chunk_size`. Mirrors `mbuf_full_data_size` from
169    /// the Stage 2 plan and the underlying `end_extra` boundary in the
170    /// C struct.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE};
176    /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
177    /// assert_eq!(buf.usable_capacity(), MBUF_SIZE);
178    /// ```
179    pub fn usable_capacity(&self) -> usize {
180        self.buf.len()
181    }
182
183    /// Bytes currently available to read from the buffer
184    /// (`last - pos`). Mirrors `mbuf_length`.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// use dynomite::io::mbuf::Mbuf;
190    /// let mut buf = Mbuf::with_chunk_size(1024);
191    /// buf.recv(b"abc");
192    /// assert_eq!(buf.len(), 3);
193    /// ```
194    pub fn len(&self) -> usize {
195        self.last - self.pos
196    }
197
198    /// Maximum number of bytes the buffer can ever hold (the writable
199    /// region size, equal to [`data_size`](Self::data_size)). Mirrors
200    /// `mbuf_size` in the reference's higher-level usage.
201    pub fn capacity(&self) -> usize {
202        self.end
203    }
204
205    /// Bytes still writable into the buffer (`end - last`). Mirrors
206    /// `mbuf_remaining_space`.
207    ///
208    /// # Examples
209    ///
210    /// ```
211    /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE, MBUF_ESIZE};
212    /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
213    /// assert_eq!(buf.remaining(), MBUF_SIZE - MBUF_ESIZE);
214    /// ```
215    pub fn remaining(&self) -> usize {
216        self.end - self.last
217    }
218
219    /// True when no readable bytes remain. Mirrors `mbuf_empty`.
220    ///
221    /// # Examples
222    ///
223    /// ```
224    /// use dynomite::io::mbuf::Mbuf;
225    /// let buf = Mbuf::with_chunk_size(1024);
226    /// assert!(buf.is_empty());
227    /// ```
228    pub fn is_empty(&self) -> bool {
229        self.pos == self.last
230    }
231
232    /// True when the writable region is full. Mirrors `mbuf_full`.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// use dynomite::io::mbuf::Mbuf;
238    /// let mut buf = Mbuf::with_chunk_size(1024);
239    /// while buf.remaining() > 0 {
240    ///     buf.recv(&[0]);
241    /// }
242    /// assert!(buf.is_full());
243    /// ```
244    pub fn is_full(&self) -> bool {
245        self.last == self.end
246    }
247
248    /// Discard all buffered bytes and rewind both cursors to the
249    /// origin. Mirrors `mbuf_rewind`.
250    pub fn rewind(&mut self) {
251        self.pos = 0;
252        self.last = 0;
253    }
254
255    /// Read the buffer's flag bits.
256    pub fn flags(&self) -> u32 {
257        self.flags
258    }
259
260    /// Set or clear individual flag bits.
261    pub fn set_flag(&mut self, flag: u32, on: bool) {
262        if on {
263            self.flags |= flag;
264        } else {
265            self.flags &= !flag;
266        }
267    }
268
269    /// Return the optional owner connection id stamped on the buffer.
270    pub fn owner(&self) -> Option<OwnerId> {
271        self.owner
272    }
273
274    /// Stamp or clear the owner connection id.
275    pub fn set_owner(&mut self, owner: Option<OwnerId>) {
276        self.owner = owner;
277    }
278
279    /// Borrow the readable region (`[pos, last)`).
280    ///
281    /// # Examples
282    ///
283    /// ```
284    /// use dynomite::io::mbuf::Mbuf;
285    /// let mut buf = Mbuf::with_chunk_size(1024);
286    /// buf.recv(b"abc");
287    /// assert_eq!(buf.readable(), b"abc");
288    /// ```
289    pub fn readable(&self) -> &[u8] {
290        &self.buf[self.pos..self.last]
291    }
292
293    /// Borrow the writable region (`[last, end)`).
294    pub fn writable(&mut self) -> &mut [u8] {
295        &mut self.buf[self.last..self.end]
296    }
297
298    /// Copy bytes from `src` into the writable region. Stops when the
299    /// buffer fills. Returns the number of bytes copied. Mirrors
300    /// `mbuf_recv` / `mbuf_copy` for the inbound direction.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// use dynomite::io::mbuf::Mbuf;
306    /// let mut buf = Mbuf::with_chunk_size(1024);
307    /// assert_eq!(buf.recv(b"hello"), 5);
308    /// assert_eq!(buf.readable(), b"hello");
309    /// ```
310    pub fn recv(&mut self, src: &[u8]) -> usize {
311        let n = src.len().min(self.remaining());
312        self.buf[self.last..self.last + n].copy_from_slice(&src[..n]);
313        self.last += n;
314        n
315    }
316
317    /// Drain bytes from the readable region into `dst`. Returns the
318    /// number of bytes copied. Mirrors `mbuf_send` for the outbound
319    /// direction.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use dynomite::io::mbuf::Mbuf;
325    /// let mut buf = Mbuf::with_chunk_size(1024);
326    /// buf.recv(b"hello");
327    /// let mut out = [0u8; 8];
328    /// let n = buf.send(&mut out);
329    /// assert_eq!(n, 5);
330    /// assert_eq!(&out[..n], b"hello");
331    /// ```
332    pub fn send(&mut self, dst: &mut [u8]) -> usize {
333        let n = dst.len().min(self.len());
334        dst[..n].copy_from_slice(&self.buf[self.pos..self.pos + n]);
335        self.pos += n;
336        n
337    }
338
339    /// Copy `n` bytes from `src` into the writable region without
340    /// bounds-checking against partial copy. Panics if `n` exceeds
341    /// [`remaining`](Self::remaining). Mirrors `mbuf_copy`.
342    ///
343    /// # Examples
344    ///
345    /// ```
346    /// use dynomite::io::mbuf::Mbuf;
347    /// let mut buf = Mbuf::with_chunk_size(1024);
348    /// buf.copy_from_slice(b"abc");
349    /// assert_eq!(buf.readable(), b"abc");
350    /// ```
351    pub fn copy_from_slice(&mut self, src: &[u8]) {
352        assert!(
353            src.len() <= self.remaining(),
354            "mbuf copy of {} bytes exceeds remaining {}",
355            src.len(),
356            self.remaining()
357        );
358        let end = self.last + src.len();
359        self.buf[self.last..end].copy_from_slice(src);
360        self.last = end;
361    }
362
363    /// Split the buffer at offset `at` (relative to `pos`). The bytes
364    /// in `[pos+at, last)` are moved into a new mbuf taken from `pool`
365    /// and returned. The original keeps `[pos, pos+at)`. Mirrors
366    /// `mbuf_split` (without the precopy callback - callers that
367    /// previously injected a header into the new buffer can do so on
368    /// the returned `Mbuf` before chaining it).
369    ///
370    /// Returns `None` if `at` is greater than [`len`](Self::len) or if
371    /// the moved tail would not fit in a fresh pool buffer.
372    ///
373    /// # Examples
374    ///
375    /// ```
376    /// use dynomite::io::mbuf::MbufPool;
377    /// let pool = MbufPool::default();
378    /// let mut head = pool.get();
379    /// head.recv(b"hello world");
380    /// let tail = head.split_off(5, &pool).unwrap();
381    /// assert_eq!(head.readable(), b"hello");
382    /// assert_eq!(tail.readable(), b" world");
383    /// ```
384    pub fn split_off(&mut self, at: usize, pool: &MbufPool) -> Option<Mbuf> {
385        if at > self.len() {
386            return None;
387        }
388        let mut tail = pool.get();
389        let cut = self.pos + at;
390        let moved = self.last - cut;
391        if moved > tail.remaining() {
392            pool.put(tail);
393            return None;
394        }
395        tail.copy_from_slice(&self.buf[cut..self.last]);
396        self.last = cut;
397        Some(tail)
398    }
399
400    /// Append the readable region of `other` into this buffer.
401    /// Returns the number of bytes appended (which may be less than
402    /// `other.len()` if this buffer fills first).
403    ///
404    /// # Examples
405    ///
406    /// ```
407    /// use dynomite::io::mbuf::Mbuf;
408    /// let mut a = Mbuf::with_chunk_size(1024);
409    /// let mut b = Mbuf::with_chunk_size(1024);
410    /// a.recv(b"foo");
411    /// b.recv(b"bar");
412    /// a.append(&b);
413    /// assert_eq!(a.readable(), b"foobar");
414    /// ```
415    pub fn append(&mut self, other: &Mbuf) -> usize {
416        self.recv(other.readable())
417    }
418
419    /// Mark `n` bytes of the readable region as consumed by advancing
420    /// `pos`. Useful when downstream code wrote directly into
421    /// [`writable`](Self::writable). Panics if `n` exceeds
422    /// [`len`](Self::len).
423    pub fn advance_pos(&mut self, n: usize) {
424        assert!(
425            n <= self.len(),
426            "advance_pos {n} exceeds len {}",
427            self.len()
428        );
429        self.pos += n;
430    }
431
432    /// Mark `n` bytes of the writable region as filled by advancing
433    /// `last`. Used after writing into [`writable`](Self::writable)
434    /// directly (for example through an `AsyncRead` call). Panics if
435    /// `n` exceeds [`remaining`](Self::remaining).
436    pub fn advance_last(&mut self, n: usize) {
437        assert!(
438            n <= self.remaining(),
439            "advance_last {n} exceeds remaining {}",
440            self.remaining()
441        );
442        self.last += n;
443    }
444}
445
446impl std::fmt::Debug for Mbuf {
447    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448        f.debug_struct("Mbuf")
449            .field("chunk_size", &self.buf.len())
450            .field("pos", &self.pos)
451            .field("last", &self.last)
452            .field("end", &self.end)
453            .field("flags", &self.flags)
454            .field("owner", &self.owner)
455            .finish()
456    }
457}
458
459/// Tail-queue of mbufs. Mirrors the C `struct mhdr` head and the
460/// `STAILQ_*` macros that operated on it.
461///
462/// The container is a [`VecDeque`]; insertion at either end is O(1).
463///
464/// # Examples
465///
466/// ```
467/// use dynomite::io::mbuf::{MbufPool, MbufQueue};
468/// let pool = MbufPool::default();
469/// let mut q = MbufQueue::new();
470/// q.push_back(pool.get());
471/// q.push_back(pool.get());
472/// assert_eq!(q.len(), 2);
473/// let _ = q.pop_front();
474/// assert_eq!(q.len(), 1);
475/// ```
476#[derive(Debug, Default)]
477pub struct MbufQueue {
478    inner: VecDeque<Mbuf>,
479}
480
481impl MbufQueue {
482    /// Create an empty queue.
483    pub fn new() -> Self {
484        Self {
485            inner: VecDeque::new(),
486        }
487    }
488
489    /// Number of buffers currently chained.
490    pub fn len(&self) -> usize {
491        self.inner.len()
492    }
493
494    /// True when the queue holds no buffers.
495    pub fn is_empty(&self) -> bool {
496        self.inner.is_empty()
497    }
498
499    /// Append `mbuf` at the tail. Mirrors `mbuf_insert`.
500    pub fn push_back(&mut self, mbuf: Mbuf) {
501        self.inner.push_back(mbuf);
502    }
503
504    /// Insert `mbuf` at the head. Mirrors `mbuf_insert_head`.
505    pub fn push_front(&mut self, mbuf: Mbuf) {
506        self.inner.push_front(mbuf);
507    }
508
509    /// Remove and return the head buffer. Mirrors `mbuf_remove`
510    /// applied to the head; the C API allowed removing from any
511    /// position, but every reference call site removed the head.
512    pub fn pop_front(&mut self) -> Option<Mbuf> {
513        self.inner.pop_front()
514    }
515
516    /// Remove and return the tail buffer.
517    pub fn pop_back(&mut self) -> Option<Mbuf> {
518        self.inner.pop_back()
519    }
520
521    /// Borrow the tail buffer mutably without removing it. Used by
522    /// the `mbuf_split` consumers that want to operate on the latest
523    /// chunk.
524    pub fn back_mut(&mut self) -> Option<&mut Mbuf> {
525        self.inner.back_mut()
526    }
527
528    /// Borrow the head buffer mutably without removing it.
529    pub fn front_mut(&mut self) -> Option<&mut Mbuf> {
530        self.inner.front_mut()
531    }
532
533    /// Iterate the queued buffers in head-to-tail order.
534    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, Mbuf> {
535        self.inner.iter()
536    }
537}
538
539impl<'a> IntoIterator for &'a MbufQueue {
540    type Item = &'a Mbuf;
541    type IntoIter = std::collections::vec_deque::Iter<'a, Mbuf>;
542
543    fn into_iter(self) -> Self::IntoIter {
544        self.inner.iter()
545    }
546}
547
548impl MbufQueue {
549    /// Total readable bytes across the chain.
550    pub fn total_len(&self) -> usize {
551        self.inner.iter().map(Mbuf::len).sum()
552    }
553
554    /// Drain the queue into the pool, recycling every chunk.
555    ///
556    /// # Examples
557    ///
558    /// ```
559    /// use dynomite::io::mbuf::{MbufPool, MbufQueue};
560    /// let pool = MbufPool::default();
561    /// let mut q = MbufQueue::new();
562    /// q.push_back(pool.get());
563    /// q.push_back(pool.get());
564    /// q.recycle(&pool);
565    /// assert!(q.is_empty());
566    /// ```
567    pub fn recycle(&mut self, pool: &MbufPool) {
568        while let Some(buf) = self.inner.pop_front() {
569            pool.put(buf);
570        }
571    }
572}
573
574/// Free-list backed mbuf pool.
575///
576/// The pool keeps a parking-lot mutex around a stash of recyclable
577/// chunk allocations. New chunks are taken from the stash if available
578/// and freshly allocated otherwise. [`MbufPool::put`] returns chunks
579/// to the stash up to [`MBUF_POOL_MAX_FREE`]; chunks beyond that cap
580/// are dropped.
581///
582/// The pool tracks total live and free counts for diagnostics, mirroring
583/// the C `mbuf_alloc_get_count` / `mbuf_free_queue_size` accessors.
584#[derive(Clone)]
585pub struct MbufPool {
586    inner: Arc<MbufPoolInner>,
587}
588
589struct MbufPoolInner {
590    chunk_size: usize,
591    max_free: usize,
592    state: Mutex<MbufPoolState>,
593}
594
595struct MbufPoolState {
596    free: Vec<Box<[u8]>>,
597    total_allocated: u64,
598}
599
600impl Default for MbufPool {
601    fn default() -> Self {
602        Self::new(MBUF_SIZE, MBUF_POOL_MAX_FREE)
603    }
604}
605
606impl MbufPool {
607    /// Construct a new pool with `chunk_size` byte chunks and a free
608    /// list capped at `max_free`. Mirrors `mbuf_init` plus the Rust-
609    /// only free-list bound.
610    ///
611    /// # Examples
612    ///
613    /// ```
614    /// use dynomite::io::mbuf::{MbufPool, MBUF_SIZE, MBUF_POOL_MAX_FREE};
615    /// let pool = MbufPool::new(MBUF_SIZE, MBUF_POOL_MAX_FREE);
616    /// let buf = pool.get();
617    /// assert_eq!(buf.chunk_size(), MBUF_SIZE);
618    /// pool.put(buf);
619    /// ```
620    pub fn new(chunk_size: usize, max_free: usize) -> Self {
621        assert!(
622            (MBUF_MIN_SIZE..=MBUF_MAX_SIZE).contains(&chunk_size),
623            "mbuf chunk_size {chunk_size} outside [{MBUF_MIN_SIZE}, {MBUF_MAX_SIZE}]"
624        );
625        Self {
626            inner: Arc::new(MbufPoolInner {
627                chunk_size,
628                max_free,
629                state: Mutex::new(MbufPoolState {
630                    free: Vec::new(),
631                    total_allocated: 0,
632                }),
633            }),
634        }
635    }
636
637    /// Configured chunk size, in bytes.
638    pub fn chunk_size(&self) -> usize {
639        self.inner.chunk_size
640    }
641
642    /// Maximum number of chunks the free list retains.
643    pub fn max_free(&self) -> usize {
644        self.inner.max_free
645    }
646
647    /// Take a fresh or recycled chunk from the pool. Mirrors
648    /// `mbuf_get`.
649    ///
650    /// # Examples
651    ///
652    /// ```
653    /// use dynomite::io::mbuf::MbufPool;
654    /// let pool = MbufPool::default();
655    /// let buf = pool.get();
656    /// assert!(buf.is_empty());
657    /// ```
658    pub fn get(&self) -> Mbuf {
659        let buf = self.alloc_buffer();
660        let chunk_size = buf.len();
661        let end = chunk_size - MBUF_ESIZE;
662        Mbuf {
663            buf,
664            pos: 0,
665            last: 0,
666            end,
667            flags: 0,
668            owner: None,
669        }
670    }
671
672    /// Return a chunk to the free list. Resets cursors and flags
673    /// before storing. Mirrors `mbuf_put`. Chunks past
674    /// [`max_free`](Self::max_free) are dropped.
675    ///
676    /// # Examples
677    ///
678    /// ```
679    /// use dynomite::io::mbuf::MbufPool;
680    /// let pool = MbufPool::default();
681    /// let buf = pool.get();
682    /// pool.put(buf);
683    /// assert_eq!(pool.free_count(), 1);
684    /// ```
685    pub fn put(&self, mut mbuf: Mbuf) {
686        if mbuf.buf.len() != self.inner.chunk_size {
687            // Off-size buffers (e.g. from [`Mbuf::with_chunk_size`])
688            // are simply dropped rather than poisoning the free list.
689            return;
690        }
691        mbuf.rewind();
692        mbuf.flags = 0;
693        mbuf.owner = None;
694        let mut state = self.inner.state.lock();
695        if state.free.len() < self.inner.max_free {
696            state.free.push(mbuf.buf);
697        }
698    }
699
700    /// Number of chunks currently sitting in the free list. Mirrors
701    /// `mbuf_free_queue_size`.
702    pub fn free_count(&self) -> usize {
703        self.inner.state.lock().free.len()
704    }
705
706    /// Lifetime count of fresh allocations performed by the pool.
707    /// Mirrors `mbuf_alloc_get_count`. Useful for tests asserting
708    /// that recycling avoids the allocator path.
709    pub fn total_allocated(&self) -> u64 {
710        self.inner.state.lock().total_allocated
711    }
712
713    fn alloc_buffer(&self) -> Box<[u8]> {
714        let mut state = self.inner.state.lock();
715        if let Some(buf) = state.free.pop() {
716            return buf;
717        }
718        state.total_allocated += 1;
719        drop(state);
720        vec![0u8; self.inner.chunk_size].into_boxed_slice()
721    }
722}
723
724impl std::fmt::Debug for MbufPool {
725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726        let state = self.inner.state.lock();
727        f.debug_struct("MbufPool")
728            .field("chunk_size", &self.inner.chunk_size)
729            .field("max_free", &self.inner.max_free)
730            .field("free_count", &state.free.len())
731            .field("total_allocated", &state.total_allocated)
732            .finish()
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739
740    #[test]
741    fn mbuf_recv_send_round_trip() {
742        let mut buf = Mbuf::with_chunk_size(1024);
743        assert_eq!(buf.recv(b"abc"), 3);
744        let mut out = [0u8; 8];
745        assert_eq!(buf.send(&mut out), 3);
746        assert_eq!(&out[..3], b"abc");
747        assert!(buf.is_empty());
748    }
749
750    #[test]
751    fn mbuf_recv_truncates_to_remaining() {
752        let mut buf = Mbuf::with_chunk_size(MBUF_MIN_SIZE);
753        let payload = vec![7u8; MBUF_MIN_SIZE * 2];
754        let n = buf.recv(&payload);
755        assert_eq!(n, MBUF_MIN_SIZE - MBUF_ESIZE);
756        assert!(buf.is_full());
757    }
758
759    #[test]
760    fn mbuf_split_then_append_reconstructs() {
761        let pool = MbufPool::default();
762        let mut head = pool.get();
763        head.recv(b"hello world");
764        let tail = head.split_off(5, &pool).unwrap();
765        assert_eq!(head.readable(), b"hello");
766        assert_eq!(tail.readable(), b" world");
767        head.append(&tail);
768        assert_eq!(head.readable(), b"hello world");
769    }
770
771    #[test]
772    fn mbuf_split_off_out_of_range_returns_none() {
773        let pool = MbufPool::default();
774        let mut head = pool.get();
775        head.recv(b"abc");
776        assert!(head.split_off(99, &pool).is_none());
777    }
778
779    #[test]
780    fn mbuf_pool_recycles_buffers_without_reallocation() {
781        let pool = MbufPool::default();
782        let n = 4;
783        let mut taken = Vec::new();
784        for _ in 0..n {
785            taken.push(pool.get());
786        }
787        assert_eq!(pool.total_allocated(), n as u64);
788        for buf in taken.drain(..) {
789            pool.put(buf);
790        }
791        assert_eq!(pool.free_count(), n);
792        for _ in 0..n {
793            taken.push(pool.get());
794        }
795        // Reused; allocation count is unchanged.
796        assert_eq!(pool.total_allocated(), n as u64);
797    }
798
799    #[test]
800    fn mbuf_pool_drops_chunks_beyond_cap() {
801        let cap = 4;
802        let pool = MbufPool::new(MBUF_SIZE, cap);
803        let n = cap * 2;
804        let mut taken = Vec::with_capacity(n);
805        for _ in 0..n {
806            taken.push(pool.get());
807        }
808        for buf in taken.drain(..) {
809            pool.put(buf);
810        }
811        // The pool refuses to grow past `max_free`; the surplus
812        // buffers are dropped on return rather than poisoning the
813        // free list.
814        assert_eq!(pool.free_count(), cap);
815    }
816
817    #[test]
818    fn mbuf_queue_push_pop_fifo() {
819        let pool = MbufPool::default();
820        let mut q = MbufQueue::new();
821        for i in 0..3u8 {
822            let mut buf = pool.get();
823            buf.recv(&[i]);
824            q.push_back(buf);
825        }
826        for i in 0..3u8 {
827            let buf = q.pop_front().unwrap();
828            assert_eq!(buf.readable(), &[i]);
829        }
830        assert!(q.is_empty());
831    }
832
833    #[test]
834    fn mbuf_flags_round_trip() {
835        let mut buf = Mbuf::with_chunk_size(1024);
836        buf.set_flag(MBUF_FLAG_READ_FLIP, true);
837        assert_eq!(buf.flags() & MBUF_FLAG_READ_FLIP, MBUF_FLAG_READ_FLIP);
838        buf.set_flag(MBUF_FLAG_READ_FLIP, false);
839        assert_eq!(buf.flags(), 0);
840    }
841
842    #[test]
843    fn mbuf_owner_round_trip() {
844        let mut buf = Mbuf::with_chunk_size(1024);
845        assert!(buf.owner().is_none());
846        buf.set_owner(Some(42));
847        assert_eq!(buf.owner(), Some(42));
848    }
849
850    #[test]
851    #[should_panic(expected = "outside")]
852    fn mbuf_too_small_panics() {
853        let _ = Mbuf::with_chunk_size(MBUF_MIN_SIZE - 1);
854    }
855
856    #[test]
857    #[should_panic(expected = "outside")]
858    fn mbuf_too_large_panics() {
859        let _ = Mbuf::with_chunk_size(MBUF_MAX_SIZE + 1);
860    }
861
862    #[test]
863    fn mbuf_put_drops_offsize_chunks() {
864        let pool = MbufPool::default();
865        // Use Mbuf::with_chunk_size to construct a buffer of a
866        // different size and confirm the pool refuses to admit it.
867        let stray = Mbuf::with_chunk_size(MBUF_MIN_SIZE);
868        pool.put(stray);
869        assert_eq!(pool.free_count(), 0);
870    }
871}