use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::Mutex;
pub const MBUF_SIZE: usize = 16384;
pub const MBUF_MIN_SIZE: usize = 512;
pub const MBUF_MAX_SIZE: usize = 512_000;
pub const MBUF_ESIZE: usize = 16;
pub const MBUF_POOL_MAX_FREE: usize = 16384;
pub type OwnerId = u64;
pub struct Mbuf {
buf: Box<[u8]>,
pos: usize,
last: usize,
end: usize,
flags: u32,
owner: Option<OwnerId>,
}
pub const MBUF_FLAG_READ_FLIP: u32 = 0x0000_0001;
pub const MBUF_FLAG_JUST_DECRYPTED: u32 = 0x0000_0002;
impl Mbuf {
pub fn with_chunk_size(chunk_size: usize) -> Self {
assert!(
(MBUF_MIN_SIZE..=MBUF_MAX_SIZE).contains(&chunk_size),
"mbuf chunk_size {chunk_size} outside [{MBUF_MIN_SIZE}, {MBUF_MAX_SIZE}]"
);
let buf = vec![0u8; chunk_size].into_boxed_slice();
let end = chunk_size - MBUF_ESIZE;
Self {
buf,
pos: 0,
last: 0,
end,
flags: 0,
owner: None,
}
}
pub fn chunk_size(&self) -> usize {
self.buf.len()
}
pub fn data_size(&self) -> usize {
self.end
}
pub fn usable_capacity(&self) -> usize {
self.buf.len()
}
pub fn len(&self) -> usize {
self.last - self.pos
}
pub fn capacity(&self) -> usize {
self.end
}
pub fn remaining(&self) -> usize {
self.end - self.last
}
pub fn is_empty(&self) -> bool {
self.pos == self.last
}
pub fn is_full(&self) -> bool {
self.last == self.end
}
pub fn rewind(&mut self) {
self.pos = 0;
self.last = 0;
}
pub fn flags(&self) -> u32 {
self.flags
}
pub fn set_flag(&mut self, flag: u32, on: bool) {
if on {
self.flags |= flag;
} else {
self.flags &= !flag;
}
}
pub fn owner(&self) -> Option<OwnerId> {
self.owner
}
pub fn set_owner(&mut self, owner: Option<OwnerId>) {
self.owner = owner;
}
pub fn readable(&self) -> &[u8] {
&self.buf[self.pos..self.last]
}
pub fn writable(&mut self) -> &mut [u8] {
&mut self.buf[self.last..self.end]
}
pub fn recv(&mut self, src: &[u8]) -> usize {
let n = src.len().min(self.remaining());
self.buf[self.last..self.last + n].copy_from_slice(&src[..n]);
self.last += n;
n
}
pub fn send(&mut self, dst: &mut [u8]) -> usize {
let n = dst.len().min(self.len());
dst[..n].copy_from_slice(&self.buf[self.pos..self.pos + n]);
self.pos += n;
n
}
pub fn copy_from_slice(&mut self, src: &[u8]) {
assert!(
src.len() <= self.remaining(),
"mbuf copy of {} bytes exceeds remaining {}",
src.len(),
self.remaining()
);
let end = self.last + src.len();
self.buf[self.last..end].copy_from_slice(src);
self.last = end;
}
pub fn split_off(&mut self, at: usize, pool: &MbufPool) -> Option<Mbuf> {
if at > self.len() {
return None;
}
let mut tail = pool.get();
let cut = self.pos + at;
let moved = self.last - cut;
if moved > tail.remaining() {
pool.put(tail);
return None;
}
tail.copy_from_slice(&self.buf[cut..self.last]);
self.last = cut;
Some(tail)
}
pub fn append(&mut self, other: &Mbuf) -> usize {
self.recv(other.readable())
}
pub fn advance_pos(&mut self, n: usize) {
assert!(
n <= self.len(),
"advance_pos {n} exceeds len {}",
self.len()
);
self.pos += n;
}
pub fn advance_last(&mut self, n: usize) {
assert!(
n <= self.remaining(),
"advance_last {n} exceeds remaining {}",
self.remaining()
);
self.last += n;
}
}
impl std::fmt::Debug for Mbuf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Mbuf")
.field("chunk_size", &self.buf.len())
.field("pos", &self.pos)
.field("last", &self.last)
.field("end", &self.end)
.field("flags", &self.flags)
.field("owner", &self.owner)
.finish()
}
}
#[derive(Debug, Default)]
pub struct MbufQueue {
inner: VecDeque<Mbuf>,
}
impl MbufQueue {
pub fn new() -> Self {
Self {
inner: VecDeque::new(),
}
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn push_back(&mut self, mbuf: Mbuf) {
self.inner.push_back(mbuf);
}
pub fn push_front(&mut self, mbuf: Mbuf) {
self.inner.push_front(mbuf);
}
pub fn pop_front(&mut self) -> Option<Mbuf> {
self.inner.pop_front()
}
pub fn pop_back(&mut self) -> Option<Mbuf> {
self.inner.pop_back()
}
pub fn back_mut(&mut self) -> Option<&mut Mbuf> {
self.inner.back_mut()
}
pub fn front_mut(&mut self) -> Option<&mut Mbuf> {
self.inner.front_mut()
}
pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, Mbuf> {
self.inner.iter()
}
}
impl<'a> IntoIterator for &'a MbufQueue {
type Item = &'a Mbuf;
type IntoIter = std::collections::vec_deque::Iter<'a, Mbuf>;
fn into_iter(self) -> Self::IntoIter {
self.inner.iter()
}
}
impl MbufQueue {
pub fn total_len(&self) -> usize {
self.inner.iter().map(Mbuf::len).sum()
}
pub fn recycle(&mut self, pool: &MbufPool) {
while let Some(buf) = self.inner.pop_front() {
pool.put(buf);
}
}
}
#[derive(Clone)]
pub struct MbufPool {
inner: Arc<MbufPoolInner>,
}
struct MbufPoolInner {
chunk_size: usize,
max_free: usize,
state: Mutex<MbufPoolState>,
}
struct MbufPoolState {
free: Vec<Box<[u8]>>,
total_allocated: u64,
}
impl Default for MbufPool {
fn default() -> Self {
Self::new(MBUF_SIZE, MBUF_POOL_MAX_FREE)
}
}
impl MbufPool {
pub fn new(chunk_size: usize, max_free: usize) -> Self {
assert!(
(MBUF_MIN_SIZE..=MBUF_MAX_SIZE).contains(&chunk_size),
"mbuf chunk_size {chunk_size} outside [{MBUF_MIN_SIZE}, {MBUF_MAX_SIZE}]"
);
Self {
inner: Arc::new(MbufPoolInner {
chunk_size,
max_free,
state: Mutex::new(MbufPoolState {
free: Vec::new(),
total_allocated: 0,
}),
}),
}
}
pub fn chunk_size(&self) -> usize {
self.inner.chunk_size
}
pub fn max_free(&self) -> usize {
self.inner.max_free
}
pub fn get(&self) -> Mbuf {
let buf = self.alloc_buffer();
let chunk_size = buf.len();
let end = chunk_size - MBUF_ESIZE;
Mbuf {
buf,
pos: 0,
last: 0,
end,
flags: 0,
owner: None,
}
}
pub fn put(&self, mut mbuf: Mbuf) {
if mbuf.buf.len() != self.inner.chunk_size {
return;
}
mbuf.rewind();
mbuf.flags = 0;
mbuf.owner = None;
let mut state = self.inner.state.lock();
if state.free.len() < self.inner.max_free {
state.free.push(mbuf.buf);
}
}
pub fn free_count(&self) -> usize {
self.inner.state.lock().free.len()
}
pub fn total_allocated(&self) -> u64 {
self.inner.state.lock().total_allocated
}
fn alloc_buffer(&self) -> Box<[u8]> {
let mut state = self.inner.state.lock();
if let Some(buf) = state.free.pop() {
return buf;
}
state.total_allocated += 1;
drop(state);
vec![0u8; self.inner.chunk_size].into_boxed_slice()
}
}
impl std::fmt::Debug for MbufPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.inner.state.lock();
f.debug_struct("MbufPool")
.field("chunk_size", &self.inner.chunk_size)
.field("max_free", &self.inner.max_free)
.field("free_count", &state.free.len())
.field("total_allocated", &state.total_allocated)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mbuf_recv_send_round_trip() {
let mut buf = Mbuf::with_chunk_size(1024);
assert_eq!(buf.recv(b"abc"), 3);
let mut out = [0u8; 8];
assert_eq!(buf.send(&mut out), 3);
assert_eq!(&out[..3], b"abc");
assert!(buf.is_empty());
}
#[test]
fn mbuf_recv_truncates_to_remaining() {
let mut buf = Mbuf::with_chunk_size(MBUF_MIN_SIZE);
let payload = vec![7u8; MBUF_MIN_SIZE * 2];
let n = buf.recv(&payload);
assert_eq!(n, MBUF_MIN_SIZE - MBUF_ESIZE);
assert!(buf.is_full());
}
#[test]
fn mbuf_split_then_append_reconstructs() {
let pool = MbufPool::default();
let mut head = pool.get();
head.recv(b"hello world");
let tail = head.split_off(5, &pool).unwrap();
assert_eq!(head.readable(), b"hello");
assert_eq!(tail.readable(), b" world");
head.append(&tail);
assert_eq!(head.readable(), b"hello world");
}
#[test]
fn mbuf_split_off_out_of_range_returns_none() {
let pool = MbufPool::default();
let mut head = pool.get();
head.recv(b"abc");
assert!(head.split_off(99, &pool).is_none());
}
#[test]
fn mbuf_pool_recycles_buffers_without_reallocation() {
let pool = MbufPool::default();
let n = 4;
let mut taken = Vec::new();
for _ in 0..n {
taken.push(pool.get());
}
assert_eq!(pool.total_allocated(), n as u64);
for buf in taken.drain(..) {
pool.put(buf);
}
assert_eq!(pool.free_count(), n);
for _ in 0..n {
taken.push(pool.get());
}
assert_eq!(pool.total_allocated(), n as u64);
}
#[test]
fn mbuf_pool_drops_chunks_beyond_cap() {
let cap = 4;
let pool = MbufPool::new(MBUF_SIZE, cap);
let n = cap * 2;
let mut taken = Vec::with_capacity(n);
for _ in 0..n {
taken.push(pool.get());
}
for buf in taken.drain(..) {
pool.put(buf);
}
assert_eq!(pool.free_count(), cap);
}
#[test]
fn mbuf_queue_push_pop_fifo() {
let pool = MbufPool::default();
let mut q = MbufQueue::new();
for i in 0..3u8 {
let mut buf = pool.get();
buf.recv(&[i]);
q.push_back(buf);
}
for i in 0..3u8 {
let buf = q.pop_front().unwrap();
assert_eq!(buf.readable(), &[i]);
}
assert!(q.is_empty());
}
#[test]
fn mbuf_flags_round_trip() {
let mut buf = Mbuf::with_chunk_size(1024);
buf.set_flag(MBUF_FLAG_READ_FLIP, true);
assert_eq!(buf.flags() & MBUF_FLAG_READ_FLIP, MBUF_FLAG_READ_FLIP);
buf.set_flag(MBUF_FLAG_READ_FLIP, false);
assert_eq!(buf.flags(), 0);
}
#[test]
fn mbuf_owner_round_trip() {
let mut buf = Mbuf::with_chunk_size(1024);
assert!(buf.owner().is_none());
buf.set_owner(Some(42));
assert_eq!(buf.owner(), Some(42));
}
#[test]
#[should_panic(expected = "outside")]
fn mbuf_too_small_panics() {
let _ = Mbuf::with_chunk_size(MBUF_MIN_SIZE - 1);
}
#[test]
#[should_panic(expected = "outside")]
fn mbuf_too_large_panics() {
let _ = Mbuf::with_chunk_size(MBUF_MAX_SIZE + 1);
}
#[test]
fn mbuf_put_drops_offsize_chunks() {
let pool = MbufPool::default();
let stray = Mbuf::with_chunk_size(MBUF_MIN_SIZE);
pool.put(stray);
assert_eq!(pool.free_count(), 0);
}
}