Skip to main content

commonware_runtime/iobuf/
mod.rs

1//! Buffer types for I/O operations.
2//!
3//! - [`IoBuf`]: Immutable byte buffer
4//! - [`IoBufMut`]: Mutable byte buffer
5//! - [`IoBufs`]: Container for one or more immutable buffers
6//! - [`IoBufsMut`]: Container for one or more mutable buffers
7//! - [`BufferPool`]: Pool of reusable, aligned buffers
8
9mod pool;
10
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use commonware_codec::{util::at_least, EncodeSize, Error, RangeCfg, Read, Write};
13pub use pool::{BufferPool, BufferPoolConfig, PoolError};
14use pool::{PooledBuf, PooledBufMut};
15use std::{collections::VecDeque, io::IoSlice, ops::RangeBounds};
16
17/// Immutable byte buffer.
18///
19/// Backed by either [`Bytes`] or a pooled aligned allocation.
20///
21/// Use this for immutable payloads. To build or mutate data, use
22/// [`IoBufMut`] and then [`IoBufMut::freeze`].
23///
24/// For pooled-backed values, the underlying buffer is returned to the pool
25/// when the final reference is dropped.
26///
27/// All `From<*> for IoBuf` implementations are guaranteed to be non-copy
28/// conversions. Use [`IoBuf::copy_from_slice`] when an explicit copy from
29/// borrowed data is required.
30///
31/// Cloning is cheap and does not copy underlying bytes.
32#[derive(Clone, Debug)]
33pub struct IoBuf {
34    inner: IoBufInner,
35}
36
37#[derive(Clone, Debug)]
38enum IoBufInner {
39    Bytes(Bytes),
40    Pooled(PooledBuf),
41}
42
43impl IoBuf {
44    /// Create a buffer by copying data from a slice.
45    ///
46    /// Use this when you have a non-static `&[u8]` that needs to be converted to an
47    /// [`IoBuf`]. For static slices, prefer [`IoBuf::from`] which is zero-copy.
48    pub fn copy_from_slice(data: &[u8]) -> Self {
49        Self {
50            inner: IoBufInner::Bytes(Bytes::copy_from_slice(data)),
51        }
52    }
53
54    /// Create a buffer from a pooled allocation.
55    const fn from_pooled(pooled: PooledBuf) -> Self {
56        Self {
57            inner: IoBufInner::Pooled(pooled),
58        }
59    }
60
61    /// Returns `true` if this buffer is tracked by a pool.
62    ///
63    /// Tracked buffers originate from [`BufferPool`] allocations and are
64    /// returned to the pool when the final reference is dropped.
65    ///
66    /// Buffers backed by [`Bytes`], and untracked fallback allocations from
67    /// [`BufferPool::alloc`], return `false`.
68    #[inline]
69    pub fn is_pooled(&self) -> bool {
70        match &self.inner {
71            IoBufInner::Bytes(_) => false,
72            IoBufInner::Pooled(p) => p.is_tracked(),
73        }
74    }
75
76    /// Number of bytes remaining in the buffer.
77    #[inline]
78    pub fn len(&self) -> usize {
79        self.remaining()
80    }
81
82    /// Whether the buffer is empty.
83    #[inline]
84    pub fn is_empty(&self) -> bool {
85        self.remaining() == 0
86    }
87
88    /// Get raw pointer to the buffer data.
89    #[inline]
90    pub fn as_ptr(&self) -> *const u8 {
91        match &self.inner {
92            IoBufInner::Bytes(b) => b.as_ptr(),
93            IoBufInner::Pooled(p) => p.as_ptr(),
94        }
95    }
96
97    /// Returns a slice of self for the provided range (zero-copy).
98    ///
99    /// For pooled buffers, empty ranges return an empty detached buffer
100    /// ([`IoBuf::default`]) so the underlying pooled allocation is not retained.
101    #[inline]
102    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
103        match &self.inner {
104            IoBufInner::Bytes(b) => Self {
105                inner: IoBufInner::Bytes(b.slice(range)),
106            },
107            IoBufInner::Pooled(p) => p.slice(range).map_or_else(Self::default, Self::from_pooled),
108        }
109    }
110
111    /// Splits the buffer into two at the given index.
112    ///
113    /// Afterwards `self` contains bytes `[at, len)`, and the returned [`IoBuf`]
114    /// contains bytes `[0, at)`.
115    ///
116    /// This is an `O(1)` zero-copy operation.
117    ///
118    /// # Panics
119    ///
120    /// Panics if `at > len`.
121    pub fn split_to(&mut self, at: usize) -> Self {
122        if at == 0 {
123            return Self::default();
124        }
125
126        if at == self.remaining() {
127            return std::mem::take(self);
128        }
129
130        match &mut self.inner {
131            IoBufInner::Bytes(b) => Self {
132                inner: IoBufInner::Bytes(b.split_to(at)),
133            },
134            IoBufInner::Pooled(p) => Self::from_pooled(p.split_to(at)),
135        }
136    }
137
138    /// Try to convert this buffer into [`IoBufMut`] without copying.
139    ///
140    /// Succeeds when `self` holds exclusive ownership of the backing storage
141    /// and returns an [`IoBufMut`] with the same contents. Fails and returns
142    /// `self` unchanged when ownership is shared.
143    ///
144    /// For [`Bytes`]-backed buffers, this matches [`Bytes::try_into_mut`]
145    /// semantics: succeeds only for uniquely-owned full buffers, and always
146    /// fails for [`Bytes::from_owner`] and [`Bytes::from_static`] buffers. For
147    /// pooled buffers, this succeeds for any uniquely-owned view (including
148    /// slices) and fails when shared.
149    pub fn try_into_mut(self) -> Result<IoBufMut, Self> {
150        match self.inner {
151            IoBufInner::Bytes(bytes) => bytes
152                .try_into_mut()
153                .map(|mut_bytes| IoBufMut {
154                    inner: IoBufMutInner::Bytes(mut_bytes),
155                })
156                .map_err(|bytes| Self {
157                    inner: IoBufInner::Bytes(bytes),
158                }),
159            IoBufInner::Pooled(pooled) => pooled
160                .try_into_mut()
161                .map(|mut_pooled| IoBufMut {
162                    inner: IoBufMutInner::Pooled(mut_pooled),
163                })
164                .map_err(|pooled| Self {
165                    inner: IoBufInner::Pooled(pooled),
166                }),
167        }
168    }
169}
170
171impl AsRef<[u8]> for IoBuf {
172    #[inline]
173    fn as_ref(&self) -> &[u8] {
174        match &self.inner {
175            IoBufInner::Bytes(b) => b.as_ref(),
176            IoBufInner::Pooled(p) => p.as_ref(),
177        }
178    }
179}
180
181impl Default for IoBuf {
182    fn default() -> Self {
183        Self {
184            inner: IoBufInner::Bytes(Bytes::new()),
185        }
186    }
187}
188
189impl PartialEq for IoBuf {
190    fn eq(&self, other: &Self) -> bool {
191        self.as_ref() == other.as_ref()
192    }
193}
194
195impl Eq for IoBuf {}
196
197impl PartialEq<[u8]> for IoBuf {
198    #[inline]
199    fn eq(&self, other: &[u8]) -> bool {
200        self.as_ref() == other
201    }
202}
203
204impl PartialEq<&[u8]> for IoBuf {
205    #[inline]
206    fn eq(&self, other: &&[u8]) -> bool {
207        self.as_ref() == *other
208    }
209}
210
211impl<const N: usize> PartialEq<[u8; N]> for IoBuf {
212    #[inline]
213    fn eq(&self, other: &[u8; N]) -> bool {
214        self.as_ref() == other
215    }
216}
217
218impl<const N: usize> PartialEq<&[u8; N]> for IoBuf {
219    #[inline]
220    fn eq(&self, other: &&[u8; N]) -> bool {
221        self.as_ref() == *other
222    }
223}
224
225impl Buf for IoBuf {
226    #[inline]
227    fn remaining(&self) -> usize {
228        match &self.inner {
229            IoBufInner::Bytes(b) => b.remaining(),
230            IoBufInner::Pooled(p) => p.remaining(),
231        }
232    }
233
234    #[inline]
235    fn chunk(&self) -> &[u8] {
236        match &self.inner {
237            IoBufInner::Bytes(b) => b.chunk(),
238            IoBufInner::Pooled(p) => p.chunk(),
239        }
240    }
241
242    #[inline]
243    fn advance(&mut self, cnt: usize) {
244        match &mut self.inner {
245            IoBufInner::Bytes(b) => b.advance(cnt),
246            IoBufInner::Pooled(p) => p.advance(cnt),
247        }
248    }
249
250    #[inline]
251    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
252        match &mut self.inner {
253            IoBufInner::Bytes(b) => b.copy_to_bytes(len),
254            IoBufInner::Pooled(p) => {
255                // Full non-empty drain: transfer ownership so the drained source no
256                // longer retains the pooled allocation. Keep len == 0 on the normal
257                // path to avoid creating an empty Bytes that still pins pool memory.
258                if len != 0 && len == p.remaining() {
259                    let inner = std::mem::replace(&mut self.inner, IoBufInner::Bytes(Bytes::new()));
260                    match inner {
261                        IoBufInner::Pooled(p) => p.into_bytes(),
262                        IoBufInner::Bytes(_) => unreachable!(),
263                    }
264                } else {
265                    p.copy_to_bytes(len)
266                }
267            }
268        }
269    }
270}
271
272impl From<Bytes> for IoBuf {
273    fn from(bytes: Bytes) -> Self {
274        Self {
275            inner: IoBufInner::Bytes(bytes),
276        }
277    }
278}
279
280impl From<Vec<u8>> for IoBuf {
281    fn from(vec: Vec<u8>) -> Self {
282        Self {
283            inner: IoBufInner::Bytes(Bytes::from(vec)),
284        }
285    }
286}
287
288impl<const N: usize> From<&'static [u8; N]> for IoBuf {
289    fn from(array: &'static [u8; N]) -> Self {
290        Self {
291            inner: IoBufInner::Bytes(Bytes::from_static(array)),
292        }
293    }
294}
295
296impl From<&'static [u8]> for IoBuf {
297    fn from(slice: &'static [u8]) -> Self {
298        Self {
299            inner: IoBufInner::Bytes(Bytes::from_static(slice)),
300        }
301    }
302}
303
304/// Convert an [`IoBuf`] into a [`Vec<u8>`].
305///
306/// This conversion may copy:
307/// - [`Bytes`]-backed buffers may reuse allocation when possible
308/// - pooled buffers copy readable bytes into a new [`Vec<u8>`]
309impl From<IoBuf> for Vec<u8> {
310    fn from(buf: IoBuf) -> Self {
311        match buf.inner {
312            IoBufInner::Bytes(bytes) => Self::from(bytes),
313            IoBufInner::Pooled(pooled) => pooled.as_ref().to_vec(),
314        }
315    }
316}
317
318/// Convert an [`IoBuf`] into [`Bytes`] without copying readable data.
319///
320/// For pooled buffers, this wraps the pooled owner using [`Bytes::from_owner`].
321impl From<IoBuf> for Bytes {
322    fn from(buf: IoBuf) -> Self {
323        match buf.inner {
324            IoBufInner::Bytes(bytes) => bytes,
325            IoBufInner::Pooled(pooled) => Self::from_owner(pooled),
326        }
327    }
328}
329
330impl Write for IoBuf {
331    #[inline]
332    fn write(&self, buf: &mut impl BufMut) {
333        self.len().write(buf);
334        buf.put_slice(self.as_ref());
335    }
336}
337
338impl EncodeSize for IoBuf {
339    #[inline]
340    fn encode_size(&self) -> usize {
341        self.len().encode_size() + self.len()
342    }
343}
344
345impl Read for IoBuf {
346    type Cfg = RangeCfg<usize>;
347
348    #[inline]
349    fn read_cfg(buf: &mut impl Buf, range: &Self::Cfg) -> Result<Self, Error> {
350        let len = usize::read_cfg(buf, range)?;
351        at_least(buf, len)?;
352        Ok(Self::from(buf.copy_to_bytes(len)))
353    }
354}
355
356#[cfg(feature = "arbitrary")]
357impl arbitrary::Arbitrary<'_> for IoBuf {
358    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
359        let len = u.arbitrary_len::<u8>()?;
360        let data: Vec<u8> = u.arbitrary_iter()?.take(len).collect::<Result<_, _>>()?;
361        Ok(Self::from(data))
362    }
363}
364
365/// Mutable byte buffer.
366///
367/// Backed by either [`BytesMut`] or a pooled aligned allocation.
368///
369/// Use this to build or mutate payloads before freezing into [`IoBuf`].
370///
371/// For pooled-backed values, dropping this buffer returns the underlying
372/// allocation to the pool. After [`IoBufMut::freeze`], the frozen `IoBuf`
373/// keeps the allocation alive until its final reference is dropped.
374#[derive(Debug)]
375pub struct IoBufMut {
376    inner: IoBufMutInner,
377}
378
379#[derive(Debug)]
380enum IoBufMutInner {
381    Bytes(BytesMut),
382    Pooled(PooledBufMut),
383}
384
385impl Default for IoBufMut {
386    fn default() -> Self {
387        Self {
388            inner: IoBufMutInner::Bytes(BytesMut::new()),
389        }
390    }
391}
392
393impl IoBufMut {
394    /// Create a buffer with the given capacity.
395    pub fn with_capacity(capacity: usize) -> Self {
396        Self {
397            inner: IoBufMutInner::Bytes(BytesMut::with_capacity(capacity)),
398        }
399    }
400
401    /// Create a buffer of `len` bytes, all initialized to zero.
402    ///
403    /// Unlike `with_capacity`, this sets both capacity and length to `len`,
404    /// making the entire buffer immediately usable for read operations
405    /// (e.g., `file.read_exact`).
406    pub fn zeroed(len: usize) -> Self {
407        Self {
408            inner: IoBufMutInner::Bytes(BytesMut::zeroed(len)),
409        }
410    }
411
412    /// Create a buffer from a pooled allocation.
413    const fn from_pooled(pooled: PooledBufMut) -> Self {
414        Self {
415            inner: IoBufMutInner::Pooled(pooled),
416        }
417    }
418
419    /// Returns `true` if this buffer is tracked by a pool.
420    ///
421    /// Tracked buffers originate from [`BufferPool`] allocations and are
422    /// returned to the pool when dropped.
423    ///
424    /// Buffers backed by [`BytesMut`], and untracked fallback allocations from
425    /// [`BufferPool::alloc`], return `false`.
426    #[inline]
427    pub fn is_pooled(&self) -> bool {
428        match &self.inner {
429            IoBufMutInner::Bytes(_) => false,
430            IoBufMutInner::Pooled(p) => p.is_tracked(),
431        }
432    }
433
434    /// Sets the length of the buffer.
435    ///
436    /// This will explicitly set the size of the buffer without actually
437    /// modifying the data, so it is up to the caller to ensure that the data
438    /// has been initialized.
439    ///
440    /// # Safety
441    ///
442    /// Caller must ensure all bytes in `0..len` are initialized before any
443    /// read operations.
444    ///
445    /// # Panics
446    ///
447    /// Panics if `len > capacity()`.
448    #[inline]
449    pub unsafe fn set_len(&mut self, len: usize) {
450        assert!(
451            len <= self.capacity(),
452            "set_len({len}) exceeds capacity({})",
453            self.capacity()
454        );
455        match &mut self.inner {
456            IoBufMutInner::Bytes(b) => b.set_len(len),
457            IoBufMutInner::Pooled(b) => b.set_len(len),
458        }
459    }
460
461    /// Number of bytes remaining in the buffer.
462    #[inline]
463    pub fn len(&self) -> usize {
464        self.remaining()
465    }
466
467    /// Whether the buffer is empty.
468    #[inline]
469    pub fn is_empty(&self) -> bool {
470        match &self.inner {
471            IoBufMutInner::Bytes(b) => b.is_empty(),
472            IoBufMutInner::Pooled(b) => b.is_empty(),
473        }
474    }
475
476    /// Freeze into immutable [`IoBuf`].
477    #[inline]
478    pub fn freeze(self) -> IoBuf {
479        match self.inner {
480            IoBufMutInner::Bytes(b) => b.freeze().into(),
481            IoBufMutInner::Pooled(b) => b.freeze(),
482        }
483    }
484
485    /// Returns the number of bytes the buffer can hold without reallocating.
486    #[inline]
487    pub fn capacity(&self) -> usize {
488        match &self.inner {
489            IoBufMutInner::Bytes(b) => b.capacity(),
490            IoBufMutInner::Pooled(b) => b.capacity(),
491        }
492    }
493
494    /// Returns an unsafe mutable pointer to the buffer's data.
495    #[inline]
496    pub fn as_mut_ptr(&mut self) -> *mut u8 {
497        match &mut self.inner {
498            IoBufMutInner::Bytes(b) => b.as_mut_ptr(),
499            IoBufMutInner::Pooled(b) => b.as_mut_ptr(),
500        }
501    }
502
503    /// Truncates the buffer to `len` readable bytes.
504    ///
505    /// If `len` is greater than the current length, this has no effect.
506    #[inline]
507    pub fn truncate(&mut self, len: usize) {
508        match &mut self.inner {
509            IoBufMutInner::Bytes(b) => b.truncate(len),
510            IoBufMutInner::Pooled(b) => b.truncate(len),
511        }
512    }
513
514    /// Clears the buffer, removing all data. Existing capacity is preserved.
515    #[inline]
516    pub fn clear(&mut self) {
517        match &mut self.inner {
518            IoBufMutInner::Bytes(b) => b.clear(),
519            IoBufMutInner::Pooled(b) => b.clear(),
520        }
521    }
522}
523
524impl AsRef<[u8]> for IoBufMut {
525    #[inline]
526    fn as_ref(&self) -> &[u8] {
527        match &self.inner {
528            IoBufMutInner::Bytes(b) => b.as_ref(),
529            IoBufMutInner::Pooled(b) => b.as_ref(),
530        }
531    }
532}
533
534impl AsMut<[u8]> for IoBufMut {
535    #[inline]
536    fn as_mut(&mut self) -> &mut [u8] {
537        match &mut self.inner {
538            IoBufMutInner::Bytes(b) => b.as_mut(),
539            IoBufMutInner::Pooled(b) => b.as_mut(),
540        }
541    }
542}
543
544impl PartialEq<[u8]> for IoBufMut {
545    #[inline]
546    fn eq(&self, other: &[u8]) -> bool {
547        self.as_ref() == other
548    }
549}
550
551impl PartialEq<&[u8]> for IoBufMut {
552    #[inline]
553    fn eq(&self, other: &&[u8]) -> bool {
554        self.as_ref() == *other
555    }
556}
557
558impl<const N: usize> PartialEq<[u8; N]> for IoBufMut {
559    #[inline]
560    fn eq(&self, other: &[u8; N]) -> bool {
561        self.as_ref() == other
562    }
563}
564
565impl<const N: usize> PartialEq<&[u8; N]> for IoBufMut {
566    #[inline]
567    fn eq(&self, other: &&[u8; N]) -> bool {
568        self.as_ref() == *other
569    }
570}
571
572impl Buf for IoBufMut {
573    #[inline]
574    fn remaining(&self) -> usize {
575        match &self.inner {
576            IoBufMutInner::Bytes(b) => b.remaining(),
577            IoBufMutInner::Pooled(b) => b.remaining(),
578        }
579    }
580
581    #[inline]
582    fn chunk(&self) -> &[u8] {
583        match &self.inner {
584            IoBufMutInner::Bytes(b) => b.chunk(),
585            IoBufMutInner::Pooled(b) => b.chunk(),
586        }
587    }
588
589    #[inline]
590    fn advance(&mut self, cnt: usize) {
591        match &mut self.inner {
592            IoBufMutInner::Bytes(b) => b.advance(cnt),
593            IoBufMutInner::Pooled(b) => b.advance(cnt),
594        }
595    }
596
597    #[inline]
598    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
599        match &mut self.inner {
600            IoBufMutInner::Bytes(b) => b.copy_to_bytes(len),
601            IoBufMutInner::Pooled(p) => {
602                // Full non-empty drain: transfer ownership so the drained source no
603                // longer retains the pooled allocation. Keep len == 0 on the normal
604                // path to avoid creating an empty Bytes that still pins pool memory.
605                if len != 0 && len == p.remaining() {
606                    let inner =
607                        std::mem::replace(&mut self.inner, IoBufMutInner::Bytes(BytesMut::new()));
608                    match inner {
609                        IoBufMutInner::Pooled(p) => p.into_bytes(),
610                        IoBufMutInner::Bytes(_) => unreachable!(),
611                    }
612                } else {
613                    p.copy_to_bytes(len)
614                }
615            }
616        }
617    }
618}
619
620// SAFETY: Delegates to BytesMut or PooledBufMut which implement BufMut safely.
621unsafe impl BufMut for IoBufMut {
622    #[inline]
623    fn remaining_mut(&self) -> usize {
624        match &self.inner {
625            IoBufMutInner::Bytes(b) => b.remaining_mut(),
626            IoBufMutInner::Pooled(b) => b.remaining_mut(),
627        }
628    }
629
630    #[inline]
631    unsafe fn advance_mut(&mut self, cnt: usize) {
632        match &mut self.inner {
633            IoBufMutInner::Bytes(b) => b.advance_mut(cnt),
634            IoBufMutInner::Pooled(b) => b.advance_mut(cnt),
635        }
636    }
637
638    #[inline]
639    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
640        match &mut self.inner {
641            IoBufMutInner::Bytes(b) => b.chunk_mut(),
642            IoBufMutInner::Pooled(b) => b.chunk_mut(),
643        }
644    }
645}
646
647impl From<Vec<u8>> for IoBufMut {
648    fn from(vec: Vec<u8>) -> Self {
649        Self::from(Bytes::from(vec))
650    }
651}
652
653impl From<&[u8]> for IoBufMut {
654    fn from(slice: &[u8]) -> Self {
655        Self {
656            inner: IoBufMutInner::Bytes(BytesMut::from(slice)),
657        }
658    }
659}
660
661impl<const N: usize> From<[u8; N]> for IoBufMut {
662    fn from(array: [u8; N]) -> Self {
663        Self::from(array.as_ref())
664    }
665}
666
667impl<const N: usize> From<&[u8; N]> for IoBufMut {
668    fn from(array: &[u8; N]) -> Self {
669        Self::from(array.as_ref())
670    }
671}
672
673impl From<BytesMut> for IoBufMut {
674    fn from(bytes: BytesMut) -> Self {
675        Self {
676            inner: IoBufMutInner::Bytes(bytes),
677        }
678    }
679}
680
681impl From<Bytes> for IoBufMut {
682    /// Zero-copy if `bytes` is unique for the entire original buffer (refcount is 1),
683    /// copies otherwise. Always copies if the [`Bytes`] was constructed via
684    /// [`Bytes::from_owner`] or [`Bytes::from_static`].
685    fn from(bytes: Bytes) -> Self {
686        Self {
687            inner: IoBufMutInner::Bytes(BytesMut::from(bytes)),
688        }
689    }
690}
691
692impl From<IoBuf> for IoBufMut {
693    /// Zero-copy when exclusive ownership can be recovered, copies otherwise.
694    fn from(buf: IoBuf) -> Self {
695        match buf.try_into_mut() {
696            Ok(buf) => buf,
697            Err(buf) => Self::from(buf.as_ref()),
698        }
699    }
700}
701
702/// Container for one or more immutable buffers.
703#[derive(Clone, Debug)]
704pub struct IoBufs {
705    inner: IoBufsInner,
706}
707
708/// Internal immutable representation.
709///
710/// - Representation is canonical and minimal for readable data:
711///   - `Single` is the only representation for empty data and one-chunk data.
712///   - `Chunked` is used only when four or more readable chunks remain.
713/// - `Pair`, `Triple`, and `Chunked` never store empty chunks.
714#[derive(Clone, Debug)]
715enum IoBufsInner {
716    /// Single buffer (fast path).
717    Single(IoBuf),
718    /// Two buffers (fast path).
719    Pair([IoBuf; 2]),
720    /// Three buffers (fast path).
721    Triple([IoBuf; 3]),
722    /// Four or more buffers.
723    Chunked(VecDeque<IoBuf>),
724}
725
726impl Default for IoBufs {
727    fn default() -> Self {
728        Self {
729            inner: IoBufsInner::Single(IoBuf::default()),
730        }
731    }
732}
733
734impl IoBufs {
735    /// Build canonical immutable chunk storage from readable chunks.
736    ///
737    /// Empty chunks are removed before representation selection.
738    fn from_chunks_iter(chunks: impl IntoIterator<Item = IoBuf>) -> Self {
739        let mut iter = chunks.into_iter().filter(|buf| !buf.is_empty());
740        let first = match iter.next() {
741            Some(first) => first,
742            None => return Self::default(),
743        };
744        let second = match iter.next() {
745            Some(second) => second,
746            None => {
747                return Self {
748                    inner: IoBufsInner::Single(first),
749                };
750            }
751        };
752        let third = match iter.next() {
753            Some(third) => third,
754            None => {
755                return Self {
756                    inner: IoBufsInner::Pair([first, second]),
757                };
758            }
759        };
760        let fourth = match iter.next() {
761            Some(fourth) => fourth,
762            None => {
763                return Self {
764                    inner: IoBufsInner::Triple([first, second, third]),
765                };
766            }
767        };
768
769        let mut bufs = VecDeque::with_capacity(4);
770        bufs.push_back(first);
771        bufs.push_back(second);
772        bufs.push_back(third);
773        bufs.push_back(fourth);
774        bufs.extend(iter);
775
776        Self {
777            inner: IoBufsInner::Chunked(bufs),
778        }
779    }
780
781    /// Re-establish canonical immutable representation invariants.
782    fn canonicalize(&mut self) {
783        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
784        self.inner = match inner {
785            IoBufsInner::Single(buf) => {
786                if buf.is_empty() {
787                    IoBufsInner::Single(IoBuf::default())
788                } else {
789                    IoBufsInner::Single(buf)
790                }
791            }
792            IoBufsInner::Pair([a, b]) => Self::from_chunks_iter([a, b]).inner,
793            IoBufsInner::Triple([a, b, c]) => Self::from_chunks_iter([a, b, c]).inner,
794            IoBufsInner::Chunked(bufs) => Self::from_chunks_iter(bufs).inner,
795        };
796    }
797
798    /// Returns a reference to the single contiguous buffer, if present.
799    ///
800    /// Returns `Some` only when all remaining data is in one contiguous buffer.
801    pub const fn as_single(&self) -> Option<&IoBuf> {
802        match &self.inner {
803            IoBufsInner::Single(buf) => Some(buf),
804            _ => None,
805        }
806    }
807
808    /// Consume this container and return the single buffer if present.
809    ///
810    /// Returns `Ok(IoBuf)` only when all remaining data is already contained in
811    /// a single chunk. Returns `Err(Self)` with the original container
812    /// otherwise.
813    pub fn try_into_single(self) -> Result<IoBuf, Self> {
814        match self.inner {
815            IoBufsInner::Single(buf) => Ok(buf),
816            inner => Err(Self { inner }),
817        }
818    }
819
820    /// Number of bytes remaining across all buffers.
821    #[inline]
822    pub fn len(&self) -> usize {
823        self.remaining()
824    }
825
826    /// Number of non-empty readable chunks.
827    #[inline]
828    pub fn chunk_count(&self) -> usize {
829        // This assumes canonical form.
830        match &self.inner {
831            IoBufsInner::Single(buf) => {
832                if buf.is_empty() {
833                    0
834                } else {
835                    1
836                }
837            }
838            IoBufsInner::Pair(_) => 2,
839            IoBufsInner::Triple(_) => 3,
840            IoBufsInner::Chunked(bufs) => bufs.len(),
841        }
842    }
843
844    /// Whether all buffers are empty.
845    #[inline]
846    pub fn is_empty(&self) -> bool {
847        self.remaining() == 0
848    }
849
850    /// Whether this contains a single contiguous buffer.
851    ///
852    /// When true, `chunk()` returns all remaining bytes.
853    #[inline]
854    pub const fn is_single(&self) -> bool {
855        matches!(self.inner, IoBufsInner::Single(_))
856    }
857
858    /// Prepend a buffer to the front.
859    ///
860    /// Empty input buffers are ignored.
861    pub fn prepend(&mut self, buf: IoBuf) {
862        if buf.is_empty() {
863            return;
864        }
865        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
866        self.inner = match inner {
867            IoBufsInner::Single(existing) if existing.is_empty() => IoBufsInner::Single(buf),
868            IoBufsInner::Single(existing) => IoBufsInner::Pair([buf, existing]),
869            IoBufsInner::Pair([a, b]) => IoBufsInner::Triple([buf, a, b]),
870            IoBufsInner::Triple([a, b, c]) => {
871                let mut bufs = VecDeque::with_capacity(4);
872                bufs.push_back(buf);
873                bufs.push_back(a);
874                bufs.push_back(b);
875                bufs.push_back(c);
876                IoBufsInner::Chunked(bufs)
877            }
878            IoBufsInner::Chunked(mut bufs) => {
879                bufs.push_front(buf);
880                IoBufsInner::Chunked(bufs)
881            }
882        };
883    }
884
885    /// Append a buffer to the back.
886    ///
887    /// Empty input buffers are ignored.
888    pub fn append(&mut self, buf: IoBuf) {
889        if buf.is_empty() {
890            return;
891        }
892        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
893        self.inner = match inner {
894            IoBufsInner::Single(existing) if existing.is_empty() => IoBufsInner::Single(buf),
895            IoBufsInner::Single(existing) => IoBufsInner::Pair([existing, buf]),
896            IoBufsInner::Pair([a, b]) => IoBufsInner::Triple([a, b, buf]),
897            IoBufsInner::Triple([a, b, c]) => {
898                let mut bufs = VecDeque::with_capacity(4);
899                bufs.push_back(a);
900                bufs.push_back(b);
901                bufs.push_back(c);
902                bufs.push_back(buf);
903                IoBufsInner::Chunked(bufs)
904            }
905            IoBufsInner::Chunked(mut bufs) => {
906                bufs.push_back(buf);
907                IoBufsInner::Chunked(bufs)
908            }
909        };
910    }
911
912    /// Splits the buffer(s) into two at the given index.
913    ///
914    /// Afterwards `self` contains bytes `[at, len)`, and the returned
915    /// [`IoBufs`] contains bytes `[0, at)`.
916    ///
917    /// Whole chunks are moved without copying. If the split point lands inside
918    /// a chunk, the chunk is split zero-copy via [`IoBuf::split_to`].
919    ///
920    /// # Panics
921    ///
922    /// Panics if `at > len`.
923    pub fn split_to(&mut self, at: usize) -> Self {
924        if at == 0 {
925            return Self::default();
926        }
927
928        let remaining = self.remaining();
929        assert!(
930            at <= remaining,
931            "split_to out of bounds: {:?} <= {:?}",
932            at,
933            remaining,
934        );
935
936        if at == remaining {
937            return std::mem::take(self);
938        }
939
940        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
941        match inner {
942            IoBufsInner::Single(mut buf) => {
943                // Delegate directly and keep remainder as single
944                let prefix = buf.split_to(at);
945                self.inner = IoBufsInner::Single(buf);
946                Self::from(prefix)
947            }
948            IoBufsInner::Pair([mut a, mut b]) => {
949                let a_len = a.remaining();
950                if at < a_len {
951                    // Split stays entirely in chunk `a`.
952                    let prefix = a.split_to(at);
953                    self.inner = IoBufsInner::Pair([a, b]);
954                    return Self::from(prefix);
955                }
956                if at == a_len {
957                    // Exact chunk boundary: move `a` out, keep `b`.
958                    self.inner = IoBufsInner::Single(b);
959                    return Self::from(a);
960                }
961
962                // Split crosses from `a` into `b`.
963                let b_prefix_len = at - a_len;
964                let b_prefix = b.split_to(b_prefix_len);
965                self.inner = IoBufsInner::Single(b);
966                Self {
967                    inner: IoBufsInner::Pair([a, b_prefix]),
968                }
969            }
970            IoBufsInner::Triple([mut a, mut b, mut c]) => {
971                let a_len = a.remaining();
972                if at < a_len {
973                    // Split stays entirely in chunk `a`.
974                    let prefix = a.split_to(at);
975                    self.inner = IoBufsInner::Triple([a, b, c]);
976                    return Self::from(prefix);
977                }
978                if at == a_len {
979                    // Exact boundary after `a`.
980                    self.inner = IoBufsInner::Pair([b, c]);
981                    return Self::from(a);
982                }
983
984                let mut remaining = at - a_len;
985                let b_len = b.remaining();
986                if remaining < b_len {
987                    // Split lands inside `b`.
988                    let b_prefix = b.split_to(remaining);
989                    self.inner = IoBufsInner::Pair([b, c]);
990                    return Self {
991                        inner: IoBufsInner::Pair([a, b_prefix]),
992                    };
993                }
994                if remaining == b_len {
995                    // Exact boundary after `b`.
996                    self.inner = IoBufsInner::Single(c);
997                    return Self {
998                        inner: IoBufsInner::Pair([a, b]),
999                    };
1000                }
1001
1002                // Split reaches into `c`.
1003                remaining -= b_len;
1004                let c_prefix = c.split_to(remaining);
1005                self.inner = IoBufsInner::Single(c);
1006                Self {
1007                    inner: IoBufsInner::Triple([a, b, c_prefix]),
1008                }
1009            }
1010            IoBufsInner::Chunked(mut bufs) => {
1011                let mut remaining = at;
1012                let mut out = VecDeque::new();
1013
1014                while remaining > 0 {
1015                    let mut front = bufs.pop_front().expect("split_to out of bounds");
1016                    let avail = front.remaining();
1017                    if avail == 0 {
1018                        // Canonical chunked state should not contain empties.
1019                        continue;
1020                    }
1021                    if remaining < avail {
1022                        // Split inside this chunk: keep suffix in `self`, move prefix to output.
1023                        let prefix = front.split_to(remaining);
1024                        out.push_back(prefix);
1025                        bufs.push_front(front);
1026                        break;
1027                    }
1028
1029                    // Consume this full chunk into the output prefix.
1030                    out.push_back(front);
1031                    remaining -= avail;
1032                }
1033
1034                self.inner = if bufs.len() >= 4 {
1035                    IoBufsInner::Chunked(bufs)
1036                } else {
1037                    Self::from_chunks_iter(bufs).inner
1038                };
1039
1040                if out.len() >= 4 {
1041                    Self {
1042                        inner: IoBufsInner::Chunked(out),
1043                    }
1044                } else {
1045                    Self::from_chunks_iter(out)
1046                }
1047            }
1048        }
1049    }
1050
1051    /// Coalesce all remaining bytes into a single contiguous [`IoBuf`].
1052    ///
1053    /// Zero-copy if only one buffer. Copies if multiple buffers.
1054    #[inline]
1055    pub fn coalesce(mut self) -> IoBuf {
1056        match self.inner {
1057            IoBufsInner::Single(buf) => buf,
1058            _ => self.copy_to_bytes(self.remaining()).into(),
1059        }
1060    }
1061
1062    /// Coalesce all remaining bytes into a single contiguous [`IoBuf`], using the pool
1063    /// for allocation if multiple buffers need to be merged.
1064    ///
1065    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
1066    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBuf {
1067        match self.inner {
1068            IoBufsInner::Single(buf) => buf,
1069            IoBufsInner::Pair([a, b]) => {
1070                let total_len = a.remaining().saturating_add(b.remaining());
1071                let mut result = pool.alloc(total_len);
1072                result.put_slice(a.as_ref());
1073                result.put_slice(b.as_ref());
1074                result.freeze()
1075            }
1076            IoBufsInner::Triple([a, b, c]) => {
1077                let total_len = a
1078                    .remaining()
1079                    .saturating_add(b.remaining())
1080                    .saturating_add(c.remaining());
1081                let mut result = pool.alloc(total_len);
1082                result.put_slice(a.as_ref());
1083                result.put_slice(b.as_ref());
1084                result.put_slice(c.as_ref());
1085                result.freeze()
1086            }
1087            IoBufsInner::Chunked(bufs) => {
1088                let total_len: usize = bufs
1089                    .iter()
1090                    .map(|b| b.remaining())
1091                    .fold(0, usize::saturating_add);
1092                let mut result = pool.alloc(total_len);
1093                for buf in bufs {
1094                    result.put_slice(buf.as_ref());
1095                }
1096                result.freeze()
1097            }
1098        }
1099    }
1100}
1101
1102impl Buf for IoBufs {
1103    fn remaining(&self) -> usize {
1104        match &self.inner {
1105            IoBufsInner::Single(buf) => buf.remaining(),
1106            IoBufsInner::Pair([a, b]) => a.remaining().saturating_add(b.remaining()),
1107            IoBufsInner::Triple([a, b, c]) => a
1108                .remaining()
1109                .saturating_add(b.remaining())
1110                .saturating_add(c.remaining()),
1111            IoBufsInner::Chunked(bufs) => bufs
1112                .iter()
1113                .map(|b| b.remaining())
1114                .fold(0, usize::saturating_add),
1115        }
1116    }
1117
1118    fn chunk(&self) -> &[u8] {
1119        match &self.inner {
1120            IoBufsInner::Single(buf) => buf.chunk(),
1121            IoBufsInner::Pair([a, b]) => {
1122                if a.remaining() > 0 {
1123                    a.chunk()
1124                } else if b.remaining() > 0 {
1125                    b.chunk()
1126                } else {
1127                    &[]
1128                }
1129            }
1130            IoBufsInner::Triple([a, b, c]) => {
1131                if a.remaining() > 0 {
1132                    a.chunk()
1133                } else if b.remaining() > 0 {
1134                    b.chunk()
1135                } else if c.remaining() > 0 {
1136                    c.chunk()
1137                } else {
1138                    &[]
1139                }
1140            }
1141            IoBufsInner::Chunked(bufs) => {
1142                for buf in bufs.iter() {
1143                    if buf.remaining() > 0 {
1144                        return buf.chunk();
1145                    }
1146                }
1147                &[]
1148            }
1149        }
1150    }
1151
1152    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
1153        if dst.is_empty() {
1154            return 0;
1155        }
1156
1157        match &self.inner {
1158            IoBufsInner::Single(buf) => {
1159                let chunk = buf.chunk();
1160                if !chunk.is_empty() {
1161                    dst[0] = IoSlice::new(chunk);
1162                    return 1;
1163                }
1164                0
1165            }
1166            IoBufsInner::Pair([a, b]) => fill_vectored_from_chunks(dst, [a.chunk(), b.chunk()]),
1167            IoBufsInner::Triple([a, b, c]) => {
1168                fill_vectored_from_chunks(dst, [a.chunk(), b.chunk(), c.chunk()])
1169            }
1170            IoBufsInner::Chunked(bufs) => {
1171                fill_vectored_from_chunks(dst, bufs.iter().map(|buf| buf.chunk()))
1172            }
1173        }
1174    }
1175
1176    fn advance(&mut self, cnt: usize) {
1177        let should_canonicalize = match &mut self.inner {
1178            IoBufsInner::Single(buf) => {
1179                buf.advance(cnt);
1180                false
1181            }
1182            IoBufsInner::Pair(pair) => advance_small_chunks(pair.as_mut_slice(), cnt),
1183            IoBufsInner::Triple(triple) => advance_small_chunks(triple.as_mut_slice(), cnt),
1184            IoBufsInner::Chunked(bufs) => {
1185                advance_chunked_front(bufs, cnt);
1186                bufs.len() <= 3
1187            }
1188        };
1189
1190        if should_canonicalize {
1191            self.canonicalize();
1192        }
1193    }
1194
1195    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
1196        let (result, needs_canonicalize) = match &mut self.inner {
1197            IoBufsInner::Single(buf) => return buf.copy_to_bytes(len),
1198            IoBufsInner::Pair(pair) => {
1199                copy_to_bytes_small_chunks(pair, len, "IoBufs::copy_to_bytes: not enough data")
1200            }
1201            IoBufsInner::Triple(triple) => {
1202                copy_to_bytes_small_chunks(triple, len, "IoBufs::copy_to_bytes: not enough data")
1203            }
1204            IoBufsInner::Chunked(bufs) => {
1205                copy_to_bytes_chunked(bufs, len, "IoBufs::copy_to_bytes: not enough data")
1206            }
1207        };
1208
1209        if needs_canonicalize {
1210            self.canonicalize();
1211        }
1212
1213        result
1214    }
1215}
1216
1217impl From<IoBuf> for IoBufs {
1218    fn from(buf: IoBuf) -> Self {
1219        Self {
1220            inner: IoBufsInner::Single(buf),
1221        }
1222    }
1223}
1224
1225impl From<IoBufMut> for IoBufs {
1226    fn from(buf: IoBufMut) -> Self {
1227        Self {
1228            inner: IoBufsInner::Single(buf.freeze()),
1229        }
1230    }
1231}
1232
1233impl From<Bytes> for IoBufs {
1234    fn from(bytes: Bytes) -> Self {
1235        Self::from(IoBuf::from(bytes))
1236    }
1237}
1238
1239impl From<BytesMut> for IoBufs {
1240    fn from(bytes: BytesMut) -> Self {
1241        Self::from(IoBuf::from(bytes.freeze()))
1242    }
1243}
1244
1245impl From<Vec<u8>> for IoBufs {
1246    fn from(vec: Vec<u8>) -> Self {
1247        Self::from(IoBuf::from(vec))
1248    }
1249}
1250
1251impl From<Vec<IoBuf>> for IoBufs {
1252    fn from(bufs: Vec<IoBuf>) -> Self {
1253        Self::from_chunks_iter(bufs)
1254    }
1255}
1256
1257impl<const N: usize> From<&'static [u8; N]> for IoBufs {
1258    fn from(array: &'static [u8; N]) -> Self {
1259        Self::from(IoBuf::from(array))
1260    }
1261}
1262
1263impl From<&'static [u8]> for IoBufs {
1264    fn from(slice: &'static [u8]) -> Self {
1265        Self::from(IoBuf::from(slice))
1266    }
1267}
1268
1269/// Container for one or more mutable buffers.
1270#[derive(Debug)]
1271pub struct IoBufsMut {
1272    inner: IoBufsMutInner,
1273}
1274
1275/// Internal mutable representation.
1276///
1277/// - Construction from caller-provided writable chunks keeps chunks with
1278///   non-zero capacity, even when `remaining() == 0`.
1279/// - Read-canonicalization paths remove drained chunks (`remaining() == 0`)
1280///   and collapse shape as readable chunk count shrinks.
1281#[derive(Debug)]
1282enum IoBufsMutInner {
1283    /// Single buffer (common case, no allocation).
1284    Single(IoBufMut),
1285    /// Two buffers (fast path, no VecDeque allocation).
1286    Pair([IoBufMut; 2]),
1287    /// Three buffers (fast path, no VecDeque allocation).
1288    Triple([IoBufMut; 3]),
1289    /// Four or more buffers.
1290    Chunked(VecDeque<IoBufMut>),
1291}
1292
1293impl Default for IoBufsMut {
1294    fn default() -> Self {
1295        Self {
1296            inner: IoBufsMutInner::Single(IoBufMut::default()),
1297        }
1298    }
1299}
1300
1301impl IoBufsMut {
1302    /// Build mutable chunk storage from already-filtered chunks.
1303    ///
1304    /// This helper intentionally does not filter.
1305    /// Callers choose filter policy first:
1306    /// - [`Self::from_writable_chunks_iter`] for construction from writable chunks (`capacity() > 0`)
1307    /// - [`Self::from_readable_chunks_iter`] for read-canonicalization (`remaining() > 0`)
1308    fn from_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1309        let mut iter = chunks.into_iter();
1310        let first = match iter.next() {
1311            Some(first) => first,
1312            None => return Self::default(),
1313        };
1314        let second = match iter.next() {
1315            Some(second) => second,
1316            None => {
1317                return Self {
1318                    inner: IoBufsMutInner::Single(first),
1319                };
1320            }
1321        };
1322        let third = match iter.next() {
1323            Some(third) => third,
1324            None => {
1325                return Self {
1326                    inner: IoBufsMutInner::Pair([first, second]),
1327                };
1328            }
1329        };
1330        let fourth = match iter.next() {
1331            Some(fourth) => fourth,
1332            None => {
1333                return Self {
1334                    inner: IoBufsMutInner::Triple([first, second, third]),
1335                };
1336            }
1337        };
1338
1339        let mut bufs = VecDeque::with_capacity(4);
1340        bufs.push_back(first);
1341        bufs.push_back(second);
1342        bufs.push_back(third);
1343        bufs.push_back(fourth);
1344        bufs.extend(iter);
1345        Self {
1346            inner: IoBufsMutInner::Chunked(bufs),
1347        }
1348    }
1349
1350    /// Build canonical mutable chunk storage from writable chunks.
1351    ///
1352    /// Chunks with zero capacity are removed.
1353    fn from_writable_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1354        // Keep chunks that can hold data (including len == 0 writable buffers).
1355        Self::from_chunks_iter(chunks.into_iter().filter(|buf| buf.capacity() > 0))
1356    }
1357
1358    /// Build canonical mutable chunk storage from readable chunks.
1359    ///
1360    /// Chunks with no remaining readable bytes are removed.
1361    fn from_readable_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1362        Self::from_chunks_iter(chunks.into_iter().filter(|buf| buf.remaining() > 0))
1363    }
1364
1365    /// Re-establish canonical mutable representation invariants.
1366    fn canonicalize(&mut self) {
1367        let inner = std::mem::replace(&mut self.inner, IoBufsMutInner::Single(IoBufMut::default()));
1368        self.inner = match inner {
1369            IoBufsMutInner::Single(buf) => IoBufsMutInner::Single(buf),
1370            IoBufsMutInner::Pair([a, b]) => Self::from_readable_chunks_iter([a, b]).inner,
1371            IoBufsMutInner::Triple([a, b, c]) => Self::from_readable_chunks_iter([a, b, c]).inner,
1372            IoBufsMutInner::Chunked(bufs) => Self::from_readable_chunks_iter(bufs).inner,
1373        };
1374    }
1375
1376    #[inline]
1377    fn for_each_chunk_mut(&mut self, mut f: impl FnMut(&mut IoBufMut)) {
1378        match &mut self.inner {
1379            IoBufsMutInner::Single(buf) => f(buf),
1380            IoBufsMutInner::Pair(pair) => {
1381                for buf in pair.iter_mut() {
1382                    f(buf);
1383                }
1384            }
1385            IoBufsMutInner::Triple(triple) => {
1386                for buf in triple.iter_mut() {
1387                    f(buf);
1388                }
1389            }
1390            IoBufsMutInner::Chunked(bufs) => {
1391                for buf in bufs.iter_mut() {
1392                    f(buf);
1393                }
1394            }
1395        }
1396    }
1397
1398    /// Returns a reference to the single contiguous buffer, if present.
1399    ///
1400    /// Returns `Some` only when this is currently represented as one chunk.
1401    pub const fn as_single(&self) -> Option<&IoBufMut> {
1402        match &self.inner {
1403            IoBufsMutInner::Single(buf) => Some(buf),
1404            _ => None,
1405        }
1406    }
1407
1408    /// Returns a mutable reference to the single contiguous buffer, if present.
1409    ///
1410    /// Returns `Some` only when this is currently represented as one chunk.
1411    pub const fn as_single_mut(&mut self) -> Option<&mut IoBufMut> {
1412        match &mut self.inner {
1413            IoBufsMutInner::Single(buf) => Some(buf),
1414            _ => None,
1415        }
1416    }
1417
1418    /// Consume this container and return the single buffer if present.
1419    ///
1420    /// Returns `Ok(IoBufMut)` only when readable data is represented as one
1421    /// chunk. Returns `Err(Self)` with the original container otherwise.
1422    #[allow(clippy::result_large_err)]
1423    pub fn try_into_single(self) -> Result<IoBufMut, Self> {
1424        match self.inner {
1425            IoBufsMutInner::Single(buf) => Ok(buf),
1426            inner => Err(Self { inner }),
1427        }
1428    }
1429
1430    /// Number of bytes remaining across all buffers.
1431    #[inline]
1432    pub fn len(&self) -> usize {
1433        self.remaining()
1434    }
1435
1436    /// Whether all buffers are empty.
1437    #[inline]
1438    pub fn is_empty(&self) -> bool {
1439        self.remaining() == 0
1440    }
1441
1442    /// Whether this contains a single contiguous buffer.
1443    ///
1444    /// When true, `chunk()` returns all remaining bytes.
1445    #[inline]
1446    pub const fn is_single(&self) -> bool {
1447        matches!(self.inner, IoBufsMutInner::Single(_))
1448    }
1449
1450    /// Freeze into immutable [`IoBufs`].
1451    pub fn freeze(self) -> IoBufs {
1452        match self.inner {
1453            IoBufsMutInner::Single(buf) => IoBufs::from(buf.freeze()),
1454            IoBufsMutInner::Pair([a, b]) => IoBufs::from_chunks_iter([a.freeze(), b.freeze()]),
1455            IoBufsMutInner::Triple([a, b, c]) => {
1456                IoBufs::from_chunks_iter([a.freeze(), b.freeze(), c.freeze()])
1457            }
1458            IoBufsMutInner::Chunked(bufs) => {
1459                IoBufs::from_chunks_iter(bufs.into_iter().map(IoBufMut::freeze))
1460            }
1461        }
1462    }
1463
1464    fn coalesce_with<F>(self, allocate: F) -> IoBufMut
1465    where
1466        F: FnOnce(usize) -> IoBufMut,
1467    {
1468        match self.inner {
1469            IoBufsMutInner::Single(buf) => buf,
1470            IoBufsMutInner::Pair([a, b]) => {
1471                let total_len = a.len().saturating_add(b.len());
1472                let mut result = allocate(total_len);
1473                result.put_slice(a.as_ref());
1474                result.put_slice(b.as_ref());
1475                result
1476            }
1477            IoBufsMutInner::Triple([a, b, c]) => {
1478                let total_len = a.len().saturating_add(b.len()).saturating_add(c.len());
1479                let mut result = allocate(total_len);
1480                result.put_slice(a.as_ref());
1481                result.put_slice(b.as_ref());
1482                result.put_slice(c.as_ref());
1483                result
1484            }
1485            IoBufsMutInner::Chunked(bufs) => {
1486                let total_len: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
1487                let mut result = allocate(total_len);
1488                for buf in bufs {
1489                    result.put_slice(buf.as_ref());
1490                }
1491                result
1492            }
1493        }
1494    }
1495
1496    /// Coalesce all buffers into a single contiguous [`IoBufMut`].
1497    ///
1498    /// Zero-copy if only one buffer. Copies if multiple buffers.
1499    pub fn coalesce(self) -> IoBufMut {
1500        self.coalesce_with(IoBufMut::with_capacity)
1501    }
1502
1503    /// Coalesce all buffers into a single contiguous [`IoBufMut`], using the pool
1504    /// for allocation if multiple buffers need to be merged.
1505    ///
1506    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
1507    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBufMut {
1508        self.coalesce_with(|len| pool.alloc(len))
1509    }
1510
1511    /// Coalesce all buffers into a single contiguous [`IoBufMut`] with extra
1512    /// capacity, using the pool for allocation.
1513    ///
1514    /// Zero-copy if single buffer with sufficient spare capacity.
1515    pub fn coalesce_with_pool_extra(self, pool: &BufferPool, extra: usize) -> IoBufMut {
1516        match self.inner {
1517            IoBufsMutInner::Single(buf) if buf.capacity() - buf.len() >= extra => buf,
1518            IoBufsMutInner::Single(buf) => {
1519                let mut result = pool.alloc(buf.len() + extra);
1520                result.put_slice(buf.as_ref());
1521                result
1522            }
1523            IoBufsMutInner::Pair([a, b]) => {
1524                let total = a.len().saturating_add(b.len());
1525                let mut result = pool.alloc(total + extra);
1526                result.put_slice(a.as_ref());
1527                result.put_slice(b.as_ref());
1528                result
1529            }
1530            IoBufsMutInner::Triple([a, b, c]) => {
1531                let total = a.len().saturating_add(b.len()).saturating_add(c.len());
1532                let mut result = pool.alloc(total + extra);
1533                result.put_slice(a.as_ref());
1534                result.put_slice(b.as_ref());
1535                result.put_slice(c.as_ref());
1536                result
1537            }
1538            IoBufsMutInner::Chunked(bufs) => {
1539                let total: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
1540                let mut result = pool.alloc(total + extra);
1541                for buf in bufs {
1542                    result.put_slice(buf.as_ref());
1543                }
1544                result
1545            }
1546        }
1547    }
1548
1549    /// Returns the total capacity across all buffers.
1550    pub fn capacity(&self) -> usize {
1551        match &self.inner {
1552            IoBufsMutInner::Single(buf) => buf.capacity(),
1553            IoBufsMutInner::Pair([a, b]) => a.capacity().saturating_add(b.capacity()),
1554            IoBufsMutInner::Triple([a, b, c]) => a
1555                .capacity()
1556                .saturating_add(b.capacity())
1557                .saturating_add(c.capacity()),
1558            IoBufsMutInner::Chunked(bufs) => bufs
1559                .iter()
1560                .map(|b| b.capacity())
1561                .fold(0, usize::saturating_add),
1562        }
1563    }
1564
1565    /// Sets the length of the buffer(s) to `len`, distributing across chunks
1566    /// while preserving the current chunk layout.
1567    ///
1568    /// This is useful for APIs that must fill caller-provided buffer structure
1569    /// in place (for example [`Blob::read_at_buf`](crate::Blob::read_at_buf)).
1570    ///
1571    /// # Safety
1572    ///
1573    /// Caller must initialize all `len` bytes before the buffer is read.
1574    ///
1575    /// # Panics
1576    ///
1577    /// Panics if `len` exceeds total capacity.
1578    pub(crate) unsafe fn set_len(&mut self, len: usize) {
1579        let capacity = self.capacity();
1580        assert!(
1581            len <= capacity,
1582            "set_len({len}) exceeds capacity({capacity})"
1583        );
1584        let mut remaining = len;
1585        self.for_each_chunk_mut(|buf| {
1586            let cap = buf.capacity();
1587            let to_set = remaining.min(cap);
1588            buf.set_len(to_set);
1589            remaining -= to_set;
1590        });
1591    }
1592
1593    /// Copy data from a slice into the buffers.
1594    ///
1595    /// Panics if the slice length doesn't match the total buffer length.
1596    pub fn copy_from_slice(&mut self, src: &[u8]) {
1597        assert_eq!(
1598            src.len(),
1599            self.len(),
1600            "source slice length must match buffer length"
1601        );
1602        let mut offset = 0;
1603        self.for_each_chunk_mut(|buf| {
1604            let len = buf.len();
1605            buf.as_mut().copy_from_slice(&src[offset..offset + len]);
1606            offset += len;
1607        });
1608    }
1609}
1610
1611impl Buf for IoBufsMut {
1612    fn remaining(&self) -> usize {
1613        match &self.inner {
1614            IoBufsMutInner::Single(buf) => buf.remaining(),
1615            IoBufsMutInner::Pair([a, b]) => a.remaining().saturating_add(b.remaining()),
1616            IoBufsMutInner::Triple([a, b, c]) => a
1617                .remaining()
1618                .saturating_add(b.remaining())
1619                .saturating_add(c.remaining()),
1620            IoBufsMutInner::Chunked(bufs) => bufs
1621                .iter()
1622                .map(|b| b.remaining())
1623                .fold(0, usize::saturating_add),
1624        }
1625    }
1626
1627    fn chunk(&self) -> &[u8] {
1628        match &self.inner {
1629            IoBufsMutInner::Single(buf) => buf.chunk(),
1630            IoBufsMutInner::Pair([a, b]) => {
1631                if a.remaining() > 0 {
1632                    a.chunk()
1633                } else if b.remaining() > 0 {
1634                    b.chunk()
1635                } else {
1636                    &[]
1637                }
1638            }
1639            IoBufsMutInner::Triple([a, b, c]) => {
1640                if a.remaining() > 0 {
1641                    a.chunk()
1642                } else if b.remaining() > 0 {
1643                    b.chunk()
1644                } else if c.remaining() > 0 {
1645                    c.chunk()
1646                } else {
1647                    &[]
1648                }
1649            }
1650            IoBufsMutInner::Chunked(bufs) => {
1651                for buf in bufs.iter() {
1652                    if buf.remaining() > 0 {
1653                        return buf.chunk();
1654                    }
1655                }
1656                &[]
1657            }
1658        }
1659    }
1660
1661    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
1662        if dst.is_empty() {
1663            return 0;
1664        }
1665
1666        match &self.inner {
1667            IoBufsMutInner::Single(buf) => {
1668                let chunk = buf.chunk();
1669                if !chunk.is_empty() {
1670                    dst[0] = IoSlice::new(chunk);
1671                    return 1;
1672                }
1673                0
1674            }
1675            IoBufsMutInner::Pair([a, b]) => fill_vectored_from_chunks(dst, [a.chunk(), b.chunk()]),
1676            IoBufsMutInner::Triple([a, b, c]) => {
1677                fill_vectored_from_chunks(dst, [a.chunk(), b.chunk(), c.chunk()])
1678            }
1679            IoBufsMutInner::Chunked(bufs) => {
1680                fill_vectored_from_chunks(dst, bufs.iter().map(|buf| buf.chunk()))
1681            }
1682        }
1683    }
1684
1685    fn advance(&mut self, cnt: usize) {
1686        let should_canonicalize = match &mut self.inner {
1687            IoBufsMutInner::Single(buf) => {
1688                buf.advance(cnt);
1689                false
1690            }
1691            IoBufsMutInner::Pair(pair) => advance_small_chunks(pair.as_mut_slice(), cnt),
1692            IoBufsMutInner::Triple(triple) => advance_small_chunks(triple.as_mut_slice(), cnt),
1693            IoBufsMutInner::Chunked(bufs) => {
1694                advance_chunked_front(bufs, cnt);
1695                bufs.len() <= 3
1696            }
1697        };
1698
1699        if should_canonicalize {
1700            self.canonicalize();
1701        }
1702    }
1703
1704    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
1705        let (result, needs_canonicalize) = match &mut self.inner {
1706            IoBufsMutInner::Single(buf) => return buf.copy_to_bytes(len),
1707            IoBufsMutInner::Pair(pair) => {
1708                copy_to_bytes_small_chunks(pair, len, "IoBufsMut::copy_to_bytes: not enough data")
1709            }
1710            IoBufsMutInner::Triple(triple) => {
1711                copy_to_bytes_small_chunks(triple, len, "IoBufsMut::copy_to_bytes: not enough data")
1712            }
1713            IoBufsMutInner::Chunked(bufs) => {
1714                copy_to_bytes_chunked(bufs, len, "IoBufsMut::copy_to_bytes: not enough data")
1715            }
1716        };
1717
1718        if needs_canonicalize {
1719            self.canonicalize();
1720        }
1721
1722        result
1723    }
1724}
1725
1726// SAFETY: Delegates to IoBufMut which implements BufMut safely.
1727unsafe impl BufMut for IoBufsMut {
1728    #[inline]
1729    fn remaining_mut(&self) -> usize {
1730        match &self.inner {
1731            IoBufsMutInner::Single(buf) => buf.remaining_mut(),
1732            IoBufsMutInner::Pair([a, b]) => a.remaining_mut().saturating_add(b.remaining_mut()),
1733            IoBufsMutInner::Triple([a, b, c]) => a
1734                .remaining_mut()
1735                .saturating_add(b.remaining_mut())
1736                .saturating_add(c.remaining_mut()),
1737            IoBufsMutInner::Chunked(bufs) => bufs
1738                .iter()
1739                .map(|b| b.remaining_mut())
1740                .fold(0, usize::saturating_add),
1741        }
1742    }
1743
1744    #[inline]
1745    unsafe fn advance_mut(&mut self, cnt: usize) {
1746        match &mut self.inner {
1747            IoBufsMutInner::Single(buf) => buf.advance_mut(cnt),
1748            IoBufsMutInner::Pair(pair) => {
1749                let mut remaining = cnt;
1750                if advance_mut_in_chunks(pair, &mut remaining) {
1751                    return;
1752                }
1753                panic!("cannot advance past end of buffer");
1754            }
1755            IoBufsMutInner::Triple(triple) => {
1756                let mut remaining = cnt;
1757                if advance_mut_in_chunks(triple, &mut remaining) {
1758                    return;
1759                }
1760                panic!("cannot advance past end of buffer");
1761            }
1762            IoBufsMutInner::Chunked(bufs) => {
1763                let mut remaining = cnt;
1764                let (first, second) = bufs.as_mut_slices();
1765                if advance_mut_in_chunks(first, &mut remaining)
1766                    || advance_mut_in_chunks(second, &mut remaining)
1767                {
1768                    return;
1769                }
1770                panic!("cannot advance past end of buffer");
1771            }
1772        }
1773    }
1774
1775    #[inline]
1776    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
1777        match &mut self.inner {
1778            IoBufsMutInner::Single(buf) => buf.chunk_mut(),
1779            IoBufsMutInner::Pair(pair) => {
1780                if pair[0].remaining_mut() > 0 {
1781                    pair[0].chunk_mut()
1782                } else if pair[1].remaining_mut() > 0 {
1783                    pair[1].chunk_mut()
1784                } else {
1785                    bytes::buf::UninitSlice::new(&mut [])
1786                }
1787            }
1788            IoBufsMutInner::Triple(triple) => {
1789                if triple[0].remaining_mut() > 0 {
1790                    triple[0].chunk_mut()
1791                } else if triple[1].remaining_mut() > 0 {
1792                    triple[1].chunk_mut()
1793                } else if triple[2].remaining_mut() > 0 {
1794                    triple[2].chunk_mut()
1795                } else {
1796                    bytes::buf::UninitSlice::new(&mut [])
1797                }
1798            }
1799            IoBufsMutInner::Chunked(bufs) => {
1800                for buf in bufs.iter_mut() {
1801                    if buf.remaining_mut() > 0 {
1802                        return buf.chunk_mut();
1803                    }
1804                }
1805                bytes::buf::UninitSlice::new(&mut [])
1806            }
1807        }
1808    }
1809}
1810
1811impl From<IoBufMut> for IoBufsMut {
1812    fn from(buf: IoBufMut) -> Self {
1813        Self {
1814            inner: IoBufsMutInner::Single(buf),
1815        }
1816    }
1817}
1818
1819impl From<Vec<u8>> for IoBufsMut {
1820    fn from(vec: Vec<u8>) -> Self {
1821        Self {
1822            inner: IoBufsMutInner::Single(IoBufMut::from(vec)),
1823        }
1824    }
1825}
1826
1827impl From<BytesMut> for IoBufsMut {
1828    fn from(bytes: BytesMut) -> Self {
1829        Self {
1830            inner: IoBufsMutInner::Single(IoBufMut::from(bytes)),
1831        }
1832    }
1833}
1834
1835impl From<Vec<IoBufMut>> for IoBufsMut {
1836    fn from(bufs: Vec<IoBufMut>) -> Self {
1837        Self::from_writable_chunks_iter(bufs)
1838    }
1839}
1840
1841impl<const N: usize> From<[u8; N]> for IoBufsMut {
1842    fn from(array: [u8; N]) -> Self {
1843        Self {
1844            inner: IoBufsMutInner::Single(IoBufMut::from(array)),
1845        }
1846    }
1847}
1848
1849/// Drain `len` readable bytes from a small fixed chunk array (`Pair`/`Triple`).
1850///
1851/// Returns drained bytes plus whether the caller should canonicalize afterward.
1852#[inline]
1853fn copy_to_bytes_small_chunks<B: Buf, const N: usize>(
1854    chunks: &mut [B; N],
1855    len: usize,
1856    not_enough_data_msg: &str,
1857) -> (Bytes, bool) {
1858    let total = chunks
1859        .iter()
1860        .map(|buf| buf.remaining())
1861        .fold(0, usize::saturating_add);
1862    assert!(total >= len, "{not_enough_data_msg}");
1863
1864    if chunks[0].remaining() >= len {
1865        let bytes = chunks[0].copy_to_bytes(len);
1866        return (bytes, chunks[0].remaining() == 0);
1867    }
1868
1869    let mut out = BytesMut::with_capacity(len);
1870    let mut remaining = len;
1871    for buf in chunks.iter_mut() {
1872        if remaining == 0 {
1873            break;
1874        }
1875        let to_copy = remaining.min(buf.remaining());
1876        out.extend_from_slice(&buf.chunk()[..to_copy]);
1877        buf.advance(to_copy);
1878        remaining -= to_copy;
1879    }
1880
1881    // Slow path always consumes past chunk 0, so canonicalization is required.
1882    (out.freeze(), true)
1883}
1884
1885/// Drain `len` readable bytes from a deque-backed chunk representation.
1886///
1887/// Returns drained bytes plus whether the caller should canonicalize afterward.
1888#[inline]
1889fn copy_to_bytes_chunked<B: Buf>(
1890    bufs: &mut VecDeque<B>,
1891    len: usize,
1892    not_enough_data_msg: &str,
1893) -> (Bytes, bool) {
1894    while bufs.front().is_some_and(|buf| buf.remaining() == 0) {
1895        bufs.pop_front();
1896    }
1897
1898    if bufs.front().is_none() {
1899        assert_eq!(len, 0, "{not_enough_data_msg}");
1900        return (Bytes::new(), false);
1901    }
1902
1903    if bufs.front().is_some_and(|front| front.remaining() >= len) {
1904        let front = bufs.front_mut().expect("front checked above");
1905        let bytes = front.copy_to_bytes(len);
1906        if front.remaining() == 0 {
1907            bufs.pop_front();
1908        }
1909        return (bytes, bufs.len() <= 3);
1910    }
1911
1912    let total = bufs
1913        .iter()
1914        .map(|buf| buf.remaining())
1915        .fold(0, usize::saturating_add);
1916    assert!(total >= len, "{not_enough_data_msg}");
1917
1918    let mut out = BytesMut::with_capacity(len);
1919    let mut remaining = len;
1920    while remaining > 0 {
1921        let front = bufs
1922            .front_mut()
1923            .expect("remaining > 0 implies non-empty bufs");
1924        let to_copy = remaining.min(front.remaining());
1925        out.extend_from_slice(&front.chunk()[..to_copy]);
1926        front.advance(to_copy);
1927        if front.remaining() == 0 {
1928            bufs.pop_front();
1929        }
1930        remaining -= to_copy;
1931    }
1932
1933    (out.freeze(), bufs.len() <= 3)
1934}
1935
1936/// Advance across a [`VecDeque`] of chunks by consuming from the front.
1937#[inline]
1938fn advance_chunked_front<B: Buf>(bufs: &mut VecDeque<B>, mut cnt: usize) {
1939    while cnt > 0 {
1940        let front = bufs.front_mut().expect("cannot advance past end of buffer");
1941        let avail = front.remaining();
1942        if avail == 0 {
1943            bufs.pop_front();
1944            continue;
1945        }
1946        if cnt < avail {
1947            front.advance(cnt);
1948            break;
1949        }
1950        front.advance(avail);
1951        bufs.pop_front();
1952        cnt -= avail;
1953    }
1954}
1955
1956/// Advance across a small fixed set of chunks (`Pair`/`Triple`).
1957///
1958/// Returns `true` when one or more chunks became (or were) empty, so callers
1959/// can canonicalize once after the operation.
1960#[inline]
1961fn advance_small_chunks<B: Buf>(chunks: &mut [B], mut cnt: usize) -> bool {
1962    let mut idx = 0;
1963    let mut needs_canonicalize = false;
1964
1965    while cnt > 0 {
1966        let chunk = chunks
1967            .get_mut(idx)
1968            .expect("cannot advance past end of buffer");
1969        let avail = chunk.remaining();
1970        if avail == 0 {
1971            idx += 1;
1972            needs_canonicalize = true;
1973            continue;
1974        }
1975        if cnt < avail {
1976            chunk.advance(cnt);
1977            return needs_canonicalize;
1978        }
1979        chunk.advance(avail);
1980        cnt -= avail;
1981        idx += 1;
1982        needs_canonicalize = true;
1983    }
1984
1985    needs_canonicalize
1986}
1987
1988/// Advance writable cursors across `chunks` by up to `*remaining` bytes.
1989///
1990/// Returns `true` when the full request has been satisfied.
1991///
1992/// # Safety
1993///
1994/// Forwards to [`BufMut::advance_mut`], so callers must ensure the advanced
1995/// region has been initialized according to [`BufMut`]'s contract.
1996#[inline]
1997unsafe fn advance_mut_in_chunks<B: BufMut>(chunks: &mut [B], remaining: &mut usize) -> bool {
1998    if *remaining == 0 {
1999        return true;
2000    }
2001
2002    for buf in chunks.iter_mut() {
2003        let avail = buf.chunk_mut().len();
2004        if avail == 0 {
2005            continue;
2006        }
2007        if *remaining <= avail {
2008            // SAFETY: Upheld by this function's safety contract.
2009            unsafe { buf.advance_mut(*remaining) };
2010            *remaining = 0;
2011            return true;
2012        }
2013        // SAFETY: Upheld by this function's safety contract.
2014        unsafe { buf.advance_mut(avail) };
2015        *remaining -= avail;
2016    }
2017    false
2018}
2019
2020/// Fill `dst` with `IoSlice`s built from `chunks`.
2021///
2022/// Empty chunks are skipped. At most `dst.len()` slices are written.
2023/// Returns the number of slices written.
2024#[inline]
2025fn fill_vectored_from_chunks<'a, I>(dst: &mut [IoSlice<'a>], chunks: I) -> usize
2026where
2027    I: IntoIterator<Item = &'a [u8]>,
2028{
2029    let mut written = 0;
2030    for chunk in chunks
2031        .into_iter()
2032        .filter(|chunk| !chunk.is_empty())
2033        .take(dst.len())
2034    {
2035        dst[written] = IoSlice::new(chunk);
2036        written += 1;
2037    }
2038    written
2039}
2040
2041/// Extension trait for encoding values into pooled I/O buffers.
2042///
2043/// This is useful for hot paths that need to avoid frequent heap allocations
2044/// when serializing values that implement [`Write`] and [`EncodeSize`].
2045pub trait EncodeExt: EncodeSize + Write {
2046    /// Encode this value into an [`IoBufMut`] allocated from `pool`.
2047    ///
2048    /// # Panics
2049    ///
2050    /// Panics if [`EncodeSize::encode_size`] does not match the number of
2051    /// bytes written by [`Write::write`].
2052    fn encode_with_pool_mut(&self, pool: &BufferPool) -> IoBufMut {
2053        let len = self.encode_size();
2054        let mut buf = pool.alloc(len);
2055        self.write(&mut buf);
2056        assert_eq!(
2057            buf.len(),
2058            len,
2059            "write() did not write expected bytes into pooled buffer"
2060        );
2061        buf
2062    }
2063
2064    /// Encode this value into an immutable [`IoBuf`] allocated from `pool`.
2065    ///
2066    /// # Panics
2067    ///
2068    /// Panics if [`EncodeSize::encode_size`] does not match the number of
2069    /// bytes written by [`Write::write`].
2070    fn encode_with_pool(&self, pool: &BufferPool) -> IoBuf {
2071        self.encode_with_pool_mut(pool).freeze()
2072    }
2073}
2074
2075impl<T: EncodeSize + Write> EncodeExt for T {}
2076
2077#[cfg(test)]
2078mod tests {
2079    use super::*;
2080    use bytes::BytesMut;
2081    use commonware_codec::{Decode, Encode, RangeCfg};
2082
2083    fn test_pool() -> BufferPool {
2084        cfg_if::cfg_if! {
2085            if #[cfg(miri)] {
2086                // Reduce max_per_class to avoid slow atomics under miri.
2087                let pool_config = BufferPoolConfig {
2088                    max_per_class: commonware_utils::NZUsize!(32),
2089                    ..BufferPoolConfig::for_network()
2090                };
2091            } else {
2092                let pool_config = BufferPoolConfig::for_network();
2093            }
2094        }
2095        let mut registry = prometheus_client::registry::Registry::default();
2096        BufferPool::new(pool_config, &mut registry)
2097    }
2098
2099    #[test]
2100    fn test_iobuf_core_behaviors() {
2101        // Clone stays zero-copy for immutable buffers.
2102        let buf1 = IoBuf::from(vec![1u8; 1000]);
2103        let buf2 = buf1.clone();
2104        assert_eq!(buf1.as_ref().as_ptr(), buf2.as_ref().as_ptr());
2105
2106        // copy_from_slice creates an owned immutable buffer.
2107        let data = vec![1u8, 2, 3, 4, 5];
2108        let copied = IoBuf::copy_from_slice(&data);
2109        assert_eq!(copied, [1, 2, 3, 4, 5]);
2110        assert_eq!(copied.len(), 5);
2111        let empty = IoBuf::copy_from_slice(&[]);
2112        assert!(empty.is_empty());
2113
2114        // Equality works against both arrays and slices.
2115        let eq = IoBuf::from(b"hello");
2116        assert_eq!(eq, *b"hello");
2117        assert_eq!(eq, b"hello");
2118        assert_ne!(eq, *b"world");
2119        assert_ne!(eq, b"world");
2120        assert_eq!(IoBuf::from(b"hello"), IoBuf::from(b"hello"));
2121        assert_ne!(IoBuf::from(b"hello"), IoBuf::from(b"world"));
2122        let bytes: Bytes = IoBuf::from(b"bytes").into();
2123        assert_eq!(bytes.as_ref(), b"bytes");
2124
2125        // Buf trait operations keep `len()` and `remaining()` in sync.
2126        let mut buf = IoBuf::from(b"hello world");
2127        assert_eq!(buf.len(), buf.remaining());
2128        assert_eq!(buf.as_ref(), buf.chunk());
2129        assert_eq!(buf.remaining(), 11);
2130        buf.advance(6);
2131        assert_eq!(buf.chunk(), b"world");
2132        assert_eq!(buf.len(), buf.remaining());
2133
2134        // copy_to_bytes drains in-order and advances the source.
2135        let first = buf.copy_to_bytes(2);
2136        assert_eq!(&first[..], b"wo");
2137        let rest = buf.copy_to_bytes(3);
2138        assert_eq!(&rest[..], b"rld");
2139        assert_eq!(buf.remaining(), 0);
2140
2141        // Slicing remains zero-copy and supports all common range forms.
2142        let src = IoBuf::from(b"hello world");
2143        assert_eq!(src.slice(..5), b"hello");
2144        assert_eq!(src.slice(6..), b"world");
2145        assert_eq!(src.slice(3..8), b"lo wo");
2146        assert!(src.slice(5..5).is_empty());
2147    }
2148
2149    #[test]
2150    fn test_iobuf_codec_roundtrip() {
2151        let cfg: RangeCfg<usize> = (0..=1024).into();
2152
2153        let original = IoBuf::from(b"hello world");
2154        let encoded = original.encode();
2155        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
2156        assert_eq!(original, decoded);
2157
2158        let empty = IoBuf::default();
2159        let encoded = empty.encode();
2160        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
2161        assert_eq!(empty, decoded);
2162
2163        let large_cfg: RangeCfg<usize> = (0..=20000).into();
2164        let large = IoBuf::from(vec![42u8; 10000]);
2165        let encoded = large.encode();
2166        let decoded = IoBuf::decode_cfg(encoded, &large_cfg).unwrap();
2167        assert_eq!(large, decoded);
2168    }
2169
2170    #[test]
2171    #[should_panic(expected = "cannot advance")]
2172    fn test_iobuf_advance_past_end() {
2173        let mut buf = IoBuf::from(b"hello");
2174        buf.advance(10);
2175    }
2176
2177    #[test]
2178    fn test_iobuf_split_to_consistent_across_backings() {
2179        let pool = test_pool();
2180        let mut pooled = pool.try_alloc(256).expect("pooled allocation");
2181        pooled.put_slice(b"hello world");
2182        let mut pooled_buf = pooled.freeze();
2183        let mut bytes_buf = IoBuf::from(b"hello world");
2184
2185        assert!(pooled_buf.is_pooled());
2186        assert!(!bytes_buf.is_pooled());
2187
2188        let pooled_empty = pooled_buf.split_to(0);
2189        let bytes_empty = bytes_buf.split_to(0);
2190        assert_eq!(pooled_empty, bytes_empty);
2191        assert_eq!(pooled_buf, bytes_buf);
2192        assert!(!pooled_empty.is_pooled());
2193
2194        let pooled_prefix = pooled_buf.split_to(5);
2195        let bytes_prefix = bytes_buf.split_to(5);
2196        assert_eq!(pooled_prefix, bytes_prefix);
2197        assert_eq!(pooled_buf, bytes_buf);
2198        assert!(pooled_prefix.is_pooled());
2199
2200        let pooled_rest = pooled_buf.split_to(pooled_buf.len());
2201        let bytes_rest = bytes_buf.split_to(bytes_buf.len());
2202        assert_eq!(pooled_rest, bytes_rest);
2203        assert_eq!(pooled_buf, bytes_buf);
2204        assert!(pooled_buf.is_empty());
2205        assert!(bytes_buf.is_empty());
2206        assert!(!pooled_buf.is_pooled());
2207    }
2208
2209    #[test]
2210    #[should_panic(expected = "split_to out of bounds")]
2211    fn test_iobuf_split_to_out_of_bounds() {
2212        let mut buf = IoBuf::from(b"abc");
2213        let _ = buf.split_to(4);
2214    }
2215
2216    #[test]
2217    fn test_iobufmut_core_behaviors() {
2218        // Build mutable buffers incrementally and freeze to immutable.
2219        let mut buf = IoBufMut::with_capacity(100);
2220        assert!(buf.capacity() >= 100);
2221        assert_eq!(buf.len(), 0);
2222        buf.put_slice(b"hello");
2223        buf.put_slice(b" world");
2224        assert_eq!(buf, b"hello world");
2225        assert_eq!(buf, &b"hello world"[..]);
2226        assert_eq!(buf.freeze(), b"hello world");
2227
2228        // `zeroed` creates readable initialized bytes; `set_len` can shrink safely.
2229        let mut zeroed = IoBufMut::zeroed(10);
2230        assert_eq!(zeroed, &[0u8; 10]);
2231        // SAFETY: shrinking readable length to initialized region.
2232        unsafe { zeroed.set_len(5) };
2233        assert_eq!(zeroed, &[0u8; 5]);
2234        zeroed.as_mut()[..5].copy_from_slice(b"hello");
2235        assert_eq!(&zeroed.as_ref()[..5], b"hello");
2236        let frozen = zeroed.freeze();
2237        let vec: Vec<u8> = frozen.into();
2238        assert_eq!(&vec[..5], b"hello");
2239
2240        // Exercise pooled branch behavior for `is_empty`.
2241        let pool = test_pool();
2242        let mut pooled = pool.alloc(8);
2243        assert!(pooled.is_empty());
2244        pooled.put_slice(b"x");
2245        assert!(!pooled.is_empty());
2246    }
2247
2248    #[test]
2249    fn test_iobufs_shapes_and_read_paths() {
2250        // Empty construction normalizes to an empty single chunk.
2251        let empty = IoBufs::from(Vec::<u8>::new());
2252        assert!(empty.is_empty());
2253        assert!(empty.is_single());
2254        assert!(empty.as_single().is_some());
2255
2256        // Single-buffer read path.
2257        let mut single = IoBufs::from(b"hello world");
2258        assert!(single.is_single());
2259        assert_eq!(single.chunk(), b"hello world");
2260        single.advance(6);
2261        assert_eq!(single.chunk(), b"world");
2262        assert_eq!(single.copy_to_bytes(5).as_ref(), b"world");
2263        assert_eq!(single.remaining(), 0);
2264
2265        // Fast-path shapes (Pair/Triple/Chunked).
2266        let mut pair = IoBufs::from(IoBuf::from(b"a"));
2267        pair.append(IoBuf::from(b"b"));
2268        assert!(matches!(pair.inner, IoBufsInner::Pair(_)));
2269        assert!(pair.as_single().is_none());
2270
2271        let mut triple = IoBufs::from(IoBuf::from(b"a"));
2272        triple.append(IoBuf::from(b"b"));
2273        triple.append(IoBuf::from(b"c"));
2274        assert!(matches!(triple.inner, IoBufsInner::Triple(_)));
2275
2276        let mut chunked = IoBufs::from(IoBuf::from(b"a"));
2277        chunked.append(IoBuf::from(b"b"));
2278        chunked.append(IoBuf::from(b"c"));
2279        chunked.append(IoBuf::from(b"d"));
2280        assert!(matches!(chunked.inner, IoBufsInner::Chunked(_)));
2281
2282        // prepend + append preserve ordering.
2283        let mut joined = IoBufs::from(b"middle");
2284        joined.prepend(IoBuf::from(b"start "));
2285        joined.append(IoBuf::from(b" end"));
2286        assert_eq!(joined.coalesce(), b"start middle end");
2287
2288        // prepending empty is a no-op, and prepending into pair upgrades to triple.
2289        let mut prepend_noop = IoBufs::from(b"x");
2290        prepend_noop.prepend(IoBuf::default());
2291        assert_eq!(prepend_noop.coalesce(), b"x");
2292
2293        let mut prepend_pair = IoBufs::from(vec![IoBuf::from(b"b"), IoBuf::from(b"c")]);
2294        prepend_pair.prepend(IoBuf::from(b"a"));
2295        assert!(matches!(prepend_pair.inner, IoBufsInner::Triple(_)));
2296        assert_eq!(prepend_pair.coalesce(), b"abc");
2297
2298        // canonicalizing a non-empty single should keep the same representation.
2299        let mut canonical_single = IoBufs::from(b"q");
2300        canonical_single.canonicalize();
2301        assert!(canonical_single.is_single());
2302        assert_eq!(canonical_single.coalesce(), b"q");
2303    }
2304
2305    #[test]
2306    fn test_iobufs_split_to_cases() {
2307        // Zero and full split on a single chunk.
2308        let mut bufs = IoBufs::from(b"hello");
2309
2310        let empty = bufs.split_to(0);
2311        assert!(empty.is_empty());
2312        assert_eq!(bufs.coalesce(), b"hello");
2313
2314        let mut bufs = IoBufs::from(b"hello");
2315        let all = bufs.split_to(5);
2316        assert_eq!(all.coalesce(), b"hello");
2317        assert!(bufs.is_single());
2318        assert!(bufs.is_empty());
2319
2320        // Single split in the middle.
2321        let mut single_mid = IoBufs::from(b"hello");
2322        let single_prefix = single_mid.split_to(2);
2323        assert!(single_prefix.is_single());
2324        assert_eq!(single_prefix.coalesce(), b"he");
2325        assert_eq!(single_mid.coalesce(), b"llo");
2326
2327        // Pair split paths: in-first, boundary-after-first, crossing-into-second.
2328        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2329        let pair_prefix = pair.split_to(1);
2330        assert!(pair_prefix.is_single());
2331        assert_eq!(pair_prefix.coalesce(), b"a");
2332        assert!(matches!(pair.inner, IoBufsInner::Pair(_)));
2333        assert_eq!(pair.coalesce(), b"bcd");
2334
2335        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2336        let pair_prefix = pair.split_to(2);
2337        assert!(pair_prefix.is_single());
2338        assert_eq!(pair_prefix.coalesce(), b"ab");
2339        assert!(pair.is_single());
2340        assert_eq!(pair.coalesce(), b"cd");
2341
2342        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2343        let pair_prefix = pair.split_to(3);
2344        assert!(matches!(pair_prefix.inner, IoBufsInner::Pair(_)));
2345        assert_eq!(pair_prefix.coalesce(), b"abc");
2346        assert!(pair.is_single());
2347        assert_eq!(pair.coalesce(), b"d");
2348
2349        // Triple split paths: in-first, boundary-after-first, in-second, boundary-after-second,
2350        // and reaching into third.
2351        let mut triple = IoBufs::from(vec![
2352            IoBuf::from(b"ab"),
2353            IoBuf::from(b"cd"),
2354            IoBuf::from(b"ef"),
2355        ]);
2356        let triple_prefix = triple.split_to(1);
2357        assert!(triple_prefix.is_single());
2358        assert_eq!(triple_prefix.coalesce(), b"a");
2359        assert!(matches!(triple.inner, IoBufsInner::Triple(_)));
2360        assert_eq!(triple.coalesce(), b"bcdef");
2361
2362        let mut triple = IoBufs::from(vec![
2363            IoBuf::from(b"ab"),
2364            IoBuf::from(b"cd"),
2365            IoBuf::from(b"ef"),
2366        ]);
2367        let triple_prefix = triple.split_to(2);
2368        assert!(triple_prefix.is_single());
2369        assert_eq!(triple_prefix.coalesce(), b"ab");
2370        assert!(matches!(triple.inner, IoBufsInner::Pair(_)));
2371        assert_eq!(triple.coalesce(), b"cdef");
2372
2373        let mut triple = IoBufs::from(vec![
2374            IoBuf::from(b"ab"),
2375            IoBuf::from(b"cd"),
2376            IoBuf::from(b"ef"),
2377        ]);
2378        let triple_prefix = triple.split_to(3);
2379        assert!(matches!(triple_prefix.inner, IoBufsInner::Pair(_)));
2380        assert_eq!(triple_prefix.coalesce(), b"abc");
2381        assert!(matches!(triple.inner, IoBufsInner::Pair(_)));
2382        assert_eq!(triple.coalesce(), b"def");
2383
2384        let mut triple = IoBufs::from(vec![
2385            IoBuf::from(b"ab"),
2386            IoBuf::from(b"cd"),
2387            IoBuf::from(b"ef"),
2388        ]);
2389        let triple_prefix = triple.split_to(4);
2390        assert!(matches!(triple_prefix.inner, IoBufsInner::Pair(_)));
2391        assert_eq!(triple_prefix.coalesce(), b"abcd");
2392        assert!(triple.is_single());
2393        assert_eq!(triple.coalesce(), b"ef");
2394
2395        let mut triple = IoBufs::from(vec![
2396            IoBuf::from(b"ab"),
2397            IoBuf::from(b"cd"),
2398            IoBuf::from(b"ef"),
2399        ]);
2400        let triple_prefix = triple.split_to(5);
2401        assert!(matches!(triple_prefix.inner, IoBufsInner::Triple(_)));
2402        assert_eq!(triple_prefix.coalesce(), b"abcde");
2403        assert!(triple.is_single());
2404        assert_eq!(triple.coalesce(), b"f");
2405
2406        // Chunked split can canonicalize remainder/prefix shapes.
2407        let mut bufs = IoBufs::from(vec![
2408            IoBuf::from(b"ab"),
2409            IoBuf::from(b"cd"),
2410            IoBuf::from(b"ef"),
2411            IoBuf::from(b"gh"),
2412        ]);
2413        let prefix = bufs.split_to(4);
2414        assert!(matches!(prefix.inner, IoBufsInner::Pair(_)));
2415        assert_eq!(prefix.coalesce(), b"abcd");
2416        assert!(matches!(bufs.inner, IoBufsInner::Pair(_)));
2417        assert_eq!(bufs.coalesce(), b"efgh");
2418
2419        // Chunked split inside a chunk.
2420        let mut bufs = IoBufs::from(vec![
2421            IoBuf::from(b"ab"),
2422            IoBuf::from(b"cd"),
2423            IoBuf::from(b"ef"),
2424            IoBuf::from(b"gh"),
2425        ]);
2426        let prefix = bufs.split_to(5);
2427        assert!(matches!(prefix.inner, IoBufsInner::Triple(_)));
2428        assert_eq!(prefix.coalesce(), b"abcde");
2429        assert!(matches!(bufs.inner, IoBufsInner::Pair(_)));
2430        assert_eq!(bufs.coalesce(), b"fgh");
2431
2432        // Chunked split can remain chunked on both sides when both have >= 4 chunks.
2433        let mut bufs = IoBufs::from(vec![
2434            IoBuf::from(b"a"),
2435            IoBuf::from(b"b"),
2436            IoBuf::from(b"c"),
2437            IoBuf::from(b"d"),
2438            IoBuf::from(b"e"),
2439            IoBuf::from(b"f"),
2440            IoBuf::from(b"g"),
2441            IoBuf::from(b"h"),
2442        ]);
2443        let prefix = bufs.split_to(4);
2444        assert!(matches!(prefix.inner, IoBufsInner::Chunked(_)));
2445        assert_eq!(prefix.coalesce(), b"abcd");
2446        assert!(matches!(bufs.inner, IoBufsInner::Chunked(_)));
2447        assert_eq!(bufs.coalesce(), b"efgh");
2448
2449        // Defensive path: tolerate accidental empty chunks in non-canonical chunked input.
2450        let mut bufs = IoBufs {
2451            inner: IoBufsInner::Chunked(VecDeque::from([
2452                IoBuf::default(),
2453                IoBuf::from(b"ab"),
2454                IoBuf::from(b"cd"),
2455                IoBuf::from(b"ef"),
2456                IoBuf::from(b"gh"),
2457            ])),
2458        };
2459        let prefix = bufs.split_to(3);
2460        assert_eq!(prefix.coalesce(), b"abc");
2461        assert_eq!(bufs.coalesce(), b"defgh");
2462    }
2463
2464    #[test]
2465    #[should_panic(expected = "split_to out of bounds")]
2466    fn test_iobufs_split_to_out_of_bounds() {
2467        let mut bufs = IoBufs::from(b"abc");
2468        let _ = bufs.split_to(4);
2469    }
2470
2471    #[test]
2472    fn test_iobufs_chunk_count() {
2473        assert_eq!(IoBufs::default().chunk_count(), 0);
2474        assert_eq!(IoBufs::from(IoBuf::from(b"a")).chunk_count(), 1);
2475        assert_eq!(
2476            IoBufs::from(vec![IoBuf::from(b"b"), IoBuf::from(b"c")]).chunk_count(),
2477            2
2478        );
2479        assert_eq!(
2480            IoBufs::from(vec![
2481                IoBuf::from(b"a"),
2482                IoBuf::from(b"b"),
2483                IoBuf::from(b"c")
2484            ])
2485            .chunk_count(),
2486            3
2487        );
2488        assert_eq!(
2489            IoBufs::from(vec![
2490                IoBuf::from(b"a"),
2491                IoBuf::from(b"b"),
2492                IoBuf::from(b"c"),
2493                IoBuf::from(b"d")
2494            ])
2495            .chunk_count(),
2496            4
2497        );
2498    }
2499
2500    #[test]
2501    fn test_iobufs_coalesce_after_advance() {
2502        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2503        bufs.append(IoBuf::from(b" world"));
2504
2505        assert_eq!(bufs.len(), 11);
2506
2507        bufs.advance(3);
2508        assert_eq!(bufs.len(), 8);
2509
2510        assert_eq!(bufs.coalesce(), b"lo world");
2511    }
2512
2513    #[test]
2514    fn test_iobufs_coalesce_with_pool() {
2515        let pool = test_pool();
2516
2517        // Single buffer: zero-copy (same pointer)
2518        let buf = IoBuf::from(vec![1u8, 2, 3, 4, 5]);
2519        let original_ptr = buf.as_ptr();
2520        let bufs = IoBufs::from(buf);
2521        let coalesced = bufs.coalesce_with_pool(&pool);
2522        assert_eq!(coalesced, [1, 2, 3, 4, 5]);
2523        assert_eq!(coalesced.as_ptr(), original_ptr);
2524
2525        // Multiple buffers: merged using pool
2526        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2527        bufs.append(IoBuf::from(b" world"));
2528        let coalesced = bufs.coalesce_with_pool(&pool);
2529        assert_eq!(coalesced, b"hello world");
2530
2531        // Multiple buffers after advance: only remaining data coalesced
2532        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2533        bufs.append(IoBuf::from(b" world"));
2534        bufs.advance(3);
2535        let coalesced = bufs.coalesce_with_pool(&pool);
2536        assert_eq!(coalesced, b"lo world");
2537
2538        // Empty buffers in the middle
2539        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2540        bufs.append(IoBuf::default());
2541        bufs.append(IoBuf::from(b" world"));
2542        let coalesced = bufs.coalesce_with_pool(&pool);
2543        assert_eq!(coalesced, b"hello world");
2544
2545        // Empty IoBufs
2546        let bufs = IoBufs::default();
2547        let coalesced = bufs.coalesce_with_pool(&pool);
2548        assert!(coalesced.is_empty());
2549
2550        // 4+ buffers: exercise chunked coalesce-with-pool path.
2551        let bufs = IoBufs::from(vec![
2552            IoBuf::from(b"ab"),
2553            IoBuf::from(b"cd"),
2554            IoBuf::from(b"ef"),
2555            IoBuf::from(b"gh"),
2556        ]);
2557        let coalesced = bufs.coalesce_with_pool(&pool);
2558        assert_eq!(coalesced, b"abcdefgh");
2559        assert!(coalesced.is_pooled());
2560    }
2561
2562    #[test]
2563    fn test_iobufs_empty_chunks_and_copy_to_bytes_paths() {
2564        // Empty chunks are skipped while reading across multiple chunks.
2565        let mut bufs = IoBufs::default();
2566        bufs.append(IoBuf::from(b"hello"));
2567        bufs.append(IoBuf::default());
2568        bufs.append(IoBuf::from(b" "));
2569        bufs.append(IoBuf::default());
2570        bufs.append(IoBuf::from(b"world"));
2571        assert_eq!(bufs.len(), 11);
2572        assert_eq!(bufs.chunk(), b"hello");
2573        bufs.advance(5);
2574        assert_eq!(bufs.chunk(), b" ");
2575        bufs.advance(1);
2576        assert_eq!(bufs.chunk(), b"world");
2577
2578        // Single-buffer copy_to_bytes path.
2579        let mut single = IoBufs::from(b"hello world");
2580        assert_eq!(single.copy_to_bytes(5).as_ref(), b"hello");
2581        assert_eq!(single.remaining(), 6);
2582
2583        // Multi-buffer copy_to_bytes path across boundaries.
2584        let mut multi = IoBufs::from(b"hello");
2585        multi.prepend(IoBuf::from(b"say "));
2586        assert_eq!(multi.copy_to_bytes(7).as_ref(), b"say hel");
2587        assert_eq!(multi.copy_to_bytes(2).as_ref(), b"lo");
2588    }
2589
2590    #[test]
2591    fn test_iobufs_copy_to_bytes_pair_and_triple() {
2592        // Pair: crossing one boundary should collapse to the trailing single chunk.
2593        let mut pair = IoBufs::from(IoBuf::from(b"ab"));
2594        pair.append(IoBuf::from(b"cd"));
2595        let first = pair.copy_to_bytes(3);
2596        assert_eq!(&first[..], b"abc");
2597        assert!(pair.is_single());
2598        assert_eq!(pair.chunk(), b"d");
2599
2600        // Triple: draining across two chunks leaves the final chunk readable.
2601        let mut triple = IoBufs::from(IoBuf::from(b"ab"));
2602        triple.append(IoBuf::from(b"cd"));
2603        triple.append(IoBuf::from(b"ef"));
2604        let first = triple.copy_to_bytes(5);
2605        assert_eq!(&first[..], b"abcde");
2606        assert!(triple.is_single());
2607        assert_eq!(triple.chunk(), b"f");
2608    }
2609
2610    #[test]
2611    fn test_iobufs_copy_to_bytes_chunked_four_plus() {
2612        let mut bufs = IoBufs::from(vec![
2613            IoBuf::from(b"ab"),
2614            IoBuf::from(b"cd"),
2615            IoBuf::from(b"ef"),
2616            IoBuf::from(b"gh"),
2617        ]);
2618
2619        // Chunked fast-path: first chunk alone satisfies request.
2620        let first = bufs.copy_to_bytes(1);
2621        assert_eq!(&first[..], b"a");
2622
2623        // Chunked slow-path: request crosses chunk boundaries.
2624        let second = bufs.copy_to_bytes(4);
2625        assert_eq!(&second[..], b"bcde");
2626
2627        let rest = bufs.copy_to_bytes(3);
2628        assert_eq!(&rest[..], b"fgh");
2629        assert_eq!(bufs.remaining(), 0);
2630    }
2631
2632    #[test]
2633    fn test_iobufs_copy_to_bytes_edge_cases() {
2634        // Leading empty chunk should not affect copied payload.
2635        let mut iobufs = IoBufs::from(IoBuf::from(b""));
2636        iobufs.append(IoBuf::from(b"hello"));
2637        assert_eq!(iobufs.copy_to_bytes(5).as_ref(), b"hello");
2638
2639        // Boundary-aligned reads should return exact chunk payloads in-order.
2640        let mut boundary = IoBufs::from(IoBuf::from(b"hello"));
2641        boundary.append(IoBuf::from(b"world"));
2642        assert_eq!(boundary.copy_to_bytes(5).as_ref(), b"hello");
2643        assert_eq!(boundary.copy_to_bytes(5).as_ref(), b"world");
2644        assert_eq!(boundary.remaining(), 0);
2645    }
2646
2647    #[test]
2648    #[should_panic(expected = "cannot advance past end of buffer")]
2649    fn test_iobufs_advance_past_end() {
2650        let mut bufs = IoBufs::from(b"hel");
2651        bufs.append(IoBuf::from(b"lo"));
2652        bufs.advance(10);
2653    }
2654
2655    #[test]
2656    #[should_panic(expected = "not enough data")]
2657    fn test_iobufs_copy_to_bytes_past_end() {
2658        let mut bufs = IoBufs::from(b"hel");
2659        bufs.append(IoBuf::from(b"lo"));
2660        bufs.copy_to_bytes(10);
2661    }
2662
2663    #[test]
2664    fn test_iobufs_matches_bytes_chain() {
2665        let b1 = Bytes::from_static(b"hello");
2666        let b2 = Bytes::from_static(b" ");
2667        let b3 = Bytes::from_static(b"world");
2668
2669        // Buf parity for remaining/chunk/advance should match `Bytes::chain`.
2670        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
2671        let mut iobufs = IoBufs::from(IoBuf::from(b1.clone()));
2672        iobufs.append(IoBuf::from(b2.clone()));
2673        iobufs.append(IoBuf::from(b3.clone()));
2674
2675        assert_eq!(chain.remaining(), iobufs.remaining());
2676        assert_eq!(chain.chunk(), iobufs.chunk());
2677
2678        chain.advance(3);
2679        iobufs.advance(3);
2680        assert_eq!(chain.remaining(), iobufs.remaining());
2681        assert_eq!(chain.chunk(), iobufs.chunk());
2682
2683        chain.advance(3);
2684        iobufs.advance(3);
2685        assert_eq!(chain.remaining(), iobufs.remaining());
2686        assert_eq!(chain.chunk(), iobufs.chunk());
2687
2688        // Test copy_to_bytes
2689        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
2690        let mut iobufs = IoBufs::from(IoBuf::from(b1));
2691        iobufs.append(IoBuf::from(b2));
2692        iobufs.append(IoBuf::from(b3));
2693
2694        assert_eq!(chain.copy_to_bytes(3), iobufs.copy_to_bytes(3));
2695        assert_eq!(chain.copy_to_bytes(4), iobufs.copy_to_bytes(4));
2696        assert_eq!(
2697            chain.copy_to_bytes(chain.remaining()),
2698            iobufs.copy_to_bytes(iobufs.remaining())
2699        );
2700        assert_eq!(chain.remaining(), 0);
2701        assert_eq!(iobufs.remaining(), 0);
2702    }
2703
2704    #[test]
2705    fn test_iobufs_try_into_single() {
2706        let single = IoBufs::from(IoBuf::from(b"hello"));
2707        let single = single.try_into_single().expect("single expected");
2708        assert_eq!(single, b"hello");
2709
2710        let multi = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2711        let multi = multi.try_into_single().expect_err("multi expected");
2712        assert_eq!(multi.coalesce(), b"abcd");
2713    }
2714
2715    #[test]
2716    fn test_iobufs_chunks_vectored_multiple_slices() {
2717        // Single non-empty buffers should export exactly one slice.
2718        let single = IoBufs::from(IoBuf::from(b"xy"));
2719        let mut single_dst = [IoSlice::new(&[]); 2];
2720        let count = single.chunks_vectored(&mut single_dst);
2721        assert_eq!(count, 1);
2722        assert_eq!(&single_dst[0][..], b"xy");
2723
2724        // Single empty buffers should export no slices.
2725        let empty_single = IoBufs::default();
2726        let mut empty_single_dst = [IoSlice::new(&[]); 1];
2727        assert_eq!(empty_single.chunks_vectored(&mut empty_single_dst), 0);
2728
2729        let bufs = IoBufs::from(vec![
2730            IoBuf::from(b"ab"),
2731            IoBuf::from(b"cd"),
2732            IoBuf::from(b"ef"),
2733            IoBuf::from(b"gh"),
2734        ]);
2735
2736        // Destination capacity should cap how many chunks we export.
2737        let mut small = [IoSlice::new(&[]); 2];
2738        let count = bufs.chunks_vectored(&mut small);
2739        assert_eq!(count, 2);
2740        assert_eq!(&small[0][..], b"ab");
2741        assert_eq!(&small[1][..], b"cd");
2742
2743        // Larger destination should include every readable chunk.
2744        let mut large = [IoSlice::new(&[]); 8];
2745        let count = bufs.chunks_vectored(&mut large);
2746        assert_eq!(count, 4);
2747        assert_eq!(&large[0][..], b"ab");
2748        assert_eq!(&large[1][..], b"cd");
2749        assert_eq!(&large[2][..], b"ef");
2750        assert_eq!(&large[3][..], b"gh");
2751
2752        // Empty destination cannot accept any slices.
2753        let mut empty_dst: [IoSlice<'_>; 0] = [];
2754        assert_eq!(bufs.chunks_vectored(&mut empty_dst), 0);
2755
2756        // Non-canonical shapes should skip empty leading chunks.
2757        let sparse = IoBufs {
2758            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::from(b"x")]),
2759        };
2760        let mut dst = [IoSlice::new(&[]); 2];
2761        let count = sparse.chunks_vectored(&mut dst);
2762        assert_eq!(count, 1);
2763        assert_eq!(&dst[0][..], b"x");
2764
2765        // Triple should skip empty chunks and preserve readable order.
2766        let sparse_triple = IoBufs {
2767            inner: IoBufsInner::Triple([IoBuf::default(), IoBuf::from(b"y"), IoBuf::from(b"z")]),
2768        };
2769        let mut dst = [IoSlice::new(&[]); 3];
2770        let count = sparse_triple.chunks_vectored(&mut dst);
2771        assert_eq!(count, 2);
2772        assert_eq!(&dst[0][..], b"y");
2773        assert_eq!(&dst[1][..], b"z");
2774
2775        // Chunked shapes with only empty buffers should export no slices.
2776        let empty_chunked = IoBufs {
2777            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default(), IoBuf::default()])),
2778        };
2779        let mut dst = [IoSlice::new(&[]); 2];
2780        assert_eq!(empty_chunked.chunks_vectored(&mut dst), 0);
2781    }
2782
2783    #[test]
2784    fn test_iobufsmut_freeze_chunked() {
2785        // Multiple non-empty buffers stay multi-chunk.
2786        let buf1 = IoBufMut::from(b"hello".as_ref());
2787        let buf2 = IoBufMut::from(b" world".as_ref());
2788        let bufs = IoBufsMut::from(vec![buf1, buf2]);
2789        let mut frozen = bufs.freeze();
2790        assert!(!frozen.is_single());
2791        assert_eq!(frozen.chunk(), b"hello");
2792        frozen.advance(5);
2793        assert_eq!(frozen.chunk(), b" world");
2794        frozen.advance(6);
2795        assert_eq!(frozen.remaining(), 0);
2796
2797        // Empty buffers are filtered out.
2798        let buf1 = IoBufMut::from(b"hello".as_ref());
2799        let empty = IoBufMut::default();
2800        let buf2 = IoBufMut::from(b" world".as_ref());
2801        let bufs = IoBufsMut::from(vec![buf1, empty, buf2]);
2802        let mut frozen = bufs.freeze();
2803        assert!(!frozen.is_single());
2804        assert_eq!(frozen.chunk(), b"hello");
2805        frozen.advance(5);
2806        assert_eq!(frozen.chunk(), b" world");
2807        frozen.advance(6);
2808        assert_eq!(frozen.remaining(), 0);
2809
2810        // Collapses to Single when one non-empty buffer remains
2811        let empty1 = IoBufMut::default();
2812        let buf = IoBufMut::from(b"only one".as_ref());
2813        let empty2 = IoBufMut::default();
2814        let bufs = IoBufsMut::from(vec![empty1, buf, empty2]);
2815        let frozen = bufs.freeze();
2816        assert!(frozen.is_single());
2817        assert_eq!(frozen.coalesce(), b"only one");
2818
2819        // All empty buffers -> Single with empty buffer
2820        let empty1 = IoBufMut::default();
2821        let empty2 = IoBufMut::default();
2822        let bufs = IoBufsMut::from(vec![empty1, empty2]);
2823        let frozen = bufs.freeze();
2824        assert!(frozen.is_single());
2825        assert!(frozen.is_empty());
2826    }
2827
2828    #[test]
2829    fn test_iobufsmut_coalesce() {
2830        let buf1 = IoBufMut::from(b"hello");
2831        let buf2 = IoBufMut::from(b" world");
2832        let bufs = IoBufsMut::from(vec![buf1, buf2]);
2833        let coalesced = bufs.coalesce();
2834        assert_eq!(coalesced, b"hello world");
2835    }
2836
2837    #[test]
2838    fn test_iobufsmut_from_vec() {
2839        // Empty Vec becomes Single with empty buffer
2840        let bufs = IoBufsMut::from(Vec::<IoBufMut>::new());
2841        assert!(bufs.is_single());
2842        assert!(bufs.is_empty());
2843
2844        // Vec with one element becomes Single
2845        let buf = IoBufMut::from(b"test");
2846        let bufs = IoBufsMut::from(vec![buf]);
2847        assert!(bufs.is_single());
2848        assert_eq!(bufs.chunk(), b"test");
2849
2850        // Vec with multiple elements becomes multi-chunk.
2851        let buf1 = IoBufMut::from(b"hello");
2852        let buf2 = IoBufMut::from(b" world");
2853        let bufs = IoBufsMut::from(vec![buf1, buf2]);
2854        assert!(!bufs.is_single());
2855    }
2856
2857    #[test]
2858    fn test_iobufsmut_from_vec_filters_empty_chunks() {
2859        let mut bufs = IoBufsMut::from(vec![
2860            IoBufMut::default(),
2861            IoBufMut::from(b"hello"),
2862            IoBufMut::default(),
2863            IoBufMut::from(b" world"),
2864            IoBufMut::default(),
2865        ]);
2866        assert_eq!(bufs.chunk(), b"hello");
2867        bufs.advance(5);
2868        assert_eq!(bufs.chunk(), b" world");
2869        bufs.advance(6);
2870        assert_eq!(bufs.remaining(), 0);
2871    }
2872
2873    #[test]
2874    fn test_iobufsmut_fast_path_shapes() {
2875        let pair = IoBufsMut::from(vec![IoBufMut::from(b"a"), IoBufMut::from(b"b")]);
2876        assert!(matches!(pair.inner, IoBufsMutInner::Pair(_)));
2877
2878        let triple = IoBufsMut::from(vec![
2879            IoBufMut::from(b"a"),
2880            IoBufMut::from(b"b"),
2881            IoBufMut::from(b"c"),
2882        ]);
2883        assert!(matches!(triple.inner, IoBufsMutInner::Triple(_)));
2884
2885        let chunked = IoBufsMut::from(vec![
2886            IoBufMut::from(b"a"),
2887            IoBufMut::from(b"b"),
2888            IoBufMut::from(b"c"),
2889            IoBufMut::from(b"d"),
2890        ]);
2891        assert!(matches!(chunked.inner, IoBufsMutInner::Chunked(_)));
2892    }
2893
2894    #[test]
2895    fn test_iobufsmut_default() {
2896        let bufs = IoBufsMut::default();
2897        assert!(bufs.is_single());
2898        assert!(bufs.is_empty());
2899        assert_eq!(bufs.len(), 0);
2900    }
2901
2902    #[test]
2903    fn test_iobufsmut_from_array() {
2904        let bufs = IoBufsMut::from([1u8, 2, 3, 4, 5]);
2905        assert!(bufs.is_single());
2906        assert_eq!(bufs.len(), 5);
2907        assert_eq!(bufs.chunk(), &[1, 2, 3, 4, 5]);
2908    }
2909
2910    #[test]
2911    fn test_iobufmut_buf_trait() {
2912        let mut buf = IoBufMut::from(b"hello world");
2913        assert_eq!(buf.remaining(), 11);
2914        assert_eq!(buf.chunk(), b"hello world");
2915
2916        buf.advance(6);
2917        assert_eq!(buf.remaining(), 5);
2918        assert_eq!(buf.chunk(), b"world");
2919
2920        buf.advance(5);
2921        assert_eq!(buf.remaining(), 0);
2922        assert!(buf.chunk().is_empty());
2923    }
2924
2925    #[test]
2926    #[should_panic(expected = "cannot advance")]
2927    fn test_iobufmut_advance_past_end() {
2928        let mut buf = IoBufMut::from(b"hello");
2929        buf.advance(10);
2930    }
2931
2932    #[test]
2933    fn test_iobufsmut_buf_trait_chunked() {
2934        let buf1 = IoBufMut::from(b"hello");
2935        let buf2 = IoBufMut::from(b" ");
2936        let buf3 = IoBufMut::from(b"world");
2937        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
2938
2939        assert_eq!(bufs.remaining(), 11);
2940        assert_eq!(bufs.chunk(), b"hello");
2941
2942        // Advance within first buffer
2943        bufs.advance(3);
2944        assert_eq!(bufs.remaining(), 8);
2945        assert_eq!(bufs.chunk(), b"lo");
2946
2947        // Advance past first buffer (should pop_front)
2948        bufs.advance(2);
2949        assert_eq!(bufs.remaining(), 6);
2950        assert_eq!(bufs.chunk(), b" ");
2951
2952        // Advance exactly one buffer
2953        bufs.advance(1);
2954        assert_eq!(bufs.remaining(), 5);
2955        assert_eq!(bufs.chunk(), b"world");
2956
2957        // Advance to end
2958        bufs.advance(5);
2959        assert_eq!(bufs.remaining(), 0);
2960    }
2961
2962    #[test]
2963    #[should_panic(expected = "cannot advance past end of buffer")]
2964    fn test_iobufsmut_advance_past_end() {
2965        let buf1 = IoBufMut::from(b"hello");
2966        let buf2 = IoBufMut::from(b" world");
2967        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
2968        bufs.advance(20);
2969    }
2970
2971    #[test]
2972    fn test_iobufsmut_bufmut_trait_single() {
2973        let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(20));
2974        // BytesMut can grow, so remaining_mut is very large
2975        assert!(bufs.remaining_mut() > 1000);
2976
2977        bufs.put_slice(b"hello");
2978        assert_eq!(bufs.chunk(), b"hello");
2979        assert_eq!(bufs.len(), 5);
2980
2981        bufs.put_slice(b" world");
2982        assert_eq!(bufs.coalesce(), b"hello world");
2983    }
2984
2985    #[test]
2986    fn test_iobufsmut_zeroed_write() {
2987        // Use zeroed buffers which have a fixed length
2988        let bufs = IoBufsMut::from(IoBufMut::zeroed(20));
2989        assert_eq!(bufs.len(), 20);
2990
2991        // Can write using as_mut on coalesced buffer
2992        let mut coalesced = bufs.coalesce();
2993        coalesced.as_mut()[..5].copy_from_slice(b"hello");
2994        assert_eq!(&coalesced.as_ref()[..5], b"hello");
2995    }
2996
2997    #[test]
2998    fn test_iobufsmut_bufmut_put_slice() {
2999        // Test writing across multiple buffers
3000        let buf1 = IoBufMut::with_capacity(5);
3001        let buf2 = IoBufMut::with_capacity(6);
3002        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3003
3004        // Write data
3005        bufs.put_slice(b"hello");
3006        bufs.put_slice(b" world");
3007        assert_eq!(bufs.coalesce(), b"hello world");
3008    }
3009
3010    #[test]
3011    fn test_iobufs_advance_drains_buffers() {
3012        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
3013        bufs.append(IoBuf::from(b" "));
3014        bufs.append(IoBuf::from(b"world"));
3015
3016        // Advance exactly past first buffer
3017        bufs.advance(5);
3018        assert_eq!(bufs.remaining(), 6);
3019        assert_eq!(bufs.chunk(), b" ");
3020
3021        // Advance across multiple buffers
3022        bufs.advance(4);
3023        assert_eq!(bufs.remaining(), 2);
3024        assert_eq!(bufs.chunk(), b"ld");
3025    }
3026
3027    #[test]
3028    fn test_iobufs_advance_exactly_to_boundary() {
3029        let mut bufs = IoBufs::from(IoBuf::from(b"abc"));
3030        bufs.append(IoBuf::from(b"def"));
3031
3032        // Advance exactly to first buffer boundary
3033        bufs.advance(3);
3034        assert_eq!(bufs.remaining(), 3);
3035        assert_eq!(bufs.chunk(), b"def");
3036
3037        // Advance exactly to end
3038        bufs.advance(3);
3039        assert_eq!(bufs.remaining(), 0);
3040    }
3041
3042    #[test]
3043    fn test_iobufs_advance_canonicalizes_pair_to_single() {
3044        let mut bufs = IoBufs::from(IoBuf::from(b"ab"));
3045        bufs.append(IoBuf::from(b"cd"));
3046        bufs.advance(2);
3047        assert!(bufs.is_single());
3048        assert_eq!(bufs.chunk(), b"cd");
3049    }
3050
3051    #[test]
3052    fn test_iobufsmut_with_empty_buffers() {
3053        let buf1 = IoBufMut::from(b"hello");
3054        let buf2 = IoBufMut::default();
3055        let buf3 = IoBufMut::from(b" world");
3056        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
3057
3058        assert_eq!(bufs.remaining(), 11);
3059        assert_eq!(bufs.chunk(), b"hello");
3060
3061        // Advance past first buffer
3062        bufs.advance(5);
3063        // Empty buffer should be skipped
3064        assert_eq!(bufs.chunk(), b" world");
3065        assert_eq!(bufs.remaining(), 6);
3066    }
3067
3068    #[test]
3069    fn test_iobufsmut_advance_skips_leading_writable_empty_chunk() {
3070        let empty_writable = IoBufMut::with_capacity(4);
3071        let payload = IoBufMut::from(b"xy");
3072        let mut bufs = IoBufsMut::from(vec![empty_writable, payload]);
3073
3074        bufs.advance(1);
3075        assert_eq!(bufs.chunk(), b"y");
3076        assert_eq!(bufs.remaining(), 1);
3077    }
3078
3079    #[test]
3080    fn test_iobufsmut_coalesce_after_advance() {
3081        let buf1 = IoBufMut::from(b"hello");
3082        let buf2 = IoBufMut::from(b" world");
3083        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3084
3085        bufs.advance(3);
3086        assert_eq!(bufs.coalesce(), b"lo world");
3087    }
3088
3089    #[test]
3090    fn test_iobufsmut_copy_to_bytes() {
3091        let buf1 = IoBufMut::from(b"hello");
3092        let buf2 = IoBufMut::from(b" world");
3093        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3094
3095        // First read spans chunks and leaves unread suffix.
3096        let first = bufs.copy_to_bytes(7);
3097        assert_eq!(&first[..], b"hello w");
3098        assert_eq!(bufs.remaining(), 4);
3099
3100        // Second read drains the remainder.
3101        let rest = bufs.copy_to_bytes(4);
3102        assert_eq!(&rest[..], b"orld");
3103        assert_eq!(bufs.remaining(), 0);
3104    }
3105
3106    #[test]
3107    fn test_iobufsmut_copy_to_bytes_chunked_four_plus() {
3108        let mut bufs = IoBufsMut::from(vec![
3109            IoBufMut::from(b"ab"),
3110            IoBufMut::from(b"cd"),
3111            IoBufMut::from(b"ef"),
3112            IoBufMut::from(b"gh"),
3113        ]);
3114
3115        // Exercise chunked advance path before copy_to_bytes.
3116        bufs.advance(1);
3117        assert_eq!(bufs.chunk(), b"b");
3118        bufs.advance(1);
3119        assert_eq!(bufs.chunk(), b"cd");
3120
3121        // Chunked fast-path: first chunk alone satisfies request.
3122        let first = bufs.copy_to_bytes(1);
3123        assert_eq!(&first[..], b"c");
3124
3125        // Chunked slow-path: request crosses chunk boundaries.
3126        let second = bufs.copy_to_bytes(4);
3127        assert_eq!(&second[..], b"defg");
3128
3129        let rest = bufs.copy_to_bytes(1);
3130        assert_eq!(&rest[..], b"h");
3131        assert_eq!(bufs.remaining(), 0);
3132
3133        // Enter copy_to_bytes while still in chunked representation.
3134        let mut bufs = IoBufsMut::from(vec![
3135            IoBufMut::from(b"a"),
3136            IoBufMut::from(b"b"),
3137            IoBufMut::from(b"c"),
3138            IoBufMut::from(b"d"),
3139            IoBufMut::from(b"e"),
3140        ]);
3141        assert!(matches!(bufs.inner, IoBufsMutInner::Chunked(_)));
3142        let first = bufs.copy_to_bytes(1);
3143        assert_eq!(&first[..], b"a");
3144        assert_eq!(bufs.remaining(), 4);
3145    }
3146
3147    #[test]
3148    fn test_iobufsmut_copy_to_bytes_canonicalizes_pair() {
3149        let mut bufs = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3150        assert!(matches!(bufs.inner, IoBufsMutInner::Pair(_)));
3151
3152        let first = bufs.copy_to_bytes(2);
3153        assert_eq!(&first[..], b"ab");
3154
3155        assert!(bufs.is_single());
3156        assert_eq!(bufs.chunk(), b"cd");
3157        assert_eq!(bufs.remaining(), 2);
3158    }
3159
3160    #[test]
3161    fn test_iobufsmut_copy_from_slice_single() {
3162        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(11));
3163        bufs.copy_from_slice(b"hello world");
3164        assert_eq!(bufs.coalesce(), b"hello world");
3165    }
3166
3167    #[test]
3168    fn test_iobufsmut_copy_from_slice_chunked() {
3169        let buf1 = IoBufMut::zeroed(5);
3170        let buf2 = IoBufMut::zeroed(6);
3171        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3172
3173        bufs.copy_from_slice(b"hello world");
3174
3175        // Verify each chunk was filled correctly.
3176        assert_eq!(bufs.chunk(), b"hello");
3177        bufs.advance(5);
3178        assert_eq!(bufs.chunk(), b" world");
3179        bufs.advance(6);
3180        assert_eq!(bufs.remaining(), 0);
3181    }
3182
3183    #[test]
3184    #[should_panic(expected = "source slice length must match buffer length")]
3185    fn test_iobufsmut_copy_from_slice_wrong_length() {
3186        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(5));
3187        bufs.copy_from_slice(b"hello world"); // 11 bytes into 5-byte buffer
3188    }
3189
3190    #[test]
3191    fn test_iobufsmut_matches_bytesmut_chain() {
3192        // Create three BytesMut with capacity
3193        let mut bm1 = BytesMut::with_capacity(5);
3194        let mut bm2 = BytesMut::with_capacity(6);
3195        let mut bm3 = BytesMut::with_capacity(7);
3196
3197        // Create matching IoBufsMut
3198        let mut iobufs = IoBufsMut::from(vec![
3199            IoBufMut::with_capacity(5),
3200            IoBufMut::with_capacity(6),
3201            IoBufMut::with_capacity(7),
3202        ]);
3203
3204        // Test initial chunk_mut length matches (spare capacity)
3205        let chain_len = (&mut bm1)
3206            .chain_mut(&mut bm2)
3207            .chain_mut(&mut bm3)
3208            .chunk_mut()
3209            .len();
3210        let iobufs_len = iobufs.chunk_mut().len();
3211        assert_eq!(chain_len, iobufs_len);
3212
3213        // Write some data
3214        (&mut bm1)
3215            .chain_mut(&mut bm2)
3216            .chain_mut(&mut bm3)
3217            .put_slice(b"hel");
3218        iobufs.put_slice(b"hel");
3219
3220        // Verify chunk_mut matches after partial write
3221        let chain_len = (&mut bm1)
3222            .chain_mut(&mut bm2)
3223            .chain_mut(&mut bm3)
3224            .chunk_mut()
3225            .len();
3226        let iobufs_len = iobufs.chunk_mut().len();
3227        assert_eq!(chain_len, iobufs_len);
3228
3229        // Write more data
3230        (&mut bm1)
3231            .chain_mut(&mut bm2)
3232            .chain_mut(&mut bm3)
3233            .put_slice(b"lo world!");
3234        iobufs.put_slice(b"lo world!");
3235
3236        // Verify chunk_mut matches after more writes
3237        let chain_len = (&mut bm1)
3238            .chain_mut(&mut bm2)
3239            .chain_mut(&mut bm3)
3240            .chunk_mut()
3241            .len();
3242        let iobufs_len = iobufs.chunk_mut().len();
3243        assert_eq!(chain_len, iobufs_len);
3244
3245        // Verify final content matches
3246        let frozen = iobufs.freeze().coalesce();
3247        let mut chain_content = bm1.to_vec();
3248        chain_content.extend_from_slice(&bm2);
3249        chain_content.extend_from_slice(&bm3);
3250        assert_eq!(frozen, chain_content.as_slice());
3251        assert_eq!(frozen, b"hello world!");
3252    }
3253
3254    #[test]
3255    fn test_iobufsmut_buf_matches_bytes_chain() {
3256        // Create pre-filled Bytes buffers
3257        let mut b1 = Bytes::from_static(b"hello");
3258        let mut b2 = Bytes::from_static(b" world");
3259        let b3 = Bytes::from_static(b"!");
3260
3261        // Create matching IoBufsMut
3262        let mut iobufs = IoBufsMut::from(vec![
3263            IoBufMut::from(b"hello"),
3264            IoBufMut::from(b" world"),
3265            IoBufMut::from(b"!"),
3266        ]);
3267
3268        // Test Buf::remaining matches
3269        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3270        assert_eq!(chain_remaining, iobufs.remaining());
3271
3272        // Test Buf::chunk matches
3273        let chain_chunk = b1
3274            .clone()
3275            .chain(b2.clone())
3276            .chain(b3.clone())
3277            .chunk()
3278            .to_vec();
3279        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3280
3281        // Advance and test again
3282        b1.advance(3);
3283        iobufs.advance(3);
3284
3285        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3286        assert_eq!(chain_remaining, iobufs.remaining());
3287
3288        let chain_chunk = b1
3289            .clone()
3290            .chain(b2.clone())
3291            .chain(b3.clone())
3292            .chunk()
3293            .to_vec();
3294        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3295
3296        // Advance past first buffer boundary into second
3297        b1.advance(2);
3298        iobufs.advance(2);
3299
3300        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3301        assert_eq!(chain_remaining, iobufs.remaining());
3302
3303        // Now we should be in the second buffer
3304        let chain_chunk = b1
3305            .clone()
3306            .chain(b2.clone())
3307            .chain(b3.clone())
3308            .chunk()
3309            .to_vec();
3310        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3311
3312        // Advance past second buffer boundary into third
3313        b2.advance(6);
3314        iobufs.advance(6);
3315
3316        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3317        assert_eq!(chain_remaining, iobufs.remaining());
3318
3319        // Now we should be in the third buffer
3320        let chain_chunk = b1.chain(b2).chain(b3).chunk().to_vec();
3321        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3322
3323        // Test copy_to_bytes
3324        let b1 = Bytes::from_static(b"hello");
3325        let b2 = Bytes::from_static(b" world");
3326        let b3 = Bytes::from_static(b"!");
3327        let mut iobufs = IoBufsMut::from(vec![
3328            IoBufMut::from(b"hello"),
3329            IoBufMut::from(b" world"),
3330            IoBufMut::from(b"!"),
3331        ]);
3332
3333        let chain_bytes = b1.chain(b2).chain(b3).copy_to_bytes(8);
3334        let iobufs_bytes = iobufs.copy_to_bytes(8);
3335        assert_eq!(chain_bytes, iobufs_bytes);
3336        assert_eq!(chain_bytes.as_ref(), b"hello wo");
3337    }
3338
3339    #[test]
3340    fn test_iobufsmut_chunks_vectored_multiple_slices() {
3341        // Single non-empty buffers should export exactly one slice.
3342        let single = IoBufsMut::from(IoBufMut::from(b"xy"));
3343        let mut single_dst = [IoSlice::new(&[]); 2];
3344        let count = single.chunks_vectored(&mut single_dst);
3345        assert_eq!(count, 1);
3346        assert_eq!(&single_dst[0][..], b"xy");
3347
3348        // Single empty buffers should export no slices.
3349        let empty_single = IoBufsMut::default();
3350        let mut empty_single_dst = [IoSlice::new(&[]); 1];
3351        assert_eq!(empty_single.chunks_vectored(&mut empty_single_dst), 0);
3352
3353        let bufs = IoBufsMut::from(vec![
3354            IoBufMut::from(b"ab"),
3355            IoBufMut::from(b"cd"),
3356            IoBufMut::from(b"ef"),
3357            IoBufMut::from(b"gh"),
3358        ]);
3359
3360        // Destination capacity should cap how many chunks we export.
3361        let mut small = [IoSlice::new(&[]); 2];
3362        let count = bufs.chunks_vectored(&mut small);
3363        assert_eq!(count, 2);
3364        assert_eq!(&small[0][..], b"ab");
3365        assert_eq!(&small[1][..], b"cd");
3366
3367        // Larger destination should include every readable chunk.
3368        let mut large = [IoSlice::new(&[]); 8];
3369        let count = bufs.chunks_vectored(&mut large);
3370        assert_eq!(count, 4);
3371        assert_eq!(&large[0][..], b"ab");
3372        assert_eq!(&large[1][..], b"cd");
3373        assert_eq!(&large[2][..], b"ef");
3374        assert_eq!(&large[3][..], b"gh");
3375
3376        // Empty destination cannot accept any slices.
3377        let mut empty_dst: [IoSlice<'_>; 0] = [];
3378        assert_eq!(bufs.chunks_vectored(&mut empty_dst), 0);
3379
3380        // Non-canonical shapes should skip empty leading chunks.
3381        let sparse = IoBufsMut {
3382            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::from(b"y")]),
3383        };
3384        let mut dst = [IoSlice::new(&[]); 2];
3385        let count = sparse.chunks_vectored(&mut dst);
3386        assert_eq!(count, 1);
3387        assert_eq!(&dst[0][..], b"y");
3388
3389        // Triple should skip empty chunks and preserve readable order.
3390        let sparse_triple = IoBufsMut {
3391            inner: IoBufsMutInner::Triple([
3392                IoBufMut::default(),
3393                IoBufMut::from(b"z"),
3394                IoBufMut::from(b"w"),
3395            ]),
3396        };
3397        let mut dst = [IoSlice::new(&[]); 3];
3398        let count = sparse_triple.chunks_vectored(&mut dst);
3399        assert_eq!(count, 2);
3400        assert_eq!(&dst[0][..], b"z");
3401        assert_eq!(&dst[1][..], b"w");
3402
3403        // Chunked shapes with only empty buffers should export no slices.
3404        let empty_chunked = IoBufsMut {
3405            inner: IoBufsMutInner::Chunked(VecDeque::from([
3406                IoBufMut::default(),
3407                IoBufMut::default(),
3408            ])),
3409        };
3410        let mut dst = [IoSlice::new(&[]); 2];
3411        assert_eq!(empty_chunked.chunks_vectored(&mut dst), 0);
3412    }
3413
3414    #[test]
3415    fn test_iobufsmut_try_into_single() {
3416        let single = IoBufsMut::from(IoBufMut::from(b"hello"));
3417        let single = single.try_into_single().expect("single expected");
3418        assert_eq!(single, b"hello");
3419
3420        let multi = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3421        let multi = multi.try_into_single().expect_err("multi expected");
3422        assert_eq!(multi.coalesce(), b"abcd");
3423    }
3424
3425    #[test]
3426    fn test_iobufsmut_freeze_after_advance() {
3427        let buf1 = IoBufMut::from(b"hello");
3428        let buf2 = IoBufMut::from(b" world");
3429        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3430
3431        // Advance partway through first buffer
3432        bufs.advance(3);
3433        assert_eq!(bufs.len(), 8);
3434
3435        // Freeze and verify only remaining data is preserved
3436        let frozen = bufs.freeze();
3437        assert_eq!(frozen.len(), 8);
3438        assert_eq!(frozen.coalesce(), b"lo world");
3439    }
3440
3441    #[test]
3442    fn test_iobufsmut_freeze_after_advance_to_boundary() {
3443        let buf1 = IoBufMut::from(b"hello");
3444        let buf2 = IoBufMut::from(b" world");
3445        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3446
3447        // Advance exactly to first buffer boundary
3448        bufs.advance(5);
3449        assert_eq!(bufs.len(), 6);
3450
3451        // First buffer should be fully consumed (empty after advance)
3452        // freeze() filters empty buffers, so result should be Single
3453        let frozen = bufs.freeze();
3454        assert!(frozen.is_single());
3455        assert_eq!(frozen.coalesce(), b" world");
3456    }
3457
3458    #[test]
3459    fn test_iobufsmut_coalesce_after_advance_to_boundary() {
3460        let buf1 = IoBufMut::from(b"hello");
3461        let buf2 = IoBufMut::from(b" world");
3462        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3463
3464        // Advance exactly past first buffer
3465        bufs.advance(5);
3466
3467        // Coalesce should only include second buffer's data
3468        assert_eq!(bufs.coalesce(), b" world");
3469    }
3470
3471    #[test]
3472    fn test_iobufsmut_coalesce_with_pool() {
3473        let pool = test_pool();
3474
3475        // Single buffer: zero-copy (same pointer)
3476        let mut buf = IoBufMut::from(b"hello");
3477        let original_ptr = buf.as_mut_ptr();
3478        let bufs = IoBufsMut::from(buf);
3479        let coalesced = bufs.coalesce_with_pool(&pool);
3480        assert_eq!(coalesced, b"hello");
3481        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
3482
3483        // Multiple buffers: merged using pool
3484        let bufs = IoBufsMut::from(vec![IoBufMut::from(b"hello"), IoBufMut::from(b" world")]);
3485        let coalesced = bufs.coalesce_with_pool(&pool);
3486        assert_eq!(coalesced, b"hello world");
3487        assert!(coalesced.is_pooled());
3488
3489        // With extra capacity: zero-copy if sufficient spare capacity
3490        let mut buf = IoBufMut::with_capacity(100);
3491        buf.put_slice(b"hello");
3492        let original_ptr = buf.as_mut_ptr();
3493        let bufs = IoBufsMut::from(buf);
3494        let coalesced = bufs.coalesce_with_pool_extra(&pool, 10);
3495        assert_eq!(coalesced, b"hello");
3496        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
3497
3498        // With extra capacity: reallocates if insufficient
3499        let mut buf = IoBufMut::with_capacity(5);
3500        buf.put_slice(b"hello");
3501        let bufs = IoBufsMut::from(buf);
3502        let coalesced = bufs.coalesce_with_pool_extra(&pool, 100);
3503        assert_eq!(coalesced, b"hello");
3504        assert!(coalesced.capacity() >= 105);
3505    }
3506
3507    #[test]
3508    fn test_iobuf_additional_conversion_and_trait_paths() {
3509        let pool = test_pool();
3510
3511        let mut pooled_mut = pool.alloc(4);
3512        pooled_mut.put_slice(b"data");
3513        let pooled = pooled_mut.freeze();
3514        assert!(!pooled.as_ptr().is_null());
3515
3516        let unique = IoBuf::from(Bytes::from(vec![1u8, 2, 3]));
3517        let unique_mut = unique.try_into_mut().expect("unique bytes should convert");
3518        assert_eq!(unique_mut.as_ref(), &[1u8, 2, 3]);
3519
3520        let shared = IoBuf::from(Bytes::from(vec![4u8, 5, 6]));
3521        let _shared_clone = shared.clone();
3522        assert!(shared.try_into_mut().is_err());
3523
3524        let expected: &[u8] = &[9u8, 8];
3525        let eq_buf = IoBuf::from(vec![9u8, 8]);
3526        assert!(PartialEq::<[u8]>::eq(&eq_buf, expected));
3527
3528        let static_slice: &'static [u8] = b"static";
3529        assert_eq!(IoBuf::from(static_slice), b"static");
3530
3531        let mut pooled_mut = pool.alloc(3);
3532        pooled_mut.put_slice(b"xyz");
3533        let pooled = pooled_mut.freeze();
3534        let vec_out: Vec<u8> = pooled.clone().into();
3535        let bytes_out: Bytes = pooled.into();
3536        assert_eq!(vec_out, b"xyz");
3537        assert_eq!(bytes_out.as_ref(), b"xyz");
3538    }
3539
3540    #[test]
3541    fn test_iobufmut_additional_conversion_and_trait_paths() {
3542        // Basic mutable operations should keep readable bytes consistent.
3543        let mut buf = IoBufMut::from(vec![1u8, 2, 3, 4]);
3544        assert!(!buf.is_empty());
3545        buf.truncate(2);
3546        assert_eq!(buf.as_ref(), &[1u8, 2]);
3547        buf.clear();
3548        assert!(buf.is_empty());
3549        buf.put_slice(b"xyz");
3550
3551        // Equality should work across slice, array, and byte-string forms.
3552        let expected: &[u8] = b"xyz";
3553        assert!(PartialEq::<[u8]>::eq(&buf, expected));
3554        assert!(buf == b"xyz"[..]);
3555        assert!(buf == [b'x', b'y', b'z']);
3556        assert!(buf == b"xyz");
3557
3558        // Conversions from common owned/shared containers preserve contents.
3559        let from_vec = IoBufMut::from(vec![7u8, 8]);
3560        assert_eq!(from_vec.as_ref(), &[7u8, 8]);
3561
3562        let from_bytesmut = IoBufMut::from(BytesMut::from(&b"hi"[..]));
3563        assert_eq!(from_bytesmut.as_ref(), b"hi");
3564
3565        let from_bytes = IoBufMut::from(Bytes::from_static(b"ok"));
3566        assert_eq!(from_bytes.as_ref(), b"ok");
3567
3568        // `Bytes::from_static` cannot be converted to mutable without copy.
3569        let from_iobuf = IoBufMut::from(IoBuf::from(Bytes::from_static(b"io")));
3570        assert_eq!(from_iobuf.as_ref(), b"io");
3571    }
3572
3573    #[test]
3574    fn test_iobufs_additional_shape_and_conversion_paths() {
3575        let pool = test_pool();
3576
3577        // Constructor coverage for mutable/immutable/slice-backed inputs.
3578        let from_mut = IoBufs::from(IoBufMut::from(b"m"));
3579        assert_eq!(from_mut.chunk(), b"m");
3580        let from_bytes = IoBufs::from(Bytes::from_static(b"b"));
3581        assert_eq!(from_bytes.chunk(), b"b");
3582        let from_bytesmut = IoBufs::from(BytesMut::from(&b"bm"[..]));
3583        assert_eq!(from_bytesmut.chunk(), b"bm");
3584        let from_vec = IoBufs::from(vec![1u8, 2u8]);
3585        assert_eq!(from_vec.chunk(), &[1u8, 2]);
3586        let static_slice: &'static [u8] = b"slice";
3587        let from_static = IoBufs::from(static_slice);
3588        assert_eq!(from_static.chunk(), b"slice");
3589
3590        // Canonicalizing an already-empty buffer remains a single empty chunk.
3591        let mut single_empty = IoBufs::default();
3592        single_empty.canonicalize();
3593        assert!(single_empty.is_single());
3594
3595        // Triple path: prepend/append can promote into chunked while preserving order.
3596        let mut triple = IoBufs::from(vec![
3597            IoBuf::from(b"a".to_vec()),
3598            IoBuf::from(b"b".to_vec()),
3599            IoBuf::from(b"c".to_vec()),
3600        ]);
3601        assert!(triple.as_single().is_none());
3602        triple.prepend(IoBuf::from(vec![b'0']));
3603        triple.prepend(IoBuf::from(vec![b'1']));
3604        triple.append(IoBuf::from(vec![b'2']));
3605        assert_eq!(triple.copy_to_bytes(triple.remaining()).as_ref(), b"10abc2");
3606
3607        // Appending to an existing triple keeps byte order stable.
3608        let mut triple_append = IoBufs::from(vec![
3609            IoBuf::from(b"x".to_vec()),
3610            IoBuf::from(b"y".to_vec()),
3611            IoBuf::from(b"z".to_vec()),
3612        ]);
3613        triple_append.append(IoBuf::from(vec![b'w']));
3614        assert_eq!(triple_append.coalesce(), b"xyzw");
3615
3616        // coalesce_with_pool on a triple should preserve contents.
3617        let triple_pool = IoBufs::from(vec![
3618            IoBuf::from(b"a".to_vec()),
3619            IoBuf::from(b"b".to_vec()),
3620            IoBuf::from(b"c".to_vec()),
3621        ]);
3622        assert_eq!(triple_pool.coalesce_with_pool(&pool), b"abc");
3623
3624        // coalesce_with_pool on 4+ chunks should read only remaining bytes.
3625        let mut chunked_pool = IoBufs::from(vec![
3626            IoBuf::from(b"a".to_vec()),
3627            IoBuf::from(b"b".to_vec()),
3628            IoBuf::from(b"c".to_vec()),
3629            IoBuf::from(b"d".to_vec()),
3630        ]);
3631        assert_eq!(chunked_pool.remaining(), 4);
3632        chunked_pool.advance(1);
3633        assert_eq!(chunked_pool.coalesce_with_pool(&pool), b"bcd");
3634
3635        // Non-canonical Pair/Triple/Chunked shapes should still expose the first readable chunk.
3636        let pair_second = IoBufs {
3637            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::from(vec![1u8])]),
3638        };
3639        assert_eq!(pair_second.chunk(), &[1u8]);
3640        let pair_empty = IoBufs {
3641            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::default()]),
3642        };
3643        assert_eq!(pair_empty.chunk(), b"");
3644
3645        let triple_third = IoBufs {
3646            inner: IoBufsInner::Triple([
3647                IoBuf::default(),
3648                IoBuf::default(),
3649                IoBuf::from(vec![3u8]),
3650            ]),
3651        };
3652        assert_eq!(triple_third.chunk(), &[3u8]);
3653        let triple_second = IoBufs {
3654            inner: IoBufsInner::Triple([
3655                IoBuf::default(),
3656                IoBuf::from(vec![2u8]),
3657                IoBuf::default(),
3658            ]),
3659        };
3660        assert_eq!(triple_second.chunk(), &[2u8]);
3661        let triple_empty = IoBufs {
3662            inner: IoBufsInner::Triple([IoBuf::default(), IoBuf::default(), IoBuf::default()]),
3663        };
3664        assert_eq!(triple_empty.chunk(), b"");
3665
3666        let chunked_second = IoBufs {
3667            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default(), IoBuf::from(vec![9u8])])),
3668        };
3669        assert_eq!(chunked_second.chunk(), &[9u8]);
3670        let chunked_empty = IoBufs {
3671            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default()])),
3672        };
3673        assert_eq!(chunked_empty.chunk(), b"");
3674    }
3675
3676    #[test]
3677    fn test_iobufsmut_additional_shape_and_conversion_paths() {
3678        // `as_single` accessors should work only for single-shape containers.
3679        let mut single = IoBufsMut::from(IoBufMut::from(b"x"));
3680        assert!(single.as_single().is_some());
3681        assert!(single.as_single_mut().is_some());
3682        single.canonicalize();
3683        assert!(single.is_single());
3684
3685        let mut pair = IoBufsMut::from(vec![IoBufMut::from(b"a"), IoBufMut::from(b"b")]);
3686        assert!(pair.as_single().is_none());
3687        assert!(pair.as_single_mut().is_none());
3688
3689        // Constructor coverage for raw vec and BytesMut sources.
3690        let from_vec = IoBufsMut::from(vec![1u8, 2u8]);
3691        assert_eq!(from_vec.chunk(), &[1u8, 2]);
3692        let from_bytesmut = IoBufsMut::from(BytesMut::from(&b"cd"[..]));
3693        assert_eq!(from_bytesmut.chunk(), b"cd");
3694
3695        // Chunked write path: set_len + copy_from_slice + freeze round-trip.
3696        let mut chunked = IoBufsMut::from(vec![
3697            IoBufMut::with_capacity(1),
3698            IoBufMut::with_capacity(1),
3699            IoBufMut::with_capacity(1),
3700            IoBufMut::with_capacity(1),
3701        ]);
3702        // SAFETY: We only write/read initialized bytes after `copy_from_slice`.
3703        unsafe { chunked.set_len(4) };
3704        chunked.copy_from_slice(b"wxyz");
3705        assert_eq!(chunked.capacity(), 4);
3706        assert_eq!(chunked.remaining(), 4);
3707        let frozen = chunked.freeze();
3708        assert_eq!(frozen.coalesce(), b"wxyz");
3709    }
3710
3711    #[test]
3712    fn test_iobufsmut_coalesce_multi_shape_paths() {
3713        let pool = test_pool();
3714
3715        // Pair: plain coalesce and pool-backed coalesce-with-extra.
3716        let pair = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3717        assert_eq!(pair.coalesce(), b"abcd");
3718        let pair = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3719        let pair_extra = pair.coalesce_with_pool_extra(&pool, 3);
3720        assert_eq!(pair_extra, b"abcd");
3721        assert!(pair_extra.capacity() >= 7);
3722
3723        // Triple: both coalesce paths should preserve payload and requested spare capacity.
3724        let triple = IoBufsMut::from(vec![
3725            IoBufMut::from(b"a"),
3726            IoBufMut::from(b"b"),
3727            IoBufMut::from(b"c"),
3728        ]);
3729        assert_eq!(triple.coalesce(), b"abc");
3730        let triple = IoBufsMut::from(vec![
3731            IoBufMut::from(b"a"),
3732            IoBufMut::from(b"b"),
3733            IoBufMut::from(b"c"),
3734        ]);
3735        let triple_extra = triple.coalesce_with_pool_extra(&pool, 2);
3736        assert_eq!(triple_extra, b"abc");
3737        assert!(triple_extra.capacity() >= 5);
3738
3739        // Chunked (4+): same expectations as pair/triple for content + capacity.
3740        let chunked = IoBufsMut::from(vec![
3741            IoBufMut::from(b"1"),
3742            IoBufMut::from(b"2"),
3743            IoBufMut::from(b"3"),
3744            IoBufMut::from(b"4"),
3745        ]);
3746        assert_eq!(chunked.coalesce(), b"1234");
3747        let chunked = IoBufsMut::from(vec![
3748            IoBufMut::from(b"1"),
3749            IoBufMut::from(b"2"),
3750            IoBufMut::from(b"3"),
3751            IoBufMut::from(b"4"),
3752        ]);
3753        let chunked_extra = chunked.coalesce_with_pool_extra(&pool, 5);
3754        assert_eq!(chunked_extra, b"1234");
3755        assert!(chunked_extra.capacity() >= 9);
3756    }
3757
3758    #[test]
3759    fn test_iobufsmut_noncanonical_chunk_and_chunk_mut_paths() {
3760        fn no_spare_capacity_buf(pool: &BufferPool) -> IoBufMut {
3761            let mut buf = pool.alloc(1);
3762            let cap = buf.capacity();
3763            // SAFETY: We never read from this buffer in this helper.
3764            unsafe { buf.set_len(cap) };
3765            buf
3766        }
3767        let pool = test_pool();
3768
3769        // `chunk()` should skip empty front buffers across all shapes.
3770        let pair_second = IoBufsMut {
3771            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::from(b"b")]),
3772        };
3773        assert_eq!(pair_second.chunk(), b"b");
3774        let pair_empty = IoBufsMut {
3775            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::default()]),
3776        };
3777        assert_eq!(pair_empty.chunk(), b"");
3778
3779        let triple_third = IoBufsMut {
3780            inner: IoBufsMutInner::Triple([
3781                IoBufMut::default(),
3782                IoBufMut::default(),
3783                IoBufMut::from(b"c"),
3784            ]),
3785        };
3786        assert_eq!(triple_third.chunk(), b"c");
3787        let triple_second = IoBufsMut {
3788            inner: IoBufsMutInner::Triple([
3789                IoBufMut::default(),
3790                IoBufMut::from(b"b"),
3791                IoBufMut::default(),
3792            ]),
3793        };
3794        assert_eq!(triple_second.chunk(), b"b");
3795        let triple_empty = IoBufsMut {
3796            inner: IoBufsMutInner::Triple([
3797                IoBufMut::default(),
3798                IoBufMut::default(),
3799                IoBufMut::default(),
3800            ]),
3801        };
3802        assert_eq!(triple_empty.chunk(), b"");
3803
3804        let chunked_second = IoBufsMut {
3805            inner: IoBufsMutInner::Chunked(VecDeque::from([
3806                IoBufMut::default(),
3807                IoBufMut::from(b"d"),
3808            ])),
3809        };
3810        assert_eq!(chunked_second.chunk(), b"d");
3811        let chunked_empty = IoBufsMut {
3812            inner: IoBufsMutInner::Chunked(VecDeque::from([IoBufMut::default()])),
3813        };
3814        assert_eq!(chunked_empty.chunk(), b"");
3815
3816        // `chunk_mut()` should skip non-writable fronts and return first writable chunk.
3817        let mut pair_chunk_mut = IoBufsMut {
3818            inner: IoBufsMutInner::Pair([no_spare_capacity_buf(&pool), IoBufMut::with_capacity(2)]),
3819        };
3820        assert!(pair_chunk_mut.chunk_mut().len() >= 2);
3821
3822        let mut pair_chunk_mut_empty = IoBufsMut {
3823            inner: IoBufsMutInner::Pair([
3824                no_spare_capacity_buf(&pool),
3825                no_spare_capacity_buf(&pool),
3826            ]),
3827        };
3828        assert_eq!(pair_chunk_mut_empty.chunk_mut().len(), 0);
3829
3830        let mut triple_chunk_mut = IoBufsMut {
3831            inner: IoBufsMutInner::Triple([
3832                no_spare_capacity_buf(&pool),
3833                no_spare_capacity_buf(&pool),
3834                IoBufMut::with_capacity(3),
3835            ]),
3836        };
3837        assert!(triple_chunk_mut.chunk_mut().len() >= 3);
3838        let mut triple_chunk_mut_second = IoBufsMut {
3839            inner: IoBufsMutInner::Triple([
3840                no_spare_capacity_buf(&pool),
3841                IoBufMut::with_capacity(2),
3842                no_spare_capacity_buf(&pool),
3843            ]),
3844        };
3845        assert!(triple_chunk_mut_second.chunk_mut().len() >= 2);
3846
3847        let mut triple_chunk_mut_empty = IoBufsMut {
3848            inner: IoBufsMutInner::Triple([
3849                no_spare_capacity_buf(&pool),
3850                no_spare_capacity_buf(&pool),
3851                no_spare_capacity_buf(&pool),
3852            ]),
3853        };
3854        assert_eq!(triple_chunk_mut_empty.chunk_mut().len(), 0);
3855
3856        let mut chunked_chunk_mut = IoBufsMut {
3857            inner: IoBufsMutInner::Chunked(VecDeque::from([
3858                IoBufMut::default(),
3859                IoBufMut::with_capacity(4),
3860            ])),
3861        };
3862        assert!(chunked_chunk_mut.chunk_mut().len() >= 4);
3863
3864        let mut chunked_chunk_mut_empty = IoBufsMut {
3865            inner: IoBufsMutInner::Chunked(VecDeque::from([no_spare_capacity_buf(&pool)])),
3866        };
3867        assert_eq!(chunked_chunk_mut_empty.chunk_mut().len(), 0);
3868    }
3869
3870    #[test]
3871    fn test_iobuf_internal_chunk_helpers() {
3872        // `copy_to_bytes_chunked` should drop leading empties on zero-length reads.
3873        let mut empty_with_leading = VecDeque::from([IoBuf::default()]);
3874        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut empty_with_leading, 0, "x");
3875        assert!(bytes.is_empty());
3876        assert!(!needs_canonicalize);
3877        assert!(empty_with_leading.is_empty());
3878
3879        // Fast path: front chunk can fully satisfy the request.
3880        let mut fast = VecDeque::from([
3881            IoBuf::from(b"ab".to_vec()),
3882            IoBuf::from(b"cd".to_vec()),
3883            IoBuf::from(b"ef".to_vec()),
3884            IoBuf::from(b"gh".to_vec()),
3885        ]);
3886        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut fast, 2, "x");
3887        assert_eq!(bytes.as_ref(), b"ab");
3888        assert!(needs_canonicalize);
3889        assert_eq!(fast.front().expect("front exists").as_ref(), b"cd");
3890
3891        // Slow path: request spans multiple chunks.
3892        let mut slow = VecDeque::from([
3893            IoBuf::from(b"a".to_vec()),
3894            IoBuf::from(b"bc".to_vec()),
3895            IoBuf::from(b"d".to_vec()),
3896            IoBuf::from(b"e".to_vec()),
3897        ]);
3898        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut slow, 3, "x");
3899        assert_eq!(bytes.as_ref(), b"abc");
3900        assert!(needs_canonicalize);
3901
3902        // `advance_chunked_front` should skip empties and drain in linear order.
3903        let mut advance_chunked = VecDeque::from([
3904            IoBuf::default(),
3905            IoBuf::from(b"abc".to_vec()),
3906            IoBuf::from(b"d".to_vec()),
3907        ]);
3908        advance_chunked_front(&mut advance_chunked, 2);
3909        assert_eq!(
3910            advance_chunked.front().expect("front exists").as_ref(),
3911            b"c"
3912        );
3913        advance_chunked_front(&mut advance_chunked, 2);
3914        assert!(advance_chunked.is_empty());
3915
3916        // `advance_small_chunks` signals canonicalization when front chunks are exhausted.
3917        let mut small = [IoBuf::default(), IoBuf::from(b"abc".to_vec())];
3918        let needs_canonicalize = advance_small_chunks(&mut small, 2);
3919        assert!(needs_canonicalize);
3920        assert_eq!(small[1].as_ref(), b"c");
3921
3922        let mut small_exact = [
3923            IoBuf::from(b"a".to_vec()),
3924            IoBuf::from(b"b".to_vec()),
3925            IoBuf::from(b"c".to_vec()),
3926        ];
3927        let needs_canonicalize = advance_small_chunks(&mut small_exact, 3);
3928        assert!(needs_canonicalize);
3929        assert_eq!(small_exact[0].remaining(), 0);
3930        assert_eq!(small_exact[1].remaining(), 0);
3931        assert_eq!(small_exact[2].remaining(), 0);
3932
3933        // `advance_mut_in_chunks` returns whether the request fully fit in writable chunks.
3934        let mut writable = [IoBufMut::with_capacity(2), IoBufMut::with_capacity(1)];
3935        let mut remaining = 3usize;
3936        // SAFETY: We do not read from advanced bytes in this test.
3937        let all_advanced = unsafe { advance_mut_in_chunks(&mut writable, &mut remaining) };
3938        assert!(all_advanced);
3939        assert_eq!(remaining, 0);
3940
3941        // `advance_mut_in_chunks` should skip non-writable chunks.
3942        let pool = test_pool();
3943        let mut full = pool.alloc(1);
3944        // SAFETY: We only mark initialized capacity; bytes are not read.
3945        unsafe { full.set_len(full.capacity()) };
3946        let mut writable_after_full = [full, IoBufMut::with_capacity(2)];
3947        let mut remaining = 2usize;
3948        // SAFETY: We do not read from advanced bytes in this test.
3949        let all_advanced =
3950            unsafe { advance_mut_in_chunks(&mut writable_after_full, &mut remaining) };
3951        assert!(all_advanced);
3952        assert_eq!(remaining, 0);
3953
3954        let mut writable_short = [IoBufMut::with_capacity(1), IoBufMut::with_capacity(1)];
3955        let mut remaining = 3usize;
3956        // SAFETY: We do not read from advanced bytes in this test.
3957        let all_advanced = unsafe { advance_mut_in_chunks(&mut writable_short, &mut remaining) };
3958        assert!(!all_advanced);
3959        assert_eq!(remaining, 1);
3960    }
3961
3962    #[test]
3963    fn test_iobufsmut_advance_mut_success_paths() {
3964        // Pair path.
3965        let mut pair = IoBufsMut {
3966            inner: IoBufsMutInner::Pair([IoBufMut::with_capacity(2), IoBufMut::with_capacity(2)]),
3967        };
3968        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
3969        unsafe { pair.advance_mut(3) };
3970        assert_eq!(pair.remaining(), 3);
3971
3972        // Triple path.
3973        let mut triple = IoBufsMut {
3974            inner: IoBufsMutInner::Triple([
3975                IoBufMut::with_capacity(1),
3976                IoBufMut::with_capacity(1),
3977                IoBufMut::with_capacity(1),
3978            ]),
3979        };
3980        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
3981        unsafe { triple.advance_mut(2) };
3982        assert_eq!(triple.remaining(), 2);
3983
3984        // Chunked wrapped-VecDeque path.
3985        let mut wrapped = VecDeque::with_capacity(5);
3986        wrapped.push_back(IoBufMut::with_capacity(1));
3987        wrapped.push_back(IoBufMut::with_capacity(1));
3988        wrapped.push_back(IoBufMut::with_capacity(1));
3989        let _ = wrapped.pop_front();
3990        wrapped.push_back(IoBufMut::with_capacity(1));
3991        wrapped.push_back(IoBufMut::with_capacity(1));
3992        let mut chunked = IoBufsMut {
3993            inner: IoBufsMutInner::Chunked(wrapped),
3994        };
3995        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
3996        unsafe { chunked.advance_mut(4) };
3997        assert_eq!(chunked.remaining(), 4);
3998        assert!(chunked.remaining_mut() > 0);
3999    }
4000
4001    #[test]
4002    fn test_iobufsmut_advance_mut_zero_noop_when_full() {
4003        fn full_chunk(pool: &BufferPool) -> IoBufMut {
4004            // Pooled buffers have bounded class capacity (unlike growable Bytes),
4005            // so force len == capacity to make remaining_mut() == 0.
4006            let mut buf = pool.alloc(1);
4007            let cap = buf.capacity();
4008            // SAFETY: We never read from this buffer in this test.
4009            unsafe { buf.set_len(cap) };
4010            buf
4011        }
4012
4013        let pool = test_pool();
4014
4015        // Pair path: fully-written chunks should allow advance_mut(0) as a no-op.
4016        let mut pair = IoBufsMut::from(vec![full_chunk(&pool), full_chunk(&pool)]);
4017        assert!(matches!(pair.inner, IoBufsMutInner::Pair(_)));
4018        assert_eq!(pair.remaining_mut(), 0);
4019        let before = pair.remaining();
4020        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4021        unsafe { pair.advance_mut(0) };
4022        assert_eq!(pair.remaining(), before);
4023
4024        // Triple path: same no-op behavior.
4025        let mut triple = IoBufsMut::from(vec![
4026            full_chunk(&pool),
4027            full_chunk(&pool),
4028            full_chunk(&pool),
4029        ]);
4030        assert!(matches!(triple.inner, IoBufsMutInner::Triple(_)));
4031        assert_eq!(triple.remaining_mut(), 0);
4032        let before = triple.remaining();
4033        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4034        unsafe { triple.advance_mut(0) };
4035        assert_eq!(triple.remaining(), before);
4036
4037        // Chunked path: 4+ fully-written chunks should also no-op.
4038        let mut chunked = IoBufsMut::from(vec![
4039            full_chunk(&pool),
4040            full_chunk(&pool),
4041            full_chunk(&pool),
4042            full_chunk(&pool),
4043        ]);
4044        assert!(matches!(chunked.inner, IoBufsMutInner::Chunked(_)));
4045        assert_eq!(chunked.remaining_mut(), 0);
4046        let before = chunked.remaining();
4047        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4048        unsafe { chunked.advance_mut(0) };
4049        assert_eq!(chunked.remaining(), before);
4050    }
4051
4052    #[test]
4053    #[should_panic(expected = "cannot advance past end of buffer")]
4054    fn test_iobufsmut_advance_mut_past_end_pair() {
4055        let mut pair = IoBufsMut {
4056            inner: IoBufsMutInner::Pair([IoBufMut::with_capacity(1), IoBufMut::with_capacity(1)]),
4057        };
4058        // SAFETY: Intentional panic path coverage.
4059        unsafe { pair.advance_mut(3) };
4060    }
4061
4062    #[test]
4063    #[should_panic(expected = "cannot advance past end of buffer")]
4064    fn test_iobufsmut_advance_mut_past_end_triple() {
4065        let mut triple = IoBufsMut {
4066            inner: IoBufsMutInner::Triple([
4067                IoBufMut::with_capacity(1),
4068                IoBufMut::with_capacity(1),
4069                IoBufMut::with_capacity(1),
4070            ]),
4071        };
4072        // SAFETY: Intentional panic path coverage.
4073        unsafe { triple.advance_mut(4) };
4074    }
4075
4076    #[test]
4077    #[should_panic(expected = "cannot advance past end of buffer")]
4078    fn test_iobufsmut_advance_mut_past_end_chunked() {
4079        let mut chunked = IoBufsMut {
4080            inner: IoBufsMutInner::Chunked(VecDeque::from([
4081                IoBufMut::with_capacity(1),
4082                IoBufMut::with_capacity(1),
4083                IoBufMut::with_capacity(1),
4084                IoBufMut::with_capacity(1),
4085            ])),
4086        };
4087        // SAFETY: Intentional panic path coverage.
4088        unsafe { chunked.advance_mut(5) };
4089    }
4090
4091    #[test]
4092    fn test_iobufsmut_set_len() {
4093        // SAFETY: we don't read the uninitialized bytes.
4094        unsafe {
4095            // Single buffer
4096            let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(16));
4097            bufs.set_len(10);
4098            assert_eq!(bufs.len(), 10);
4099
4100            // Chunked: distributes across chunks [cap 5, cap 10], set 12 -> [5, 7]
4101            let mut bufs = IoBufsMut::from(vec![
4102                IoBufMut::with_capacity(5),
4103                IoBufMut::with_capacity(10),
4104            ]);
4105            bufs.set_len(12);
4106            assert_eq!(bufs.len(), 12);
4107            assert_eq!(bufs.chunk().len(), 5);
4108            bufs.advance(5);
4109            assert_eq!(bufs.chunk().len(), 7);
4110            bufs.advance(7);
4111            assert_eq!(bufs.remaining(), 0);
4112
4113            // Uneven capacities [3, 20, 2], set 18 -> [3, 15, 0].
4114            let mut bufs = IoBufsMut::from(vec![
4115                IoBufMut::with_capacity(3),
4116                IoBufMut::with_capacity(20),
4117                IoBufMut::with_capacity(2),
4118            ]);
4119            bufs.set_len(18);
4120            assert_eq!(bufs.chunk().len(), 3);
4121            bufs.advance(3);
4122            assert_eq!(bufs.chunk().len(), 15);
4123            bufs.advance(15);
4124            assert_eq!(bufs.remaining(), 0);
4125
4126            // Exact total capacity [4, 4], set 8 -> [4, 4]
4127            let mut bufs =
4128                IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4129            bufs.set_len(8);
4130            assert_eq!(bufs.chunk().len(), 4);
4131            bufs.advance(4);
4132            assert_eq!(bufs.chunk().len(), 4);
4133            bufs.advance(4);
4134            assert_eq!(bufs.remaining(), 0);
4135
4136            // Zero length preserves caller-provided layout.
4137            let mut bufs =
4138                IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4139            bufs.set_len(0);
4140            assert_eq!(bufs.len(), 0);
4141            assert_eq!(bufs.chunk(), b"");
4142        }
4143    }
4144
4145    #[test]
4146    #[should_panic(expected = "set_len(9) exceeds capacity(8)")]
4147    fn test_iobufsmut_set_len_overflow() {
4148        let mut bufs =
4149            IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4150        // SAFETY: this will panic before any read.
4151        unsafe { bufs.set_len(9) };
4152    }
4153
4154    #[test]
4155    #[should_panic(expected = "set_len(9) exceeds capacity(8)")]
4156    fn test_iobufmut_set_len_overflow() {
4157        let mut buf = IoBufMut::with_capacity(8);
4158        // SAFETY: this will panic before any read.
4159        unsafe { buf.set_len(9) };
4160    }
4161
4162    #[test]
4163    fn test_encode_with_pool_matches_encode() {
4164        let pool = test_pool();
4165        let value = vec![1u8, 2, 3, 4, 5, 6];
4166
4167        let pooled = value.encode_with_pool(&pool);
4168        let baseline = value.encode();
4169        assert_eq!(pooled.as_ref(), baseline.as_ref());
4170    }
4171
4172    #[test]
4173    fn test_encode_with_pool_mut_len_matches_encode_size() {
4174        let pool = test_pool();
4175        let value = vec![9u8, 8, 7, 6];
4176
4177        let buf = value.encode_with_pool_mut(&pool);
4178        assert_eq!(buf.len(), value.encode_size());
4179    }
4180
4181    #[cfg(feature = "arbitrary")]
4182    mod conformance {
4183        use super::IoBuf;
4184        use commonware_codec::conformance::CodecConformance;
4185
4186        commonware_conformance::conformance_tests! {
4187            CodecConformance<IoBuf>
4188        }
4189    }
4190}