dynomite/io/mbuf.rs
1//! Pooled fixed-size byte buffers.
2//!
3//! The C engine uses chained `struct mbuf` chunks for every connection's
4//! read and write paths. Each chunk has a read cursor (`pos`), a write
5//! cursor (`last`), a writable end boundary (`end`), and an extra
6//! trailing region (`end_extra`) that downstream code uses for crypto
7//! padding and overflow guards. Chunks are recycled through a global
8//! free list so steady-state traffic does not call into the allocator.
9//!
10//! This module reproduces the same shape in safe Rust:
11//!
12//! * [`Mbuf`] owns a [`Box<[u8]>`] of `chunk_size` bytes plus the four
13//! cursors that track readable, writable, and reserved regions.
14//! * [`MbufQueue`] is a tail-queue of chunks. The C list head is
15//! `struct mhdr`; the Rust container is a [`VecDeque`].
16//! * [`MbufPool`] owns the recycled buffers behind a parking-lot
17//! mutex and is shared across worker tasks via [`Arc`].
18//!
19//! The default chunk size is [`MBUF_SIZE`] (16 KiB), tunable through
20//! [`MbufPool::new`]. The configurable range is
21//! `[MBUF_MIN_SIZE, MBUF_MAX_SIZE]`. The trailing [`MBUF_ESIZE`] bytes
22//! of every chunk are reserved for the crypto MAC region used by Stage
23//! 13 traffic; normal writes stop at [`Mbuf::capacity`].
24//!
25//! # Examples
26//!
27//! ```
28//! use dynomite::io::mbuf::{Mbuf, MbufPool, MbufQueue};
29//!
30//! let pool = MbufPool::default();
31//! let mut buf = pool.get();
32//! assert_eq!(buf.recv(b"hello"), 5);
33//! assert_eq!(buf.len(), 5);
34//!
35//! let mut out = [0u8; 5];
36//! assert_eq!(buf.send(&mut out), 5);
37//! assert_eq!(&out, b"hello");
38//!
39//! let mut q = MbufQueue::new();
40//! q.push_back(buf);
41//! assert_eq!(q.len(), 1);
42//! ```
43
44use std::collections::VecDeque;
45use std::sync::Arc;
46
47use parking_lot::Mutex;
48
49/// Default mbuf chunk size in bytes. Mirrors the reference `MBUF_SIZE`.
50pub const MBUF_SIZE: usize = 16384;
51
52/// Minimum permitted mbuf chunk size. Mirrors the reference
53/// `MBUF_MIN_SIZE`.
54pub const MBUF_MIN_SIZE: usize = 512;
55
56/// Maximum permitted mbuf chunk size. Mirrors the reference
57/// `MBUF_MAX_SIZE`.
58pub const MBUF_MAX_SIZE: usize = 512_000;
59
60/// Bytes reserved at the tail of every chunk for crypto padding and
61/// overflow guards. Mirrors the reference `MBUF_ESIZE`.
62pub const MBUF_ESIZE: usize = 16;
63
64/// Maximum number of free chunks the pool retains before dropping
65/// returned buffers on the floor.
66///
67/// The C engine has no hard cap on the free list because the worker
68/// loop bounds the working set implicitly. The Rust port adds an
69/// upper bound so a misbehaving caller cannot drive unbounded memory
70/// growth. The default tracks the connection-budget order-of-magnitude
71/// the reference docs cite (4096 connections * 4 chunks per direction).
72pub const MBUF_POOL_MAX_FREE: usize = 16384;
73
74/// Connection identifier carried by an mbuf to mark the conn that owns
75/// it. The reactor stamps this when it hands a chunk to a connection
76/// state machine; the chunk pool itself never inspects the value.
77pub type OwnerId = u64;
78
79/// A single chunk of a connection's I/O buffer chain.
80///
81/// An `Mbuf` exposes three regions:
82///
83/// * `[0, pos)` - already-consumed bytes (drained by the parser).
84/// * `[pos, last)` - readable bytes (parser input or pre-flight write).
85/// * `[last, end)` - writable bytes (target of [`recv`](Self::recv)).
86///
87/// The slice `[end, end_extra)` of length [`MBUF_ESIZE`] is reserved
88/// for crypto MAC and overflow guards. It is not visible through the
89/// normal write API.
90pub struct Mbuf {
91 buf: Box<[u8]>,
92 pos: usize,
93 last: usize,
94 end: usize,
95 flags: u32,
96 owner: Option<OwnerId>,
97}
98
99/// Bit flag set when the buffer has been flipped from write to read
100/// orientation. Mirrors `MBUF_FLAGS_READ_FLIP`.
101pub const MBUF_FLAG_READ_FLIP: u32 = 0x0000_0001;
102
103/// Bit flag set when the contents have just been decrypted. Mirrors
104/// `MBUF_FLAGS_JUST_DECRYPTED`.
105pub const MBUF_FLAG_JUST_DECRYPTED: u32 = 0x0000_0002;
106
107impl Mbuf {
108 /// Allocate a fresh chunk of the given total size. Used by the
109 /// pool to fill its free list and by callers that need a one-off
110 /// non-pooled buffer (the Stage 13 entropy path is a planned
111 /// consumer).
112 ///
113 /// `chunk_size` must be in `[MBUF_MIN_SIZE, MBUF_MAX_SIZE]`.
114 ///
115 /// # Examples
116 ///
117 /// ```
118 /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE};
119 /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
120 /// assert_eq!(buf.chunk_size(), MBUF_SIZE);
121 /// assert!(buf.is_empty());
122 /// ```
123 pub fn with_chunk_size(chunk_size: usize) -> Self {
124 assert!(
125 (MBUF_MIN_SIZE..=MBUF_MAX_SIZE).contains(&chunk_size),
126 "mbuf chunk_size {chunk_size} outside [{MBUF_MIN_SIZE}, {MBUF_MAX_SIZE}]"
127 );
128 let buf = vec![0u8; chunk_size].into_boxed_slice();
129 let end = chunk_size - MBUF_ESIZE;
130 Self {
131 buf,
132 pos: 0,
133 last: 0,
134 end,
135 flags: 0,
136 owner: None,
137 }
138 }
139
140 /// Total byte length of the chunk allocation.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE};
146 /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
147 /// assert_eq!(buf.chunk_size(), MBUF_SIZE);
148 /// ```
149 pub fn chunk_size(&self) -> usize {
150 self.buf.len()
151 }
152
153 /// Number of bytes that can be written before [`is_full`](Self::is_full)
154 /// trips, equal to `chunk_size - MBUF_ESIZE`. Mirrors `mbuf_data_size`.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE, MBUF_ESIZE};
160 /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
161 /// assert_eq!(buf.data_size(), MBUF_SIZE - MBUF_ESIZE);
162 /// ```
163 pub fn data_size(&self) -> usize {
164 self.end
165 }
166
167 /// Total byte addressable region including the trailing extra
168 /// area, equal to `chunk_size`. Mirrors `mbuf_full_data_size` from
169 /// the Stage 2 plan and the underlying `end_extra` boundary in the
170 /// C struct.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE};
176 /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
177 /// assert_eq!(buf.usable_capacity(), MBUF_SIZE);
178 /// ```
179 pub fn usable_capacity(&self) -> usize {
180 self.buf.len()
181 }
182
183 /// Bytes currently available to read from the buffer
184 /// (`last - pos`). Mirrors `mbuf_length`.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// use dynomite::io::mbuf::Mbuf;
190 /// let mut buf = Mbuf::with_chunk_size(1024);
191 /// buf.recv(b"abc");
192 /// assert_eq!(buf.len(), 3);
193 /// ```
194 pub fn len(&self) -> usize {
195 self.last - self.pos
196 }
197
198 /// Maximum number of bytes the buffer can ever hold (the writable
199 /// region size, equal to [`data_size`](Self::data_size)). Mirrors
200 /// `mbuf_size` in the reference's higher-level usage.
201 pub fn capacity(&self) -> usize {
202 self.end
203 }
204
205 /// Bytes still writable into the buffer (`end - last`). Mirrors
206 /// `mbuf_remaining_space`.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// use dynomite::io::mbuf::{Mbuf, MBUF_SIZE, MBUF_ESIZE};
212 /// let buf = Mbuf::with_chunk_size(MBUF_SIZE);
213 /// assert_eq!(buf.remaining(), MBUF_SIZE - MBUF_ESIZE);
214 /// ```
215 pub fn remaining(&self) -> usize {
216 self.end - self.last
217 }
218
219 /// True when no readable bytes remain. Mirrors `mbuf_empty`.
220 ///
221 /// # Examples
222 ///
223 /// ```
224 /// use dynomite::io::mbuf::Mbuf;
225 /// let buf = Mbuf::with_chunk_size(1024);
226 /// assert!(buf.is_empty());
227 /// ```
228 pub fn is_empty(&self) -> bool {
229 self.pos == self.last
230 }
231
232 /// True when the writable region is full. Mirrors `mbuf_full`.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use dynomite::io::mbuf::Mbuf;
238 /// let mut buf = Mbuf::with_chunk_size(1024);
239 /// while buf.remaining() > 0 {
240 /// buf.recv(&[0]);
241 /// }
242 /// assert!(buf.is_full());
243 /// ```
244 pub fn is_full(&self) -> bool {
245 self.last == self.end
246 }
247
248 /// Discard all buffered bytes and rewind both cursors to the
249 /// origin. Mirrors `mbuf_rewind`.
250 pub fn rewind(&mut self) {
251 self.pos = 0;
252 self.last = 0;
253 }
254
255 /// Read the buffer's flag bits.
256 pub fn flags(&self) -> u32 {
257 self.flags
258 }
259
260 /// Set or clear individual flag bits.
261 pub fn set_flag(&mut self, flag: u32, on: bool) {
262 if on {
263 self.flags |= flag;
264 } else {
265 self.flags &= !flag;
266 }
267 }
268
269 /// Return the optional owner connection id stamped on the buffer.
270 pub fn owner(&self) -> Option<OwnerId> {
271 self.owner
272 }
273
274 /// Stamp or clear the owner connection id.
275 pub fn set_owner(&mut self, owner: Option<OwnerId>) {
276 self.owner = owner;
277 }
278
279 /// Borrow the readable region (`[pos, last)`).
280 ///
281 /// # Examples
282 ///
283 /// ```
284 /// use dynomite::io::mbuf::Mbuf;
285 /// let mut buf = Mbuf::with_chunk_size(1024);
286 /// buf.recv(b"abc");
287 /// assert_eq!(buf.readable(), b"abc");
288 /// ```
289 pub fn readable(&self) -> &[u8] {
290 &self.buf[self.pos..self.last]
291 }
292
293 /// Borrow the writable region (`[last, end)`).
294 pub fn writable(&mut self) -> &mut [u8] {
295 &mut self.buf[self.last..self.end]
296 }
297
298 /// Copy bytes from `src` into the writable region. Stops when the
299 /// buffer fills. Returns the number of bytes copied. Mirrors
300 /// `mbuf_recv` / `mbuf_copy` for the inbound direction.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use dynomite::io::mbuf::Mbuf;
306 /// let mut buf = Mbuf::with_chunk_size(1024);
307 /// assert_eq!(buf.recv(b"hello"), 5);
308 /// assert_eq!(buf.readable(), b"hello");
309 /// ```
310 pub fn recv(&mut self, src: &[u8]) -> usize {
311 let n = src.len().min(self.remaining());
312 self.buf[self.last..self.last + n].copy_from_slice(&src[..n]);
313 self.last += n;
314 n
315 }
316
317 /// Drain bytes from the readable region into `dst`. Returns the
318 /// number of bytes copied. Mirrors `mbuf_send` for the outbound
319 /// direction.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use dynomite::io::mbuf::Mbuf;
325 /// let mut buf = Mbuf::with_chunk_size(1024);
326 /// buf.recv(b"hello");
327 /// let mut out = [0u8; 8];
328 /// let n = buf.send(&mut out);
329 /// assert_eq!(n, 5);
330 /// assert_eq!(&out[..n], b"hello");
331 /// ```
332 pub fn send(&mut self, dst: &mut [u8]) -> usize {
333 let n = dst.len().min(self.len());
334 dst[..n].copy_from_slice(&self.buf[self.pos..self.pos + n]);
335 self.pos += n;
336 n
337 }
338
339 /// Copy `n` bytes from `src` into the writable region without
340 /// bounds-checking against partial copy. Panics if `n` exceeds
341 /// [`remaining`](Self::remaining). Mirrors `mbuf_copy`.
342 ///
343 /// # Examples
344 ///
345 /// ```
346 /// use dynomite::io::mbuf::Mbuf;
347 /// let mut buf = Mbuf::with_chunk_size(1024);
348 /// buf.copy_from_slice(b"abc");
349 /// assert_eq!(buf.readable(), b"abc");
350 /// ```
351 pub fn copy_from_slice(&mut self, src: &[u8]) {
352 assert!(
353 src.len() <= self.remaining(),
354 "mbuf copy of {} bytes exceeds remaining {}",
355 src.len(),
356 self.remaining()
357 );
358 let end = self.last + src.len();
359 self.buf[self.last..end].copy_from_slice(src);
360 self.last = end;
361 }
362
363 /// Split the buffer at offset `at` (relative to `pos`). The bytes
364 /// in `[pos+at, last)` are moved into a new mbuf taken from `pool`
365 /// and returned. The original keeps `[pos, pos+at)`. Mirrors
366 /// `mbuf_split` (without the precopy callback - callers that
367 /// previously injected a header into the new buffer can do so on
368 /// the returned `Mbuf` before chaining it).
369 ///
370 /// Returns `None` if `at` is greater than [`len`](Self::len) or if
371 /// the moved tail would not fit in a fresh pool buffer.
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// use dynomite::io::mbuf::MbufPool;
377 /// let pool = MbufPool::default();
378 /// let mut head = pool.get();
379 /// head.recv(b"hello world");
380 /// let tail = head.split_off(5, &pool).unwrap();
381 /// assert_eq!(head.readable(), b"hello");
382 /// assert_eq!(tail.readable(), b" world");
383 /// ```
384 pub fn split_off(&mut self, at: usize, pool: &MbufPool) -> Option<Mbuf> {
385 if at > self.len() {
386 return None;
387 }
388 let mut tail = pool.get();
389 let cut = self.pos + at;
390 let moved = self.last - cut;
391 if moved > tail.remaining() {
392 pool.put(tail);
393 return None;
394 }
395 tail.copy_from_slice(&self.buf[cut..self.last]);
396 self.last = cut;
397 Some(tail)
398 }
399
400 /// Append the readable region of `other` into this buffer.
401 /// Returns the number of bytes appended (which may be less than
402 /// `other.len()` if this buffer fills first).
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// use dynomite::io::mbuf::Mbuf;
408 /// let mut a = Mbuf::with_chunk_size(1024);
409 /// let mut b = Mbuf::with_chunk_size(1024);
410 /// a.recv(b"foo");
411 /// b.recv(b"bar");
412 /// a.append(&b);
413 /// assert_eq!(a.readable(), b"foobar");
414 /// ```
415 pub fn append(&mut self, other: &Mbuf) -> usize {
416 self.recv(other.readable())
417 }
418
419 /// Mark `n` bytes of the readable region as consumed by advancing
420 /// `pos`. Useful when downstream code wrote directly into
421 /// [`writable`](Self::writable). Panics if `n` exceeds
422 /// [`len`](Self::len).
423 pub fn advance_pos(&mut self, n: usize) {
424 assert!(
425 n <= self.len(),
426 "advance_pos {n} exceeds len {}",
427 self.len()
428 );
429 self.pos += n;
430 }
431
432 /// Mark `n` bytes of the writable region as filled by advancing
433 /// `last`. Used after writing into [`writable`](Self::writable)
434 /// directly (for example through an `AsyncRead` call). Panics if
435 /// `n` exceeds [`remaining`](Self::remaining).
436 pub fn advance_last(&mut self, n: usize) {
437 assert!(
438 n <= self.remaining(),
439 "advance_last {n} exceeds remaining {}",
440 self.remaining()
441 );
442 self.last += n;
443 }
444}
445
446impl std::fmt::Debug for Mbuf {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448 f.debug_struct("Mbuf")
449 .field("chunk_size", &self.buf.len())
450 .field("pos", &self.pos)
451 .field("last", &self.last)
452 .field("end", &self.end)
453 .field("flags", &self.flags)
454 .field("owner", &self.owner)
455 .finish()
456 }
457}
458
459/// Tail-queue of mbufs. Mirrors the C `struct mhdr` head and the
460/// `STAILQ_*` macros that operated on it.
461///
462/// The container is a [`VecDeque`]; insertion at either end is O(1).
463///
464/// # Examples
465///
466/// ```
467/// use dynomite::io::mbuf::{MbufPool, MbufQueue};
468/// let pool = MbufPool::default();
469/// let mut q = MbufQueue::new();
470/// q.push_back(pool.get());
471/// q.push_back(pool.get());
472/// assert_eq!(q.len(), 2);
473/// let _ = q.pop_front();
474/// assert_eq!(q.len(), 1);
475/// ```
476#[derive(Debug, Default)]
477pub struct MbufQueue {
478 inner: VecDeque<Mbuf>,
479}
480
481impl MbufQueue {
482 /// Create an empty queue.
483 pub fn new() -> Self {
484 Self {
485 inner: VecDeque::new(),
486 }
487 }
488
489 /// Number of buffers currently chained.
490 pub fn len(&self) -> usize {
491 self.inner.len()
492 }
493
494 /// True when the queue holds no buffers.
495 pub fn is_empty(&self) -> bool {
496 self.inner.is_empty()
497 }
498
499 /// Append `mbuf` at the tail. Mirrors `mbuf_insert`.
500 pub fn push_back(&mut self, mbuf: Mbuf) {
501 self.inner.push_back(mbuf);
502 }
503
504 /// Insert `mbuf` at the head. Mirrors `mbuf_insert_head`.
505 pub fn push_front(&mut self, mbuf: Mbuf) {
506 self.inner.push_front(mbuf);
507 }
508
509 /// Remove and return the head buffer. Mirrors `mbuf_remove`
510 /// applied to the head; the C API allowed removing from any
511 /// position, but every reference call site removed the head.
512 pub fn pop_front(&mut self) -> Option<Mbuf> {
513 self.inner.pop_front()
514 }
515
516 /// Remove and return the tail buffer.
517 pub fn pop_back(&mut self) -> Option<Mbuf> {
518 self.inner.pop_back()
519 }
520
521 /// Borrow the tail buffer mutably without removing it. Used by
522 /// the `mbuf_split` consumers that want to operate on the latest
523 /// chunk.
524 pub fn back_mut(&mut self) -> Option<&mut Mbuf> {
525 self.inner.back_mut()
526 }
527
528 /// Borrow the head buffer mutably without removing it.
529 pub fn front_mut(&mut self) -> Option<&mut Mbuf> {
530 self.inner.front_mut()
531 }
532
533 /// Iterate the queued buffers in head-to-tail order.
534 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, Mbuf> {
535 self.inner.iter()
536 }
537}
538
539impl<'a> IntoIterator for &'a MbufQueue {
540 type Item = &'a Mbuf;
541 type IntoIter = std::collections::vec_deque::Iter<'a, Mbuf>;
542
543 fn into_iter(self) -> Self::IntoIter {
544 self.inner.iter()
545 }
546}
547
548impl MbufQueue {
549 /// Total readable bytes across the chain.
550 pub fn total_len(&self) -> usize {
551 self.inner.iter().map(Mbuf::len).sum()
552 }
553
554 /// Drain the queue into the pool, recycling every chunk.
555 ///
556 /// # Examples
557 ///
558 /// ```
559 /// use dynomite::io::mbuf::{MbufPool, MbufQueue};
560 /// let pool = MbufPool::default();
561 /// let mut q = MbufQueue::new();
562 /// q.push_back(pool.get());
563 /// q.push_back(pool.get());
564 /// q.recycle(&pool);
565 /// assert!(q.is_empty());
566 /// ```
567 pub fn recycle(&mut self, pool: &MbufPool) {
568 while let Some(buf) = self.inner.pop_front() {
569 pool.put(buf);
570 }
571 }
572}
573
574/// Free-list backed mbuf pool.
575///
576/// The pool keeps a parking-lot mutex around a stash of recyclable
577/// chunk allocations. New chunks are taken from the stash if available
578/// and freshly allocated otherwise. [`MbufPool::put`] returns chunks
579/// to the stash up to [`MBUF_POOL_MAX_FREE`]; chunks beyond that cap
580/// are dropped.
581///
582/// The pool tracks total live and free counts for diagnostics, mirroring
583/// the C `mbuf_alloc_get_count` / `mbuf_free_queue_size` accessors.
584#[derive(Clone)]
585pub struct MbufPool {
586 inner: Arc<MbufPoolInner>,
587}
588
589struct MbufPoolInner {
590 chunk_size: usize,
591 max_free: usize,
592 state: Mutex<MbufPoolState>,
593}
594
595struct MbufPoolState {
596 free: Vec<Box<[u8]>>,
597 total_allocated: u64,
598}
599
600impl Default for MbufPool {
601 fn default() -> Self {
602 Self::new(MBUF_SIZE, MBUF_POOL_MAX_FREE)
603 }
604}
605
606impl MbufPool {
607 /// Construct a new pool with `chunk_size` byte chunks and a free
608 /// list capped at `max_free`. Mirrors `mbuf_init` plus the Rust-
609 /// only free-list bound.
610 ///
611 /// # Examples
612 ///
613 /// ```
614 /// use dynomite::io::mbuf::{MbufPool, MBUF_SIZE, MBUF_POOL_MAX_FREE};
615 /// let pool = MbufPool::new(MBUF_SIZE, MBUF_POOL_MAX_FREE);
616 /// let buf = pool.get();
617 /// assert_eq!(buf.chunk_size(), MBUF_SIZE);
618 /// pool.put(buf);
619 /// ```
620 pub fn new(chunk_size: usize, max_free: usize) -> Self {
621 assert!(
622 (MBUF_MIN_SIZE..=MBUF_MAX_SIZE).contains(&chunk_size),
623 "mbuf chunk_size {chunk_size} outside [{MBUF_MIN_SIZE}, {MBUF_MAX_SIZE}]"
624 );
625 Self {
626 inner: Arc::new(MbufPoolInner {
627 chunk_size,
628 max_free,
629 state: Mutex::new(MbufPoolState {
630 free: Vec::new(),
631 total_allocated: 0,
632 }),
633 }),
634 }
635 }
636
637 /// Configured chunk size, in bytes.
638 pub fn chunk_size(&self) -> usize {
639 self.inner.chunk_size
640 }
641
642 /// Maximum number of chunks the free list retains.
643 pub fn max_free(&self) -> usize {
644 self.inner.max_free
645 }
646
647 /// Take a fresh or recycled chunk from the pool. Mirrors
648 /// `mbuf_get`.
649 ///
650 /// # Examples
651 ///
652 /// ```
653 /// use dynomite::io::mbuf::MbufPool;
654 /// let pool = MbufPool::default();
655 /// let buf = pool.get();
656 /// assert!(buf.is_empty());
657 /// ```
658 pub fn get(&self) -> Mbuf {
659 let buf = self.alloc_buffer();
660 let chunk_size = buf.len();
661 let end = chunk_size - MBUF_ESIZE;
662 Mbuf {
663 buf,
664 pos: 0,
665 last: 0,
666 end,
667 flags: 0,
668 owner: None,
669 }
670 }
671
672 /// Return a chunk to the free list. Resets cursors and flags
673 /// before storing. Mirrors `mbuf_put`. Chunks past
674 /// [`max_free`](Self::max_free) are dropped.
675 ///
676 /// # Examples
677 ///
678 /// ```
679 /// use dynomite::io::mbuf::MbufPool;
680 /// let pool = MbufPool::default();
681 /// let buf = pool.get();
682 /// pool.put(buf);
683 /// assert_eq!(pool.free_count(), 1);
684 /// ```
685 pub fn put(&self, mut mbuf: Mbuf) {
686 if mbuf.buf.len() != self.inner.chunk_size {
687 // Off-size buffers (e.g. from [`Mbuf::with_chunk_size`])
688 // are simply dropped rather than poisoning the free list.
689 return;
690 }
691 mbuf.rewind();
692 mbuf.flags = 0;
693 mbuf.owner = None;
694 let mut state = self.inner.state.lock();
695 if state.free.len() < self.inner.max_free {
696 state.free.push(mbuf.buf);
697 }
698 }
699
700 /// Number of chunks currently sitting in the free list. Mirrors
701 /// `mbuf_free_queue_size`.
702 pub fn free_count(&self) -> usize {
703 self.inner.state.lock().free.len()
704 }
705
706 /// Lifetime count of fresh allocations performed by the pool.
707 /// Mirrors `mbuf_alloc_get_count`. Useful for tests asserting
708 /// that recycling avoids the allocator path.
709 pub fn total_allocated(&self) -> u64 {
710 self.inner.state.lock().total_allocated
711 }
712
713 fn alloc_buffer(&self) -> Box<[u8]> {
714 let mut state = self.inner.state.lock();
715 if let Some(buf) = state.free.pop() {
716 return buf;
717 }
718 state.total_allocated += 1;
719 drop(state);
720 vec![0u8; self.inner.chunk_size].into_boxed_slice()
721 }
722}
723
724impl std::fmt::Debug for MbufPool {
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 let state = self.inner.state.lock();
727 f.debug_struct("MbufPool")
728 .field("chunk_size", &self.inner.chunk_size)
729 .field("max_free", &self.inner.max_free)
730 .field("free_count", &state.free.len())
731 .field("total_allocated", &state.total_allocated)
732 .finish()
733 }
734}
735
736#[cfg(test)]
737mod tests {
738 use super::*;
739
740 #[test]
741 fn mbuf_recv_send_round_trip() {
742 let mut buf = Mbuf::with_chunk_size(1024);
743 assert_eq!(buf.recv(b"abc"), 3);
744 let mut out = [0u8; 8];
745 assert_eq!(buf.send(&mut out), 3);
746 assert_eq!(&out[..3], b"abc");
747 assert!(buf.is_empty());
748 }
749
750 #[test]
751 fn mbuf_recv_truncates_to_remaining() {
752 let mut buf = Mbuf::with_chunk_size(MBUF_MIN_SIZE);
753 let payload = vec![7u8; MBUF_MIN_SIZE * 2];
754 let n = buf.recv(&payload);
755 assert_eq!(n, MBUF_MIN_SIZE - MBUF_ESIZE);
756 assert!(buf.is_full());
757 }
758
759 #[test]
760 fn mbuf_split_then_append_reconstructs() {
761 let pool = MbufPool::default();
762 let mut head = pool.get();
763 head.recv(b"hello world");
764 let tail = head.split_off(5, &pool).unwrap();
765 assert_eq!(head.readable(), b"hello");
766 assert_eq!(tail.readable(), b" world");
767 head.append(&tail);
768 assert_eq!(head.readable(), b"hello world");
769 }
770
771 #[test]
772 fn mbuf_split_off_out_of_range_returns_none() {
773 let pool = MbufPool::default();
774 let mut head = pool.get();
775 head.recv(b"abc");
776 assert!(head.split_off(99, &pool).is_none());
777 }
778
779 #[test]
780 fn mbuf_pool_recycles_buffers_without_reallocation() {
781 let pool = MbufPool::default();
782 let n = 4;
783 let mut taken = Vec::new();
784 for _ in 0..n {
785 taken.push(pool.get());
786 }
787 assert_eq!(pool.total_allocated(), n as u64);
788 for buf in taken.drain(..) {
789 pool.put(buf);
790 }
791 assert_eq!(pool.free_count(), n);
792 for _ in 0..n {
793 taken.push(pool.get());
794 }
795 // Reused; allocation count is unchanged.
796 assert_eq!(pool.total_allocated(), n as u64);
797 }
798
799 #[test]
800 fn mbuf_pool_drops_chunks_beyond_cap() {
801 let cap = 4;
802 let pool = MbufPool::new(MBUF_SIZE, cap);
803 let n = cap * 2;
804 let mut taken = Vec::with_capacity(n);
805 for _ in 0..n {
806 taken.push(pool.get());
807 }
808 for buf in taken.drain(..) {
809 pool.put(buf);
810 }
811 // The pool refuses to grow past `max_free`; the surplus
812 // buffers are dropped on return rather than poisoning the
813 // free list.
814 assert_eq!(pool.free_count(), cap);
815 }
816
817 #[test]
818 fn mbuf_queue_push_pop_fifo() {
819 let pool = MbufPool::default();
820 let mut q = MbufQueue::new();
821 for i in 0..3u8 {
822 let mut buf = pool.get();
823 buf.recv(&[i]);
824 q.push_back(buf);
825 }
826 for i in 0..3u8 {
827 let buf = q.pop_front().unwrap();
828 assert_eq!(buf.readable(), &[i]);
829 }
830 assert!(q.is_empty());
831 }
832
833 #[test]
834 fn mbuf_flags_round_trip() {
835 let mut buf = Mbuf::with_chunk_size(1024);
836 buf.set_flag(MBUF_FLAG_READ_FLIP, true);
837 assert_eq!(buf.flags() & MBUF_FLAG_READ_FLIP, MBUF_FLAG_READ_FLIP);
838 buf.set_flag(MBUF_FLAG_READ_FLIP, false);
839 assert_eq!(buf.flags(), 0);
840 }
841
842 #[test]
843 fn mbuf_owner_round_trip() {
844 let mut buf = Mbuf::with_chunk_size(1024);
845 assert!(buf.owner().is_none());
846 buf.set_owner(Some(42));
847 assert_eq!(buf.owner(), Some(42));
848 }
849
850 #[test]
851 #[should_panic(expected = "outside")]
852 fn mbuf_too_small_panics() {
853 let _ = Mbuf::with_chunk_size(MBUF_MIN_SIZE - 1);
854 }
855
856 #[test]
857 #[should_panic(expected = "outside")]
858 fn mbuf_too_large_panics() {
859 let _ = Mbuf::with_chunk_size(MBUF_MAX_SIZE + 1);
860 }
861
862 #[test]
863 fn mbuf_put_drops_offsize_chunks() {
864 let pool = MbufPool::default();
865 // Use Mbuf::with_chunk_size to construct a buffer of a
866 // different size and confirm the pool refuses to admit it.
867 let stray = Mbuf::with_chunk_size(MBUF_MIN_SIZE);
868 pool.put(stray);
869 assert_eq!(pool.free_count(), 0);
870 }
871}