Skip to main content

commonware_runtime/iobuf/
mod.rs

1//! Buffer types for I/O operations.
2//!
3//! - [`IoBuf`]: Immutable byte buffer
4//! - [`IoBufMut`]: Mutable byte buffer
5//! - [`IoBufs`]: Container for one or more immutable buffers
6//! - [`IoBufsMut`]: Container for one or more mutable buffers
7//! - [`BufferPool`]: Pool of reusable, aligned buffers
8
9mod pool;
10
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use commonware_codec::{util::at_least, EncodeSize, Error, RangeCfg, Read, Write};
13use pool::PooledBufMut;
14pub use pool::{BufferPool, BufferPoolConfig, PoolError};
15use std::{collections::VecDeque, ops::RangeBounds};
16
17/// Immutable byte buffer.
18///
19/// Cloning is cheap and does not copy.
20#[derive(Clone, Debug, Default, PartialEq, Eq)]
21pub struct IoBuf {
22    inner: Bytes,
23}
24
25impl IoBuf {
26    /// Create a buffer by copying data from a slice.
27    ///
28    /// Use this when you have a non-static `&[u8]` that needs to be converted to an
29    /// `IoBuf`. For static slices, prefer `IoBuf::from(b"...")` which is zero-copy.
30    pub fn copy_from_slice(data: &[u8]) -> Self {
31        Self {
32            inner: Bytes::copy_from_slice(data),
33        }
34    }
35
36    /// Number of bytes remaining in the buffer.
37    #[inline]
38    pub fn len(&self) -> usize {
39        self.remaining()
40    }
41
42    /// Whether the buffer is empty.
43    #[inline]
44    pub fn is_empty(&self) -> bool {
45        self.remaining() == 0
46    }
47
48    /// Get raw pointer to the buffer data.
49    #[inline]
50    pub fn as_ptr(&self) -> *const u8 {
51        self.inner.as_ptr()
52    }
53
54    /// Returns a slice of self for the provided range (zero-copy).
55    #[inline]
56    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
57        Self {
58            inner: self.inner.slice(range),
59        }
60    }
61}
62
63impl AsRef<[u8]> for IoBuf {
64    #[inline]
65    fn as_ref(&self) -> &[u8] {
66        self.inner.as_ref()
67    }
68}
69
70impl PartialEq<[u8]> for IoBuf {
71    #[inline]
72    fn eq(&self, other: &[u8]) -> bool {
73        self.as_ref() == other
74    }
75}
76
77impl PartialEq<&[u8]> for IoBuf {
78    #[inline]
79    fn eq(&self, other: &&[u8]) -> bool {
80        self.as_ref() == *other
81    }
82}
83
84impl<const N: usize> PartialEq<[u8; N]> for IoBuf {
85    #[inline]
86    fn eq(&self, other: &[u8; N]) -> bool {
87        self.as_ref() == other
88    }
89}
90
91impl<const N: usize> PartialEq<&[u8; N]> for IoBuf {
92    #[inline]
93    fn eq(&self, other: &&[u8; N]) -> bool {
94        self.as_ref() == *other
95    }
96}
97
98impl Buf for IoBuf {
99    #[inline]
100    fn remaining(&self) -> usize {
101        self.inner.remaining()
102    }
103
104    #[inline]
105    fn chunk(&self) -> &[u8] {
106        self.inner.chunk()
107    }
108
109    #[inline]
110    fn advance(&mut self, cnt: usize) {
111        self.inner.advance(cnt);
112    }
113
114    #[inline]
115    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
116        self.inner.copy_to_bytes(len)
117    }
118}
119
120impl From<Bytes> for IoBuf {
121    fn from(bytes: Bytes) -> Self {
122        Self { inner: bytes }
123    }
124}
125
126impl From<Vec<u8>> for IoBuf {
127    fn from(vec: Vec<u8>) -> Self {
128        Self {
129            inner: Bytes::from(vec),
130        }
131    }
132}
133
134impl<const N: usize> From<&'static [u8; N]> for IoBuf {
135    fn from(array: &'static [u8; N]) -> Self {
136        Self {
137            inner: Bytes::from_static(array),
138        }
139    }
140}
141
142impl From<&'static [u8]> for IoBuf {
143    fn from(slice: &'static [u8]) -> Self {
144        Self {
145            inner: Bytes::from_static(slice),
146        }
147    }
148}
149
150impl From<IoBuf> for Vec<u8> {
151    fn from(buf: IoBuf) -> Self {
152        Self::from(buf.inner)
153    }
154}
155
156impl From<IoBuf> for Bytes {
157    fn from(buf: IoBuf) -> Self {
158        buf.inner
159    }
160}
161
162impl Write for IoBuf {
163    #[inline]
164    fn write(&self, buf: &mut impl BufMut) {
165        self.len().write(buf);
166        buf.put_slice(self.as_ref());
167    }
168}
169
170impl EncodeSize for IoBuf {
171    #[inline]
172    fn encode_size(&self) -> usize {
173        self.len().encode_size() + self.len()
174    }
175}
176
177impl Read for IoBuf {
178    type Cfg = RangeCfg<usize>;
179
180    #[inline]
181    fn read_cfg(buf: &mut impl Buf, range: &Self::Cfg) -> Result<Self, Error> {
182        let len = usize::read_cfg(buf, range)?;
183        at_least(buf, len)?;
184        Ok(Self::from(buf.copy_to_bytes(len)))
185    }
186}
187
188#[cfg(feature = "arbitrary")]
189impl arbitrary::Arbitrary<'_> for IoBuf {
190    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
191        let len = u.arbitrary_len::<u8>()?;
192        let data: Vec<u8> = u.arbitrary_iter()?.take(len).collect::<Result<_, _>>()?;
193        Ok(Self::from(data))
194    }
195}
196
197/// Mutable byte buffer.
198///
199/// Use this to build or mutate payloads before freezing into `IoBuf`.
200///
201/// Can be either an owned buffer (backed by `BytesMut`) or a pooled buffer
202/// (allocated from a `BufferPool`). Pooled buffers are automatically returned
203/// to the pool when dropped. Freezing transfers ownership to the resulting
204/// `IoBuf`, which returns the buffer to the pool when all references are dropped.
205#[derive(Debug)]
206pub struct IoBufMut {
207    inner: IoBufMutInner,
208}
209
210#[derive(Debug)]
211enum IoBufMutInner {
212    Owned(BytesMut),
213    Pooled(PooledBufMut),
214}
215
216impl Default for IoBufMut {
217    fn default() -> Self {
218        Self {
219            inner: IoBufMutInner::Owned(BytesMut::new()),
220        }
221    }
222}
223
224impl IoBufMut {
225    /// Create a buffer with the given capacity.
226    pub fn with_capacity(capacity: usize) -> Self {
227        Self {
228            inner: IoBufMutInner::Owned(BytesMut::with_capacity(capacity)),
229        }
230    }
231
232    /// Create a buffer of `len` bytes, all initialized to zero.
233    ///
234    /// Unlike `with_capacity`, this sets both capacity and length to `len`,
235    /// making the entire buffer immediately usable for read operations
236    /// (e.g., `file.read_exact`).
237    pub fn zeroed(len: usize) -> Self {
238        Self {
239            inner: IoBufMutInner::Owned(BytesMut::zeroed(len)),
240        }
241    }
242
243    /// Create a buffer from a pooled allocation.
244    pub(crate) const fn from_pooled(pooled: PooledBufMut) -> Self {
245        Self {
246            inner: IoBufMutInner::Pooled(pooled),
247        }
248    }
249
250    /// Returns `true` if this buffer is tracked by a pool.
251    ///
252    /// Tracked buffers will be returned to their pool when dropped. Fallback
253    /// allocations from [`BufferPool::alloc`] when the pool is exhausted or
254    /// oversized are aligned but untracked, so this returns `false`.
255    #[inline]
256    pub fn is_pooled(&self) -> bool {
257        match &self.inner {
258            IoBufMutInner::Owned(_) => false,
259            IoBufMutInner::Pooled(p) => p.is_tracked(),
260        }
261    }
262
263    /// Sets the length of the buffer.
264    ///
265    /// This will explicitly set the size of the buffer without actually
266    /// modifying the data, so it is up to the caller to ensure that the data
267    /// has been initialized.
268    ///
269    /// # Safety
270    ///
271    /// Caller must ensure all bytes in `0..len` are initialized before any
272    /// read operations.
273    ///
274    /// # Panics
275    ///
276    /// Panics if `len > capacity()`.
277    #[inline]
278    pub unsafe fn set_len(&mut self, len: usize) {
279        assert!(
280            len <= self.capacity(),
281            "set_len({len}) exceeds capacity({})",
282            self.capacity()
283        );
284        match &mut self.inner {
285            IoBufMutInner::Owned(b) => b.set_len(len),
286            IoBufMutInner::Pooled(b) => b.set_len(len),
287        }
288    }
289
290    /// Number of bytes remaining in the buffer.
291    #[inline]
292    pub fn len(&self) -> usize {
293        self.remaining()
294    }
295
296    /// Whether the buffer is empty.
297    #[inline]
298    pub fn is_empty(&self) -> bool {
299        match &self.inner {
300            IoBufMutInner::Owned(b) => b.is_empty(),
301            IoBufMutInner::Pooled(b) => b.is_empty(),
302        }
303    }
304
305    /// Freeze into immutable `IoBuf`.
306    #[inline]
307    pub fn freeze(self) -> IoBuf {
308        match self.inner {
309            IoBufMutInner::Owned(b) => b.freeze().into(),
310            IoBufMutInner::Pooled(b) => b.freeze(),
311        }
312    }
313
314    /// Returns the number of bytes the buffer can hold without reallocating.
315    #[inline]
316    pub fn capacity(&self) -> usize {
317        match &self.inner {
318            IoBufMutInner::Owned(b) => b.capacity(),
319            IoBufMutInner::Pooled(b) => b.capacity(),
320        }
321    }
322
323    /// Returns an unsafe mutable pointer to the buffer's data.
324    #[inline]
325    pub fn as_mut_ptr(&mut self) -> *mut u8 {
326        match &mut self.inner {
327            IoBufMutInner::Owned(b) => b.as_mut_ptr(),
328            IoBufMutInner::Pooled(b) => b.as_mut_ptr(),
329        }
330    }
331
332    /// Truncates the buffer to `len` readable bytes.
333    ///
334    /// If `len` is greater than the current length, this has no effect.
335    #[inline]
336    pub fn truncate(&mut self, len: usize) {
337        match &mut self.inner {
338            IoBufMutInner::Owned(b) => b.truncate(len),
339            IoBufMutInner::Pooled(b) => b.truncate(len),
340        }
341    }
342
343    /// Clears the buffer, removing all data. Existing capacity is preserved.
344    #[inline]
345    pub fn clear(&mut self) {
346        match &mut self.inner {
347            IoBufMutInner::Owned(b) => b.clear(),
348            IoBufMutInner::Pooled(b) => b.clear(),
349        }
350    }
351}
352
353impl AsRef<[u8]> for IoBufMut {
354    #[inline]
355    fn as_ref(&self) -> &[u8] {
356        match &self.inner {
357            IoBufMutInner::Owned(b) => b.as_ref(),
358            IoBufMutInner::Pooled(b) => b.as_ref(),
359        }
360    }
361}
362
363impl AsMut<[u8]> for IoBufMut {
364    #[inline]
365    fn as_mut(&mut self) -> &mut [u8] {
366        match &mut self.inner {
367            IoBufMutInner::Owned(b) => b.as_mut(),
368            IoBufMutInner::Pooled(b) => b.as_mut(),
369        }
370    }
371}
372
373impl PartialEq<[u8]> for IoBufMut {
374    #[inline]
375    fn eq(&self, other: &[u8]) -> bool {
376        self.as_ref() == other
377    }
378}
379
380impl PartialEq<&[u8]> for IoBufMut {
381    #[inline]
382    fn eq(&self, other: &&[u8]) -> bool {
383        self.as_ref() == *other
384    }
385}
386
387impl<const N: usize> PartialEq<[u8; N]> for IoBufMut {
388    #[inline]
389    fn eq(&self, other: &[u8; N]) -> bool {
390        self.as_ref() == other
391    }
392}
393
394impl<const N: usize> PartialEq<&[u8; N]> for IoBufMut {
395    #[inline]
396    fn eq(&self, other: &&[u8; N]) -> bool {
397        self.as_ref() == *other
398    }
399}
400
401impl Buf for IoBufMut {
402    #[inline]
403    fn remaining(&self) -> usize {
404        match &self.inner {
405            IoBufMutInner::Owned(b) => b.remaining(),
406            IoBufMutInner::Pooled(b) => b.remaining(),
407        }
408    }
409
410    #[inline]
411    fn chunk(&self) -> &[u8] {
412        match &self.inner {
413            IoBufMutInner::Owned(b) => b.chunk(),
414            IoBufMutInner::Pooled(b) => b.chunk(),
415        }
416    }
417
418    #[inline]
419    fn advance(&mut self, cnt: usize) {
420        match &mut self.inner {
421            IoBufMutInner::Owned(b) => b.advance(cnt),
422            IoBufMutInner::Pooled(b) => b.advance(cnt),
423        }
424    }
425}
426
427// SAFETY: Delegates to BytesMut or PooledBufMut which implement BufMut safely.
428unsafe impl BufMut for IoBufMut {
429    #[inline]
430    fn remaining_mut(&self) -> usize {
431        match &self.inner {
432            IoBufMutInner::Owned(b) => b.remaining_mut(),
433            IoBufMutInner::Pooled(b) => b.remaining_mut(),
434        }
435    }
436
437    #[inline]
438    unsafe fn advance_mut(&mut self, cnt: usize) {
439        match &mut self.inner {
440            IoBufMutInner::Owned(b) => b.advance_mut(cnt),
441            IoBufMutInner::Pooled(b) => b.advance_mut(cnt),
442        }
443    }
444
445    #[inline]
446    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
447        match &mut self.inner {
448            IoBufMutInner::Owned(b) => b.chunk_mut(),
449            IoBufMutInner::Pooled(b) => b.chunk_mut(),
450        }
451    }
452}
453
454impl From<Vec<u8>> for IoBufMut {
455    fn from(vec: Vec<u8>) -> Self {
456        Self::from(Bytes::from(vec))
457    }
458}
459
460impl From<&[u8]> for IoBufMut {
461    fn from(slice: &[u8]) -> Self {
462        Self {
463            inner: IoBufMutInner::Owned(BytesMut::from(slice)),
464        }
465    }
466}
467
468impl<const N: usize> From<[u8; N]> for IoBufMut {
469    fn from(array: [u8; N]) -> Self {
470        Self::from(array.as_ref())
471    }
472}
473
474impl<const N: usize> From<&[u8; N]> for IoBufMut {
475    fn from(array: &[u8; N]) -> Self {
476        Self::from(array.as_ref())
477    }
478}
479
480impl From<BytesMut> for IoBufMut {
481    fn from(bytes: BytesMut) -> Self {
482        Self {
483            inner: IoBufMutInner::Owned(bytes),
484        }
485    }
486}
487
488impl From<Bytes> for IoBufMut {
489    /// Zero-copy if `bytes` is unique for the entire original buffer (refcount is 1),
490    /// copies otherwise. Always copies if the `Bytes` was constructed via `from_owner` or
491    /// `from_static`.
492    fn from(bytes: Bytes) -> Self {
493        Self {
494            inner: IoBufMutInner::Owned(BytesMut::from(bytes)),
495        }
496    }
497}
498
499impl From<IoBuf> for IoBufMut {
500    /// Zero-copy if `buf` is unique for the entire original buffer (refcount is 1),
501    /// copies otherwise. Always copies for pooled buffers and static slices.
502    fn from(buf: IoBuf) -> Self {
503        Self::from(buf.inner)
504    }
505}
506
507/// Container for one or more immutable buffers.
508#[derive(Debug)]
509pub enum IoBufs {
510    /// Single buffer (common case, no VecDeque allocation).
511    Single(IoBuf),
512    /// Multiple buffers.
513    Chunked(VecDeque<IoBuf>),
514}
515
516impl Default for IoBufs {
517    fn default() -> Self {
518        Self::Single(IoBuf::default())
519    }
520}
521
522impl IoBufs {
523    /// Number of bytes remaining across all buffers.
524    #[inline]
525    pub fn len(&self) -> usize {
526        self.remaining()
527    }
528
529    /// Whether all buffers are empty.
530    #[inline]
531    pub fn is_empty(&self) -> bool {
532        self.remaining() == 0
533    }
534
535    /// Whether this contains a single contiguous buffer.
536    ///
537    /// When true, `chunk()` returns all remaining bytes.
538    #[inline]
539    pub const fn is_single(&self) -> bool {
540        matches!(self, Self::Single(_))
541    }
542
543    /// Prepend a buffer to the front.
544    pub fn prepend(&mut self, buf: IoBuf) {
545        if buf.is_empty() {
546            return;
547        }
548        match std::mem::take(self) {
549            Self::Single(existing) if existing.is_empty() => {
550                *self = Self::Single(buf);
551            }
552            Self::Single(existing) => {
553                *self = Self::Chunked(VecDeque::from([buf, existing]));
554            }
555            Self::Chunked(mut bufs) => {
556                bufs.push_front(buf);
557                *self = Self::Chunked(bufs);
558            }
559        }
560    }
561
562    /// Append a buffer to the back.
563    pub fn append(&mut self, buf: IoBuf) {
564        if buf.is_empty() {
565            return;
566        }
567        match std::mem::take(self) {
568            Self::Single(existing) if existing.is_empty() => {
569                *self = Self::Single(buf);
570            }
571            Self::Single(existing) => {
572                *self = Self::Chunked(VecDeque::from([existing, buf]));
573            }
574            Self::Chunked(mut bufs) => {
575                bufs.push_back(buf);
576                *self = Self::Chunked(bufs);
577            }
578        }
579    }
580
581    /// Coalesce all remaining bytes into a single contiguous `IoBuf`.
582    ///
583    /// Zero-copy if only one buffer. Copies if multiple buffers.
584    #[inline]
585    pub fn coalesce(mut self) -> IoBuf {
586        match self {
587            Self::Single(buf) => buf,
588            Self::Chunked(_) => self.copy_to_bytes(self.remaining()).into(),
589        }
590    }
591
592    /// Coalesce all remaining bytes into a single contiguous `IoBuf`, using the pool
593    /// for allocation if multiple buffers need to be merged.
594    ///
595    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
596    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBuf {
597        match self {
598            Self::Single(buf) => buf,
599            Self::Chunked(bufs) => {
600                let total_len: usize = bufs
601                    .iter()
602                    .map(|b| b.remaining())
603                    .fold(0, usize::saturating_add);
604                let mut result = pool.alloc(total_len);
605                for buf in bufs {
606                    result.put_slice(buf.as_ref());
607                }
608                result.freeze()
609            }
610        }
611    }
612}
613
614impl Buf for IoBufs {
615    fn remaining(&self) -> usize {
616        match self {
617            Self::Single(buf) => buf.remaining(),
618            Self::Chunked(bufs) => bufs
619                .iter()
620                .map(|b| b.remaining())
621                .fold(0, usize::saturating_add),
622        }
623    }
624
625    fn chunk(&self) -> &[u8] {
626        match self {
627            Self::Single(buf) => buf.chunk(),
628            Self::Chunked(bufs) => {
629                for buf in bufs.iter() {
630                    if buf.remaining() > 0 {
631                        return buf.chunk();
632                    }
633                }
634                &[]
635            }
636        }
637    }
638
639    fn advance(&mut self, mut cnt: usize) {
640        let bufs = match self {
641            Self::Single(buf) => return buf.advance(cnt),
642            Self::Chunked(bufs) => bufs,
643        };
644
645        while cnt > 0 {
646            let front = bufs.front_mut().expect("cannot advance past end of buffer");
647            let avail = front.remaining();
648            if cnt >= avail {
649                bufs.pop_front();
650                cnt -= avail;
651            } else {
652                front.advance(cnt);
653                return;
654            }
655        }
656    }
657
658    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
659        let bufs = match self {
660            Self::Single(buf) => return buf.copy_to_bytes(len),
661            Self::Chunked(bufs) => bufs,
662        };
663
664        // Remove exhausted buffers from front
665        while bufs.front().is_some_and(|b| b.remaining() == 0) {
666            bufs.pop_front();
667        }
668
669        // If the first buffer has all the data we need, use its optimized copy_to_bytes
670        if let Some(front) = bufs.front_mut() {
671            if front.remaining() >= len {
672                return front.copy_to_bytes(len);
673            }
674        }
675
676        // Otherwise, copy from multiple buffers
677        let total: usize = bufs
678            .iter()
679            .map(|b| b.remaining())
680            .fold(0, usize::saturating_add);
681
682        assert!(total >= len, "IoBufs::copy_to_bytes: not enough data");
683
684        let mut result = BytesMut::with_capacity(len);
685        let mut remaining = len;
686        while remaining > 0 {
687            let front = bufs
688                .front_mut()
689                .expect("remaining > 0 implies non-empty bufs");
690            let avail = front.remaining();
691            let to_copy = remaining.min(avail);
692            result.extend_from_slice(&front.chunk()[..to_copy]);
693            front.advance(to_copy);
694            if front.remaining() == 0 {
695                bufs.pop_front();
696            }
697            remaining -= to_copy;
698        }
699
700        result.freeze()
701    }
702}
703
704impl From<IoBuf> for IoBufs {
705    fn from(buf: IoBuf) -> Self {
706        Self::Single(buf)
707    }
708}
709
710impl From<IoBufMut> for IoBufs {
711    fn from(buf: IoBufMut) -> Self {
712        Self::Single(buf.freeze())
713    }
714}
715
716impl From<Bytes> for IoBufs {
717    fn from(bytes: Bytes) -> Self {
718        Self::from(IoBuf::from(bytes))
719    }
720}
721
722impl From<BytesMut> for IoBufs {
723    fn from(bytes: BytesMut) -> Self {
724        Self::from(IoBuf::from(bytes.freeze()))
725    }
726}
727
728impl From<Vec<u8>> for IoBufs {
729    fn from(vec: Vec<u8>) -> Self {
730        Self::from(IoBuf::from(vec))
731    }
732}
733
734impl<const N: usize> From<&'static [u8; N]> for IoBufs {
735    fn from(array: &'static [u8; N]) -> Self {
736        Self::from(IoBuf::from(array))
737    }
738}
739
740impl From<&'static [u8]> for IoBufs {
741    fn from(slice: &'static [u8]) -> Self {
742        Self::from(IoBuf::from(slice))
743    }
744}
745
746/// Container for one or more mutable buffers.
747#[derive(Debug)]
748pub enum IoBufsMut {
749    /// Single buffer (common case, no VecDeque allocation).
750    Single(IoBufMut),
751    /// Multiple buffers for vectored reads.
752    Chunked(VecDeque<IoBufMut>),
753}
754
755impl Default for IoBufsMut {
756    fn default() -> Self {
757        Self::Single(IoBufMut::default())
758    }
759}
760
761impl IoBufsMut {
762    /// Number of bytes remaining across all buffers.
763    #[inline]
764    pub fn len(&self) -> usize {
765        self.remaining()
766    }
767
768    /// Whether all buffers are empty.
769    #[inline]
770    pub fn is_empty(&self) -> bool {
771        self.remaining() == 0
772    }
773
774    /// Whether this contains a single contiguous buffer.
775    ///
776    /// When true, `chunk()` returns all remaining bytes.
777    #[inline]
778    pub const fn is_single(&self) -> bool {
779        matches!(self, Self::Single(_))
780    }
781
782    /// Freeze into immutable `IoBufs`.
783    pub fn freeze(self) -> IoBufs {
784        match self {
785            Self::Single(buf) => IoBufs::Single(buf.freeze()),
786            Self::Chunked(bufs) => {
787                let mut frozen: VecDeque<IoBuf> = bufs
788                    .into_iter()
789                    .map(|b| b.freeze())
790                    .filter(|b| !b.is_empty())
791                    .collect();
792                if frozen.len() == 1 {
793                    IoBufs::Single(frozen.pop_front().unwrap())
794                } else if frozen.is_empty() {
795                    IoBufs::Single(IoBuf::default())
796                } else {
797                    IoBufs::Chunked(frozen)
798                }
799            }
800        }
801    }
802
803    fn coalesce_with<F>(self, allocate: F) -> IoBufMut
804    where
805        F: FnOnce(usize) -> IoBufMut,
806    {
807        match self {
808            Self::Single(buf) => buf,
809            Self::Chunked(bufs) => {
810                let total_len: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
811                let mut result = allocate(total_len);
812                for buf in bufs {
813                    result.put_slice(buf.as_ref());
814                }
815                result
816            }
817        }
818    }
819
820    /// Coalesce all buffers into a single contiguous `IoBufMut`.
821    ///
822    /// Zero-copy if only one buffer. Copies if multiple buffers.
823    pub fn coalesce(self) -> IoBufMut {
824        self.coalesce_with(IoBufMut::with_capacity)
825    }
826
827    /// Coalesce all buffers into a single contiguous `IoBufMut`, using the pool
828    /// for allocation if multiple buffers need to be merged.
829    ///
830    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
831    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBufMut {
832        self.coalesce_with(|len| pool.alloc(len))
833    }
834
835    /// Coalesce all buffers into a single contiguous `IoBufMut` with extra
836    /// capacity, using the pool for allocation.
837    ///
838    /// Zero-copy if single buffer with sufficient spare capacity.
839    pub fn coalesce_with_pool_extra(self, pool: &BufferPool, extra: usize) -> IoBufMut {
840        match self {
841            Self::Single(buf) if buf.capacity() - buf.len() >= extra => buf,
842            Self::Single(buf) => {
843                let mut result = pool.alloc(buf.len() + extra);
844                result.put_slice(buf.as_ref());
845                result
846            }
847            Self::Chunked(bufs) => {
848                let total: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
849                let mut result = pool.alloc(total + extra);
850                for buf in bufs {
851                    result.put_slice(buf.as_ref());
852                }
853                result
854            }
855        }
856    }
857
858    /// Copy data from a slice into the buffers.
859    ///
860    /// Panics if the slice length doesn't match the total buffer length.
861    pub fn copy_from_slice(&mut self, src: &[u8]) {
862        assert_eq!(
863            src.len(),
864            self.len(),
865            "source slice length must match buffer length"
866        );
867        match self {
868            Self::Single(buf) => buf.as_mut().copy_from_slice(src),
869            Self::Chunked(bufs) => {
870                let mut offset = 0;
871                for buf in bufs.iter_mut() {
872                    let len = buf.len();
873                    buf.as_mut().copy_from_slice(&src[offset..offset + len]);
874                    offset += len;
875                }
876            }
877        }
878    }
879}
880
881impl Buf for IoBufsMut {
882    fn remaining(&self) -> usize {
883        match self {
884            Self::Single(buf) => buf.remaining(),
885            Self::Chunked(bufs) => bufs
886                .iter()
887                .map(|b| b.remaining())
888                .fold(0, usize::saturating_add),
889        }
890    }
891
892    fn chunk(&self) -> &[u8] {
893        match self {
894            Self::Single(buf) => buf.chunk(),
895            Self::Chunked(bufs) => {
896                for buf in bufs.iter() {
897                    if buf.remaining() > 0 {
898                        return buf.chunk();
899                    }
900                }
901                &[]
902            }
903        }
904    }
905
906    fn advance(&mut self, mut cnt: usize) {
907        let bufs = match self {
908            Self::Single(buf) => return buf.advance(cnt),
909            Self::Chunked(bufs) => bufs,
910        };
911
912        while cnt > 0 {
913            let front = bufs.front_mut().expect("cannot advance past end of buffer");
914            let avail = front.remaining();
915            if cnt >= avail {
916                bufs.pop_front();
917                cnt -= avail;
918            } else {
919                front.advance(cnt);
920                return;
921            }
922        }
923    }
924}
925
926// SAFETY: Delegates to IoBufMut which implements BufMut safely.
927unsafe impl BufMut for IoBufsMut {
928    #[inline]
929    fn remaining_mut(&self) -> usize {
930        match self {
931            Self::Single(buf) => buf.remaining_mut(),
932            Self::Chunked(bufs) => bufs
933                .iter()
934                .map(|b| b.remaining_mut())
935                .fold(0, usize::saturating_add),
936        }
937    }
938
939    #[inline]
940    unsafe fn advance_mut(&mut self, cnt: usize) {
941        match self {
942            Self::Single(buf) => buf.advance_mut(cnt),
943            Self::Chunked(bufs) => {
944                let mut remaining = cnt;
945                for buf in bufs.iter_mut() {
946                    let avail = buf.remaining_mut();
947                    if remaining <= avail {
948                        buf.advance_mut(remaining);
949                        return;
950                    }
951                    buf.advance_mut(avail);
952                    remaining -= avail;
953                }
954                panic!("cannot advance past end of buffer");
955            }
956        }
957    }
958
959    #[inline]
960    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
961        match self {
962            Self::Single(buf) => buf.chunk_mut(),
963            Self::Chunked(bufs) => {
964                for buf in bufs.iter_mut() {
965                    if buf.remaining_mut() > 0 {
966                        return buf.chunk_mut();
967                    }
968                }
969                bytes::buf::UninitSlice::new(&mut [])
970            }
971        }
972    }
973}
974
975impl From<IoBufMut> for IoBufsMut {
976    fn from(buf: IoBufMut) -> Self {
977        Self::Single(buf)
978    }
979}
980
981impl From<Vec<u8>> for IoBufsMut {
982    fn from(vec: Vec<u8>) -> Self {
983        Self::Single(IoBufMut::from(vec))
984    }
985}
986
987impl From<BytesMut> for IoBufsMut {
988    fn from(bytes: BytesMut) -> Self {
989        Self::Single(IoBufMut::from(bytes))
990    }
991}
992
993impl From<Vec<IoBufMut>> for IoBufsMut {
994    fn from(mut bufs: Vec<IoBufMut>) -> Self {
995        match bufs.len() {
996            0 => Self::default(),
997            1 => Self::Single(bufs.pop().unwrap()),
998            _ => Self::Chunked(bufs.into()),
999        }
1000    }
1001}
1002
1003impl<const N: usize> From<[u8; N]> for IoBufsMut {
1004    fn from(array: [u8; N]) -> Self {
1005        Self::Single(IoBufMut::from(array))
1006    }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::*;
1012
1013    #[test]
1014    fn test_iobuf_clone_doesnt_copy() {
1015        let buf1 = IoBuf::from(vec![1u8; 1000]);
1016        let buf2 = buf1.clone();
1017        assert_eq!(buf1.as_ref().as_ptr(), buf2.as_ref().as_ptr());
1018    }
1019
1020    #[test]
1021    fn test_iobuf_copy_from_slice() {
1022        let data = vec![1u8, 2, 3, 4, 5];
1023        let buf = IoBuf::copy_from_slice(&data);
1024        assert_eq!(buf, [1, 2, 3, 4, 5]);
1025        assert_eq!(buf.len(), 5);
1026
1027        drop(data);
1028        assert_eq!(buf, [1, 2, 3, 4, 5]);
1029
1030        let empty = IoBuf::copy_from_slice(&[]);
1031        assert!(empty.is_empty());
1032    }
1033
1034    #[test]
1035    fn test_iobuf_buf_trait() {
1036        let mut buf = IoBuf::from(b"hello");
1037        assert_eq!(buf.remaining(), 5);
1038        buf.advance(2);
1039        assert_eq!(buf.chunk(), b"llo");
1040    }
1041
1042    #[test]
1043    fn test_iobuf_empty() {
1044        let buf = IoBuf::from(Vec::new());
1045        assert!(buf.is_empty());
1046        assert_eq!(buf.len(), 0);
1047    }
1048
1049    #[test]
1050    fn test_iobuf_equality() {
1051        let buf1 = IoBuf::from(b"hello");
1052        let buf2 = IoBuf::from(b"hello");
1053        let buf3 = IoBuf::from(b"world");
1054        assert_eq!(buf1, buf2);
1055        assert_ne!(buf1, buf3);
1056    }
1057
1058    #[test]
1059    fn test_iobuf_equality_with_slice() {
1060        let buf = IoBuf::from(b"hello");
1061        assert_eq!(buf, *b"hello");
1062        assert_eq!(buf, b"hello");
1063        assert_ne!(buf, *b"world");
1064        assert_ne!(buf, b"world");
1065    }
1066
1067    #[test]
1068    fn test_iobuf_codec_roundtrip() {
1069        use commonware_codec::{Decode, Encode, RangeCfg};
1070
1071        let cfg: RangeCfg<usize> = (0..=1024).into();
1072
1073        let original = IoBuf::from(b"hello world");
1074        let encoded = original.encode();
1075        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
1076        assert_eq!(original, decoded);
1077
1078        let empty = IoBuf::default();
1079        let encoded = empty.encode();
1080        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
1081        assert_eq!(empty, decoded);
1082
1083        let large_cfg: RangeCfg<usize> = (0..=20000).into();
1084        let large = IoBuf::from(vec![42u8; 10000]);
1085        let encoded = large.encode();
1086        let decoded = IoBuf::decode_cfg(encoded, &large_cfg).unwrap();
1087        assert_eq!(large, decoded);
1088    }
1089
1090    #[test]
1091    fn test_iobuf_copy_to_bytes() {
1092        let mut buf = IoBuf::from(b"hello world");
1093        let first = buf.copy_to_bytes(5);
1094        assert_eq!(&first[..], b"hello");
1095        assert_eq!(buf.remaining(), 6);
1096        let rest = buf.copy_to_bytes(6);
1097        assert_eq!(&rest[..], b" world");
1098        assert_eq!(buf.remaining(), 0);
1099    }
1100
1101    #[test]
1102    fn test_iobuf_slice() {
1103        let buf = IoBuf::from(b"hello world");
1104
1105        let slice = buf.slice(..5);
1106        assert_eq!(slice, b"hello");
1107
1108        let slice = buf.slice(6..);
1109        assert_eq!(slice, b"world");
1110
1111        let slice = buf.slice(3..8);
1112        assert_eq!(slice, b"lo wo");
1113
1114        let slice = buf.slice(5..5);
1115        assert!(slice.is_empty());
1116
1117        assert_eq!(buf, b"hello world");
1118    }
1119
1120    #[test]
1121    #[should_panic(expected = "cannot advance")]
1122    fn test_iobuf_advance_past_end() {
1123        let mut buf = IoBuf::from(b"hello");
1124        buf.advance(10);
1125    }
1126
1127    #[test]
1128    fn test_iobuf_mut_build_and_freeze() {
1129        let mut buf = IoBufMut::with_capacity(100);
1130        buf.put_slice(b"hello");
1131        assert_eq!(buf, b"hello");
1132
1133        buf.put_slice(b" world");
1134        assert_eq!(buf, b"hello world");
1135
1136        let frozen = buf.freeze();
1137        assert_eq!(frozen, b"hello world");
1138    }
1139
1140    #[test]
1141    fn test_iobuf_mut_capacity() {
1142        let buf = IoBufMut::with_capacity(100);
1143        assert!(buf.capacity() >= 100);
1144        assert_eq!(buf.len(), 0);
1145    }
1146
1147    #[test]
1148    fn test_iobuf_mut_set_len() {
1149        let mut buf = IoBufMut::zeroed(10);
1150        assert_eq!(buf.len(), 10);
1151
1152        // Test shrinking via set_len
1153        // SAFETY: Shrinking to 5 bytes, all of which are initialized (zeros from zeroed()).
1154        unsafe {
1155            buf.set_len(5);
1156        }
1157        assert_eq!(buf.len(), 5);
1158        assert_eq!(buf, &[0u8; 5]);
1159
1160        // Modify the content and verify
1161        buf.as_mut()[..5].copy_from_slice(&[0xAB; 5]);
1162        assert_eq!(buf, &[0xAB; 5]);
1163    }
1164
1165    #[test]
1166    fn test_iobuf_mut_zeroed() {
1167        let mut buf = IoBufMut::zeroed(10);
1168        assert_eq!(buf.len(), 10);
1169        assert!(buf.capacity() >= 10);
1170        assert_eq!(buf, &[0u8; 10]);
1171
1172        // Can write into it via as_mut
1173        buf.as_mut()[..5].copy_from_slice(b"hello");
1174        assert_eq!(&buf.as_ref()[..5], b"hello");
1175        assert_eq!(&buf.as_ref()[5..], &[0u8; 5]);
1176
1177        // Freeze and convert to Vec
1178        let frozen = buf.freeze();
1179        assert_eq!(frozen.len(), 10);
1180        let vec: Vec<u8> = frozen.into();
1181        assert_eq!(&vec[..5], b"hello");
1182        assert_eq!(&vec[5..], &[0u8; 5]);
1183    }
1184
1185    #[test]
1186    fn test_iobuf_len_equals_remaining_after_advance() {
1187        let mut buf = IoBuf::from(b"hello world");
1188
1189        // Before advance
1190        assert_eq!(buf.len(), buf.remaining());
1191        assert_eq!(buf.as_ref(), buf.chunk());
1192
1193        // After advance
1194        buf.advance(6);
1195        assert_eq!(buf.len(), buf.remaining());
1196        assert_eq!(buf.as_ref(), buf.chunk());
1197        assert_eq!(buf.len(), 5);
1198    }
1199
1200    #[test]
1201    fn test_iobufs_empty() {
1202        let bufs = IoBufs::from(Vec::new());
1203        assert!(bufs.is_empty());
1204        assert_eq!(bufs.len(), 0);
1205    }
1206
1207    #[test]
1208    fn test_iobufs_single_buffer() {
1209        let mut bufs = IoBufs::from(b"hello world");
1210        assert!(bufs.is_single());
1211
1212        assert_eq!(bufs.remaining(), 11);
1213        assert_eq!(bufs.chunk(), b"hello world");
1214
1215        bufs.advance(6);
1216        assert_eq!(bufs.remaining(), 5);
1217        assert_eq!(bufs.chunk(), b"world");
1218
1219        let bytes = bufs.copy_to_bytes(5);
1220        assert_eq!(&bytes[..], b"world");
1221        assert_eq!(bufs.remaining(), 0);
1222    }
1223
1224    #[test]
1225    fn test_iobufs_is_single() {
1226        let bufs = IoBufs::from(b"hello");
1227        assert!(bufs.is_single());
1228
1229        let mut bufs = IoBufs::from(b"world");
1230        assert!(bufs.is_single());
1231        bufs.prepend(IoBuf::from(b"hello "));
1232        assert!(!bufs.is_single());
1233
1234        let mut bufs = IoBufs::from(b"hello");
1235        assert!(bufs.is_single());
1236        bufs.append(IoBuf::from(b" world"));
1237        assert!(!bufs.is_single());
1238
1239        let bufs = IoBufs::default();
1240        assert!(bufs.is_single());
1241    }
1242
1243    #[test]
1244    fn test_iobufs_prepend_and_append() {
1245        let mut bufs = IoBufs::from(b"middle");
1246        bufs.prepend(IoBuf::from(b"start "));
1247        bufs.append(IoBuf::from(b" end"));
1248        assert_eq!(bufs.coalesce(), b"start middle end");
1249    }
1250
1251    #[test]
1252    fn test_iobufs_coalesce_after_advance() {
1253        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
1254        bufs.append(IoBuf::from(b" world"));
1255
1256        assert_eq!(bufs.len(), 11);
1257
1258        bufs.advance(3);
1259        assert_eq!(bufs.len(), 8);
1260
1261        assert_eq!(bufs.coalesce(), b"lo world");
1262    }
1263
1264    #[test]
1265    fn test_iobufs_coalesce_with_pool() {
1266        let mut registry = prometheus_client::registry::Registry::default();
1267        let pool = BufferPool::new(BufferPoolConfig::for_network(), &mut registry);
1268
1269        // Single buffer: zero-copy (same pointer)
1270        let buf = IoBuf::from(vec![1u8, 2, 3, 4, 5]);
1271        let original_ptr = buf.as_ptr();
1272        let bufs = IoBufs::from(buf);
1273        let coalesced = bufs.coalesce_with_pool(&pool);
1274        assert_eq!(coalesced, [1, 2, 3, 4, 5]);
1275        assert_eq!(coalesced.as_ptr(), original_ptr);
1276
1277        // Multiple buffers: merged using pool
1278        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
1279        bufs.append(IoBuf::from(b" world"));
1280        let coalesced = bufs.coalesce_with_pool(&pool);
1281        assert_eq!(coalesced, b"hello world");
1282
1283        // Multiple buffers after advance: only remaining data coalesced
1284        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
1285        bufs.append(IoBuf::from(b" world"));
1286        bufs.advance(3);
1287        let coalesced = bufs.coalesce_with_pool(&pool);
1288        assert_eq!(coalesced, b"lo world");
1289
1290        // Empty buffers in the middle
1291        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
1292        bufs.append(IoBuf::default());
1293        bufs.append(IoBuf::from(b" world"));
1294        let coalesced = bufs.coalesce_with_pool(&pool);
1295        assert_eq!(coalesced, b"hello world");
1296
1297        // Empty IoBufs
1298        let bufs = IoBufs::default();
1299        let coalesced = bufs.coalesce_with_pool(&pool);
1300        assert!(coalesced.is_empty());
1301    }
1302
1303    #[test]
1304    fn test_iobufs_with_empty_buffers() {
1305        let mut bufs = IoBufs::default();
1306        bufs.append(IoBuf::from(b"hello"));
1307        bufs.append(IoBuf::default());
1308        bufs.append(IoBuf::from(b" "));
1309        bufs.append(IoBuf::default());
1310        bufs.append(IoBuf::from(b"world"));
1311
1312        assert_eq!(bufs.len(), 11);
1313        assert_eq!(bufs.chunk(), b"hello");
1314
1315        bufs.advance(5);
1316        assert_eq!(bufs.chunk(), b" ");
1317
1318        bufs.advance(1);
1319        assert_eq!(bufs.chunk(), b"world");
1320
1321        assert_eq!(bufs.coalesce(), b"world");
1322    }
1323
1324    #[test]
1325    fn test_iobufs_copy_to_bytes_single_buffer() {
1326        let mut bufs = IoBufs::from(b"hello world");
1327        let first = bufs.copy_to_bytes(5);
1328        assert_eq!(&first[..], b"hello");
1329        assert_eq!(bufs.remaining(), 6);
1330    }
1331
1332    #[test]
1333    fn test_iobufs_copy_to_bytes_multiple_buffers() {
1334        let mut bufs = IoBufs::from(b"hello");
1335        bufs.prepend(IoBuf::from(b"say "));
1336
1337        let first = bufs.copy_to_bytes(7);
1338        assert_eq!(&first[..], b"say hel");
1339        assert_eq!(bufs.remaining(), 2);
1340
1341        let rest = bufs.copy_to_bytes(2);
1342        assert_eq!(&rest[..], b"lo");
1343    }
1344
1345    #[test]
1346    fn test_iobufs_copy_to_bytes_edge_cases() {
1347        // Empty first buffer
1348        let mut iobufs = IoBufs::from(IoBuf::from(b""));
1349        iobufs.append(IoBuf::from(b"hello"));
1350        let bytes = iobufs.copy_to_bytes(5);
1351        assert_eq!(&bytes[..], b"hello");
1352
1353        // Exact buffer boundary
1354        let mut iobufs = IoBufs::from(IoBuf::from(b"hello"));
1355        iobufs.append(IoBuf::from(b"world"));
1356
1357        let bytes = iobufs.copy_to_bytes(5);
1358        assert_eq!(&bytes[..], b"hello");
1359        assert_eq!(iobufs.remaining(), 5);
1360
1361        let bytes = iobufs.copy_to_bytes(5);
1362        assert_eq!(&bytes[..], b"world");
1363        assert_eq!(iobufs.remaining(), 0);
1364    }
1365
1366    #[test]
1367    #[should_panic(expected = "cannot advance past end of buffer")]
1368    fn test_iobufs_advance_past_end() {
1369        let mut bufs = IoBufs::from(b"hel");
1370        bufs.append(IoBuf::from(b"lo"));
1371        bufs.advance(10);
1372    }
1373
1374    #[test]
1375    #[should_panic(expected = "not enough data")]
1376    fn test_iobufs_copy_to_bytes_past_end() {
1377        let mut bufs = IoBufs::from(b"hel");
1378        bufs.append(IoBuf::from(b"lo"));
1379        bufs.copy_to_bytes(10);
1380    }
1381
1382    #[test]
1383    fn test_iobufs_matches_bytes_chain() {
1384        let b1 = Bytes::from_static(b"hello");
1385        let b2 = Bytes::from_static(b" ");
1386        let b3 = Bytes::from_static(b"world");
1387
1388        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
1389        let mut iobufs = IoBufs::from(IoBuf::from(b1.clone()));
1390        iobufs.append(IoBuf::from(b2.clone()));
1391        iobufs.append(IoBuf::from(b3.clone()));
1392
1393        assert_eq!(chain.remaining(), iobufs.remaining());
1394        assert_eq!(chain.chunk(), iobufs.chunk());
1395
1396        chain.advance(3);
1397        iobufs.advance(3);
1398        assert_eq!(chain.remaining(), iobufs.remaining());
1399        assert_eq!(chain.chunk(), iobufs.chunk());
1400
1401        chain.advance(3);
1402        iobufs.advance(3);
1403        assert_eq!(chain.remaining(), iobufs.remaining());
1404        assert_eq!(chain.chunk(), iobufs.chunk());
1405
1406        // Test copy_to_bytes
1407        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
1408        let mut iobufs = IoBufs::from(IoBuf::from(b1));
1409        iobufs.append(IoBuf::from(b2));
1410        iobufs.append(IoBuf::from(b3));
1411
1412        assert_eq!(chain.copy_to_bytes(3), iobufs.copy_to_bytes(3));
1413        assert_eq!(chain.copy_to_bytes(4), iobufs.copy_to_bytes(4));
1414        assert_eq!(
1415            chain.copy_to_bytes(chain.remaining()),
1416            iobufs.copy_to_bytes(iobufs.remaining())
1417        );
1418        assert_eq!(chain.remaining(), 0);
1419        assert_eq!(iobufs.remaining(), 0);
1420    }
1421
1422    #[test]
1423    fn test_iobufsmut_single() {
1424        let buf = IoBufMut::from(b"hello".as_ref());
1425        let bufs = IoBufsMut::from(buf);
1426        assert!(bufs.is_single());
1427        assert_eq!(bufs.len(), 5);
1428        assert_eq!(bufs.chunk(), b"hello");
1429    }
1430
1431    #[test]
1432    fn test_iobufsmut_chunked() {
1433        let buf1 = IoBufMut::from(b"hello");
1434        let buf2 = IoBufMut::from(b" world");
1435        let bufs = IoBufsMut::from(vec![buf1, buf2]);
1436        assert!(!bufs.is_single());
1437        assert_eq!(bufs.len(), 11);
1438        assert_eq!(bufs.chunk(), b"hello");
1439    }
1440
1441    #[test]
1442    fn test_iobufsmut_freeze_single() {
1443        let buf = IoBufMut::from(b"hello");
1444        let bufs = IoBufsMut::from(buf);
1445        let frozen = bufs.freeze();
1446        assert!(frozen.is_single());
1447        assert_eq!(frozen.chunk(), b"hello");
1448    }
1449
1450    #[test]
1451    fn test_iobufsmut_freeze_chunked() {
1452        // Multiple non-empty buffers stays Chunked
1453        let buf1 = IoBufMut::from(b"hello".as_ref());
1454        let buf2 = IoBufMut::from(b" world".as_ref());
1455        let bufs = IoBufsMut::from(vec![buf1, buf2]);
1456        let frozen = bufs.freeze();
1457        assert!(!frozen.is_single());
1458        match frozen {
1459            IoBufs::Chunked(ref chunks) => {
1460                assert_eq!(chunks.len(), 2);
1461                assert_eq!(chunks[0], b"hello");
1462                assert_eq!(chunks[1], b" world");
1463            }
1464            _ => unreachable!(),
1465        }
1466
1467        // Empty buffers are filtered out
1468        let buf1 = IoBufMut::from(b"hello".as_ref());
1469        let empty = IoBufMut::default();
1470        let buf2 = IoBufMut::from(b" world".as_ref());
1471        let bufs = IoBufsMut::from(vec![buf1, empty, buf2]);
1472        let frozen = bufs.freeze();
1473        assert!(!frozen.is_single());
1474        match frozen {
1475            IoBufs::Chunked(ref chunks) => {
1476                assert_eq!(chunks.len(), 2);
1477                assert_eq!(chunks[0], b"hello");
1478                assert_eq!(chunks[1], b" world");
1479            }
1480            _ => unreachable!(),
1481        }
1482
1483        // Collapses to Single when one non-empty buffer remains
1484        let empty1 = IoBufMut::default();
1485        let buf = IoBufMut::from(b"only one".as_ref());
1486        let empty2 = IoBufMut::default();
1487        let bufs = IoBufsMut::from(vec![empty1, buf, empty2]);
1488        let frozen = bufs.freeze();
1489        assert!(frozen.is_single());
1490        assert_eq!(frozen.coalesce(), b"only one");
1491
1492        // All empty buffers -> Single with empty buffer
1493        let empty1 = IoBufMut::default();
1494        let empty2 = IoBufMut::default();
1495        let bufs = IoBufsMut::from(vec![empty1, empty2]);
1496        let frozen = bufs.freeze();
1497        assert!(frozen.is_single());
1498        assert!(frozen.is_empty());
1499    }
1500
1501    #[test]
1502    fn test_iobufsmut_coalesce() {
1503        let buf1 = IoBufMut::from(b"hello");
1504        let buf2 = IoBufMut::from(b" world");
1505        let bufs = IoBufsMut::from(vec![buf1, buf2]);
1506        let coalesced = bufs.coalesce();
1507        assert_eq!(coalesced, b"hello world");
1508    }
1509
1510    #[test]
1511    fn test_iobufsmut_from_vec() {
1512        // Empty Vec becomes Single with empty buffer
1513        let bufs = IoBufsMut::from(Vec::<IoBufMut>::new());
1514        assert!(bufs.is_single());
1515        assert!(bufs.is_empty());
1516
1517        // Vec with one element becomes Single
1518        let buf = IoBufMut::from(b"test");
1519        let bufs = IoBufsMut::from(vec![buf]);
1520        assert!(bufs.is_single());
1521        assert_eq!(bufs.chunk(), b"test");
1522
1523        // Vec with multiple elements becomes Chunked
1524        let buf1 = IoBufMut::from(b"hello");
1525        let buf2 = IoBufMut::from(b" world");
1526        let bufs = IoBufsMut::from(vec![buf1, buf2]);
1527        assert!(!bufs.is_single());
1528    }
1529
1530    #[test]
1531    fn test_iobufsmut_default() {
1532        let bufs = IoBufsMut::default();
1533        assert!(bufs.is_single());
1534        assert!(bufs.is_empty());
1535        assert_eq!(bufs.len(), 0);
1536    }
1537
1538    #[test]
1539    fn test_iobufsmut_from_array() {
1540        let bufs = IoBufsMut::from([1u8, 2, 3, 4, 5]);
1541        assert!(bufs.is_single());
1542        assert_eq!(bufs.len(), 5);
1543        assert_eq!(bufs.chunk(), &[1, 2, 3, 4, 5]);
1544    }
1545
1546    #[test]
1547    fn test_iobufmut_buf_trait() {
1548        let mut buf = IoBufMut::from(b"hello world");
1549        assert_eq!(buf.remaining(), 11);
1550        assert_eq!(buf.chunk(), b"hello world");
1551
1552        buf.advance(6);
1553        assert_eq!(buf.remaining(), 5);
1554        assert_eq!(buf.chunk(), b"world");
1555
1556        buf.advance(5);
1557        assert_eq!(buf.remaining(), 0);
1558        assert!(buf.chunk().is_empty());
1559    }
1560
1561    #[test]
1562    #[should_panic(expected = "cannot advance")]
1563    fn test_iobufmut_advance_past_end() {
1564        let mut buf = IoBufMut::from(b"hello");
1565        buf.advance(10);
1566    }
1567
1568    #[test]
1569    fn test_iobufmut_len_equals_remaining_after_advance() {
1570        let mut buf = IoBufMut::from(b"hello world");
1571
1572        // Before advance
1573        assert_eq!(buf.len(), buf.remaining());
1574        assert_eq!(buf.as_ref(), buf.chunk());
1575
1576        // After partial advance
1577        buf.advance(6);
1578        assert_eq!(buf.len(), buf.remaining());
1579        assert_eq!(buf.as_ref(), buf.chunk());
1580        assert_eq!(buf.len(), 5);
1581        assert_eq!(buf.as_ref(), b"world");
1582
1583        // After advancing to end
1584        buf.advance(5);
1585        assert_eq!(buf.len(), buf.remaining());
1586        assert_eq!(buf.as_ref(), buf.chunk());
1587        assert_eq!(buf.len(), 0);
1588    }
1589
1590    #[test]
1591    fn test_iobufsmut_buf_trait_single() {
1592        let mut bufs = IoBufsMut::from(IoBufMut::from(b"hello world"));
1593        assert_eq!(bufs.remaining(), 11);
1594        assert_eq!(bufs.chunk(), b"hello world");
1595
1596        bufs.advance(6);
1597        assert_eq!(bufs.remaining(), 5);
1598        assert_eq!(bufs.chunk(), b"world");
1599    }
1600
1601    #[test]
1602    fn test_iobufsmut_buf_trait_chunked() {
1603        let buf1 = IoBufMut::from(b"hello");
1604        let buf2 = IoBufMut::from(b" ");
1605        let buf3 = IoBufMut::from(b"world");
1606        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
1607
1608        assert_eq!(bufs.remaining(), 11);
1609        assert_eq!(bufs.chunk(), b"hello");
1610
1611        // Advance within first buffer
1612        bufs.advance(3);
1613        assert_eq!(bufs.remaining(), 8);
1614        assert_eq!(bufs.chunk(), b"lo");
1615
1616        // Advance past first buffer (should pop_front)
1617        bufs.advance(2);
1618        assert_eq!(bufs.remaining(), 6);
1619        assert_eq!(bufs.chunk(), b" ");
1620
1621        // Advance exactly one buffer
1622        bufs.advance(1);
1623        assert_eq!(bufs.remaining(), 5);
1624        assert_eq!(bufs.chunk(), b"world");
1625
1626        // Advance to end
1627        bufs.advance(5);
1628        assert_eq!(bufs.remaining(), 0);
1629    }
1630
1631    #[test]
1632    fn test_iobufsmut_advance_across_multiple_buffers() {
1633        let buf1 = IoBufMut::from(b"ab");
1634        let buf2 = IoBufMut::from(b"cd");
1635        let buf3 = IoBufMut::from(b"ef");
1636        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
1637
1638        // Advance across two buffers at once
1639        bufs.advance(5);
1640        assert_eq!(bufs.remaining(), 1);
1641        assert_eq!(bufs.chunk(), b"f");
1642    }
1643
1644    #[test]
1645    #[should_panic(expected = "cannot advance past end of buffer")]
1646    fn test_iobufsmut_advance_past_end() {
1647        let buf1 = IoBufMut::from(b"hello");
1648        let buf2 = IoBufMut::from(b" world");
1649        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1650        bufs.advance(20);
1651    }
1652
1653    #[test]
1654    fn test_iobufsmut_bufmut_trait_single() {
1655        let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(20));
1656        // BytesMut can grow, so remaining_mut is very large
1657        assert!(bufs.remaining_mut() > 1000);
1658
1659        bufs.put_slice(b"hello");
1660        assert_eq!(bufs.chunk(), b"hello");
1661        assert_eq!(bufs.len(), 5);
1662
1663        bufs.put_slice(b" world");
1664        assert_eq!(bufs.coalesce(), b"hello world");
1665    }
1666
1667    #[test]
1668    fn test_iobufsmut_zeroed_write() {
1669        // Use zeroed buffers which have a fixed length
1670        let bufs = IoBufsMut::from(IoBufMut::zeroed(20));
1671        assert_eq!(bufs.len(), 20);
1672
1673        // Can write using as_mut on coalesced buffer
1674        let mut coalesced = bufs.coalesce();
1675        coalesced.as_mut()[..5].copy_from_slice(b"hello");
1676        assert_eq!(&coalesced.as_ref()[..5], b"hello");
1677    }
1678
1679    #[test]
1680    fn test_iobufsmut_bufmut_put_slice() {
1681        // Test writing across multiple buffers
1682        let buf1 = IoBufMut::with_capacity(5);
1683        let buf2 = IoBufMut::with_capacity(6);
1684        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1685
1686        // Write data
1687        bufs.put_slice(b"hello");
1688        bufs.put_slice(b" world");
1689        assert_eq!(bufs.coalesce(), b"hello world");
1690    }
1691
1692    #[test]
1693    fn test_iobufs_advance_drains_buffers() {
1694        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
1695        bufs.append(IoBuf::from(b" "));
1696        bufs.append(IoBuf::from(b"world"));
1697
1698        // Advance exactly past first buffer
1699        bufs.advance(5);
1700        assert_eq!(bufs.remaining(), 6);
1701        assert_eq!(bufs.chunk(), b" ");
1702
1703        // Advance across multiple buffers
1704        bufs.advance(4);
1705        assert_eq!(bufs.remaining(), 2);
1706        assert_eq!(bufs.chunk(), b"ld");
1707    }
1708
1709    #[test]
1710    fn test_iobufs_advance_exactly_to_boundary() {
1711        let mut bufs = IoBufs::from(IoBuf::from(b"abc"));
1712        bufs.append(IoBuf::from(b"def"));
1713
1714        // Advance exactly to first buffer boundary
1715        bufs.advance(3);
1716        assert_eq!(bufs.remaining(), 3);
1717        assert_eq!(bufs.chunk(), b"def");
1718
1719        // Advance exactly to end
1720        bufs.advance(3);
1721        assert_eq!(bufs.remaining(), 0);
1722    }
1723
1724    #[test]
1725    fn test_iobufsmut_with_empty_buffers() {
1726        let buf1 = IoBufMut::from(b"hello");
1727        let buf2 = IoBufMut::default();
1728        let buf3 = IoBufMut::from(b" world");
1729        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
1730
1731        assert_eq!(bufs.remaining(), 11);
1732        assert_eq!(bufs.chunk(), b"hello");
1733
1734        // Advance past first buffer
1735        bufs.advance(5);
1736        // Empty buffer should be skipped
1737        assert_eq!(bufs.chunk(), b" world");
1738        assert_eq!(bufs.remaining(), 6);
1739    }
1740
1741    #[test]
1742    fn test_iobufsmut_coalesce_after_advance() {
1743        let buf1 = IoBufMut::from(b"hello");
1744        let buf2 = IoBufMut::from(b" world");
1745        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1746
1747        bufs.advance(3);
1748        assert_eq!(bufs.coalesce(), b"lo world");
1749    }
1750
1751    #[test]
1752    fn test_iobufsmut_copy_to_bytes() {
1753        let buf1 = IoBufMut::from(b"hello");
1754        let buf2 = IoBufMut::from(b" world");
1755        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1756
1757        let first = bufs.copy_to_bytes(7);
1758        assert_eq!(&first[..], b"hello w");
1759        assert_eq!(bufs.remaining(), 4);
1760
1761        let rest = bufs.copy_to_bytes(4);
1762        assert_eq!(&rest[..], b"orld");
1763        assert_eq!(bufs.remaining(), 0);
1764    }
1765
1766    #[test]
1767    fn test_iobufsmut_copy_from_slice_single() {
1768        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(11));
1769        bufs.copy_from_slice(b"hello world");
1770        assert_eq!(bufs.coalesce(), b"hello world");
1771    }
1772
1773    #[test]
1774    fn test_iobufsmut_copy_from_slice_chunked() {
1775        let buf1 = IoBufMut::zeroed(5);
1776        let buf2 = IoBufMut::zeroed(6);
1777        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1778
1779        bufs.copy_from_slice(b"hello world");
1780
1781        // Verify each chunk was filled correctly
1782        match &bufs {
1783            IoBufsMut::Chunked(chunks) => {
1784                assert_eq!(chunks[0], b"hello");
1785                assert_eq!(chunks[1], b" world");
1786            }
1787            _ => panic!("expected Chunked variant"),
1788        }
1789    }
1790
1791    #[test]
1792    #[should_panic(expected = "source slice length must match buffer length")]
1793    fn test_iobufsmut_copy_from_slice_wrong_length() {
1794        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(5));
1795        bufs.copy_from_slice(b"hello world"); // 11 bytes into 5-byte buffer
1796    }
1797
1798    #[test]
1799    fn test_iobufsmut_matches_bytesmut_chain() {
1800        use bytes::BytesMut;
1801
1802        // Create three BytesMut with capacity
1803        let mut bm1 = BytesMut::with_capacity(5);
1804        let mut bm2 = BytesMut::with_capacity(6);
1805        let mut bm3 = BytesMut::with_capacity(7);
1806
1807        // Create matching IoBufsMut
1808        let mut iobufs = IoBufsMut::from(vec![
1809            IoBufMut::with_capacity(5),
1810            IoBufMut::with_capacity(6),
1811            IoBufMut::with_capacity(7),
1812        ]);
1813
1814        // Test initial chunk_mut length matches (spare capacity)
1815        let chain_len = (&mut bm1)
1816            .chain_mut(&mut bm2)
1817            .chain_mut(&mut bm3)
1818            .chunk_mut()
1819            .len();
1820        let iobufs_len = iobufs.chunk_mut().len();
1821        assert_eq!(chain_len, iobufs_len);
1822
1823        // Write some data
1824        (&mut bm1)
1825            .chain_mut(&mut bm2)
1826            .chain_mut(&mut bm3)
1827            .put_slice(b"hel");
1828        iobufs.put_slice(b"hel");
1829
1830        // Verify chunk_mut matches after partial write
1831        let chain_len = (&mut bm1)
1832            .chain_mut(&mut bm2)
1833            .chain_mut(&mut bm3)
1834            .chunk_mut()
1835            .len();
1836        let iobufs_len = iobufs.chunk_mut().len();
1837        assert_eq!(chain_len, iobufs_len);
1838
1839        // Write more data
1840        (&mut bm1)
1841            .chain_mut(&mut bm2)
1842            .chain_mut(&mut bm3)
1843            .put_slice(b"lo world!");
1844        iobufs.put_slice(b"lo world!");
1845
1846        // Verify chunk_mut matches after more writes
1847        let chain_len = (&mut bm1)
1848            .chain_mut(&mut bm2)
1849            .chain_mut(&mut bm3)
1850            .chunk_mut()
1851            .len();
1852        let iobufs_len = iobufs.chunk_mut().len();
1853        assert_eq!(chain_len, iobufs_len);
1854
1855        // Verify final content matches
1856        let frozen = iobufs.freeze().coalesce();
1857        let mut chain_content = bm1.to_vec();
1858        chain_content.extend_from_slice(&bm2);
1859        chain_content.extend_from_slice(&bm3);
1860        assert_eq!(frozen, chain_content.as_slice());
1861        assert_eq!(frozen, b"hello world!");
1862    }
1863
1864    #[test]
1865    fn test_iobufsmut_buf_matches_bytes_chain() {
1866        // Create pre-filled Bytes buffers
1867        let mut b1 = Bytes::from_static(b"hello");
1868        let mut b2 = Bytes::from_static(b" world");
1869        let b3 = Bytes::from_static(b"!");
1870
1871        // Create matching IoBufsMut
1872        let mut iobufs = IoBufsMut::from(vec![
1873            IoBufMut::from(b"hello"),
1874            IoBufMut::from(b" world"),
1875            IoBufMut::from(b"!"),
1876        ]);
1877
1878        // Test Buf::remaining matches
1879        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
1880        assert_eq!(chain_remaining, iobufs.remaining());
1881
1882        // Test Buf::chunk matches
1883        let chain_chunk = b1
1884            .clone()
1885            .chain(b2.clone())
1886            .chain(b3.clone())
1887            .chunk()
1888            .to_vec();
1889        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
1890
1891        // Advance and test again
1892        b1.advance(3);
1893        iobufs.advance(3);
1894
1895        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
1896        assert_eq!(chain_remaining, iobufs.remaining());
1897
1898        let chain_chunk = b1
1899            .clone()
1900            .chain(b2.clone())
1901            .chain(b3.clone())
1902            .chunk()
1903            .to_vec();
1904        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
1905
1906        // Advance past first buffer boundary into second
1907        b1.advance(2);
1908        iobufs.advance(2);
1909
1910        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
1911        assert_eq!(chain_remaining, iobufs.remaining());
1912
1913        // Now we should be in the second buffer
1914        let chain_chunk = b1
1915            .clone()
1916            .chain(b2.clone())
1917            .chain(b3.clone())
1918            .chunk()
1919            .to_vec();
1920        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
1921
1922        // Advance past second buffer boundary into third
1923        b2.advance(6);
1924        iobufs.advance(6);
1925
1926        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
1927        assert_eq!(chain_remaining, iobufs.remaining());
1928
1929        // Now we should be in the third buffer
1930        let chain_chunk = b1.chain(b2).chain(b3).chunk().to_vec();
1931        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
1932
1933        // Test copy_to_bytes
1934        let b1 = Bytes::from_static(b"hello");
1935        let b2 = Bytes::from_static(b" world");
1936        let b3 = Bytes::from_static(b"!");
1937        let mut iobufs = IoBufsMut::from(vec![
1938            IoBufMut::from(b"hello"),
1939            IoBufMut::from(b" world"),
1940            IoBufMut::from(b"!"),
1941        ]);
1942
1943        let chain_bytes = b1.chain(b2).chain(b3).copy_to_bytes(8);
1944        let iobufs_bytes = iobufs.copy_to_bytes(8);
1945        assert_eq!(chain_bytes, iobufs_bytes);
1946        assert_eq!(chain_bytes.as_ref(), b"hello wo");
1947    }
1948
1949    #[test]
1950    fn test_iobufsmut_len_equals_remaining_after_advance() {
1951        let buf1 = IoBufMut::from(b"hello");
1952        let buf2 = IoBufMut::from(b" world");
1953        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1954
1955        // Before advance
1956        assert_eq!(bufs.len(), bufs.remaining());
1957        assert_eq!(bufs.len(), 11);
1958
1959        // After partial advance (within first buffer)
1960        bufs.advance(3);
1961        assert_eq!(bufs.len(), bufs.remaining());
1962        assert_eq!(bufs.len(), 8);
1963
1964        // After advance past first buffer
1965        bufs.advance(4);
1966        assert_eq!(bufs.len(), bufs.remaining());
1967        assert_eq!(bufs.len(), 4);
1968    }
1969
1970    #[test]
1971    fn test_iobufsmut_freeze_after_advance() {
1972        let buf1 = IoBufMut::from(b"hello");
1973        let buf2 = IoBufMut::from(b" world");
1974        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1975
1976        // Advance partway through first buffer
1977        bufs.advance(3);
1978        assert_eq!(bufs.len(), 8);
1979
1980        // Freeze and verify only remaining data is preserved
1981        let frozen = bufs.freeze();
1982        assert_eq!(frozen.len(), 8);
1983        assert_eq!(frozen.coalesce(), b"lo world");
1984    }
1985
1986    #[test]
1987    fn test_iobufsmut_freeze_after_advance_to_boundary() {
1988        let buf1 = IoBufMut::from(b"hello");
1989        let buf2 = IoBufMut::from(b" world");
1990        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
1991
1992        // Advance exactly to first buffer boundary
1993        bufs.advance(5);
1994        assert_eq!(bufs.len(), 6);
1995
1996        // First buffer should be fully consumed (empty after advance)
1997        // freeze() filters empty buffers, so result should be Single
1998        let frozen = bufs.freeze();
1999        assert!(frozen.is_single());
2000        assert_eq!(frozen.coalesce(), b" world");
2001    }
2002
2003    #[test]
2004    fn test_iobufsmut_coalesce_after_advance_to_boundary() {
2005        let buf1 = IoBufMut::from(b"hello");
2006        let buf2 = IoBufMut::from(b" world");
2007        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
2008
2009        // Advance exactly past first buffer
2010        bufs.advance(5);
2011
2012        // Coalesce should only include second buffer's data
2013        assert_eq!(bufs.coalesce(), b" world");
2014    }
2015
2016    #[test]
2017    fn test_iobufsmut_coalesce_with_pool() {
2018        let mut registry = prometheus_client::registry::Registry::default();
2019        let pool = BufferPool::new(BufferPoolConfig::for_network(), &mut registry);
2020
2021        // Single buffer: zero-copy (same pointer)
2022        let mut buf = IoBufMut::from(b"hello");
2023        let original_ptr = buf.as_mut_ptr();
2024        let bufs = IoBufsMut::from(buf);
2025        let coalesced = bufs.coalesce_with_pool(&pool);
2026        assert_eq!(coalesced, b"hello");
2027        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
2028
2029        // Multiple buffers: merged using pool
2030        let bufs = IoBufsMut::from(vec![IoBufMut::from(b"hello"), IoBufMut::from(b" world")]);
2031        let coalesced = bufs.coalesce_with_pool(&pool);
2032        assert_eq!(coalesced, b"hello world");
2033        assert!(coalesced.is_pooled());
2034
2035        // With extra capacity: zero-copy if sufficient spare capacity
2036        let mut buf = IoBufMut::with_capacity(100);
2037        buf.put_slice(b"hello");
2038        let original_ptr = buf.as_mut_ptr();
2039        let bufs = IoBufsMut::from(buf);
2040        let coalesced = bufs.coalesce_with_pool_extra(&pool, 10);
2041        assert_eq!(coalesced, b"hello");
2042        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
2043
2044        // With extra capacity: reallocates if insufficient
2045        let mut buf = IoBufMut::with_capacity(5);
2046        buf.put_slice(b"hello");
2047        let bufs = IoBufsMut::from(buf);
2048        let coalesced = bufs.coalesce_with_pool_extra(&pool, 100);
2049        assert_eq!(coalesced, b"hello");
2050        assert!(coalesced.capacity() >= 105);
2051    }
2052
2053    #[cfg(feature = "arbitrary")]
2054    mod conformance {
2055        use super::IoBuf;
2056        use commonware_codec::conformance::CodecConformance;
2057
2058        commonware_conformance::conformance_tests! {
2059            CodecConformance<IoBuf>
2060        }
2061    }
2062}