Skip to main content

commonware_runtime/iobuf/
mod.rs

1//! Buffer types for I/O operations.
2//!
3//! Each buffer type is backed by one of three storage variants:
4//! - [`Bytes`]/[`BytesMut`]: standard heap allocation (from `From` conversions)
5//! - Aligned: untracked aligned allocation (from [`IoBufMut::with_alignment`],
6//!   pool bypass for small requests, or fallback)
7//! - Pooled: tracked aligned allocation returned to a [`BufferPool`] on drop
8//!
9//! Public types:
10//! - [`IoBuf`]: Immutable byte buffer
11//! - [`IoBufMut`]: Mutable byte buffer
12//! - [`IoBufs`]: Container for one or more immutable buffers
13//! - [`IoBufsMut`]: Container for one or more mutable buffers
14//! - [`BufferPool`]: Pool of reusable, aligned buffers
15
16mod aligned;
17mod pool;
18
19use aligned::{AlignedBuf, AlignedBufMut, AlignedBuffer, PooledBuf, PooledBufMut};
20use bytes::{Buf, BufMut, Bytes, BytesMut};
21use commonware_codec::{util::at_least, BufsMut, EncodeSize, Error, RangeCfg, Read, Write};
22pub use pool::{BufferPool, BufferPoolConfig, BufferPoolThreadCache, PoolError};
23use std::{collections::VecDeque, io::IoSlice, num::NonZeroUsize, ops::RangeBounds};
24
25/// Immutable byte buffer.
26///
27/// Backed by either [`Bytes`] or a pooled aligned allocation.
28///
29/// Use this for immutable payloads. To build or mutate data, use
30/// [`IoBufMut`] and then [`IoBufMut::freeze`].
31///
32/// For pooled-backed values, the underlying buffer is returned to the pool
33/// when the final reference is dropped.
34///
35/// All `From<*> for IoBuf` implementations are guaranteed to be non-copy
36/// conversions. Use [`IoBuf::copy_from_slice`] when an explicit copy from
37/// borrowed data is required.
38///
39/// Cloning is cheap and does not copy underlying bytes.
40#[derive(Clone, Debug)]
41pub struct IoBuf {
42    inner: IoBufInner,
43}
44
45/// Internal storage variant for [`IoBuf`].
46///
47/// - `Bytes`: from `From<Bytes>`, `From<Vec<u8>>`, `From<&'static [u8]>`, etc.
48/// - `Aligned`: from untracked aligned allocation ([`IoBufMut::with_alignment`],
49///   pool bypass for small requests, or fallback)
50/// - `Pooled`: from [`BufferPool`] allocation (returned to pool on final drop)
51#[derive(Clone, Debug)]
52enum IoBufInner {
53    Bytes(Bytes),
54    Aligned(AlignedBuf),
55    Pooled(PooledBuf),
56}
57
58impl IoBuf {
59    /// Create a buffer by copying data from a slice.
60    ///
61    /// Use this when you have a non-static `&[u8]` that needs to be converted to an
62    /// [`IoBuf`]. For static slices, prefer [`IoBuf::from`] which is zero-copy.
63    pub fn copy_from_slice(data: &[u8]) -> Self {
64        Self {
65            inner: IoBufInner::Bytes(Bytes::copy_from_slice(data)),
66        }
67    }
68
69    /// Create a buffer from a pooled allocation.
70    #[inline]
71    const fn from_pooled(pooled: PooledBuf) -> Self {
72        Self {
73            inner: IoBufInner::Pooled(pooled),
74        }
75    }
76
77    /// Create a buffer from an untracked aligned allocation.
78    #[inline]
79    const fn from_aligned(aligned: AlignedBuf) -> Self {
80        Self {
81            inner: IoBufInner::Aligned(aligned),
82        }
83    }
84
85    /// Returns `true` if this buffer is tracked by a pool.
86    ///
87    /// Tracked buffers originate from [`BufferPool`] allocations and are
88    /// returned to the pool when the final reference is dropped.
89    ///
90    /// Buffers backed by [`Bytes`], and untracked aligned allocations (from
91    /// [`IoBufMut::with_alignment`], pool bypass for small requests, or
92    /// fallback), return `false`.
93    #[inline]
94    pub const fn is_pooled(&self) -> bool {
95        match &self.inner {
96            IoBufInner::Bytes(_) => false,
97            IoBufInner::Aligned(_) => false,
98            IoBufInner::Pooled(_) => true,
99        }
100    }
101
102    /// Number of bytes remaining in the buffer.
103    #[inline]
104    pub fn len(&self) -> usize {
105        self.remaining()
106    }
107
108    /// Whether the buffer is empty.
109    #[inline]
110    pub fn is_empty(&self) -> bool {
111        self.remaining() == 0
112    }
113
114    /// Get raw pointer to the buffer data.
115    #[inline]
116    pub fn as_ptr(&self) -> *const u8 {
117        match &self.inner {
118            IoBufInner::Bytes(b) => b.as_ptr(),
119            IoBufInner::Aligned(a) => a.as_ptr(),
120            IoBufInner::Pooled(p) => p.as_ptr(),
121        }
122    }
123
124    /// Returns a slice of self for the provided range (zero-copy).
125    ///
126    /// For pooled buffers, empty ranges return an empty detached buffer
127    /// ([`IoBuf::default`]) so the underlying pooled allocation is not retained.
128    #[inline]
129    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
130        match &self.inner {
131            IoBufInner::Bytes(b) => Self {
132                inner: IoBufInner::Bytes(b.slice(range)),
133            },
134            IoBufInner::Aligned(a) => a
135                .slice(range)
136                .map_or_else(Self::default, Self::from_aligned),
137            IoBufInner::Pooled(p) => p.slice(range).map_or_else(Self::default, Self::from_pooled),
138        }
139    }
140
141    /// Splits the buffer into two at the given index.
142    ///
143    /// Afterwards `self` contains bytes `[at, len)`, and the returned [`IoBuf`]
144    /// contains bytes `[0, at)`.
145    ///
146    /// This is an `O(1)` zero-copy operation.
147    ///
148    /// # Panics
149    ///
150    /// Panics if `at > len`.
151    pub fn split_to(&mut self, at: usize) -> Self {
152        if at == 0 {
153            return Self::default();
154        }
155
156        if at == self.remaining() {
157            return std::mem::take(self);
158        }
159
160        match &mut self.inner {
161            IoBufInner::Bytes(b) => Self {
162                inner: IoBufInner::Bytes(b.split_to(at)),
163            },
164            IoBufInner::Aligned(a) => Self::from_aligned(a.split_to(at)),
165            IoBufInner::Pooled(p) => Self::from_pooled(p.split_to(at)),
166        }
167    }
168
169    /// Try to convert this buffer into [`IoBufMut`] without copying.
170    ///
171    /// Succeeds when `self` holds exclusive ownership of the backing storage
172    /// and returns an [`IoBufMut`] with the same contents. Fails and returns
173    /// `self` unchanged when ownership is shared.
174    ///
175    /// For [`Bytes`]-backed buffers, this matches [`Bytes::try_into_mut`]
176    /// semantics: succeeds only for uniquely-owned full buffers, and always
177    /// fails for [`Bytes::from_owner`] and [`Bytes::from_static`] buffers. For
178    /// pooled buffers, this succeeds for any uniquely-owned view (including
179    /// slices) and fails when shared.
180    pub fn try_into_mut(self) -> Result<IoBufMut, Self> {
181        match self.inner {
182            IoBufInner::Bytes(bytes) => bytes
183                .try_into_mut()
184                .map(|mut_bytes| IoBufMut {
185                    inner: IoBufMutInner::Bytes(mut_bytes),
186                })
187                .map_err(|bytes| Self {
188                    inner: IoBufInner::Bytes(bytes),
189                }),
190            IoBufInner::Aligned(aligned) => aligned
191                .try_into_mut()
192                .map(|mut_aligned| IoBufMut {
193                    inner: IoBufMutInner::Aligned(mut_aligned),
194                })
195                .map_err(|aligned| Self {
196                    inner: IoBufInner::Aligned(aligned),
197                }),
198            IoBufInner::Pooled(pooled) => pooled
199                .try_into_mut()
200                .map(|mut_pooled| IoBufMut {
201                    inner: IoBufMutInner::Pooled(mut_pooled),
202                })
203                .map_err(|pooled| Self {
204                    inner: IoBufInner::Pooled(pooled),
205                }),
206        }
207    }
208}
209
210impl AsRef<[u8]> for IoBuf {
211    #[inline]
212    fn as_ref(&self) -> &[u8] {
213        match &self.inner {
214            IoBufInner::Bytes(b) => b.as_ref(),
215            IoBufInner::Aligned(a) => a.as_ref(),
216            IoBufInner::Pooled(p) => p.as_ref(),
217        }
218    }
219}
220
221impl Default for IoBuf {
222    fn default() -> Self {
223        Self {
224            inner: IoBufInner::Bytes(Bytes::new()),
225        }
226    }
227}
228
229impl PartialEq for IoBuf {
230    fn eq(&self, other: &Self) -> bool {
231        self.as_ref() == other.as_ref()
232    }
233}
234
235impl Eq for IoBuf {}
236
237impl PartialEq<[u8]> for IoBuf {
238    #[inline]
239    fn eq(&self, other: &[u8]) -> bool {
240        self.as_ref() == other
241    }
242}
243
244impl PartialEq<&[u8]> for IoBuf {
245    #[inline]
246    fn eq(&self, other: &&[u8]) -> bool {
247        self.as_ref() == *other
248    }
249}
250
251impl<const N: usize> PartialEq<[u8; N]> for IoBuf {
252    #[inline]
253    fn eq(&self, other: &[u8; N]) -> bool {
254        self.as_ref() == other
255    }
256}
257
258impl<const N: usize> PartialEq<&[u8; N]> for IoBuf {
259    #[inline]
260    fn eq(&self, other: &&[u8; N]) -> bool {
261        self.as_ref() == *other
262    }
263}
264
265impl Buf for IoBuf {
266    #[inline]
267    fn remaining(&self) -> usize {
268        match &self.inner {
269            IoBufInner::Bytes(b) => b.remaining(),
270            IoBufInner::Aligned(a) => a.remaining(),
271            IoBufInner::Pooled(p) => p.remaining(),
272        }
273    }
274
275    #[inline]
276    fn chunk(&self) -> &[u8] {
277        match &self.inner {
278            IoBufInner::Bytes(b) => b.chunk(),
279            IoBufInner::Aligned(a) => a.chunk(),
280            IoBufInner::Pooled(p) => p.chunk(),
281        }
282    }
283
284    #[inline]
285    fn advance(&mut self, cnt: usize) {
286        match &mut self.inner {
287            IoBufInner::Bytes(b) => b.advance(cnt),
288            IoBufInner::Aligned(a) => a.advance(cnt),
289            IoBufInner::Pooled(p) => p.advance(cnt),
290        }
291    }
292
293    #[inline]
294    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
295        match &mut self.inner {
296            IoBufInner::Bytes(b) => b.copy_to_bytes(len),
297            IoBufInner::Aligned(a) => a.copy_to_bytes(len),
298            IoBufInner::Pooled(p) => {
299                // Full non-empty drain: transfer ownership so the drained source no
300                // longer retains the pooled allocation. Keep len == 0 on the normal
301                // path to avoid creating an empty Bytes that still pins pool memory.
302                if len != 0 && len == p.remaining() {
303                    let inner = std::mem::replace(&mut self.inner, IoBufInner::Bytes(Bytes::new()));
304                    match inner {
305                        IoBufInner::Pooled(p) => p.into_bytes(),
306                        _ => unreachable!(),
307                    }
308                } else {
309                    p.copy_to_bytes(len)
310                }
311            }
312        }
313    }
314}
315
316impl From<Bytes> for IoBuf {
317    fn from(bytes: Bytes) -> Self {
318        Self {
319            inner: IoBufInner::Bytes(bytes),
320        }
321    }
322}
323
324impl From<Vec<u8>> for IoBuf {
325    fn from(vec: Vec<u8>) -> Self {
326        Self {
327            inner: IoBufInner::Bytes(Bytes::from(vec)),
328        }
329    }
330}
331
332impl<const N: usize> From<&'static [u8; N]> for IoBuf {
333    fn from(array: &'static [u8; N]) -> Self {
334        Self {
335            inner: IoBufInner::Bytes(Bytes::from_static(array)),
336        }
337    }
338}
339
340impl From<&'static [u8]> for IoBuf {
341    fn from(slice: &'static [u8]) -> Self {
342        Self {
343            inner: IoBufInner::Bytes(Bytes::from_static(slice)),
344        }
345    }
346}
347
348/// Convert an [`IoBuf`] into a [`Vec<u8>`].
349///
350/// This conversion may copy:
351/// - [`Bytes`]-backed buffers may reuse allocation when possible
352/// - pooled buffers copy readable bytes into a new [`Vec<u8>`]
353impl From<IoBuf> for Vec<u8> {
354    fn from(buf: IoBuf) -> Self {
355        match buf.inner {
356            IoBufInner::Bytes(bytes) => Self::from(bytes),
357            IoBufInner::Aligned(aligned) => aligned.as_ref().to_vec(),
358            IoBufInner::Pooled(pooled) => pooled.as_ref().to_vec(),
359        }
360    }
361}
362
363/// Convert an [`IoBuf`] into [`Bytes`] without copying readable data.
364///
365/// For pooled buffers, this wraps the pooled owner using [`Bytes::from_owner`].
366impl From<IoBuf> for Bytes {
367    fn from(buf: IoBuf) -> Self {
368        match buf.inner {
369            IoBufInner::Bytes(bytes) => bytes,
370            IoBufInner::Aligned(aligned) => Self::from_owner(aligned),
371            IoBufInner::Pooled(pooled) => Self::from_owner(pooled),
372        }
373    }
374}
375
376impl Write for IoBuf {
377    #[inline]
378    fn write(&self, buf: &mut impl BufMut) {
379        self.len().write(buf);
380        buf.put_slice(self.as_ref());
381    }
382
383    #[inline]
384    fn write_bufs(&self, buf: &mut impl BufsMut) {
385        self.len().write(buf);
386        buf.push(self.clone());
387    }
388}
389
390impl EncodeSize for IoBuf {
391    #[inline]
392    fn encode_size(&self) -> usize {
393        self.len().encode_size() + self.len()
394    }
395
396    #[inline]
397    fn encode_inline_size(&self) -> usize {
398        self.len().encode_size()
399    }
400}
401
402impl Read for IoBuf {
403    type Cfg = RangeCfg<usize>;
404
405    #[inline]
406    fn read_cfg(buf: &mut impl Buf, range: &Self::Cfg) -> Result<Self, Error> {
407        let len = usize::read_cfg(buf, range)?;
408        at_least(buf, len)?;
409        Ok(Self::from(buf.copy_to_bytes(len)))
410    }
411}
412
413#[cfg(feature = "arbitrary")]
414impl arbitrary::Arbitrary<'_> for IoBuf {
415    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
416        let len = u.arbitrary_len::<u8>()?;
417        let data: Vec<u8> = u.arbitrary_iter()?.take(len).collect::<Result<_, _>>()?;
418        Ok(Self::from(data))
419    }
420}
421
422/// Mutable byte buffer.
423///
424/// Backed by either [`BytesMut`] or a pooled aligned allocation.
425///
426/// Use this to build or mutate payloads before freezing into [`IoBuf`].
427///
428/// For pooled-backed values, dropping this buffer returns the underlying
429/// allocation to the pool. After [`IoBufMut::freeze`], the frozen `IoBuf`
430/// keeps the allocation alive until its final reference is dropped.
431#[derive(Debug)]
432pub struct IoBufMut {
433    inner: IoBufMutInner,
434}
435
436/// Internal storage variant for [`IoBufMut`]. See [`IoBufInner`] for variant
437/// semantics.
438#[derive(Debug)]
439enum IoBufMutInner {
440    Bytes(BytesMut),
441    Aligned(AlignedBufMut),
442    Pooled(PooledBufMut),
443}
444
445impl Default for IoBufMut {
446    fn default() -> Self {
447        Self {
448            inner: IoBufMutInner::Bytes(BytesMut::new()),
449        }
450    }
451}
452
453impl IoBufMut {
454    /// Create a buffer with the given capacity.
455    pub fn with_capacity(capacity: usize) -> Self {
456        Self {
457            inner: IoBufMutInner::Bytes(BytesMut::with_capacity(capacity)),
458        }
459    }
460
461    /// Create an untracked aligned buffer with the given capacity and alignment.
462    ///
463    /// This uses the aligned backing path directly rather than `BytesMut`.
464    /// The returned buffer is not tracked by a [`BufferPool`], so dropping it
465    /// deallocates the aligned allocation immediately.
466    ///
467    /// Use this when the caller needs a specific alignment but does not need
468    /// pooled reuse.
469    #[inline]
470    pub fn with_alignment(capacity: usize, alignment: NonZeroUsize) -> Self {
471        if capacity == 0 {
472            return Self::with_capacity(0);
473        }
474        let buffer = AlignedBuffer::new(capacity, alignment.get());
475        Self::from_aligned(AlignedBufMut::new(buffer))
476    }
477
478    /// Create a zero-initialized untracked aligned buffer with the given
479    /// length and alignment.
480    ///
481    /// Unlike [`Self::with_alignment`], this initializes the full readable
482    /// range to zero and sets `len == capacity == len`.
483    #[inline]
484    pub fn zeroed_with_alignment(len: usize, alignment: NonZeroUsize) -> Self {
485        if len == 0 {
486            return Self::zeroed(0);
487        }
488        let buffer = AlignedBuffer::new_zeroed(len, alignment.get());
489        let mut buffer = Self::from_aligned(AlignedBufMut::new(buffer));
490        // SAFETY: the aligned allocation was zero-initialized for `len` bytes.
491        unsafe { buffer.set_len(len) };
492        buffer
493    }
494
495    /// Create a buffer of `len` bytes, all initialized to zero.
496    ///
497    /// Unlike `with_capacity`, this sets both capacity and length to `len`,
498    /// making the entire buffer immediately usable for read operations
499    /// (e.g., `file.read_exact`).
500    pub fn zeroed(len: usize) -> Self {
501        Self {
502            inner: IoBufMutInner::Bytes(BytesMut::zeroed(len)),
503        }
504    }
505
506    /// Create a buffer from a pooled allocation.
507    #[inline]
508    const fn from_pooled(pooled: PooledBufMut) -> Self {
509        Self {
510            inner: IoBufMutInner::Pooled(pooled),
511        }
512    }
513
514    /// Create a buffer from an untracked aligned allocation.
515    #[inline]
516    const fn from_aligned(aligned: AlignedBufMut) -> Self {
517        Self {
518            inner: IoBufMutInner::Aligned(aligned),
519        }
520    }
521
522    /// Returns `true` if this buffer is tracked by a pool.
523    ///
524    /// Tracked buffers originate from [`BufferPool`] allocations and are
525    /// returned to the pool when dropped.
526    ///
527    /// Buffers backed by [`BytesMut`], and untracked aligned allocations (from
528    /// [`IoBufMut::with_alignment`], pool bypass for small requests, or
529    /// fallback), return `false`.
530    #[inline]
531    pub const fn is_pooled(&self) -> bool {
532        match &self.inner {
533            IoBufMutInner::Bytes(_) => false,
534            IoBufMutInner::Aligned(_) => false,
535            IoBufMutInner::Pooled(_) => true,
536        }
537    }
538
539    /// Sets the length of the buffer.
540    ///
541    /// This will explicitly set the size of the buffer without actually
542    /// modifying the data, so it is up to the caller to ensure that the data
543    /// has been initialized.
544    ///
545    /// # Safety
546    ///
547    /// Caller must ensure all bytes in `0..len` are initialized before any
548    /// read operations.
549    ///
550    /// # Panics
551    ///
552    /// Panics if `len > capacity()`.
553    #[inline]
554    pub unsafe fn set_len(&mut self, len: usize) {
555        assert!(
556            len <= self.capacity(),
557            "set_len({len}) exceeds capacity({})",
558            self.capacity()
559        );
560        match &mut self.inner {
561            IoBufMutInner::Bytes(b) => b.set_len(len),
562            IoBufMutInner::Aligned(b) => b.set_len(len),
563            IoBufMutInner::Pooled(b) => b.set_len(len),
564        }
565    }
566
567    /// Number of bytes remaining in the buffer.
568    #[inline]
569    pub fn len(&self) -> usize {
570        self.remaining()
571    }
572
573    /// Whether the buffer is empty.
574    #[inline]
575    pub fn is_empty(&self) -> bool {
576        match &self.inner {
577            IoBufMutInner::Bytes(b) => b.is_empty(),
578            IoBufMutInner::Aligned(b) => b.is_empty(),
579            IoBufMutInner::Pooled(b) => b.is_empty(),
580        }
581    }
582
583    /// Freeze into immutable [`IoBuf`].
584    #[inline]
585    pub fn freeze(self) -> IoBuf {
586        match self.inner {
587            IoBufMutInner::Bytes(b) => b.freeze().into(),
588            IoBufMutInner::Aligned(b) => b.freeze(),
589            IoBufMutInner::Pooled(b) => b.freeze(),
590        }
591    }
592
593    /// Returns the number of bytes the buffer can hold without reallocating.
594    #[inline]
595    pub fn capacity(&self) -> usize {
596        match &self.inner {
597            IoBufMutInner::Bytes(b) => b.capacity(),
598            IoBufMutInner::Aligned(b) => b.capacity(),
599            IoBufMutInner::Pooled(b) => b.capacity(),
600        }
601    }
602
603    /// Returns an unsafe mutable pointer to the buffer's data.
604    #[inline]
605    pub fn as_mut_ptr(&mut self) -> *mut u8 {
606        match &mut self.inner {
607            IoBufMutInner::Bytes(b) => b.as_mut_ptr(),
608            IoBufMutInner::Aligned(b) => b.as_mut_ptr(),
609            IoBufMutInner::Pooled(b) => b.as_mut_ptr(),
610        }
611    }
612
613    /// Truncates the buffer to `len` readable bytes.
614    ///
615    /// If `len` is greater than the current length, this has no effect.
616    #[inline]
617    pub fn truncate(&mut self, len: usize) {
618        match &mut self.inner {
619            IoBufMutInner::Bytes(b) => b.truncate(len),
620            IoBufMutInner::Aligned(b) => b.truncate(len),
621            IoBufMutInner::Pooled(b) => b.truncate(len),
622        }
623    }
624
625    /// Clears the buffer, removing all data. Existing capacity is preserved.
626    #[inline]
627    pub fn clear(&mut self) {
628        match &mut self.inner {
629            IoBufMutInner::Bytes(b) => b.clear(),
630            IoBufMutInner::Aligned(b) => b.clear(),
631            IoBufMutInner::Pooled(b) => b.clear(),
632        }
633    }
634}
635
636impl AsRef<[u8]> for IoBufMut {
637    #[inline]
638    fn as_ref(&self) -> &[u8] {
639        match &self.inner {
640            IoBufMutInner::Bytes(b) => b.as_ref(),
641            IoBufMutInner::Aligned(b) => b.as_ref(),
642            IoBufMutInner::Pooled(b) => b.as_ref(),
643        }
644    }
645}
646
647impl AsMut<[u8]> for IoBufMut {
648    #[inline]
649    fn as_mut(&mut self) -> &mut [u8] {
650        match &mut self.inner {
651            IoBufMutInner::Bytes(b) => b.as_mut(),
652            IoBufMutInner::Aligned(b) => b.as_mut(),
653            IoBufMutInner::Pooled(b) => b.as_mut(),
654        }
655    }
656}
657
658impl PartialEq<[u8]> for IoBufMut {
659    #[inline]
660    fn eq(&self, other: &[u8]) -> bool {
661        self.as_ref() == other
662    }
663}
664
665impl PartialEq<&[u8]> for IoBufMut {
666    #[inline]
667    fn eq(&self, other: &&[u8]) -> bool {
668        self.as_ref() == *other
669    }
670}
671
672impl<const N: usize> PartialEq<[u8; N]> for IoBufMut {
673    #[inline]
674    fn eq(&self, other: &[u8; N]) -> bool {
675        self.as_ref() == other
676    }
677}
678
679impl<const N: usize> PartialEq<&[u8; N]> for IoBufMut {
680    #[inline]
681    fn eq(&self, other: &&[u8; N]) -> bool {
682        self.as_ref() == *other
683    }
684}
685
686impl Buf for IoBufMut {
687    #[inline]
688    fn remaining(&self) -> usize {
689        match &self.inner {
690            IoBufMutInner::Bytes(b) => b.remaining(),
691            IoBufMutInner::Aligned(b) => b.remaining(),
692            IoBufMutInner::Pooled(b) => b.remaining(),
693        }
694    }
695
696    #[inline]
697    fn chunk(&self) -> &[u8] {
698        match &self.inner {
699            IoBufMutInner::Bytes(b) => b.chunk(),
700            IoBufMutInner::Aligned(b) => b.chunk(),
701            IoBufMutInner::Pooled(b) => b.chunk(),
702        }
703    }
704
705    #[inline]
706    fn advance(&mut self, cnt: usize) {
707        match &mut self.inner {
708            IoBufMutInner::Bytes(b) => b.advance(cnt),
709            IoBufMutInner::Aligned(b) => b.advance(cnt),
710            IoBufMutInner::Pooled(b) => b.advance(cnt),
711        }
712    }
713
714    #[inline]
715    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
716        match &mut self.inner {
717            IoBufMutInner::Bytes(b) => b.copy_to_bytes(len),
718            IoBufMutInner::Aligned(a) => a.copy_to_bytes(len),
719            IoBufMutInner::Pooled(p) => {
720                // Full non-empty drain: transfer ownership so the drained source no
721                // longer retains the pooled allocation. Keep len == 0 on the normal
722                // path to avoid creating an empty Bytes that still pins pool memory.
723                if len != 0 && len == p.remaining() {
724                    let inner =
725                        std::mem::replace(&mut self.inner, IoBufMutInner::Bytes(BytesMut::new()));
726                    match inner {
727                        IoBufMutInner::Pooled(p) => p.into_bytes(),
728                        _ => unreachable!(),
729                    }
730                } else {
731                    p.copy_to_bytes(len)
732                }
733            }
734        }
735    }
736}
737
738// SAFETY: Delegates to BytesMut or PooledBufMut which implement BufMut safely.
739unsafe impl BufMut for IoBufMut {
740    #[inline]
741    fn remaining_mut(&self) -> usize {
742        match &self.inner {
743            IoBufMutInner::Bytes(b) => b.remaining_mut(),
744            IoBufMutInner::Aligned(b) => b.remaining_mut(),
745            IoBufMutInner::Pooled(b) => b.remaining_mut(),
746        }
747    }
748
749    #[inline]
750    unsafe fn advance_mut(&mut self, cnt: usize) {
751        match &mut self.inner {
752            IoBufMutInner::Bytes(b) => b.advance_mut(cnt),
753            IoBufMutInner::Aligned(b) => b.advance_mut(cnt),
754            IoBufMutInner::Pooled(b) => b.advance_mut(cnt),
755        }
756    }
757
758    #[inline]
759    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
760        match &mut self.inner {
761            IoBufMutInner::Bytes(b) => b.chunk_mut(),
762            IoBufMutInner::Aligned(b) => b.chunk_mut(),
763            IoBufMutInner::Pooled(b) => b.chunk_mut(),
764        }
765    }
766}
767
768impl From<Vec<u8>> for IoBufMut {
769    fn from(vec: Vec<u8>) -> Self {
770        Self::from(Bytes::from(vec))
771    }
772}
773
774impl From<&[u8]> for IoBufMut {
775    fn from(slice: &[u8]) -> Self {
776        Self {
777            inner: IoBufMutInner::Bytes(BytesMut::from(slice)),
778        }
779    }
780}
781
782impl<const N: usize> From<[u8; N]> for IoBufMut {
783    fn from(array: [u8; N]) -> Self {
784        Self::from(array.as_ref())
785    }
786}
787
788impl<const N: usize> From<&[u8; N]> for IoBufMut {
789    fn from(array: &[u8; N]) -> Self {
790        Self::from(array.as_ref())
791    }
792}
793
794impl From<BytesMut> for IoBufMut {
795    fn from(bytes: BytesMut) -> Self {
796        Self {
797            inner: IoBufMutInner::Bytes(bytes),
798        }
799    }
800}
801
802impl From<Bytes> for IoBufMut {
803    /// Zero-copy if `bytes` is unique for the entire original buffer (refcount is 1),
804    /// copies otherwise. Always copies if the [`Bytes`] was constructed via
805    /// [`Bytes::from_owner`] or [`Bytes::from_static`].
806    fn from(bytes: Bytes) -> Self {
807        Self {
808            inner: IoBufMutInner::Bytes(BytesMut::from(bytes)),
809        }
810    }
811}
812
813impl From<IoBuf> for IoBufMut {
814    /// Zero-copy when exclusive ownership can be recovered, copies otherwise.
815    fn from(buf: IoBuf) -> Self {
816        match buf.try_into_mut() {
817            Ok(buf) => buf,
818            Err(buf) => Self::from(buf.as_ref()),
819        }
820    }
821}
822
823/// Container for one or more immutable buffers.
824#[derive(Clone, Debug)]
825pub struct IoBufs {
826    inner: IoBufsInner,
827}
828
829/// Internal immutable representation.
830///
831/// - Representation is canonical and minimal for readable data:
832///   - `Single` is the only representation for empty data and one-chunk data.
833///   - `Chunked` is used only when four or more readable chunks remain.
834/// - `Pair`, `Triple`, and `Chunked` never store empty chunks.
835#[derive(Clone, Debug)]
836enum IoBufsInner {
837    /// Single buffer (fast path).
838    Single(IoBuf),
839    /// Two buffers (fast path).
840    Pair([IoBuf; 2]),
841    /// Three buffers (fast path).
842    Triple([IoBuf; 3]),
843    /// Four or more buffers.
844    Chunked(VecDeque<IoBuf>),
845}
846
847impl Default for IoBufs {
848    fn default() -> Self {
849        Self {
850            inner: IoBufsInner::Single(IoBuf::default()),
851        }
852    }
853}
854
855impl IoBufs {
856    /// Build canonical immutable chunk storage from readable chunks.
857    ///
858    /// Empty chunks are removed before representation selection.
859    fn from_chunks_iter(chunks: impl IntoIterator<Item = IoBuf>) -> Self {
860        let mut iter = chunks.into_iter().filter(|buf| !buf.is_empty());
861        let first = match iter.next() {
862            Some(first) => first,
863            None => return Self::default(),
864        };
865        let second = match iter.next() {
866            Some(second) => second,
867            None => {
868                return Self {
869                    inner: IoBufsInner::Single(first),
870                };
871            }
872        };
873        let third = match iter.next() {
874            Some(third) => third,
875            None => {
876                return Self {
877                    inner: IoBufsInner::Pair([first, second]),
878                };
879            }
880        };
881        let fourth = match iter.next() {
882            Some(fourth) => fourth,
883            None => {
884                return Self {
885                    inner: IoBufsInner::Triple([first, second, third]),
886                };
887            }
888        };
889
890        let mut bufs = VecDeque::with_capacity(4);
891        bufs.push_back(first);
892        bufs.push_back(second);
893        bufs.push_back(third);
894        bufs.push_back(fourth);
895        bufs.extend(iter);
896
897        Self {
898            inner: IoBufsInner::Chunked(bufs),
899        }
900    }
901
902    /// Re-establish canonical immutable representation invariants.
903    fn canonicalize(&mut self) {
904        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
905        self.inner = match inner {
906            IoBufsInner::Single(buf) => {
907                if buf.is_empty() {
908                    IoBufsInner::Single(IoBuf::default())
909                } else {
910                    IoBufsInner::Single(buf)
911                }
912            }
913            IoBufsInner::Pair([a, b]) => Self::from_chunks_iter([a, b]).inner,
914            IoBufsInner::Triple([a, b, c]) => Self::from_chunks_iter([a, b, c]).inner,
915            IoBufsInner::Chunked(bufs) => Self::from_chunks_iter(bufs).inner,
916        };
917    }
918
919    /// Returns a reference to the single contiguous buffer, if present.
920    ///
921    /// Returns `Some` only when all remaining data is in one contiguous buffer.
922    pub const fn as_single(&self) -> Option<&IoBuf> {
923        match &self.inner {
924            IoBufsInner::Single(buf) => Some(buf),
925            _ => None,
926        }
927    }
928
929    /// Consume this container and return the single buffer if present.
930    ///
931    /// Returns `Ok(IoBuf)` only when all remaining data is already contained in
932    /// a single chunk. Returns `Err(Self)` with the original container
933    /// otherwise.
934    pub fn try_into_single(self) -> Result<IoBuf, Self> {
935        match self.inner {
936            IoBufsInner::Single(buf) => Ok(buf),
937            inner => Err(Self { inner }),
938        }
939    }
940
941    /// Number of bytes remaining across all buffers.
942    #[inline]
943    pub fn len(&self) -> usize {
944        self.remaining()
945    }
946
947    /// Number of non-empty readable chunks.
948    #[inline]
949    pub fn chunk_count(&self) -> usize {
950        // This assumes canonical form.
951        match &self.inner {
952            IoBufsInner::Single(buf) => {
953                if buf.is_empty() {
954                    0
955                } else {
956                    1
957                }
958            }
959            IoBufsInner::Pair(_) => 2,
960            IoBufsInner::Triple(_) => 3,
961            IoBufsInner::Chunked(bufs) => bufs.len(),
962        }
963    }
964
965    /// Whether all buffers are empty.
966    #[inline]
967    pub fn is_empty(&self) -> bool {
968        self.remaining() == 0
969    }
970
971    /// Whether this contains a single contiguous buffer.
972    ///
973    /// When true, `chunk()` returns all remaining bytes.
974    #[inline]
975    pub const fn is_single(&self) -> bool {
976        matches!(self.inner, IoBufsInner::Single(_))
977    }
978
979    /// Visit each readable chunk in order without coalescing.
980    #[inline]
981    pub fn for_each_chunk(&self, mut f: impl FnMut(&[u8])) {
982        match &self.inner {
983            IoBufsInner::Single(buf) => {
984                let chunk = buf.as_ref();
985                if !chunk.is_empty() {
986                    f(chunk);
987                }
988            }
989            IoBufsInner::Pair(pair) => {
990                for buf in pair {
991                    let chunk = buf.as_ref();
992                    if !chunk.is_empty() {
993                        f(chunk);
994                    }
995                }
996            }
997            IoBufsInner::Triple(triple) => {
998                for buf in triple {
999                    let chunk = buf.as_ref();
1000                    if !chunk.is_empty() {
1001                        f(chunk);
1002                    }
1003                }
1004            }
1005            IoBufsInner::Chunked(bufs) => {
1006                for buf in bufs {
1007                    let chunk = buf.as_ref();
1008                    if !chunk.is_empty() {
1009                        f(chunk);
1010                    }
1011                }
1012            }
1013        }
1014    }
1015
1016    /// Prepend a buffer to the front.
1017    ///
1018    /// Empty input buffers are ignored.
1019    pub fn prepend(&mut self, buf: IoBuf) {
1020        if buf.is_empty() {
1021            return;
1022        }
1023        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
1024        self.inner = match inner {
1025            IoBufsInner::Single(existing) if existing.is_empty() => IoBufsInner::Single(buf),
1026            IoBufsInner::Single(existing) => IoBufsInner::Pair([buf, existing]),
1027            IoBufsInner::Pair([a, b]) => IoBufsInner::Triple([buf, a, b]),
1028            IoBufsInner::Triple([a, b, c]) => {
1029                let mut bufs = VecDeque::with_capacity(4);
1030                bufs.push_back(buf);
1031                bufs.push_back(a);
1032                bufs.push_back(b);
1033                bufs.push_back(c);
1034                IoBufsInner::Chunked(bufs)
1035            }
1036            IoBufsInner::Chunked(mut bufs) => {
1037                bufs.push_front(buf);
1038                IoBufsInner::Chunked(bufs)
1039            }
1040        };
1041    }
1042
1043    /// Append a buffer to the back.
1044    ///
1045    /// Empty input buffers are ignored.
1046    pub fn append(&mut self, buf: IoBuf) {
1047        if buf.is_empty() {
1048            return;
1049        }
1050        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
1051        self.inner = match inner {
1052            IoBufsInner::Single(existing) if existing.is_empty() => IoBufsInner::Single(buf),
1053            IoBufsInner::Single(existing) => IoBufsInner::Pair([existing, buf]),
1054            IoBufsInner::Pair([a, b]) => IoBufsInner::Triple([a, b, buf]),
1055            IoBufsInner::Triple([a, b, c]) => {
1056                let mut bufs = VecDeque::with_capacity(4);
1057                bufs.push_back(a);
1058                bufs.push_back(b);
1059                bufs.push_back(c);
1060                bufs.push_back(buf);
1061                IoBufsInner::Chunked(bufs)
1062            }
1063            IoBufsInner::Chunked(mut bufs) => {
1064                bufs.push_back(buf);
1065                IoBufsInner::Chunked(bufs)
1066            }
1067        };
1068    }
1069
1070    /// Splits the buffer(s) into two at the given index.
1071    ///
1072    /// Afterwards `self` contains bytes `[at, len)`, and the returned
1073    /// [`IoBufs`] contains bytes `[0, at)`.
1074    ///
1075    /// Whole chunks are moved without copying. If the split point lands inside
1076    /// a chunk, the chunk is split zero-copy via [`IoBuf::split_to`].
1077    ///
1078    /// # Panics
1079    ///
1080    /// Panics if `at > len`.
1081    pub fn split_to(&mut self, at: usize) -> Self {
1082        if at == 0 {
1083            return Self::default();
1084        }
1085
1086        let remaining = self.remaining();
1087        assert!(
1088            at <= remaining,
1089            "split_to out of bounds: {:?} <= {:?}",
1090            at,
1091            remaining,
1092        );
1093
1094        if at == remaining {
1095            return std::mem::take(self);
1096        }
1097
1098        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
1099        match inner {
1100            IoBufsInner::Single(mut buf) => {
1101                // Delegate directly and keep remainder as single
1102                let prefix = buf.split_to(at);
1103                self.inner = IoBufsInner::Single(buf);
1104                Self::from(prefix)
1105            }
1106            IoBufsInner::Pair([mut a, mut b]) => {
1107                let a_len = a.remaining();
1108                if at < a_len {
1109                    // Split stays entirely in chunk `a`.
1110                    let prefix = a.split_to(at);
1111                    self.inner = IoBufsInner::Pair([a, b]);
1112                    return Self::from(prefix);
1113                }
1114                if at == a_len {
1115                    // Exact chunk boundary: move `a` out, keep `b`.
1116                    self.inner = IoBufsInner::Single(b);
1117                    return Self::from(a);
1118                }
1119
1120                // Split crosses from `a` into `b`.
1121                let b_prefix_len = at - a_len;
1122                let b_prefix = b.split_to(b_prefix_len);
1123                self.inner = IoBufsInner::Single(b);
1124                Self {
1125                    inner: IoBufsInner::Pair([a, b_prefix]),
1126                }
1127            }
1128            IoBufsInner::Triple([mut a, mut b, mut c]) => {
1129                let a_len = a.remaining();
1130                if at < a_len {
1131                    // Split stays entirely in chunk `a`.
1132                    let prefix = a.split_to(at);
1133                    self.inner = IoBufsInner::Triple([a, b, c]);
1134                    return Self::from(prefix);
1135                }
1136                if at == a_len {
1137                    // Exact boundary after `a`.
1138                    self.inner = IoBufsInner::Pair([b, c]);
1139                    return Self::from(a);
1140                }
1141
1142                let mut remaining = at - a_len;
1143                let b_len = b.remaining();
1144                if remaining < b_len {
1145                    // Split lands inside `b`.
1146                    let b_prefix = b.split_to(remaining);
1147                    self.inner = IoBufsInner::Pair([b, c]);
1148                    return Self {
1149                        inner: IoBufsInner::Pair([a, b_prefix]),
1150                    };
1151                }
1152                if remaining == b_len {
1153                    // Exact boundary after `b`.
1154                    self.inner = IoBufsInner::Single(c);
1155                    return Self {
1156                        inner: IoBufsInner::Pair([a, b]),
1157                    };
1158                }
1159
1160                // Split reaches into `c`.
1161                remaining -= b_len;
1162                let c_prefix = c.split_to(remaining);
1163                self.inner = IoBufsInner::Single(c);
1164                Self {
1165                    inner: IoBufsInner::Triple([a, b, c_prefix]),
1166                }
1167            }
1168            IoBufsInner::Chunked(mut bufs) => {
1169                let mut remaining = at;
1170                let mut out = VecDeque::new();
1171
1172                while remaining > 0 {
1173                    let mut front = bufs.pop_front().expect("split_to out of bounds");
1174                    let avail = front.remaining();
1175                    if avail == 0 {
1176                        // Canonical chunked state should not contain empties.
1177                        continue;
1178                    }
1179                    if remaining < avail {
1180                        // Split inside this chunk: keep suffix in `self`, move prefix to output.
1181                        let prefix = front.split_to(remaining);
1182                        out.push_back(prefix);
1183                        bufs.push_front(front);
1184                        break;
1185                    }
1186
1187                    // Consume this full chunk into the output prefix.
1188                    out.push_back(front);
1189                    remaining -= avail;
1190                }
1191
1192                self.inner = if bufs.len() >= 4 {
1193                    IoBufsInner::Chunked(bufs)
1194                } else {
1195                    Self::from_chunks_iter(bufs).inner
1196                };
1197
1198                if out.len() >= 4 {
1199                    Self {
1200                        inner: IoBufsInner::Chunked(out),
1201                    }
1202                } else {
1203                    Self::from_chunks_iter(out)
1204                }
1205            }
1206        }
1207    }
1208
1209    /// Coalesce all remaining bytes into a single contiguous [`IoBuf`].
1210    ///
1211    /// Zero-copy if only one buffer. Copies if multiple buffers.
1212    #[inline]
1213    pub fn coalesce(mut self) -> IoBuf {
1214        match self.inner {
1215            IoBufsInner::Single(buf) => buf,
1216            _ => self.copy_to_bytes(self.remaining()).into(),
1217        }
1218    }
1219
1220    /// Coalesce all remaining bytes into a single contiguous [`IoBuf`], using the pool
1221    /// for allocation if multiple buffers need to be merged.
1222    ///
1223    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
1224    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBuf {
1225        match self.inner {
1226            IoBufsInner::Single(buf) => buf,
1227            IoBufsInner::Pair([a, b]) => {
1228                let total_len = a.remaining().saturating_add(b.remaining());
1229                let mut result = pool.alloc(total_len);
1230                result.put_slice(a.as_ref());
1231                result.put_slice(b.as_ref());
1232                result.freeze()
1233            }
1234            IoBufsInner::Triple([a, b, c]) => {
1235                let total_len = a
1236                    .remaining()
1237                    .saturating_add(b.remaining())
1238                    .saturating_add(c.remaining());
1239                let mut result = pool.alloc(total_len);
1240                result.put_slice(a.as_ref());
1241                result.put_slice(b.as_ref());
1242                result.put_slice(c.as_ref());
1243                result.freeze()
1244            }
1245            IoBufsInner::Chunked(bufs) => {
1246                let total_len: usize = bufs
1247                    .iter()
1248                    .map(|b| b.remaining())
1249                    .fold(0, usize::saturating_add);
1250                let mut result = pool.alloc(total_len);
1251                for buf in bufs {
1252                    result.put_slice(buf.as_ref());
1253                }
1254                result.freeze()
1255            }
1256        }
1257    }
1258}
1259
1260impl Buf for IoBufs {
1261    fn remaining(&self) -> usize {
1262        match &self.inner {
1263            IoBufsInner::Single(buf) => buf.remaining(),
1264            IoBufsInner::Pair([a, b]) => a.remaining().saturating_add(b.remaining()),
1265            IoBufsInner::Triple([a, b, c]) => a
1266                .remaining()
1267                .saturating_add(b.remaining())
1268                .saturating_add(c.remaining()),
1269            IoBufsInner::Chunked(bufs) => bufs
1270                .iter()
1271                .map(|b| b.remaining())
1272                .fold(0, usize::saturating_add),
1273        }
1274    }
1275
1276    fn chunk(&self) -> &[u8] {
1277        match &self.inner {
1278            IoBufsInner::Single(buf) => buf.chunk(),
1279            IoBufsInner::Pair([a, b]) => {
1280                if a.remaining() > 0 {
1281                    a.chunk()
1282                } else if b.remaining() > 0 {
1283                    b.chunk()
1284                } else {
1285                    &[]
1286                }
1287            }
1288            IoBufsInner::Triple([a, b, c]) => {
1289                if a.remaining() > 0 {
1290                    a.chunk()
1291                } else if b.remaining() > 0 {
1292                    b.chunk()
1293                } else if c.remaining() > 0 {
1294                    c.chunk()
1295                } else {
1296                    &[]
1297                }
1298            }
1299            IoBufsInner::Chunked(bufs) => {
1300                for buf in bufs.iter() {
1301                    if buf.remaining() > 0 {
1302                        return buf.chunk();
1303                    }
1304                }
1305                &[]
1306            }
1307        }
1308    }
1309
1310    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
1311        if dst.is_empty() {
1312            return 0;
1313        }
1314
1315        match &self.inner {
1316            IoBufsInner::Single(buf) => {
1317                let chunk = buf.chunk();
1318                if !chunk.is_empty() {
1319                    dst[0] = IoSlice::new(chunk);
1320                    return 1;
1321                }
1322                0
1323            }
1324            IoBufsInner::Pair([a, b]) => fill_vectored_from_chunks(dst, [a.chunk(), b.chunk()]),
1325            IoBufsInner::Triple([a, b, c]) => {
1326                fill_vectored_from_chunks(dst, [a.chunk(), b.chunk(), c.chunk()])
1327            }
1328            IoBufsInner::Chunked(bufs) => {
1329                fill_vectored_from_chunks(dst, bufs.iter().map(|buf| buf.chunk()))
1330            }
1331        }
1332    }
1333
1334    fn advance(&mut self, cnt: usize) {
1335        let should_canonicalize = match &mut self.inner {
1336            IoBufsInner::Single(buf) => {
1337                buf.advance(cnt);
1338                false
1339            }
1340            IoBufsInner::Pair(pair) => advance_small_chunks(pair.as_mut_slice(), cnt),
1341            IoBufsInner::Triple(triple) => advance_small_chunks(triple.as_mut_slice(), cnt),
1342            IoBufsInner::Chunked(bufs) => {
1343                advance_chunked_front(bufs, cnt);
1344                bufs.len() <= 3
1345            }
1346        };
1347
1348        if should_canonicalize {
1349            self.canonicalize();
1350        }
1351    }
1352
1353    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
1354        let (result, needs_canonicalize) = match &mut self.inner {
1355            IoBufsInner::Single(buf) => return buf.copy_to_bytes(len),
1356            IoBufsInner::Pair(pair) => {
1357                copy_to_bytes_small_chunks(pair, len, "IoBufs::copy_to_bytes: not enough data")
1358            }
1359            IoBufsInner::Triple(triple) => {
1360                copy_to_bytes_small_chunks(triple, len, "IoBufs::copy_to_bytes: not enough data")
1361            }
1362            IoBufsInner::Chunked(bufs) => {
1363                copy_to_bytes_chunked(bufs, len, "IoBufs::copy_to_bytes: not enough data")
1364            }
1365        };
1366
1367        if needs_canonicalize {
1368            self.canonicalize();
1369        }
1370
1371        result
1372    }
1373}
1374
1375impl From<IoBuf> for IoBufs {
1376    fn from(buf: IoBuf) -> Self {
1377        Self {
1378            inner: IoBufsInner::Single(buf),
1379        }
1380    }
1381}
1382
1383impl From<IoBufMut> for IoBufs {
1384    fn from(buf: IoBufMut) -> Self {
1385        Self {
1386            inner: IoBufsInner::Single(buf.freeze()),
1387        }
1388    }
1389}
1390
1391impl From<Bytes> for IoBufs {
1392    fn from(bytes: Bytes) -> Self {
1393        Self::from(IoBuf::from(bytes))
1394    }
1395}
1396
1397impl From<BytesMut> for IoBufs {
1398    fn from(bytes: BytesMut) -> Self {
1399        Self::from(IoBuf::from(bytes.freeze()))
1400    }
1401}
1402
1403impl From<Vec<u8>> for IoBufs {
1404    fn from(vec: Vec<u8>) -> Self {
1405        Self::from(IoBuf::from(vec))
1406    }
1407}
1408
1409impl From<Vec<IoBuf>> for IoBufs {
1410    fn from(bufs: Vec<IoBuf>) -> Self {
1411        Self::from_chunks_iter(bufs)
1412    }
1413}
1414
1415impl<const N: usize> From<&'static [u8; N]> for IoBufs {
1416    fn from(array: &'static [u8; N]) -> Self {
1417        Self::from(IoBuf::from(array))
1418    }
1419}
1420
1421impl From<&'static [u8]> for IoBufs {
1422    fn from(slice: &'static [u8]) -> Self {
1423        Self::from(IoBuf::from(slice))
1424    }
1425}
1426
1427/// Container for one or more mutable buffers.
1428#[derive(Debug)]
1429pub struct IoBufsMut {
1430    inner: IoBufsMutInner,
1431}
1432
1433/// Internal mutable representation.
1434///
1435/// - Construction from caller-provided writable chunks keeps chunks with
1436///   non-zero capacity, even when `remaining() == 0`.
1437/// - Read-canonicalization paths remove drained chunks (`remaining() == 0`)
1438///   and collapse shape as readable chunk count shrinks.
1439#[derive(Debug)]
1440enum IoBufsMutInner {
1441    /// Single buffer (common case, no allocation).
1442    Single(IoBufMut),
1443    /// Two buffers (fast path, no VecDeque allocation).
1444    Pair([IoBufMut; 2]),
1445    /// Three buffers (fast path, no VecDeque allocation).
1446    Triple([IoBufMut; 3]),
1447    /// Four or more buffers.
1448    Chunked(VecDeque<IoBufMut>),
1449}
1450
1451impl Default for IoBufsMut {
1452    fn default() -> Self {
1453        Self {
1454            inner: IoBufsMutInner::Single(IoBufMut::default()),
1455        }
1456    }
1457}
1458
1459impl IoBufsMut {
1460    /// Build mutable chunk storage from already-filtered chunks.
1461    ///
1462    /// This helper intentionally does not filter.
1463    /// Callers choose filter policy first:
1464    /// - [`Self::from_writable_chunks_iter`] for construction from writable chunks (`capacity() > 0`)
1465    /// - [`Self::from_readable_chunks_iter`] for read-canonicalization (`remaining() > 0`)
1466    fn from_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1467        let mut iter = chunks.into_iter();
1468        let first = match iter.next() {
1469            Some(first) => first,
1470            None => return Self::default(),
1471        };
1472        let second = match iter.next() {
1473            Some(second) => second,
1474            None => {
1475                return Self {
1476                    inner: IoBufsMutInner::Single(first),
1477                };
1478            }
1479        };
1480        let third = match iter.next() {
1481            Some(third) => third,
1482            None => {
1483                return Self {
1484                    inner: IoBufsMutInner::Pair([first, second]),
1485                };
1486            }
1487        };
1488        let fourth = match iter.next() {
1489            Some(fourth) => fourth,
1490            None => {
1491                return Self {
1492                    inner: IoBufsMutInner::Triple([first, second, third]),
1493                };
1494            }
1495        };
1496
1497        let mut bufs = VecDeque::with_capacity(4);
1498        bufs.push_back(first);
1499        bufs.push_back(second);
1500        bufs.push_back(third);
1501        bufs.push_back(fourth);
1502        bufs.extend(iter);
1503        Self {
1504            inner: IoBufsMutInner::Chunked(bufs),
1505        }
1506    }
1507
1508    /// Build canonical mutable chunk storage from writable chunks.
1509    ///
1510    /// Chunks with zero capacity are removed.
1511    fn from_writable_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1512        // Keep chunks that can hold data (including len == 0 writable buffers).
1513        Self::from_chunks_iter(chunks.into_iter().filter(|buf| buf.capacity() > 0))
1514    }
1515
1516    /// Build canonical mutable chunk storage from readable chunks.
1517    ///
1518    /// Chunks with no remaining readable bytes are removed.
1519    fn from_readable_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1520        Self::from_chunks_iter(chunks.into_iter().filter(|buf| buf.remaining() > 0))
1521    }
1522
1523    /// Re-establish canonical mutable representation invariants.
1524    fn canonicalize(&mut self) {
1525        let inner = std::mem::replace(&mut self.inner, IoBufsMutInner::Single(IoBufMut::default()));
1526        self.inner = match inner {
1527            IoBufsMutInner::Single(buf) => IoBufsMutInner::Single(buf),
1528            IoBufsMutInner::Pair([a, b]) => Self::from_readable_chunks_iter([a, b]).inner,
1529            IoBufsMutInner::Triple([a, b, c]) => Self::from_readable_chunks_iter([a, b, c]).inner,
1530            IoBufsMutInner::Chunked(bufs) => Self::from_readable_chunks_iter(bufs).inner,
1531        };
1532    }
1533
1534    #[inline]
1535    fn for_each_chunk_mut(&mut self, mut f: impl FnMut(&mut IoBufMut)) {
1536        match &mut self.inner {
1537            IoBufsMutInner::Single(buf) => f(buf),
1538            IoBufsMutInner::Pair(pair) => {
1539                for buf in pair.iter_mut() {
1540                    f(buf);
1541                }
1542            }
1543            IoBufsMutInner::Triple(triple) => {
1544                for buf in triple.iter_mut() {
1545                    f(buf);
1546                }
1547            }
1548            IoBufsMutInner::Chunked(bufs) => {
1549                for buf in bufs.iter_mut() {
1550                    f(buf);
1551                }
1552            }
1553        }
1554    }
1555
1556    /// Returns a reference to the single contiguous buffer, if present.
1557    ///
1558    /// Returns `Some` only when this is currently represented as one chunk.
1559    pub const fn as_single(&self) -> Option<&IoBufMut> {
1560        match &self.inner {
1561            IoBufsMutInner::Single(buf) => Some(buf),
1562            _ => None,
1563        }
1564    }
1565
1566    /// Returns a mutable reference to the single contiguous buffer, if present.
1567    ///
1568    /// Returns `Some` only when this is currently represented as one chunk.
1569    pub const fn as_single_mut(&mut self) -> Option<&mut IoBufMut> {
1570        match &mut self.inner {
1571            IoBufsMutInner::Single(buf) => Some(buf),
1572            _ => None,
1573        }
1574    }
1575
1576    /// Consume this container and return the single buffer if present.
1577    ///
1578    /// Returns `Ok(IoBufMut)` only when readable data is represented as one
1579    /// chunk. Returns `Err(Self)` with the original container otherwise.
1580    #[allow(clippy::result_large_err)]
1581    pub fn try_into_single(self) -> Result<IoBufMut, Self> {
1582        match self.inner {
1583            IoBufsMutInner::Single(buf) => Ok(buf),
1584            inner => Err(Self { inner }),
1585        }
1586    }
1587
1588    /// Number of bytes remaining across all buffers.
1589    #[inline]
1590    pub fn len(&self) -> usize {
1591        self.remaining()
1592    }
1593
1594    /// Whether all buffers are empty.
1595    #[inline]
1596    pub fn is_empty(&self) -> bool {
1597        self.remaining() == 0
1598    }
1599
1600    /// Whether this contains a single contiguous buffer.
1601    ///
1602    /// When true, `chunk()` returns all remaining bytes.
1603    #[inline]
1604    pub const fn is_single(&self) -> bool {
1605        matches!(self.inner, IoBufsMutInner::Single(_))
1606    }
1607
1608    /// Freeze into immutable [`IoBufs`].
1609    pub fn freeze(self) -> IoBufs {
1610        match self.inner {
1611            IoBufsMutInner::Single(buf) => IoBufs::from(buf.freeze()),
1612            IoBufsMutInner::Pair([a, b]) => IoBufs::from_chunks_iter([a.freeze(), b.freeze()]),
1613            IoBufsMutInner::Triple([a, b, c]) => {
1614                IoBufs::from_chunks_iter([a.freeze(), b.freeze(), c.freeze()])
1615            }
1616            IoBufsMutInner::Chunked(bufs) => {
1617                IoBufs::from_chunks_iter(bufs.into_iter().map(IoBufMut::freeze))
1618            }
1619        }
1620    }
1621
1622    fn coalesce_with<F>(self, allocate: F) -> IoBufMut
1623    where
1624        F: FnOnce(usize) -> IoBufMut,
1625    {
1626        match self.inner {
1627            IoBufsMutInner::Single(buf) => buf,
1628            IoBufsMutInner::Pair([a, b]) => {
1629                let total_len = a.len().saturating_add(b.len());
1630                let mut result = allocate(total_len);
1631                result.put_slice(a.as_ref());
1632                result.put_slice(b.as_ref());
1633                result
1634            }
1635            IoBufsMutInner::Triple([a, b, c]) => {
1636                let total_len = a.len().saturating_add(b.len()).saturating_add(c.len());
1637                let mut result = allocate(total_len);
1638                result.put_slice(a.as_ref());
1639                result.put_slice(b.as_ref());
1640                result.put_slice(c.as_ref());
1641                result
1642            }
1643            IoBufsMutInner::Chunked(bufs) => {
1644                let total_len: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
1645                let mut result = allocate(total_len);
1646                for buf in bufs {
1647                    result.put_slice(buf.as_ref());
1648                }
1649                result
1650            }
1651        }
1652    }
1653
1654    /// Coalesce all buffers into a single contiguous [`IoBufMut`].
1655    ///
1656    /// Zero-copy if only one buffer. Copies if multiple buffers.
1657    pub fn coalesce(self) -> IoBufMut {
1658        self.coalesce_with(IoBufMut::with_capacity)
1659    }
1660
1661    /// Coalesce all buffers into a single contiguous [`IoBufMut`], using the pool
1662    /// for allocation if multiple buffers need to be merged.
1663    ///
1664    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
1665    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBufMut {
1666        self.coalesce_with(|len| pool.alloc(len))
1667    }
1668
1669    /// Coalesce all buffers into a single contiguous [`IoBufMut`] with extra
1670    /// capacity, using the pool for allocation.
1671    ///
1672    /// Zero-copy if single buffer with sufficient spare capacity.
1673    pub fn coalesce_with_pool_extra(self, pool: &BufferPool, extra: usize) -> IoBufMut {
1674        match self.inner {
1675            IoBufsMutInner::Single(buf) if buf.capacity() - buf.len() >= extra => buf,
1676            IoBufsMutInner::Single(buf) => {
1677                let mut result = pool.alloc(buf.len() + extra);
1678                result.put_slice(buf.as_ref());
1679                result
1680            }
1681            IoBufsMutInner::Pair([a, b]) => {
1682                let total = a.len().saturating_add(b.len());
1683                let mut result = pool.alloc(total + extra);
1684                result.put_slice(a.as_ref());
1685                result.put_slice(b.as_ref());
1686                result
1687            }
1688            IoBufsMutInner::Triple([a, b, c]) => {
1689                let total = a.len().saturating_add(b.len()).saturating_add(c.len());
1690                let mut result = pool.alloc(total + extra);
1691                result.put_slice(a.as_ref());
1692                result.put_slice(b.as_ref());
1693                result.put_slice(c.as_ref());
1694                result
1695            }
1696            IoBufsMutInner::Chunked(bufs) => {
1697                let total: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
1698                let mut result = pool.alloc(total + extra);
1699                for buf in bufs {
1700                    result.put_slice(buf.as_ref());
1701                }
1702                result
1703            }
1704        }
1705    }
1706
1707    /// Returns the total capacity across all buffers.
1708    pub fn capacity(&self) -> usize {
1709        match &self.inner {
1710            IoBufsMutInner::Single(buf) => buf.capacity(),
1711            IoBufsMutInner::Pair([a, b]) => a.capacity().saturating_add(b.capacity()),
1712            IoBufsMutInner::Triple([a, b, c]) => a
1713                .capacity()
1714                .saturating_add(b.capacity())
1715                .saturating_add(c.capacity()),
1716            IoBufsMutInner::Chunked(bufs) => bufs
1717                .iter()
1718                .map(|b| b.capacity())
1719                .fold(0, usize::saturating_add),
1720        }
1721    }
1722
1723    /// Sets the length of the buffer(s) to `len`, distributing across chunks
1724    /// while preserving the current chunk layout.
1725    ///
1726    /// This is useful for APIs that must fill caller-provided buffer structure
1727    /// in place (for example [`Blob::read_at_buf`](crate::Blob::read_at_buf)).
1728    ///
1729    /// # Safety
1730    ///
1731    /// Caller must initialize all `len` bytes before the buffer is read.
1732    ///
1733    /// # Panics
1734    ///
1735    /// Panics if `len` exceeds total capacity.
1736    pub(crate) unsafe fn set_len(&mut self, len: usize) {
1737        let capacity = self.capacity();
1738        assert!(
1739            len <= capacity,
1740            "set_len({len}) exceeds capacity({capacity})"
1741        );
1742        let mut remaining = len;
1743        self.for_each_chunk_mut(|buf| {
1744            let cap = buf.capacity();
1745            let to_set = remaining.min(cap);
1746            buf.set_len(to_set);
1747            remaining -= to_set;
1748        });
1749    }
1750
1751    /// Copy data from a slice into the buffers.
1752    ///
1753    /// Panics if the slice length doesn't match the total buffer length.
1754    pub fn copy_from_slice(&mut self, src: &[u8]) {
1755        assert_eq!(
1756            src.len(),
1757            self.len(),
1758            "source slice length must match buffer length"
1759        );
1760        let mut offset = 0;
1761        self.for_each_chunk_mut(|buf| {
1762            let len = buf.len();
1763            buf.as_mut().copy_from_slice(&src[offset..offset + len]);
1764            offset += len;
1765        });
1766    }
1767}
1768
1769impl Buf for IoBufsMut {
1770    fn remaining(&self) -> usize {
1771        match &self.inner {
1772            IoBufsMutInner::Single(buf) => buf.remaining(),
1773            IoBufsMutInner::Pair([a, b]) => a.remaining().saturating_add(b.remaining()),
1774            IoBufsMutInner::Triple([a, b, c]) => a
1775                .remaining()
1776                .saturating_add(b.remaining())
1777                .saturating_add(c.remaining()),
1778            IoBufsMutInner::Chunked(bufs) => bufs
1779                .iter()
1780                .map(|b| b.remaining())
1781                .fold(0, usize::saturating_add),
1782        }
1783    }
1784
1785    fn chunk(&self) -> &[u8] {
1786        match &self.inner {
1787            IoBufsMutInner::Single(buf) => buf.chunk(),
1788            IoBufsMutInner::Pair([a, b]) => {
1789                if a.remaining() > 0 {
1790                    a.chunk()
1791                } else if b.remaining() > 0 {
1792                    b.chunk()
1793                } else {
1794                    &[]
1795                }
1796            }
1797            IoBufsMutInner::Triple([a, b, c]) => {
1798                if a.remaining() > 0 {
1799                    a.chunk()
1800                } else if b.remaining() > 0 {
1801                    b.chunk()
1802                } else if c.remaining() > 0 {
1803                    c.chunk()
1804                } else {
1805                    &[]
1806                }
1807            }
1808            IoBufsMutInner::Chunked(bufs) => {
1809                for buf in bufs.iter() {
1810                    if buf.remaining() > 0 {
1811                        return buf.chunk();
1812                    }
1813                }
1814                &[]
1815            }
1816        }
1817    }
1818
1819    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
1820        if dst.is_empty() {
1821            return 0;
1822        }
1823
1824        match &self.inner {
1825            IoBufsMutInner::Single(buf) => {
1826                let chunk = buf.chunk();
1827                if !chunk.is_empty() {
1828                    dst[0] = IoSlice::new(chunk);
1829                    return 1;
1830                }
1831                0
1832            }
1833            IoBufsMutInner::Pair([a, b]) => fill_vectored_from_chunks(dst, [a.chunk(), b.chunk()]),
1834            IoBufsMutInner::Triple([a, b, c]) => {
1835                fill_vectored_from_chunks(dst, [a.chunk(), b.chunk(), c.chunk()])
1836            }
1837            IoBufsMutInner::Chunked(bufs) => {
1838                fill_vectored_from_chunks(dst, bufs.iter().map(|buf| buf.chunk()))
1839            }
1840        }
1841    }
1842
1843    fn advance(&mut self, cnt: usize) {
1844        let should_canonicalize = match &mut self.inner {
1845            IoBufsMutInner::Single(buf) => {
1846                buf.advance(cnt);
1847                false
1848            }
1849            IoBufsMutInner::Pair(pair) => advance_small_chunks(pair.as_mut_slice(), cnt),
1850            IoBufsMutInner::Triple(triple) => advance_small_chunks(triple.as_mut_slice(), cnt),
1851            IoBufsMutInner::Chunked(bufs) => {
1852                advance_chunked_front(bufs, cnt);
1853                bufs.len() <= 3
1854            }
1855        };
1856
1857        if should_canonicalize {
1858            self.canonicalize();
1859        }
1860    }
1861
1862    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
1863        let (result, needs_canonicalize) = match &mut self.inner {
1864            IoBufsMutInner::Single(buf) => return buf.copy_to_bytes(len),
1865            IoBufsMutInner::Pair(pair) => {
1866                copy_to_bytes_small_chunks(pair, len, "IoBufsMut::copy_to_bytes: not enough data")
1867            }
1868            IoBufsMutInner::Triple(triple) => {
1869                copy_to_bytes_small_chunks(triple, len, "IoBufsMut::copy_to_bytes: not enough data")
1870            }
1871            IoBufsMutInner::Chunked(bufs) => {
1872                copy_to_bytes_chunked(bufs, len, "IoBufsMut::copy_to_bytes: not enough data")
1873            }
1874        };
1875
1876        if needs_canonicalize {
1877            self.canonicalize();
1878        }
1879
1880        result
1881    }
1882}
1883
1884// SAFETY: Delegates to IoBufMut which implements BufMut safely.
1885unsafe impl BufMut for IoBufsMut {
1886    #[inline]
1887    fn remaining_mut(&self) -> usize {
1888        match &self.inner {
1889            IoBufsMutInner::Single(buf) => buf.remaining_mut(),
1890            IoBufsMutInner::Pair([a, b]) => a.remaining_mut().saturating_add(b.remaining_mut()),
1891            IoBufsMutInner::Triple([a, b, c]) => a
1892                .remaining_mut()
1893                .saturating_add(b.remaining_mut())
1894                .saturating_add(c.remaining_mut()),
1895            IoBufsMutInner::Chunked(bufs) => bufs
1896                .iter()
1897                .map(|b| b.remaining_mut())
1898                .fold(0, usize::saturating_add),
1899        }
1900    }
1901
1902    #[inline]
1903    unsafe fn advance_mut(&mut self, cnt: usize) {
1904        match &mut self.inner {
1905            IoBufsMutInner::Single(buf) => buf.advance_mut(cnt),
1906            IoBufsMutInner::Pair(pair) => {
1907                let mut remaining = cnt;
1908                if advance_mut_in_chunks(pair, &mut remaining) {
1909                    return;
1910                }
1911                panic!("cannot advance past end of buffer");
1912            }
1913            IoBufsMutInner::Triple(triple) => {
1914                let mut remaining = cnt;
1915                if advance_mut_in_chunks(triple, &mut remaining) {
1916                    return;
1917                }
1918                panic!("cannot advance past end of buffer");
1919            }
1920            IoBufsMutInner::Chunked(bufs) => {
1921                let mut remaining = cnt;
1922                let (first, second) = bufs.as_mut_slices();
1923                if advance_mut_in_chunks(first, &mut remaining)
1924                    || advance_mut_in_chunks(second, &mut remaining)
1925                {
1926                    return;
1927                }
1928                panic!("cannot advance past end of buffer");
1929            }
1930        }
1931    }
1932
1933    #[inline]
1934    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
1935        match &mut self.inner {
1936            IoBufsMutInner::Single(buf) => buf.chunk_mut(),
1937            IoBufsMutInner::Pair(pair) => {
1938                if pair[0].remaining_mut() > 0 {
1939                    pair[0].chunk_mut()
1940                } else if pair[1].remaining_mut() > 0 {
1941                    pair[1].chunk_mut()
1942                } else {
1943                    bytes::buf::UninitSlice::new(&mut [])
1944                }
1945            }
1946            IoBufsMutInner::Triple(triple) => {
1947                if triple[0].remaining_mut() > 0 {
1948                    triple[0].chunk_mut()
1949                } else if triple[1].remaining_mut() > 0 {
1950                    triple[1].chunk_mut()
1951                } else if triple[2].remaining_mut() > 0 {
1952                    triple[2].chunk_mut()
1953                } else {
1954                    bytes::buf::UninitSlice::new(&mut [])
1955                }
1956            }
1957            IoBufsMutInner::Chunked(bufs) => {
1958                for buf in bufs.iter_mut() {
1959                    if buf.remaining_mut() > 0 {
1960                        return buf.chunk_mut();
1961                    }
1962                }
1963                bytes::buf::UninitSlice::new(&mut [])
1964            }
1965        }
1966    }
1967}
1968
1969impl From<IoBufMut> for IoBufsMut {
1970    fn from(buf: IoBufMut) -> Self {
1971        Self {
1972            inner: IoBufsMutInner::Single(buf),
1973        }
1974    }
1975}
1976
1977impl From<Vec<u8>> for IoBufsMut {
1978    fn from(vec: Vec<u8>) -> Self {
1979        Self {
1980            inner: IoBufsMutInner::Single(IoBufMut::from(vec)),
1981        }
1982    }
1983}
1984
1985impl From<BytesMut> for IoBufsMut {
1986    fn from(bytes: BytesMut) -> Self {
1987        Self {
1988            inner: IoBufsMutInner::Single(IoBufMut::from(bytes)),
1989        }
1990    }
1991}
1992
1993impl From<Vec<IoBufMut>> for IoBufsMut {
1994    fn from(bufs: Vec<IoBufMut>) -> Self {
1995        Self::from_writable_chunks_iter(bufs)
1996    }
1997}
1998
1999impl<const N: usize> From<[u8; N]> for IoBufsMut {
2000    fn from(array: [u8; N]) -> Self {
2001        Self {
2002            inner: IoBufsMutInner::Single(IoBufMut::from(array)),
2003        }
2004    }
2005}
2006
2007/// Drain `len` readable bytes from a small fixed chunk array (`Pair`/`Triple`).
2008///
2009/// Returns drained bytes plus whether the caller should canonicalize afterward.
2010#[inline]
2011fn copy_to_bytes_small_chunks<B: Buf, const N: usize>(
2012    chunks: &mut [B; N],
2013    len: usize,
2014    not_enough_data_msg: &str,
2015) -> (Bytes, bool) {
2016    let total = chunks
2017        .iter()
2018        .map(|buf| buf.remaining())
2019        .fold(0, usize::saturating_add);
2020    assert!(total >= len, "{not_enough_data_msg}");
2021
2022    if chunks[0].remaining() >= len {
2023        let bytes = chunks[0].copy_to_bytes(len);
2024        return (bytes, chunks[0].remaining() == 0);
2025    }
2026
2027    let mut out = BytesMut::with_capacity(len);
2028    let mut remaining = len;
2029    for buf in chunks.iter_mut() {
2030        if remaining == 0 {
2031            break;
2032        }
2033        let to_copy = remaining.min(buf.remaining());
2034        out.extend_from_slice(&buf.chunk()[..to_copy]);
2035        buf.advance(to_copy);
2036        remaining -= to_copy;
2037    }
2038
2039    // Slow path always consumes past chunk 0, so canonicalization is required.
2040    (out.freeze(), true)
2041}
2042
2043/// Drain `len` readable bytes from a deque-backed chunk representation.
2044///
2045/// Returns drained bytes plus whether the caller should canonicalize afterward.
2046#[inline]
2047fn copy_to_bytes_chunked<B: Buf>(
2048    bufs: &mut VecDeque<B>,
2049    len: usize,
2050    not_enough_data_msg: &str,
2051) -> (Bytes, bool) {
2052    while bufs.front().is_some_and(|buf| buf.remaining() == 0) {
2053        bufs.pop_front();
2054    }
2055
2056    if bufs.front().is_none() {
2057        assert_eq!(len, 0, "{not_enough_data_msg}");
2058        return (Bytes::new(), false);
2059    }
2060
2061    if bufs.front().is_some_and(|front| front.remaining() >= len) {
2062        let front = bufs.front_mut().expect("front checked above");
2063        let bytes = front.copy_to_bytes(len);
2064        if front.remaining() == 0 {
2065            bufs.pop_front();
2066        }
2067        return (bytes, bufs.len() <= 3);
2068    }
2069
2070    let total = bufs
2071        .iter()
2072        .map(|buf| buf.remaining())
2073        .fold(0, usize::saturating_add);
2074    assert!(total >= len, "{not_enough_data_msg}");
2075
2076    let mut out = BytesMut::with_capacity(len);
2077    let mut remaining = len;
2078    while remaining > 0 {
2079        let front = bufs
2080            .front_mut()
2081            .expect("remaining > 0 implies non-empty bufs");
2082        let to_copy = remaining.min(front.remaining());
2083        out.extend_from_slice(&front.chunk()[..to_copy]);
2084        front.advance(to_copy);
2085        if front.remaining() == 0 {
2086            bufs.pop_front();
2087        }
2088        remaining -= to_copy;
2089    }
2090
2091    (out.freeze(), bufs.len() <= 3)
2092}
2093
2094/// Advance across a [`VecDeque`] of chunks by consuming from the front.
2095#[inline]
2096fn advance_chunked_front<B: Buf>(bufs: &mut VecDeque<B>, mut cnt: usize) {
2097    while cnt > 0 {
2098        let front = bufs.front_mut().expect("cannot advance past end of buffer");
2099        let avail = front.remaining();
2100        if avail == 0 {
2101            bufs.pop_front();
2102            continue;
2103        }
2104        if cnt < avail {
2105            front.advance(cnt);
2106            break;
2107        }
2108        front.advance(avail);
2109        bufs.pop_front();
2110        cnt -= avail;
2111    }
2112}
2113
2114/// Advance across a small fixed set of chunks (`Pair`/`Triple`).
2115///
2116/// Returns `true` when one or more chunks became (or were) empty, so callers
2117/// can canonicalize once after the operation.
2118#[inline]
2119fn advance_small_chunks<B: Buf>(chunks: &mut [B], mut cnt: usize) -> bool {
2120    let mut idx = 0;
2121    let mut needs_canonicalize = false;
2122
2123    while cnt > 0 {
2124        let chunk = chunks
2125            .get_mut(idx)
2126            .expect("cannot advance past end of buffer");
2127        let avail = chunk.remaining();
2128        if avail == 0 {
2129            idx += 1;
2130            needs_canonicalize = true;
2131            continue;
2132        }
2133        if cnt < avail {
2134            chunk.advance(cnt);
2135            return needs_canonicalize;
2136        }
2137        chunk.advance(avail);
2138        cnt -= avail;
2139        idx += 1;
2140        needs_canonicalize = true;
2141    }
2142
2143    needs_canonicalize
2144}
2145
2146/// Advance writable cursors across `chunks` by up to `*remaining` bytes.
2147///
2148/// Returns `true` when the full request has been satisfied.
2149///
2150/// # Safety
2151///
2152/// Forwards to [`BufMut::advance_mut`], so callers must ensure the advanced
2153/// region has been initialized according to [`BufMut`]'s contract.
2154#[inline]
2155unsafe fn advance_mut_in_chunks<B: BufMut>(chunks: &mut [B], remaining: &mut usize) -> bool {
2156    if *remaining == 0 {
2157        return true;
2158    }
2159
2160    for buf in chunks.iter_mut() {
2161        let avail = buf.chunk_mut().len();
2162        if avail == 0 {
2163            continue;
2164        }
2165        if *remaining <= avail {
2166            // SAFETY: Upheld by this function's safety contract.
2167            unsafe { buf.advance_mut(*remaining) };
2168            *remaining = 0;
2169            return true;
2170        }
2171        // SAFETY: Upheld by this function's safety contract.
2172        unsafe { buf.advance_mut(avail) };
2173        *remaining -= avail;
2174    }
2175    false
2176}
2177
2178/// Fill `dst` with `IoSlice`s built from `chunks`.
2179///
2180/// Empty chunks are skipped. At most `dst.len()` slices are written.
2181/// Returns the number of slices written.
2182#[inline]
2183fn fill_vectored_from_chunks<'a, I>(dst: &mut [IoSlice<'a>], chunks: I) -> usize
2184where
2185    I: IntoIterator<Item = &'a [u8]>,
2186{
2187    let mut written = 0;
2188    for chunk in chunks
2189        .into_iter()
2190        .filter(|chunk| !chunk.is_empty())
2191        .take(dst.len())
2192    {
2193        dst[written] = IoSlice::new(chunk);
2194        written += 1;
2195    }
2196    written
2197}
2198
2199/// Assembles [`IoBufs`] from a mix of inline writes and zero-copy pieces.
2200///
2201/// All inline writes go into a single pool-backed buffer. [`BufsMut::push`]
2202/// records boundaries without flushing. [`Builder::finish`] freezes the buffer
2203/// once and uses [`IoBuf::slice`] to carve it into pieces at the recorded
2204/// boundaries, interleaved with the pushed [`Bytes`].
2205///
2206/// The inline buffer has a fixed capacity set at construction and will not
2207/// grow. Callers must ensure the capacity accounts for all inline
2208/// (non-pushed) bytes that will be written. Exceeding it will panic.
2209///
2210/// ```text
2211/// builder.put_u16(99);                        // inline
2212/// builder.push(shard_payload.clone());        // zero-copy (Arc clone)
2213/// builder.put_u32(checksum);                  // inline
2214/// let output = builder.finish();
2215///
2216/// // output: [ 99 | --- 1 MB shard --- | checksum ]
2217/// //           pool    Arc clone          pool
2218/// //            \________________________/
2219/// //             slices of one allocation
2220/// ```
2221pub struct Builder {
2222    // Single working buffer for all inline writes.
2223    buf: IoBufMut,
2224    // Each entry is (offset_in_buf, pushed_bytes) recording where a push
2225    // interrupts the inline byte stream.
2226    pushes: Vec<(usize, Bytes)>,
2227}
2228
2229impl Builder {
2230    /// Creates a new builder with a fixed-capacity inline buffer.
2231    ///
2232    /// `capacity` is the minimum number of inline bytes the buffer can hold.
2233    /// The pool may round up to a larger size class. Writing more inline
2234    /// bytes than the allocated capacity will panic.
2235    pub fn new(pool: &BufferPool, capacity: NonZeroUsize) -> Self {
2236        Self {
2237            buf: pool.alloc(capacity.get()),
2238            pushes: Vec::new(),
2239        }
2240    }
2241
2242    /// Freezes the inline buffer and assembles [`IoBufs`] by slicing at
2243    /// the recorded push boundaries.
2244    pub fn finish(self) -> IoBufs {
2245        if self.pushes.is_empty() {
2246            return IoBufs::from(self.buf.freeze());
2247        }
2248
2249        let frozen = self.buf.freeze();
2250        let mut result = IoBufs::default();
2251        let mut pos = 0;
2252
2253        for (offset, pushed) in self.pushes {
2254            if offset > pos {
2255                result.append(frozen.slice(pos..offset));
2256            }
2257            result.append(IoBuf::from(pushed));
2258            pos = offset;
2259        }
2260
2261        if pos < frozen.len() {
2262            result.append(frozen.slice(pos..));
2263        }
2264
2265        result
2266    }
2267}
2268
2269// SAFETY: All methods delegate directly to `self.buf`, a pool-backed
2270// `IoBufMut` with a sound `BufMut` implementation. The inline buffer has
2271// fixed capacity; writes that exceed it will panic via the underlying
2272// `IoBufMut` implementation.
2273unsafe impl BufMut for Builder {
2274    #[inline]
2275    fn remaining_mut(&self) -> usize {
2276        self.buf.remaining_mut()
2277    }
2278
2279    #[inline]
2280    unsafe fn advance_mut(&mut self, cnt: usize) {
2281        self.buf.advance_mut(cnt);
2282    }
2283
2284    #[inline]
2285    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
2286        self.buf.chunk_mut()
2287    }
2288}
2289
2290impl BufsMut for Builder {
2291    fn push(&mut self, bytes: impl Into<Bytes>) {
2292        let bytes = bytes.into();
2293        if !bytes.is_empty() {
2294            self.pushes.push((self.buf.len(), bytes));
2295        }
2296    }
2297}
2298
2299/// Extension trait for encoding values into pooled I/O buffers.
2300///
2301/// This is useful for hot paths that need to avoid frequent heap allocations
2302/// when serializing values that implement [`Write`] and [`EncodeSize`].
2303pub trait EncodeExt: EncodeSize + Write {
2304    /// Encode this value into an [`IoBufMut`] allocated from `pool`.
2305    ///
2306    /// # Panics
2307    ///
2308    /// Panics if [`EncodeSize::encode_size`] does not match the number of
2309    /// bytes written by [`Write::write`].
2310    fn encode_with_pool_mut(&self, pool: &BufferPool) -> IoBufMut {
2311        let len = self.encode_size();
2312        let mut buf = pool.alloc(len);
2313        self.write(&mut buf);
2314        assert_eq!(
2315            buf.len(),
2316            len,
2317            "write() did not write expected bytes into pooled buffer"
2318        );
2319        buf
2320    }
2321
2322    /// Encode into [`IoBufs`] using pool allocation.
2323    ///
2324    /// Override [`Write::write_bufs`] to avoid copying large [`Bytes`] fields.
2325    ///
2326    /// # Panics
2327    ///
2328    /// Panics if [`EncodeSize::encode_inline_size`] underreports the number
2329    /// of inline bytes written by [`Write::write_bufs`], or if
2330    /// [`EncodeSize::encode_size`] does not match the total bytes written.
2331    fn encode_with_pool(&self, pool: &BufferPool) -> IoBufs {
2332        let len = self.encode_size();
2333        let capacity = NonZeroUsize::new(self.encode_inline_size()).unwrap_or(NonZeroUsize::MIN);
2334        let mut builder = Builder::new(pool, capacity);
2335        self.write_bufs(&mut builder);
2336        let bufs = builder.finish();
2337        assert_eq!(
2338            bufs.remaining(),
2339            len,
2340            "write_bufs() did not write expected bytes"
2341        );
2342        bufs
2343    }
2344}
2345
2346impl<T: EncodeSize + Write> EncodeExt for T {}
2347
2348#[cfg(test)]
2349mod tests {
2350    use super::*;
2351    use bytes::{Bytes, BytesMut};
2352    use commonware_codec::{types::lazy::Lazy, Decode, Encode, RangeCfg};
2353    use core::ops::{Range, RangeFrom, RangeInclusive, RangeToInclusive};
2354    use std::collections::{BTreeMap, HashMap};
2355
2356    fn test_pool() -> BufferPool {
2357        cfg_if::cfg_if! {
2358            if #[cfg(miri)] {
2359                // Reduce max_per_class to avoid slow atomics under miri.
2360                let pool_config = BufferPoolConfig {
2361                    pool_min_size: 0,
2362                    max_per_class: commonware_utils::NZUsize!(32),
2363                    ..BufferPoolConfig::for_network()
2364                };
2365            } else {
2366                let pool_config = BufferPoolConfig::for_network().with_pool_min_size(0);
2367            }
2368        }
2369        let mut registry = prometheus_client::registry::Registry::default();
2370        BufferPool::new(pool_config, &mut registry)
2371    }
2372
2373    fn assert_encode_with_pool_matches_encode<T: Encode + EncodeExt>(value: &T) {
2374        let pool = test_pool();
2375        let mut pooled = value.encode_with_pool(&pool);
2376        let baseline = value.encode();
2377        let mut pooled_bytes = vec![0u8; pooled.remaining()];
2378        pooled.copy_to_slice(&mut pooled_bytes);
2379        assert_eq!(pooled_bytes, baseline.as_ref());
2380    }
2381
2382    #[test]
2383    fn test_iobuf_core_behaviors() {
2384        // Clone stays zero-copy for immutable buffers.
2385        let buf1 = IoBuf::from(vec![1u8; 1000]);
2386        let buf2 = buf1.clone();
2387        assert_eq!(buf1.as_ref().as_ptr(), buf2.as_ref().as_ptr());
2388
2389        // copy_from_slice creates an owned immutable buffer.
2390        let data = vec![1u8, 2, 3, 4, 5];
2391        let copied = IoBuf::copy_from_slice(&data);
2392        assert_eq!(copied, [1, 2, 3, 4, 5]);
2393        assert_eq!(copied.len(), 5);
2394        let empty = IoBuf::copy_from_slice(&[]);
2395        assert!(empty.is_empty());
2396
2397        // Equality works against both arrays and slices.
2398        let eq = IoBuf::from(b"hello");
2399        assert_eq!(eq, *b"hello");
2400        assert_eq!(eq, b"hello");
2401        assert_ne!(eq, *b"world");
2402        assert_ne!(eq, b"world");
2403        assert_eq!(IoBuf::from(b"hello"), IoBuf::from(b"hello"));
2404        assert_ne!(IoBuf::from(b"hello"), IoBuf::from(b"world"));
2405        let bytes: Bytes = IoBuf::from(b"bytes").into();
2406        assert_eq!(bytes.as_ref(), b"bytes");
2407
2408        // Buf trait operations keep `len()` and `remaining()` in sync.
2409        let mut buf = IoBuf::from(b"hello world");
2410        assert_eq!(buf.len(), buf.remaining());
2411        assert_eq!(buf.as_ref(), buf.chunk());
2412        assert_eq!(buf.remaining(), 11);
2413        buf.advance(6);
2414        assert_eq!(buf.chunk(), b"world");
2415        assert_eq!(buf.len(), buf.remaining());
2416
2417        // copy_to_bytes drains in-order and advances the source.
2418        let first = buf.copy_to_bytes(2);
2419        assert_eq!(&first[..], b"wo");
2420        let rest = buf.copy_to_bytes(3);
2421        assert_eq!(&rest[..], b"rld");
2422        assert_eq!(buf.remaining(), 0);
2423
2424        // Slicing remains zero-copy and supports all common range forms.
2425        let src = IoBuf::from(b"hello world");
2426        assert_eq!(src.slice(..5), b"hello");
2427        assert_eq!(src.slice(6..), b"world");
2428        assert_eq!(src.slice(3..8), b"lo wo");
2429        assert!(src.slice(5..5).is_empty());
2430    }
2431
2432    #[test]
2433    fn test_iobuf_codec_roundtrip() {
2434        let cfg: RangeCfg<usize> = (0..=1024).into();
2435
2436        let original = IoBuf::from(b"hello world");
2437        let encoded = original.encode();
2438        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
2439        assert_eq!(original, decoded);
2440
2441        let empty = IoBuf::default();
2442        let encoded = empty.encode();
2443        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
2444        assert_eq!(empty, decoded);
2445
2446        let large_cfg: RangeCfg<usize> = (0..=20000).into();
2447        let large = IoBuf::from(vec![42u8; 10000]);
2448        let encoded = large.encode();
2449        let decoded = IoBuf::decode_cfg(encoded, &large_cfg).unwrap();
2450        assert_eq!(large, decoded);
2451
2452        let mut truncated = BytesMut::new();
2453        4usize.write(&mut truncated);
2454        truncated.extend_from_slice(b"xy");
2455        let mut truncated = truncated.freeze();
2456        assert!(IoBuf::read_cfg(&mut truncated, &cfg).is_err());
2457
2458        // Directly exercise the successful `read_cfg` path, not just decode helpers.
2459        let mut direct = BytesMut::new();
2460        4usize.write(&mut direct);
2461        direct.extend_from_slice(b"wxyz");
2462        let mut direct = direct.freeze();
2463        let decoded = IoBuf::read_cfg(&mut direct, &cfg).unwrap();
2464        assert_eq!(decoded, b"wxyz");
2465    }
2466
2467    #[test]
2468    #[should_panic(expected = "cannot advance")]
2469    fn test_iobuf_advance_past_end() {
2470        let mut buf = IoBuf::from(b"hello");
2471        buf.advance(10);
2472    }
2473
2474    #[test]
2475    fn test_iobuf_split_to_consistent_across_backings() {
2476        // split_to on pooled and Bytes-backed IoBufs should produce identical results.
2477        let pool = test_pool();
2478        let mut pooled = pool.try_alloc(256).expect("pooled allocation");
2479        pooled.put_slice(b"hello world");
2480        let mut pooled_buf = pooled.freeze();
2481        let mut bytes_buf = IoBuf::from(b"hello world");
2482
2483        assert!(pooled_buf.is_pooled());
2484        assert!(!bytes_buf.is_pooled());
2485
2486        let pooled_empty = pooled_buf.split_to(0);
2487        let bytes_empty = bytes_buf.split_to(0);
2488        assert_eq!(pooled_empty, bytes_empty);
2489        assert_eq!(pooled_buf, bytes_buf);
2490        assert!(!pooled_empty.is_pooled());
2491
2492        let pooled_prefix = pooled_buf.split_to(5);
2493        let bytes_prefix = bytes_buf.split_to(5);
2494        assert_eq!(pooled_prefix, bytes_prefix);
2495        assert_eq!(pooled_buf, bytes_buf);
2496        assert!(pooled_prefix.is_pooled());
2497
2498        let pooled_rest = pooled_buf.split_to(pooled_buf.len());
2499        let bytes_rest = bytes_buf.split_to(bytes_buf.len());
2500        assert_eq!(pooled_rest, bytes_rest);
2501        assert_eq!(pooled_buf, bytes_buf);
2502        assert!(pooled_buf.is_empty());
2503        assert!(bytes_buf.is_empty());
2504        assert!(!pooled_buf.is_pooled());
2505    }
2506
2507    #[test]
2508    #[should_panic(expected = "split_to out of bounds")]
2509    fn test_iobuf_split_to_out_of_bounds() {
2510        let mut buf = IoBuf::from(b"abc");
2511        let _ = buf.split_to(4);
2512    }
2513
2514    #[test]
2515    fn test_iobufmut_core_behaviors() {
2516        // Build mutable buffers incrementally and freeze to immutable.
2517        let mut buf = IoBufMut::with_capacity(100);
2518        assert!(buf.capacity() >= 100);
2519        assert_eq!(buf.len(), 0);
2520        buf.put_slice(b"hello");
2521        buf.put_slice(b" world");
2522        assert_eq!(buf, b"hello world");
2523        assert_eq!(buf, &b"hello world"[..]);
2524        assert_eq!(buf.freeze(), b"hello world");
2525
2526        // `zeroed` creates readable initialized bytes; `set_len` can shrink safely.
2527        let mut zeroed = IoBufMut::zeroed(10);
2528        assert_eq!(zeroed, &[0u8; 10]);
2529        // SAFETY: shrinking readable length to initialized region.
2530        unsafe { zeroed.set_len(5) };
2531        assert_eq!(zeroed, &[0u8; 5]);
2532        zeroed.as_mut()[..5].copy_from_slice(b"hello");
2533        assert_eq!(&zeroed.as_ref()[..5], b"hello");
2534        let frozen = zeroed.freeze();
2535        let vec: Vec<u8> = frozen.into();
2536        assert_eq!(&vec[..5], b"hello");
2537
2538        // Exercise pooled branch behavior for `is_empty`.
2539        let pool = test_pool();
2540        let mut pooled = pool.alloc(8);
2541        assert!(pooled.is_empty());
2542        pooled.put_slice(b"x");
2543        assert!(!pooled.is_empty());
2544    }
2545
2546    #[test]
2547    fn test_iobufs_shapes_and_read_paths() {
2548        // Empty construction normalizes to an empty single chunk.
2549        let empty = IoBufs::from(Vec::<u8>::new());
2550        assert!(empty.is_empty());
2551        assert!(empty.is_single());
2552        assert!(empty.as_single().is_some());
2553
2554        // Single-buffer read path.
2555        let mut single = IoBufs::from(b"hello world");
2556        assert!(single.is_single());
2557        assert_eq!(single.chunk(), b"hello world");
2558        single.advance(6);
2559        assert_eq!(single.chunk(), b"world");
2560        assert_eq!(single.copy_to_bytes(5).as_ref(), b"world");
2561        assert_eq!(single.remaining(), 0);
2562
2563        // Fast-path shapes (Pair/Triple/Chunked).
2564        let mut pair = IoBufs::from(IoBuf::from(b"a"));
2565        pair.append(IoBuf::from(b"b"));
2566        assert!(matches!(pair.inner, IoBufsInner::Pair(_)));
2567        assert!(pair.as_single().is_none());
2568
2569        let mut triple = IoBufs::from(IoBuf::from(b"a"));
2570        triple.append(IoBuf::from(b"b"));
2571        triple.append(IoBuf::from(b"c"));
2572        assert!(matches!(triple.inner, IoBufsInner::Triple(_)));
2573
2574        let mut chunked = IoBufs::from(IoBuf::from(b"a"));
2575        chunked.append(IoBuf::from(b"b"));
2576        chunked.append(IoBuf::from(b"c"));
2577        chunked.append(IoBuf::from(b"d"));
2578        assert!(matches!(chunked.inner, IoBufsInner::Chunked(_)));
2579
2580        // prepend + append preserve ordering.
2581        let mut joined = IoBufs::from(b"middle");
2582        joined.prepend(IoBuf::from(b"start "));
2583        joined.append(IoBuf::from(b" end"));
2584        assert_eq!(joined.coalesce(), b"start middle end");
2585
2586        // prepending empty is a no-op, and prepending into pair upgrades to triple.
2587        let mut prepend_noop = IoBufs::from(b"x");
2588        prepend_noop.prepend(IoBuf::default());
2589        assert_eq!(prepend_noop.coalesce(), b"x");
2590
2591        // Prepending into an empty aggregate should stay on the single-buffer fast path.
2592        let mut prepend_into_empty = IoBufs::default();
2593        prepend_into_empty.prepend(IoBuf::from(b"z"));
2594        assert!(prepend_into_empty.is_single());
2595        assert_eq!(prepend_into_empty.coalesce(), b"z");
2596
2597        let mut prepend_pair = IoBufs::from(vec![IoBuf::from(b"b"), IoBuf::from(b"c")]);
2598        prepend_pair.prepend(IoBuf::from(b"a"));
2599        assert!(matches!(prepend_pair.inner, IoBufsInner::Triple(_)));
2600        assert_eq!(prepend_pair.coalesce(), b"abc");
2601
2602        // canonicalizing a non-empty single should keep the same representation.
2603        let mut canonical_single = IoBufs::from(b"q");
2604        canonical_single.canonicalize();
2605        assert!(canonical_single.is_single());
2606        assert_eq!(canonical_single.coalesce(), b"q");
2607    }
2608
2609    #[test]
2610    fn test_iobufs_split_to_cases() {
2611        // Zero and full split on a single chunk.
2612        let mut bufs = IoBufs::from(b"hello");
2613
2614        let empty = bufs.split_to(0);
2615        assert!(empty.is_empty());
2616        assert_eq!(bufs.coalesce(), b"hello");
2617
2618        let mut bufs = IoBufs::from(b"hello");
2619        let all = bufs.split_to(5);
2620        assert_eq!(all.coalesce(), b"hello");
2621        assert!(bufs.is_single());
2622        assert!(bufs.is_empty());
2623
2624        // Single split in the middle.
2625        let mut single_mid = IoBufs::from(b"hello");
2626        let single_prefix = single_mid.split_to(2);
2627        assert!(single_prefix.is_single());
2628        assert_eq!(single_prefix.coalesce(), b"he");
2629        assert_eq!(single_mid.coalesce(), b"llo");
2630
2631        // Pair split paths: in-first, boundary-after-first, crossing-into-second.
2632        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2633        let pair_prefix = pair.split_to(1);
2634        assert!(pair_prefix.is_single());
2635        assert_eq!(pair_prefix.coalesce(), b"a");
2636        assert!(matches!(pair.inner, IoBufsInner::Pair(_)));
2637        assert_eq!(pair.coalesce(), b"bcd");
2638
2639        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2640        let pair_prefix = pair.split_to(2);
2641        assert!(pair_prefix.is_single());
2642        assert_eq!(pair_prefix.coalesce(), b"ab");
2643        assert!(pair.is_single());
2644        assert_eq!(pair.coalesce(), b"cd");
2645
2646        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2647        let pair_prefix = pair.split_to(3);
2648        assert!(matches!(pair_prefix.inner, IoBufsInner::Pair(_)));
2649        assert_eq!(pair_prefix.coalesce(), b"abc");
2650        assert!(pair.is_single());
2651        assert_eq!(pair.coalesce(), b"d");
2652
2653        // Triple split paths: in-first, boundary-after-first, in-second, boundary-after-second,
2654        // and reaching into third.
2655        let mut triple = IoBufs::from(vec![
2656            IoBuf::from(b"ab"),
2657            IoBuf::from(b"cd"),
2658            IoBuf::from(b"ef"),
2659        ]);
2660        let triple_prefix = triple.split_to(1);
2661        assert!(triple_prefix.is_single());
2662        assert_eq!(triple_prefix.coalesce(), b"a");
2663        assert!(matches!(triple.inner, IoBufsInner::Triple(_)));
2664        assert_eq!(triple.coalesce(), b"bcdef");
2665
2666        let mut triple = IoBufs::from(vec![
2667            IoBuf::from(b"ab"),
2668            IoBuf::from(b"cd"),
2669            IoBuf::from(b"ef"),
2670        ]);
2671        let triple_prefix = triple.split_to(2);
2672        assert!(triple_prefix.is_single());
2673        assert_eq!(triple_prefix.coalesce(), b"ab");
2674        assert!(matches!(triple.inner, IoBufsInner::Pair(_)));
2675        assert_eq!(triple.coalesce(), b"cdef");
2676
2677        let mut triple = IoBufs::from(vec![
2678            IoBuf::from(b"ab"),
2679            IoBuf::from(b"cd"),
2680            IoBuf::from(b"ef"),
2681        ]);
2682        let triple_prefix = triple.split_to(3);
2683        assert!(matches!(triple_prefix.inner, IoBufsInner::Pair(_)));
2684        assert_eq!(triple_prefix.coalesce(), b"abc");
2685        assert!(matches!(triple.inner, IoBufsInner::Pair(_)));
2686        assert_eq!(triple.coalesce(), b"def");
2687
2688        let mut triple = IoBufs::from(vec![
2689            IoBuf::from(b"ab"),
2690            IoBuf::from(b"cd"),
2691            IoBuf::from(b"ef"),
2692        ]);
2693        let triple_prefix = triple.split_to(4);
2694        assert!(matches!(triple_prefix.inner, IoBufsInner::Pair(_)));
2695        assert_eq!(triple_prefix.coalesce(), b"abcd");
2696        assert!(triple.is_single());
2697        assert_eq!(triple.coalesce(), b"ef");
2698
2699        let mut triple = IoBufs::from(vec![
2700            IoBuf::from(b"ab"),
2701            IoBuf::from(b"cd"),
2702            IoBuf::from(b"ef"),
2703        ]);
2704        let triple_prefix = triple.split_to(5);
2705        assert!(matches!(triple_prefix.inner, IoBufsInner::Triple(_)));
2706        assert_eq!(triple_prefix.coalesce(), b"abcde");
2707        assert!(triple.is_single());
2708        assert_eq!(triple.coalesce(), b"f");
2709
2710        // Chunked split can canonicalize remainder/prefix shapes.
2711        let mut bufs = IoBufs::from(vec![
2712            IoBuf::from(b"ab"),
2713            IoBuf::from(b"cd"),
2714            IoBuf::from(b"ef"),
2715            IoBuf::from(b"gh"),
2716        ]);
2717        let prefix = bufs.split_to(4);
2718        assert!(matches!(prefix.inner, IoBufsInner::Pair(_)));
2719        assert_eq!(prefix.coalesce(), b"abcd");
2720        assert!(matches!(bufs.inner, IoBufsInner::Pair(_)));
2721        assert_eq!(bufs.coalesce(), b"efgh");
2722
2723        // Chunked split inside a chunk.
2724        let mut bufs = IoBufs::from(vec![
2725            IoBuf::from(b"ab"),
2726            IoBuf::from(b"cd"),
2727            IoBuf::from(b"ef"),
2728            IoBuf::from(b"gh"),
2729        ]);
2730        let prefix = bufs.split_to(5);
2731        assert!(matches!(prefix.inner, IoBufsInner::Triple(_)));
2732        assert_eq!(prefix.coalesce(), b"abcde");
2733        assert!(matches!(bufs.inner, IoBufsInner::Pair(_)));
2734        assert_eq!(bufs.coalesce(), b"fgh");
2735
2736        // Chunked split can remain chunked on both sides when both have >= 4 chunks.
2737        let mut bufs = IoBufs::from(vec![
2738            IoBuf::from(b"a"),
2739            IoBuf::from(b"b"),
2740            IoBuf::from(b"c"),
2741            IoBuf::from(b"d"),
2742            IoBuf::from(b"e"),
2743            IoBuf::from(b"f"),
2744            IoBuf::from(b"g"),
2745            IoBuf::from(b"h"),
2746        ]);
2747        let prefix = bufs.split_to(4);
2748        assert!(matches!(prefix.inner, IoBufsInner::Chunked(_)));
2749        assert_eq!(prefix.coalesce(), b"abcd");
2750        assert!(matches!(bufs.inner, IoBufsInner::Chunked(_)));
2751        assert_eq!(bufs.coalesce(), b"efgh");
2752
2753        // Defensive path: tolerate accidental empty chunks in non-canonical chunked input.
2754        let mut bufs = IoBufs {
2755            inner: IoBufsInner::Chunked(VecDeque::from([
2756                IoBuf::default(),
2757                IoBuf::from(b"ab"),
2758                IoBuf::from(b"cd"),
2759                IoBuf::from(b"ef"),
2760                IoBuf::from(b"gh"),
2761            ])),
2762        };
2763        let prefix = bufs.split_to(3);
2764        assert_eq!(prefix.coalesce(), b"abc");
2765        assert_eq!(bufs.coalesce(), b"defgh");
2766    }
2767
2768    #[test]
2769    #[should_panic(expected = "split_to out of bounds")]
2770    fn test_iobufs_split_to_out_of_bounds() {
2771        let mut bufs = IoBufs::from(b"abc");
2772        let _ = bufs.split_to(4);
2773    }
2774
2775    #[test]
2776    fn test_iobufs_chunk_count() {
2777        assert_eq!(IoBufs::default().chunk_count(), 0);
2778        assert_eq!(IoBufs::from(IoBuf::from(b"a")).chunk_count(), 1);
2779        assert_eq!(
2780            IoBufs::from(vec![IoBuf::from(b"b"), IoBuf::from(b"c")]).chunk_count(),
2781            2
2782        );
2783        assert_eq!(
2784            IoBufs::from(vec![
2785                IoBuf::from(b"a"),
2786                IoBuf::from(b"b"),
2787                IoBuf::from(b"c")
2788            ])
2789            .chunk_count(),
2790            3
2791        );
2792        assert_eq!(
2793            IoBufs::from(vec![
2794                IoBuf::from(b"a"),
2795                IoBuf::from(b"b"),
2796                IoBuf::from(b"c"),
2797                IoBuf::from(b"d")
2798            ])
2799            .chunk_count(),
2800            4
2801        );
2802    }
2803
2804    #[test]
2805    fn test_iobufs_coalesce_after_advance() {
2806        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2807        bufs.append(IoBuf::from(b" world"));
2808
2809        assert_eq!(bufs.len(), 11);
2810
2811        bufs.advance(3);
2812        assert_eq!(bufs.len(), 8);
2813
2814        assert_eq!(bufs.coalesce(), b"lo world");
2815    }
2816
2817    #[test]
2818    fn test_iobufs_coalesce_with_pool() {
2819        let pool = test_pool();
2820
2821        // Single buffer: zero-copy (same pointer)
2822        let buf = IoBuf::from(vec![1u8, 2, 3, 4, 5]);
2823        let original_ptr = buf.as_ptr();
2824        let bufs = IoBufs::from(buf);
2825        let coalesced = bufs.coalesce_with_pool(&pool);
2826        assert_eq!(coalesced, [1, 2, 3, 4, 5]);
2827        assert_eq!(coalesced.as_ptr(), original_ptr);
2828
2829        // Multiple buffers: merged using pool
2830        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2831        bufs.append(IoBuf::from(b" world"));
2832        let coalesced = bufs.coalesce_with_pool(&pool);
2833        assert_eq!(coalesced, b"hello world");
2834
2835        // Multiple buffers after advance: only remaining data coalesced
2836        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2837        bufs.append(IoBuf::from(b" world"));
2838        bufs.advance(3);
2839        let coalesced = bufs.coalesce_with_pool(&pool);
2840        assert_eq!(coalesced, b"lo world");
2841
2842        // Empty buffers in the middle
2843        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2844        bufs.append(IoBuf::default());
2845        bufs.append(IoBuf::from(b" world"));
2846        let coalesced = bufs.coalesce_with_pool(&pool);
2847        assert_eq!(coalesced, b"hello world");
2848
2849        // Empty IoBufs
2850        let bufs = IoBufs::default();
2851        let coalesced = bufs.coalesce_with_pool(&pool);
2852        assert!(coalesced.is_empty());
2853
2854        // 4+ buffers: exercise chunked coalesce-with-pool path.
2855        let bufs = IoBufs::from(vec![
2856            IoBuf::from(b"ab"),
2857            IoBuf::from(b"cd"),
2858            IoBuf::from(b"ef"),
2859            IoBuf::from(b"gh"),
2860        ]);
2861        let coalesced = bufs.coalesce_with_pool(&pool);
2862        assert_eq!(coalesced, b"abcdefgh");
2863        assert!(coalesced.is_pooled());
2864    }
2865
2866    #[test]
2867    fn test_iobufs_empty_chunks_and_copy_to_bytes_paths() {
2868        // Empty chunks are skipped while reading across multiple chunks.
2869        let mut bufs = IoBufs::default();
2870        bufs.append(IoBuf::from(b"hello"));
2871        bufs.append(IoBuf::default());
2872        bufs.append(IoBuf::from(b" "));
2873        bufs.append(IoBuf::default());
2874        bufs.append(IoBuf::from(b"world"));
2875        assert_eq!(bufs.len(), 11);
2876        assert_eq!(bufs.chunk(), b"hello");
2877        bufs.advance(5);
2878        assert_eq!(bufs.chunk(), b" ");
2879        bufs.advance(1);
2880        assert_eq!(bufs.chunk(), b"world");
2881
2882        // Single-buffer copy_to_bytes path.
2883        let mut single = IoBufs::from(b"hello world");
2884        assert_eq!(single.copy_to_bytes(5).as_ref(), b"hello");
2885        assert_eq!(single.remaining(), 6);
2886
2887        // Multi-buffer copy_to_bytes path across boundaries.
2888        let mut multi = IoBufs::from(b"hello");
2889        multi.prepend(IoBuf::from(b"say "));
2890        assert_eq!(multi.copy_to_bytes(7).as_ref(), b"say hel");
2891        assert_eq!(multi.copy_to_bytes(2).as_ref(), b"lo");
2892    }
2893
2894    #[test]
2895    fn test_iobufs_copy_to_bytes_pair_and_triple() {
2896        // Pair: crossing one boundary should collapse to the trailing single chunk.
2897        let mut pair = IoBufs::from(IoBuf::from(b"ab"));
2898        pair.append(IoBuf::from(b"cd"));
2899        let first = pair.copy_to_bytes(3);
2900        assert_eq!(&first[..], b"abc");
2901        assert!(pair.is_single());
2902        assert_eq!(pair.chunk(), b"d");
2903
2904        // Triple: draining across two chunks leaves the final chunk readable.
2905        let mut triple = IoBufs::from(IoBuf::from(b"ab"));
2906        triple.append(IoBuf::from(b"cd"));
2907        triple.append(IoBuf::from(b"ef"));
2908        let first = triple.copy_to_bytes(5);
2909        assert_eq!(&first[..], b"abcde");
2910        assert!(triple.is_single());
2911        assert_eq!(triple.chunk(), b"f");
2912    }
2913
2914    #[test]
2915    fn test_iobufs_copy_to_bytes_chunked_four_plus() {
2916        let mut bufs = IoBufs::from(vec![
2917            IoBuf::from(b"ab"),
2918            IoBuf::from(b"cd"),
2919            IoBuf::from(b"ef"),
2920            IoBuf::from(b"gh"),
2921        ]);
2922
2923        // Chunked fast-path: first chunk alone satisfies request.
2924        let first = bufs.copy_to_bytes(1);
2925        assert_eq!(&first[..], b"a");
2926
2927        // Chunked slow-path: request crosses chunk boundaries.
2928        let second = bufs.copy_to_bytes(4);
2929        assert_eq!(&second[..], b"bcde");
2930
2931        let rest = bufs.copy_to_bytes(3);
2932        assert_eq!(&rest[..], b"fgh");
2933        assert_eq!(bufs.remaining(), 0);
2934    }
2935
2936    #[test]
2937    fn test_iobufs_copy_to_bytes_edge_cases() {
2938        // Leading empty chunk should not affect copied payload.
2939        let mut iobufs = IoBufs::from(IoBuf::from(b""));
2940        iobufs.append(IoBuf::from(b"hello"));
2941        assert_eq!(iobufs.copy_to_bytes(5).as_ref(), b"hello");
2942
2943        // Boundary-aligned reads should return exact chunk payloads in-order.
2944        let mut boundary = IoBufs::from(IoBuf::from(b"hello"));
2945        boundary.append(IoBuf::from(b"world"));
2946        assert_eq!(boundary.copy_to_bytes(5).as_ref(), b"hello");
2947        assert_eq!(boundary.copy_to_bytes(5).as_ref(), b"world");
2948        assert_eq!(boundary.remaining(), 0);
2949    }
2950
2951    #[test]
2952    #[should_panic(expected = "cannot advance past end of buffer")]
2953    fn test_iobufs_advance_past_end() {
2954        let mut bufs = IoBufs::from(b"hel");
2955        bufs.append(IoBuf::from(b"lo"));
2956        bufs.advance(10);
2957    }
2958
2959    #[test]
2960    #[should_panic(expected = "not enough data")]
2961    fn test_iobufs_copy_to_bytes_past_end() {
2962        let mut bufs = IoBufs::from(b"hel");
2963        bufs.append(IoBuf::from(b"lo"));
2964        bufs.copy_to_bytes(10);
2965    }
2966
2967    #[test]
2968    fn test_iobufs_matches_bytes_chain() {
2969        let b1 = Bytes::from_static(b"hello");
2970        let b2 = Bytes::from_static(b" ");
2971        let b3 = Bytes::from_static(b"world");
2972
2973        // Buf parity for remaining/chunk/advance should match `Bytes::chain`.
2974        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
2975        let mut iobufs = IoBufs::from(IoBuf::from(b1.clone()));
2976        iobufs.append(IoBuf::from(b2.clone()));
2977        iobufs.append(IoBuf::from(b3.clone()));
2978
2979        assert_eq!(chain.remaining(), iobufs.remaining());
2980        assert_eq!(chain.chunk(), iobufs.chunk());
2981
2982        chain.advance(3);
2983        iobufs.advance(3);
2984        assert_eq!(chain.remaining(), iobufs.remaining());
2985        assert_eq!(chain.chunk(), iobufs.chunk());
2986
2987        chain.advance(3);
2988        iobufs.advance(3);
2989        assert_eq!(chain.remaining(), iobufs.remaining());
2990        assert_eq!(chain.chunk(), iobufs.chunk());
2991
2992        // Test copy_to_bytes
2993        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
2994        let mut iobufs = IoBufs::from(IoBuf::from(b1));
2995        iobufs.append(IoBuf::from(b2));
2996        iobufs.append(IoBuf::from(b3));
2997
2998        assert_eq!(chain.copy_to_bytes(3), iobufs.copy_to_bytes(3));
2999        assert_eq!(chain.copy_to_bytes(4), iobufs.copy_to_bytes(4));
3000        assert_eq!(
3001            chain.copy_to_bytes(chain.remaining()),
3002            iobufs.copy_to_bytes(iobufs.remaining())
3003        );
3004        assert_eq!(chain.remaining(), 0);
3005        assert_eq!(iobufs.remaining(), 0);
3006    }
3007
3008    #[test]
3009    fn test_iobufs_try_into_single() {
3010        let single = IoBufs::from(IoBuf::from(b"hello"));
3011        let single = single.try_into_single().expect("single expected");
3012        assert_eq!(single, b"hello");
3013
3014        let multi = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
3015        let multi = multi.try_into_single().expect_err("multi expected");
3016        assert_eq!(multi.coalesce(), b"abcd");
3017    }
3018
3019    #[test]
3020    fn test_iobufs_chunks_vectored_multiple_slices() {
3021        // Single non-empty buffers should export exactly one slice.
3022        let single = IoBufs::from(IoBuf::from(b"xy"));
3023        let mut single_dst = [IoSlice::new(&[]); 2];
3024        let count = single.chunks_vectored(&mut single_dst);
3025        assert_eq!(count, 1);
3026        assert_eq!(&single_dst[0][..], b"xy");
3027
3028        // Single empty buffers should export no slices.
3029        let empty_single = IoBufs::default();
3030        let mut empty_single_dst = [IoSlice::new(&[]); 1];
3031        assert_eq!(empty_single.chunks_vectored(&mut empty_single_dst), 0);
3032
3033        let bufs = IoBufs::from(vec![
3034            IoBuf::from(b"ab"),
3035            IoBuf::from(b"cd"),
3036            IoBuf::from(b"ef"),
3037            IoBuf::from(b"gh"),
3038        ]);
3039
3040        // Destination capacity should cap how many chunks we export.
3041        let mut small = [IoSlice::new(&[]); 2];
3042        let count = bufs.chunks_vectored(&mut small);
3043        assert_eq!(count, 2);
3044        assert_eq!(&small[0][..], b"ab");
3045        assert_eq!(&small[1][..], b"cd");
3046
3047        // Larger destination should include every readable chunk.
3048        let mut large = [IoSlice::new(&[]); 8];
3049        let count = bufs.chunks_vectored(&mut large);
3050        assert_eq!(count, 4);
3051        assert_eq!(&large[0][..], b"ab");
3052        assert_eq!(&large[1][..], b"cd");
3053        assert_eq!(&large[2][..], b"ef");
3054        assert_eq!(&large[3][..], b"gh");
3055
3056        // Empty destination cannot accept any slices.
3057        let mut empty_dst: [IoSlice<'_>; 0] = [];
3058        assert_eq!(bufs.chunks_vectored(&mut empty_dst), 0);
3059
3060        // Non-canonical shapes should skip empty leading chunks.
3061        let sparse = IoBufs {
3062            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::from(b"x")]),
3063        };
3064        let mut dst = [IoSlice::new(&[]); 2];
3065        let count = sparse.chunks_vectored(&mut dst);
3066        assert_eq!(count, 1);
3067        assert_eq!(&dst[0][..], b"x");
3068
3069        // Triple should skip empty chunks and preserve readable order.
3070        let sparse_triple = IoBufs {
3071            inner: IoBufsInner::Triple([IoBuf::default(), IoBuf::from(b"y"), IoBuf::from(b"z")]),
3072        };
3073        let mut dst = [IoSlice::new(&[]); 3];
3074        let count = sparse_triple.chunks_vectored(&mut dst);
3075        assert_eq!(count, 2);
3076        assert_eq!(&dst[0][..], b"y");
3077        assert_eq!(&dst[1][..], b"z");
3078
3079        // Chunked shapes with only empty buffers should export no slices.
3080        let empty_chunked = IoBufs {
3081            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default(), IoBuf::default()])),
3082        };
3083        let mut dst = [IoSlice::new(&[]); 2];
3084        assert_eq!(empty_chunked.chunks_vectored(&mut dst), 0);
3085    }
3086
3087    #[test]
3088    fn test_iobufsmut_freeze_chunked() {
3089        // Multiple non-empty buffers stay multi-chunk.
3090        let buf1 = IoBufMut::from(b"hello".as_ref());
3091        let buf2 = IoBufMut::from(b" world".as_ref());
3092        let bufs = IoBufsMut::from(vec![buf1, buf2]);
3093        let mut frozen = bufs.freeze();
3094        assert!(!frozen.is_single());
3095        assert_eq!(frozen.chunk(), b"hello");
3096        frozen.advance(5);
3097        assert_eq!(frozen.chunk(), b" world");
3098        frozen.advance(6);
3099        assert_eq!(frozen.remaining(), 0);
3100
3101        // Empty buffers are filtered out.
3102        let buf1 = IoBufMut::from(b"hello".as_ref());
3103        let empty = IoBufMut::default();
3104        let buf2 = IoBufMut::from(b" world".as_ref());
3105        let bufs = IoBufsMut::from(vec![buf1, empty, buf2]);
3106        let mut frozen = bufs.freeze();
3107        assert!(!frozen.is_single());
3108        assert_eq!(frozen.chunk(), b"hello");
3109        frozen.advance(5);
3110        assert_eq!(frozen.chunk(), b" world");
3111        frozen.advance(6);
3112        assert_eq!(frozen.remaining(), 0);
3113
3114        // Collapses to Single when one non-empty buffer remains
3115        let empty1 = IoBufMut::default();
3116        let buf = IoBufMut::from(b"only one".as_ref());
3117        let empty2 = IoBufMut::default();
3118        let bufs = IoBufsMut::from(vec![empty1, buf, empty2]);
3119        let frozen = bufs.freeze();
3120        assert!(frozen.is_single());
3121        assert_eq!(frozen.coalesce(), b"only one");
3122
3123        // All empty buffers -> Single with empty buffer
3124        let empty1 = IoBufMut::default();
3125        let empty2 = IoBufMut::default();
3126        let bufs = IoBufsMut::from(vec![empty1, empty2]);
3127        let frozen = bufs.freeze();
3128        assert!(frozen.is_single());
3129        assert!(frozen.is_empty());
3130    }
3131
3132    #[test]
3133    fn test_iobufsmut_coalesce() {
3134        let buf1 = IoBufMut::from(b"hello");
3135        let buf2 = IoBufMut::from(b" world");
3136        let bufs = IoBufsMut::from(vec![buf1, buf2]);
3137        let coalesced = bufs.coalesce();
3138        assert_eq!(coalesced, b"hello world");
3139    }
3140
3141    #[test]
3142    fn test_iobufsmut_from_vec() {
3143        // Empty Vec becomes Single with empty buffer
3144        let bufs = IoBufsMut::from(Vec::<IoBufMut>::new());
3145        assert!(bufs.is_single());
3146        assert!(bufs.is_empty());
3147
3148        // Vec with one element becomes Single
3149        let buf = IoBufMut::from(b"test");
3150        let bufs = IoBufsMut::from(vec![buf]);
3151        assert!(bufs.is_single());
3152        assert_eq!(bufs.chunk(), b"test");
3153
3154        // Vec with multiple elements becomes multi-chunk.
3155        let buf1 = IoBufMut::from(b"hello");
3156        let buf2 = IoBufMut::from(b" world");
3157        let bufs = IoBufsMut::from(vec![buf1, buf2]);
3158        assert!(!bufs.is_single());
3159    }
3160
3161    #[test]
3162    fn test_iobufsmut_from_vec_filters_empty_chunks() {
3163        let mut bufs = IoBufsMut::from(vec![
3164            IoBufMut::default(),
3165            IoBufMut::from(b"hello"),
3166            IoBufMut::default(),
3167            IoBufMut::from(b" world"),
3168            IoBufMut::default(),
3169        ]);
3170        assert_eq!(bufs.chunk(), b"hello");
3171        bufs.advance(5);
3172        assert_eq!(bufs.chunk(), b" world");
3173        bufs.advance(6);
3174        assert_eq!(bufs.remaining(), 0);
3175    }
3176
3177    #[test]
3178    fn test_iobufsmut_fast_path_shapes() {
3179        let pair = IoBufsMut::from(vec![IoBufMut::from(b"a"), IoBufMut::from(b"b")]);
3180        assert!(matches!(pair.inner, IoBufsMutInner::Pair(_)));
3181
3182        let triple = IoBufsMut::from(vec![
3183            IoBufMut::from(b"a"),
3184            IoBufMut::from(b"b"),
3185            IoBufMut::from(b"c"),
3186        ]);
3187        assert!(matches!(triple.inner, IoBufsMutInner::Triple(_)));
3188
3189        let chunked = IoBufsMut::from(vec![
3190            IoBufMut::from(b"a"),
3191            IoBufMut::from(b"b"),
3192            IoBufMut::from(b"c"),
3193            IoBufMut::from(b"d"),
3194        ]);
3195        assert!(matches!(chunked.inner, IoBufsMutInner::Chunked(_)));
3196    }
3197
3198    #[test]
3199    fn test_iobufsmut_default() {
3200        // Default IoBufsMut should be a single empty chunk.
3201        let bufs = IoBufsMut::default();
3202        assert!(bufs.is_single());
3203        assert!(bufs.is_empty());
3204        assert_eq!(bufs.len(), 0);
3205    }
3206
3207    #[test]
3208    fn test_iobufsmut_from_array() {
3209        // From<[u8; N]> should create a single-chunk container with the array data.
3210        let bufs = IoBufsMut::from([1u8, 2, 3, 4, 5]);
3211        assert!(bufs.is_single());
3212        assert_eq!(bufs.len(), 5);
3213        assert_eq!(bufs.chunk(), &[1, 2, 3, 4, 5]);
3214    }
3215
3216    #[test]
3217    fn test_iobufmut_buf_trait() {
3218        // Buf trait on IoBufMut: remaining/chunk/advance should work like BytesMut.
3219        let mut buf = IoBufMut::from(b"hello world");
3220        assert_eq!(buf.remaining(), 11);
3221        assert_eq!(buf.chunk(), b"hello world");
3222
3223        buf.advance(6);
3224        assert_eq!(buf.remaining(), 5);
3225        assert_eq!(buf.chunk(), b"world");
3226
3227        buf.advance(5);
3228        assert_eq!(buf.remaining(), 0);
3229        assert!(buf.chunk().is_empty());
3230    }
3231
3232    #[test]
3233    #[should_panic(expected = "cannot advance")]
3234    fn test_iobufmut_advance_past_end() {
3235        let mut buf = IoBufMut::from(b"hello");
3236        buf.advance(10);
3237    }
3238
3239    #[test]
3240    fn test_iobufsmut_buf_trait_chunked() {
3241        let buf1 = IoBufMut::from(b"hello");
3242        let buf2 = IoBufMut::from(b" ");
3243        let buf3 = IoBufMut::from(b"world");
3244        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
3245
3246        assert_eq!(bufs.remaining(), 11);
3247        assert_eq!(bufs.chunk(), b"hello");
3248
3249        // Advance within first buffer
3250        bufs.advance(3);
3251        assert_eq!(bufs.remaining(), 8);
3252        assert_eq!(bufs.chunk(), b"lo");
3253
3254        // Advance past first buffer (should pop_front)
3255        bufs.advance(2);
3256        assert_eq!(bufs.remaining(), 6);
3257        assert_eq!(bufs.chunk(), b" ");
3258
3259        // Advance exactly one buffer
3260        bufs.advance(1);
3261        assert_eq!(bufs.remaining(), 5);
3262        assert_eq!(bufs.chunk(), b"world");
3263
3264        // Advance to end
3265        bufs.advance(5);
3266        assert_eq!(bufs.remaining(), 0);
3267    }
3268
3269    #[test]
3270    #[should_panic(expected = "cannot advance past end of buffer")]
3271    fn test_iobufsmut_advance_past_end() {
3272        let buf1 = IoBufMut::from(b"hello");
3273        let buf2 = IoBufMut::from(b" world");
3274        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3275        bufs.advance(20);
3276    }
3277
3278    #[test]
3279    fn test_iobufsmut_bufmut_trait_single() {
3280        let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(20));
3281        // BytesMut can grow, so remaining_mut is very large
3282        assert!(bufs.remaining_mut() > 1000);
3283
3284        bufs.put_slice(b"hello");
3285        assert_eq!(bufs.chunk(), b"hello");
3286        assert_eq!(bufs.len(), 5);
3287
3288        bufs.put_slice(b" world");
3289        assert_eq!(bufs.coalesce(), b"hello world");
3290    }
3291
3292    #[test]
3293    fn test_iobufsmut_zeroed_write() {
3294        // Use zeroed buffers which have a fixed length
3295        let bufs = IoBufsMut::from(IoBufMut::zeroed(20));
3296        assert_eq!(bufs.len(), 20);
3297
3298        // Can write using as_mut on coalesced buffer
3299        let mut coalesced = bufs.coalesce();
3300        coalesced.as_mut()[..5].copy_from_slice(b"hello");
3301        assert_eq!(&coalesced.as_ref()[..5], b"hello");
3302    }
3303
3304    #[test]
3305    fn test_iobufsmut_bufmut_put_slice() {
3306        // Test writing across multiple buffers
3307        let buf1 = IoBufMut::with_capacity(5);
3308        let buf2 = IoBufMut::with_capacity(6);
3309        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3310
3311        // Write data
3312        bufs.put_slice(b"hello");
3313        bufs.put_slice(b" world");
3314        assert_eq!(bufs.coalesce(), b"hello world");
3315    }
3316
3317    #[test]
3318    fn test_iobufs_advance_drains_buffers() {
3319        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
3320        bufs.append(IoBuf::from(b" "));
3321        bufs.append(IoBuf::from(b"world"));
3322
3323        // Advance exactly past first buffer
3324        bufs.advance(5);
3325        assert_eq!(bufs.remaining(), 6);
3326        assert_eq!(bufs.chunk(), b" ");
3327
3328        // Advance across multiple buffers
3329        bufs.advance(4);
3330        assert_eq!(bufs.remaining(), 2);
3331        assert_eq!(bufs.chunk(), b"ld");
3332    }
3333
3334    #[test]
3335    fn test_iobufs_advance_exactly_to_boundary() {
3336        let mut bufs = IoBufs::from(IoBuf::from(b"abc"));
3337        bufs.append(IoBuf::from(b"def"));
3338
3339        // Advance exactly to first buffer boundary
3340        bufs.advance(3);
3341        assert_eq!(bufs.remaining(), 3);
3342        assert_eq!(bufs.chunk(), b"def");
3343
3344        // Advance exactly to end
3345        bufs.advance(3);
3346        assert_eq!(bufs.remaining(), 0);
3347    }
3348
3349    #[test]
3350    fn test_iobufs_advance_canonicalizes_pair_to_single() {
3351        let mut bufs = IoBufs::from(IoBuf::from(b"ab"));
3352        bufs.append(IoBuf::from(b"cd"));
3353        bufs.advance(2);
3354        assert!(bufs.is_single());
3355        assert_eq!(bufs.chunk(), b"cd");
3356    }
3357
3358    #[test]
3359    fn test_iobufsmut_with_empty_buffers() {
3360        let buf1 = IoBufMut::from(b"hello");
3361        let buf2 = IoBufMut::default();
3362        let buf3 = IoBufMut::from(b" world");
3363        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
3364
3365        assert_eq!(bufs.remaining(), 11);
3366        assert_eq!(bufs.chunk(), b"hello");
3367
3368        // Advance past first buffer
3369        bufs.advance(5);
3370        // Empty buffer should be skipped
3371        assert_eq!(bufs.chunk(), b" world");
3372        assert_eq!(bufs.remaining(), 6);
3373    }
3374
3375    #[test]
3376    fn test_iobufsmut_advance_skips_leading_writable_empty_chunk() {
3377        // A leading chunk with capacity but no readable bytes (len == 0) should
3378        // be skipped during advance, reaching the next readable chunk.
3379        let empty_writable = IoBufMut::with_capacity(4);
3380        let payload = IoBufMut::from(b"xy");
3381        let mut bufs = IoBufsMut::from(vec![empty_writable, payload]);
3382
3383        bufs.advance(1);
3384        assert_eq!(bufs.chunk(), b"y");
3385        assert_eq!(bufs.remaining(), 1);
3386    }
3387
3388    #[test]
3389    fn test_iobufsmut_coalesce_after_advance() {
3390        // Advance mid-chunk: advance 3 of 11 bytes
3391        let buf1 = IoBufMut::from(b"hello");
3392        let buf2 = IoBufMut::from(b" world");
3393        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3394
3395        bufs.advance(3);
3396        assert_eq!(bufs.coalesce(), b"lo world");
3397
3398        // Advance to exact chunk boundary: advance 5 of 11 bytes
3399        let buf1 = IoBufMut::from(b"hello");
3400        let buf2 = IoBufMut::from(b" world");
3401        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3402
3403        bufs.advance(5);
3404        assert_eq!(bufs.coalesce(), b" world");
3405    }
3406
3407    #[test]
3408    fn test_iobufsmut_copy_to_bytes() {
3409        let buf1 = IoBufMut::from(b"hello");
3410        let buf2 = IoBufMut::from(b" world");
3411        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3412
3413        // First read spans chunks and leaves unread suffix.
3414        let first = bufs.copy_to_bytes(7);
3415        assert_eq!(&first[..], b"hello w");
3416        assert_eq!(bufs.remaining(), 4);
3417
3418        // Second read drains the remainder.
3419        let rest = bufs.copy_to_bytes(4);
3420        assert_eq!(&rest[..], b"orld");
3421        assert_eq!(bufs.remaining(), 0);
3422    }
3423
3424    #[test]
3425    fn test_iobufsmut_copy_to_bytes_chunked_four_plus() {
3426        let mut bufs = IoBufsMut::from(vec![
3427            IoBufMut::from(b"ab"),
3428            IoBufMut::from(b"cd"),
3429            IoBufMut::from(b"ef"),
3430            IoBufMut::from(b"gh"),
3431        ]);
3432
3433        // Exercise chunked advance path before copy_to_bytes.
3434        bufs.advance(1);
3435        assert_eq!(bufs.chunk(), b"b");
3436        bufs.advance(1);
3437        assert_eq!(bufs.chunk(), b"cd");
3438
3439        // Chunked fast-path: first chunk alone satisfies request.
3440        let first = bufs.copy_to_bytes(1);
3441        assert_eq!(&first[..], b"c");
3442
3443        // Chunked slow-path: request crosses chunk boundaries.
3444        let second = bufs.copy_to_bytes(4);
3445        assert_eq!(&second[..], b"defg");
3446
3447        let rest = bufs.copy_to_bytes(1);
3448        assert_eq!(&rest[..], b"h");
3449        assert_eq!(bufs.remaining(), 0);
3450
3451        // Enter copy_to_bytes while still in chunked representation.
3452        let mut bufs = IoBufsMut::from(vec![
3453            IoBufMut::from(b"a"),
3454            IoBufMut::from(b"b"),
3455            IoBufMut::from(b"c"),
3456            IoBufMut::from(b"d"),
3457            IoBufMut::from(b"e"),
3458        ]);
3459        assert!(matches!(bufs.inner, IoBufsMutInner::Chunked(_)));
3460        let first = bufs.copy_to_bytes(1);
3461        assert_eq!(&first[..], b"a");
3462        // Stay chunked while consuming across multiple tiny chunks.
3463        let next = bufs.copy_to_bytes(3);
3464        assert_eq!(&next[..], b"bcd");
3465        assert_eq!(bufs.chunk(), b"e");
3466        assert_eq!(bufs.remaining(), 1);
3467    }
3468
3469    #[test]
3470    fn test_iobufsmut_copy_to_bytes_canonicalizes_pair() {
3471        let mut bufs = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3472        assert!(matches!(bufs.inner, IoBufsMutInner::Pair(_)));
3473
3474        let first = bufs.copy_to_bytes(2);
3475        assert_eq!(&first[..], b"ab");
3476
3477        assert!(bufs.is_single());
3478        assert_eq!(bufs.chunk(), b"cd");
3479        assert_eq!(bufs.remaining(), 2);
3480    }
3481
3482    #[test]
3483    fn test_iobufsmut_copy_from_slice_single() {
3484        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(11));
3485        bufs.copy_from_slice(b"hello world");
3486        assert_eq!(bufs.coalesce(), b"hello world");
3487    }
3488
3489    #[test]
3490    fn test_iobufsmut_copy_from_slice_chunked() {
3491        let buf1 = IoBufMut::zeroed(5);
3492        let buf2 = IoBufMut::zeroed(6);
3493        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3494
3495        bufs.copy_from_slice(b"hello world");
3496
3497        // Verify each chunk was filled correctly.
3498        assert_eq!(bufs.chunk(), b"hello");
3499        bufs.advance(5);
3500        assert_eq!(bufs.chunk(), b" world");
3501        bufs.advance(6);
3502        assert_eq!(bufs.remaining(), 0);
3503    }
3504
3505    #[test]
3506    #[should_panic(expected = "source slice length must match buffer length")]
3507    fn test_iobufsmut_copy_from_slice_wrong_length() {
3508        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(5));
3509        bufs.copy_from_slice(b"hello world"); // 11 bytes into 5-byte buffer
3510    }
3511
3512    #[test]
3513    fn test_iobufsmut_matches_bytesmut_chain() {
3514        // Create three BytesMut with capacity
3515        let mut bm1 = BytesMut::with_capacity(5);
3516        let mut bm2 = BytesMut::with_capacity(6);
3517        let mut bm3 = BytesMut::with_capacity(7);
3518
3519        // Create matching IoBufsMut
3520        let mut iobufs = IoBufsMut::from(vec![
3521            IoBufMut::with_capacity(5),
3522            IoBufMut::with_capacity(6),
3523            IoBufMut::with_capacity(7),
3524        ]);
3525
3526        // Test initial chunk_mut length matches (spare capacity)
3527        let chain_len = (&mut bm1)
3528            .chain_mut(&mut bm2)
3529            .chain_mut(&mut bm3)
3530            .chunk_mut()
3531            .len();
3532        let iobufs_len = iobufs.chunk_mut().len();
3533        assert_eq!(chain_len, iobufs_len);
3534
3535        // Write some data
3536        (&mut bm1)
3537            .chain_mut(&mut bm2)
3538            .chain_mut(&mut bm3)
3539            .put_slice(b"hel");
3540        iobufs.put_slice(b"hel");
3541
3542        // Verify chunk_mut matches after partial write
3543        let chain_len = (&mut bm1)
3544            .chain_mut(&mut bm2)
3545            .chain_mut(&mut bm3)
3546            .chunk_mut()
3547            .len();
3548        let iobufs_len = iobufs.chunk_mut().len();
3549        assert_eq!(chain_len, iobufs_len);
3550
3551        // Write more data
3552        (&mut bm1)
3553            .chain_mut(&mut bm2)
3554            .chain_mut(&mut bm3)
3555            .put_slice(b"lo world!");
3556        iobufs.put_slice(b"lo world!");
3557
3558        // Verify chunk_mut matches after more writes
3559        let chain_len = (&mut bm1)
3560            .chain_mut(&mut bm2)
3561            .chain_mut(&mut bm3)
3562            .chunk_mut()
3563            .len();
3564        let iobufs_len = iobufs.chunk_mut().len();
3565        assert_eq!(chain_len, iobufs_len);
3566
3567        // Verify final content matches
3568        let frozen = iobufs.freeze().coalesce();
3569        let mut chain_content = bm1.to_vec();
3570        chain_content.extend_from_slice(&bm2);
3571        chain_content.extend_from_slice(&bm3);
3572        assert_eq!(frozen, chain_content.as_slice());
3573        assert_eq!(frozen, b"hello world!");
3574    }
3575
3576    #[test]
3577    fn test_iobufsmut_buf_matches_bytes_chain() {
3578        // Create pre-filled Bytes buffers
3579        let mut b1 = Bytes::from_static(b"hello");
3580        let mut b2 = Bytes::from_static(b" world");
3581        let b3 = Bytes::from_static(b"!");
3582
3583        // Create matching IoBufsMut
3584        let mut iobufs = IoBufsMut::from(vec![
3585            IoBufMut::from(b"hello"),
3586            IoBufMut::from(b" world"),
3587            IoBufMut::from(b"!"),
3588        ]);
3589
3590        // Test Buf::remaining matches
3591        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3592        assert_eq!(chain_remaining, iobufs.remaining());
3593
3594        // Test Buf::chunk matches
3595        let chain_chunk = b1
3596            .clone()
3597            .chain(b2.clone())
3598            .chain(b3.clone())
3599            .chunk()
3600            .to_vec();
3601        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3602
3603        // Advance and test again
3604        b1.advance(3);
3605        iobufs.advance(3);
3606
3607        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3608        assert_eq!(chain_remaining, iobufs.remaining());
3609
3610        let chain_chunk = b1
3611            .clone()
3612            .chain(b2.clone())
3613            .chain(b3.clone())
3614            .chunk()
3615            .to_vec();
3616        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3617
3618        // Advance past first buffer boundary into second
3619        b1.advance(2);
3620        iobufs.advance(2);
3621
3622        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3623        assert_eq!(chain_remaining, iobufs.remaining());
3624
3625        // Now we should be in the second buffer
3626        let chain_chunk = b1
3627            .clone()
3628            .chain(b2.clone())
3629            .chain(b3.clone())
3630            .chunk()
3631            .to_vec();
3632        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3633
3634        // Advance past second buffer boundary into third
3635        b2.advance(6);
3636        iobufs.advance(6);
3637
3638        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3639        assert_eq!(chain_remaining, iobufs.remaining());
3640
3641        // Now we should be in the third buffer
3642        let chain_chunk = b1.chain(b2).chain(b3).chunk().to_vec();
3643        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3644
3645        // Test copy_to_bytes
3646        let b1 = Bytes::from_static(b"hello");
3647        let b2 = Bytes::from_static(b" world");
3648        let b3 = Bytes::from_static(b"!");
3649        let mut iobufs = IoBufsMut::from(vec![
3650            IoBufMut::from(b"hello"),
3651            IoBufMut::from(b" world"),
3652            IoBufMut::from(b"!"),
3653        ]);
3654
3655        let chain_bytes = b1.chain(b2).chain(b3).copy_to_bytes(8);
3656        let iobufs_bytes = iobufs.copy_to_bytes(8);
3657        assert_eq!(chain_bytes, iobufs_bytes);
3658        assert_eq!(chain_bytes.as_ref(), b"hello wo");
3659    }
3660
3661    #[test]
3662    fn test_iobufsmut_chunks_vectored_multiple_slices() {
3663        // Single non-empty buffers should export exactly one slice.
3664        let single = IoBufsMut::from(IoBufMut::from(b"xy"));
3665        let mut single_dst = [IoSlice::new(&[]); 2];
3666        let count = single.chunks_vectored(&mut single_dst);
3667        assert_eq!(count, 1);
3668        assert_eq!(&single_dst[0][..], b"xy");
3669
3670        // Single empty buffers should export no slices.
3671        let empty_single = IoBufsMut::default();
3672        let mut empty_single_dst = [IoSlice::new(&[]); 1];
3673        assert_eq!(empty_single.chunks_vectored(&mut empty_single_dst), 0);
3674
3675        let bufs = IoBufsMut::from(vec![
3676            IoBufMut::from(b"ab"),
3677            IoBufMut::from(b"cd"),
3678            IoBufMut::from(b"ef"),
3679            IoBufMut::from(b"gh"),
3680        ]);
3681
3682        // Destination capacity should cap how many chunks we export.
3683        let mut small = [IoSlice::new(&[]); 2];
3684        let count = bufs.chunks_vectored(&mut small);
3685        assert_eq!(count, 2);
3686        assert_eq!(&small[0][..], b"ab");
3687        assert_eq!(&small[1][..], b"cd");
3688
3689        // Larger destination should include every readable chunk.
3690        let mut large = [IoSlice::new(&[]); 8];
3691        let count = bufs.chunks_vectored(&mut large);
3692        assert_eq!(count, 4);
3693        assert_eq!(&large[0][..], b"ab");
3694        assert_eq!(&large[1][..], b"cd");
3695        assert_eq!(&large[2][..], b"ef");
3696        assert_eq!(&large[3][..], b"gh");
3697
3698        // Empty destination cannot accept any slices.
3699        let mut empty_dst: [IoSlice<'_>; 0] = [];
3700        assert_eq!(bufs.chunks_vectored(&mut empty_dst), 0);
3701
3702        // Non-canonical shapes should skip empty leading chunks.
3703        let sparse = IoBufsMut {
3704            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::from(b"y")]),
3705        };
3706        let mut dst = [IoSlice::new(&[]); 2];
3707        let count = sparse.chunks_vectored(&mut dst);
3708        assert_eq!(count, 1);
3709        assert_eq!(&dst[0][..], b"y");
3710
3711        // Triple should skip empty chunks and preserve readable order.
3712        let sparse_triple = IoBufsMut {
3713            inner: IoBufsMutInner::Triple([
3714                IoBufMut::default(),
3715                IoBufMut::from(b"z"),
3716                IoBufMut::from(b"w"),
3717            ]),
3718        };
3719        let mut dst = [IoSlice::new(&[]); 3];
3720        let count = sparse_triple.chunks_vectored(&mut dst);
3721        assert_eq!(count, 2);
3722        assert_eq!(&dst[0][..], b"z");
3723        assert_eq!(&dst[1][..], b"w");
3724
3725        // Chunked shapes with only empty buffers should export no slices.
3726        let empty_chunked = IoBufsMut {
3727            inner: IoBufsMutInner::Chunked(VecDeque::from([
3728                IoBufMut::default(),
3729                IoBufMut::default(),
3730            ])),
3731        };
3732        let mut dst = [IoSlice::new(&[]); 2];
3733        assert_eq!(empty_chunked.chunks_vectored(&mut dst), 0);
3734    }
3735
3736    #[test]
3737    fn test_iobufsmut_try_into_single() {
3738        let single = IoBufsMut::from(IoBufMut::from(b"hello"));
3739        let single = single.try_into_single().expect("single expected");
3740        assert_eq!(single, b"hello");
3741
3742        let multi = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3743        let multi = multi.try_into_single().expect_err("multi expected");
3744        assert_eq!(multi.coalesce(), b"abcd");
3745    }
3746
3747    #[test]
3748    fn test_iobufsmut_freeze_after_advance() {
3749        // Partial advance: advance 3 of 11 bytes
3750        let buf1 = IoBufMut::from(b"hello");
3751        let buf2 = IoBufMut::from(b" world");
3752        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3753
3754        bufs.advance(3);
3755        assert_eq!(bufs.len(), 8);
3756
3757        let frozen = bufs.freeze();
3758        assert_eq!(frozen.len(), 8);
3759        assert_eq!(frozen.coalesce(), b"lo world");
3760
3761        // Exact boundary advance: advance 5 of 11 bytes (first buf is 5 bytes)
3762        let buf1 = IoBufMut::from(b"hello");
3763        let buf2 = IoBufMut::from(b" world");
3764        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3765
3766        bufs.advance(5);
3767        assert_eq!(bufs.len(), 6);
3768
3769        // First buffer should be fully consumed (empty after advance)
3770        // freeze() filters empty buffers, so result should be Single
3771        let frozen = bufs.freeze();
3772        assert!(frozen.is_single());
3773        assert_eq!(frozen.coalesce(), b" world");
3774    }
3775
3776    #[test]
3777    fn test_iobufsmut_coalesce_with_pool() {
3778        let pool = test_pool();
3779
3780        // Single buffer: zero-copy (same pointer)
3781        let mut buf = IoBufMut::from(b"hello");
3782        let original_ptr = buf.as_mut_ptr();
3783        let bufs = IoBufsMut::from(buf);
3784        let coalesced = bufs.coalesce_with_pool(&pool);
3785        assert_eq!(coalesced, b"hello");
3786        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
3787
3788        // Multiple buffers: merged using pool
3789        let bufs = IoBufsMut::from(vec![IoBufMut::from(b"hello"), IoBufMut::from(b" world")]);
3790        let coalesced = bufs.coalesce_with_pool(&pool);
3791        assert_eq!(coalesced, b"hello world");
3792        assert!(coalesced.is_pooled());
3793
3794        // Four chunks force the deque-backed coalesce path instead of pair/triple fast paths.
3795        let bufs = IoBufsMut::from(vec![
3796            IoBufMut::from(b"a"),
3797            IoBufMut::from(b"b"),
3798            IoBufMut::from(b"c"),
3799            IoBufMut::from(b"d"),
3800        ]);
3801        let coalesced = bufs.coalesce_with_pool(&pool);
3802        assert_eq!(coalesced, b"abcd");
3803        assert!(coalesced.is_pooled());
3804
3805        // With extra capacity: zero-copy if sufficient spare capacity
3806        let mut buf = IoBufMut::with_capacity(100);
3807        buf.put_slice(b"hello");
3808        let original_ptr = buf.as_mut_ptr();
3809        let bufs = IoBufsMut::from(buf);
3810        let coalesced = bufs.coalesce_with_pool_extra(&pool, 10);
3811        assert_eq!(coalesced, b"hello");
3812        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
3813
3814        // With extra capacity: reallocates if insufficient
3815        let mut buf = IoBufMut::with_capacity(5);
3816        buf.put_slice(b"hello");
3817        let bufs = IoBufsMut::from(buf);
3818        let coalesced = bufs.coalesce_with_pool_extra(&pool, 100);
3819        assert_eq!(coalesced, b"hello");
3820        assert!(coalesced.capacity() >= 105);
3821    }
3822
3823    #[test]
3824    fn test_iobuf_additional_conversion_and_trait_paths() {
3825        let pool = test_pool();
3826
3827        let mut pooled_mut = pool.alloc(4);
3828        pooled_mut.put_slice(b"data");
3829        let pooled = pooled_mut.freeze();
3830        assert!(!pooled.as_ptr().is_null());
3831
3832        let unique = IoBuf::from(Bytes::from(vec![1u8, 2, 3]));
3833        let unique_mut = unique.try_into_mut().expect("unique bytes should convert");
3834        assert_eq!(unique_mut.as_ref(), &[1u8, 2, 3]);
3835
3836        let shared = IoBuf::from(Bytes::from(vec![4u8, 5, 6]));
3837        let _shared_clone = shared.clone();
3838        assert!(shared.try_into_mut().is_err());
3839
3840        let expected: &[u8] = &[9u8, 8];
3841        let eq_buf = IoBuf::from(vec![9u8, 8]);
3842        assert!(PartialEq::<[u8]>::eq(&eq_buf, expected));
3843
3844        let static_slice: &'static [u8] = b"static";
3845        assert_eq!(IoBuf::from(static_slice), b"static");
3846
3847        let mut pooled_mut = pool.alloc(3);
3848        pooled_mut.put_slice(b"xyz");
3849        let pooled = pooled_mut.freeze();
3850        let vec_out: Vec<u8> = pooled.clone().into();
3851        let bytes_out: Bytes = pooled.into();
3852        assert_eq!(vec_out, b"xyz");
3853        assert_eq!(bytes_out.as_ref(), b"xyz");
3854    }
3855
3856    #[test]
3857    fn test_iobufmut_additional_conversion_and_trait_paths() {
3858        // Basic mutable operations should keep readable bytes consistent.
3859        let mut buf = IoBufMut::from(vec![1u8, 2, 3, 4]);
3860        assert!(!buf.is_empty());
3861        buf.truncate(2);
3862        assert_eq!(buf.as_ref(), &[1u8, 2]);
3863        buf.clear();
3864        assert!(buf.is_empty());
3865        buf.put_slice(b"xyz");
3866
3867        // Equality should work across slice, array, and byte-string forms.
3868        let expected: &[u8] = b"xyz";
3869        assert!(PartialEq::<[u8]>::eq(&buf, expected));
3870        assert!(buf == b"xyz"[..]);
3871        assert!(buf == [b'x', b'y', b'z']);
3872        assert!(buf == b"xyz");
3873
3874        // Conversions from common owned/shared containers preserve contents.
3875        let from_vec = IoBufMut::from(vec![7u8, 8]);
3876        assert_eq!(from_vec.as_ref(), &[7u8, 8]);
3877
3878        let from_bytesmut = IoBufMut::from(BytesMut::from(&b"hi"[..]));
3879        assert_eq!(from_bytesmut.as_ref(), b"hi");
3880
3881        let from_bytes = IoBufMut::from(Bytes::from_static(b"ok"));
3882        assert_eq!(from_bytes.as_ref(), b"ok");
3883
3884        // `Bytes::from_static` cannot be converted to mutable without copy.
3885        let from_iobuf = IoBufMut::from(IoBuf::from(Bytes::from_static(b"io")));
3886        assert_eq!(from_iobuf.as_ref(), b"io");
3887    }
3888
3889    #[test]
3890    fn test_iobuf_aligned_public_paths() {
3891        // Exercise the public IoBuf/IoBufMut API through the untracked aligned
3892        // backing: write, advance, copy_to_bytes, freeze, slice, split_to,
3893        // try_into_mut, and From/Into conversions.
3894        static ARRAY: &[u8; 4] = b"wxyz";
3895
3896        let alignment = NonZeroUsize::new(64).expect("non-zero alignment");
3897
3898        // Start from a non-zero untracked aligned buffer to cover the public mutable API.
3899        let mut aligned_mut = IoBufMut::with_alignment(8, alignment);
3900        assert!(!aligned_mut.is_pooled());
3901        assert!(aligned_mut.is_empty());
3902        assert_eq!(aligned_mut.capacity(), 8);
3903        assert!((aligned_mut.as_mut_ptr() as usize).is_multiple_of(64));
3904
3905        aligned_mut.put_slice(b"abcdefgh");
3906        assert_eq!(aligned_mut.as_mut(), b"abcdefgh");
3907        assert_eq!(aligned_mut.chunk(), b"abcdefgh");
3908        aligned_mut.advance(2);
3909        assert_eq!(aligned_mut.chunk(), b"cdefgh");
3910
3911        let partial = aligned_mut.copy_to_bytes(2);
3912        assert_eq!(partial.as_ref(), b"cd");
3913        assert_eq!(aligned_mut.as_ref(), b"efgh");
3914        let empty = aligned_mut.copy_to_bytes(0);
3915        assert!(empty.is_empty());
3916        assert_eq!(aligned_mut.as_ref(), b"efgh");
3917
3918        aligned_mut.clear();
3919        assert!(aligned_mut.is_empty());
3920        aligned_mut.put_slice(ARRAY);
3921        assert!(aligned_mut == ARRAY);
3922
3923        // Full aligned drains should use the owner-transfer path, including len == 0 first.
3924        let mut fully_drained = IoBufMut::with_alignment(4, alignment);
3925        fully_drained.put_slice(b"lmno");
3926        let empty = fully_drained.copy_to_bytes(0);
3927        assert!(empty.is_empty());
3928        assert_eq!(fully_drained.as_ref(), b"lmno");
3929        let drained = fully_drained.copy_to_bytes(4);
3930        assert_eq!(drained.as_ref(), b"lmno");
3931        assert!(fully_drained.is_empty());
3932
3933        // Freeze to an immutable aligned `IoBuf` and exercise its view/Buf dispatch.
3934        let aligned = aligned_mut.freeze();
3935        assert!(!aligned.is_pooled());
3936        assert_eq!(aligned.as_ref(), &ARRAY[..]);
3937        assert!(aligned == ARRAY);
3938        assert!(!aligned.as_ptr().is_null());
3939        assert_eq!(aligned.slice(..2), b"wx");
3940        assert_eq!(aligned.slice(1..), b"xyz");
3941        assert_eq!(aligned.slice(1..=2), b"xy");
3942        assert_eq!(aligned.chunk(), b"wxyz");
3943
3944        let mut split = aligned.clone();
3945        let prefix = split.split_to(2);
3946        assert_eq!(prefix, b"wx");
3947        assert_eq!(split, b"yz");
3948
3949        let mut advanced = aligned.clone();
3950        advanced.advance(2);
3951        assert_eq!(advanced.chunk(), b"yz");
3952
3953        // Partial and full immutable drains should preserve the aligned backing behavior.
3954        let mut drained = aligned.clone();
3955        let empty = drained.copy_to_bytes(0);
3956        assert!(empty.is_empty());
3957        assert_eq!(drained.as_ref(), &ARRAY[..]);
3958        let first = drained.copy_to_bytes(1);
3959        assert_eq!(first.as_ref(), b"w");
3960        let rest = drained.copy_to_bytes(3);
3961        assert_eq!(rest.as_ref(), b"xyz");
3962        assert_eq!(drained.remaining(), 0);
3963
3964        // Unique aligned immutable buffers can become mutable again.
3965        let mut unique_source = IoBufMut::zeroed_with_alignment(4, alignment);
3966        unique_source.as_mut().copy_from_slice(b"pqrs");
3967        let unique = unique_source.freeze();
3968        let recovered = unique
3969            .try_into_mut()
3970            .expect("unique aligned iobuf should recover mutability");
3971        assert_eq!(recovered.as_ref(), b"pqrs");
3972
3973        // Shared aligned immutable buffers must reject the mutable conversion.
3974        let mut shared_source = IoBufMut::zeroed_with_alignment(4, alignment);
3975        shared_source.as_mut().copy_from_slice(b"tuvw");
3976        let shared = shared_source.freeze();
3977        let _shared_clone = shared.clone();
3978        assert!(shared.try_into_mut().is_err());
3979
3980        // Owned/container conversions should preserve bytes for aligned backings.
3981        let vec_out: Vec<u8> = aligned.clone().into();
3982        let bytes_out: Bytes = aligned.into();
3983        assert_eq!(vec_out, ARRAY.to_vec());
3984        assert_eq!(bytes_out.as_ref(), &ARRAY[..]);
3985
3986        let from_array = IoBuf::from(ARRAY);
3987        assert_eq!(from_array, b"wxyz");
3988
3989        let iobufs = IoBufs::from(ARRAY);
3990        assert_eq!(iobufs.chunk(), b"wxyz");
3991    }
3992
3993    #[test]
3994    fn test_iobufmut_aligned_zero_length_constructors() {
3995        let alignment = NonZeroUsize::new(64).expect("non-zero alignment");
3996
3997        let with_alignment = IoBufMut::with_alignment(0, alignment);
3998        assert!(with_alignment.is_empty());
3999        assert_eq!(with_alignment.len(), 0);
4000        assert_eq!(with_alignment.capacity(), 0);
4001
4002        let zeroed = IoBufMut::zeroed_with_alignment(0, alignment);
4003        assert!(zeroed.is_empty());
4004        assert_eq!(zeroed.len(), 0);
4005        assert_eq!(zeroed.capacity(), 0);
4006    }
4007
4008    #[test]
4009    fn test_iobufs_additional_shape_and_conversion_paths() {
4010        let pool = test_pool();
4011
4012        // Constructor coverage for mutable/immutable/slice-backed inputs.
4013        let from_mut = IoBufs::from(IoBufMut::from(b"m"));
4014        assert_eq!(from_mut.chunk(), b"m");
4015        let from_bytes = IoBufs::from(Bytes::from_static(b"b"));
4016        assert_eq!(from_bytes.chunk(), b"b");
4017        let from_bytesmut = IoBufs::from(BytesMut::from(&b"bm"[..]));
4018        assert_eq!(from_bytesmut.chunk(), b"bm");
4019        let from_vec = IoBufs::from(vec![1u8, 2u8]);
4020        assert_eq!(from_vec.chunk(), &[1u8, 2]);
4021        let static_slice: &'static [u8] = b"slice";
4022        let from_static = IoBufs::from(static_slice);
4023        assert_eq!(from_static.chunk(), b"slice");
4024
4025        // Canonicalizing an already-empty buffer remains a single empty chunk.
4026        let mut single_empty = IoBufs::default();
4027        single_empty.canonicalize();
4028        assert!(single_empty.is_single());
4029
4030        // Triple path: prepend/append can promote into chunked while preserving order.
4031        let mut triple = IoBufs::from(vec![
4032            IoBuf::from(b"a".to_vec()),
4033            IoBuf::from(b"b".to_vec()),
4034            IoBuf::from(b"c".to_vec()),
4035        ]);
4036        assert!(triple.as_single().is_none());
4037        triple.prepend(IoBuf::from(vec![b'0']));
4038        triple.prepend(IoBuf::from(vec![b'1']));
4039        triple.append(IoBuf::from(vec![b'2']));
4040        assert_eq!(triple.copy_to_bytes(triple.remaining()).as_ref(), b"10abc2");
4041
4042        // Appending to an existing triple keeps byte order stable.
4043        let mut triple_append = IoBufs::from(vec![
4044            IoBuf::from(b"x".to_vec()),
4045            IoBuf::from(b"y".to_vec()),
4046            IoBuf::from(b"z".to_vec()),
4047        ]);
4048        triple_append.append(IoBuf::from(vec![b'w']));
4049        assert_eq!(triple_append.coalesce(), b"xyzw");
4050
4051        // coalesce_with_pool on a triple should preserve contents.
4052        let triple_pool = IoBufs::from(vec![
4053            IoBuf::from(b"a".to_vec()),
4054            IoBuf::from(b"b".to_vec()),
4055            IoBuf::from(b"c".to_vec()),
4056        ]);
4057        assert_eq!(triple_pool.coalesce_with_pool(&pool), b"abc");
4058
4059        // coalesce_with_pool on 4+ chunks should read only remaining bytes.
4060        let mut chunked_pool = IoBufs::from(vec![
4061            IoBuf::from(b"a".to_vec()),
4062            IoBuf::from(b"b".to_vec()),
4063            IoBuf::from(b"c".to_vec()),
4064            IoBuf::from(b"d".to_vec()),
4065        ]);
4066        assert_eq!(chunked_pool.remaining(), 4);
4067        chunked_pool.advance(1);
4068        assert_eq!(chunked_pool.coalesce_with_pool(&pool), b"bcd");
4069
4070        // Non-canonical Pair/Triple/Chunked shapes should still expose the first readable chunk.
4071        let pair_second = IoBufs {
4072            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::from(vec![1u8])]),
4073        };
4074        assert_eq!(pair_second.chunk(), &[1u8]);
4075        let pair_empty = IoBufs {
4076            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::default()]),
4077        };
4078        assert_eq!(pair_empty.chunk(), b"");
4079
4080        let triple_third = IoBufs {
4081            inner: IoBufsInner::Triple([
4082                IoBuf::default(),
4083                IoBuf::default(),
4084                IoBuf::from(vec![3u8]),
4085            ]),
4086        };
4087        assert_eq!(triple_third.chunk(), &[3u8]);
4088        let triple_second = IoBufs {
4089            inner: IoBufsInner::Triple([
4090                IoBuf::default(),
4091                IoBuf::from(vec![2u8]),
4092                IoBuf::default(),
4093            ]),
4094        };
4095        assert_eq!(triple_second.chunk(), &[2u8]);
4096        let triple_empty = IoBufs {
4097            inner: IoBufsInner::Triple([IoBuf::default(), IoBuf::default(), IoBuf::default()]),
4098        };
4099        assert_eq!(triple_empty.chunk(), b"");
4100
4101        let chunked_second = IoBufs {
4102            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default(), IoBuf::from(vec![9u8])])),
4103        };
4104        assert_eq!(chunked_second.chunk(), &[9u8]);
4105        let chunked_empty = IoBufs {
4106            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default()])),
4107        };
4108        assert_eq!(chunked_empty.chunk(), b"");
4109    }
4110
4111    #[test]
4112    fn test_iobufsmut_additional_shape_and_conversion_paths() {
4113        // `as_single` accessors should work only for single-shape containers.
4114        let mut single = IoBufsMut::from(IoBufMut::from(b"x"));
4115        assert!(single.as_single().is_some());
4116        assert!(single.as_single_mut().is_some());
4117        single.canonicalize();
4118        assert!(single.is_single());
4119
4120        let mut pair = IoBufsMut::from(vec![IoBufMut::from(b"a"), IoBufMut::from(b"b")]);
4121        assert!(pair.as_single().is_none());
4122        assert!(pair.as_single_mut().is_none());
4123
4124        // Constructor coverage for raw vec and BytesMut sources.
4125        let from_vec = IoBufsMut::from(vec![1u8, 2u8]);
4126        assert_eq!(from_vec.chunk(), &[1u8, 2]);
4127        let from_bytesmut = IoBufsMut::from(BytesMut::from(&b"cd"[..]));
4128        assert_eq!(from_bytesmut.chunk(), b"cd");
4129
4130        // Chunked write path: set_len + copy_from_slice + freeze round-trip.
4131        let mut chunked = IoBufsMut::from(vec![
4132            IoBufMut::with_capacity(1),
4133            IoBufMut::with_capacity(1),
4134            IoBufMut::with_capacity(1),
4135            IoBufMut::with_capacity(1),
4136        ]);
4137        // SAFETY: We only write/read initialized bytes after `copy_from_slice`.
4138        unsafe { chunked.set_len(4) };
4139        chunked.copy_from_slice(b"wxyz");
4140        assert_eq!(chunked.capacity(), 4);
4141        assert_eq!(chunked.remaining(), 4);
4142        let frozen = chunked.freeze();
4143        assert_eq!(frozen.coalesce(), b"wxyz");
4144    }
4145
4146    #[test]
4147    fn test_iobufsmut_coalesce_multi_shape_paths() {
4148        let pool = test_pool();
4149
4150        // Pair: plain coalesce and pool-backed coalesce-with-extra.
4151        let pair = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
4152        assert_eq!(pair.coalesce(), b"abcd");
4153        let pair = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
4154        let pair_extra = pair.coalesce_with_pool_extra(&pool, 3);
4155        assert_eq!(pair_extra, b"abcd");
4156        assert!(pair_extra.capacity() >= 7);
4157
4158        // Triple: both coalesce paths should preserve payload and requested spare capacity.
4159        let triple = IoBufsMut::from(vec![
4160            IoBufMut::from(b"a"),
4161            IoBufMut::from(b"b"),
4162            IoBufMut::from(b"c"),
4163        ]);
4164        assert_eq!(triple.coalesce(), b"abc");
4165        let triple = IoBufsMut::from(vec![
4166            IoBufMut::from(b"a"),
4167            IoBufMut::from(b"b"),
4168            IoBufMut::from(b"c"),
4169        ]);
4170        let triple_extra = triple.coalesce_with_pool_extra(&pool, 2);
4171        assert_eq!(triple_extra, b"abc");
4172        assert!(triple_extra.capacity() >= 5);
4173
4174        // Chunked (4+): same expectations as pair/triple for content + capacity.
4175        let chunked = IoBufsMut::from(vec![
4176            IoBufMut::from(b"1"),
4177            IoBufMut::from(b"2"),
4178            IoBufMut::from(b"3"),
4179            IoBufMut::from(b"4"),
4180        ]);
4181        assert_eq!(chunked.coalesce(), b"1234");
4182        let chunked = IoBufsMut::from(vec![
4183            IoBufMut::from(b"1"),
4184            IoBufMut::from(b"2"),
4185            IoBufMut::from(b"3"),
4186            IoBufMut::from(b"4"),
4187        ]);
4188        let chunked_extra = chunked.coalesce_with_pool_extra(&pool, 5);
4189        assert_eq!(chunked_extra, b"1234");
4190        assert!(chunked_extra.capacity() >= 9);
4191    }
4192
4193    #[test]
4194    fn test_iobufsmut_noncanonical_chunk_and_chunk_mut_paths() {
4195        fn no_spare_capacity_buf(pool: &BufferPool) -> IoBufMut {
4196            let mut buf = pool.alloc(1);
4197            let cap = buf.capacity();
4198            // SAFETY: We never read from this buffer in this helper.
4199            unsafe { buf.set_len(cap) };
4200            buf
4201        }
4202        let pool = test_pool();
4203
4204        // `chunk()` should skip empty front buffers across all shapes.
4205        let pair_second = IoBufsMut {
4206            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::from(b"b")]),
4207        };
4208        assert_eq!(pair_second.chunk(), b"b");
4209        let pair_empty = IoBufsMut {
4210            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::default()]),
4211        };
4212        assert_eq!(pair_empty.chunk(), b"");
4213
4214        let triple_third = IoBufsMut {
4215            inner: IoBufsMutInner::Triple([
4216                IoBufMut::default(),
4217                IoBufMut::default(),
4218                IoBufMut::from(b"c"),
4219            ]),
4220        };
4221        assert_eq!(triple_third.chunk(), b"c");
4222        let triple_second = IoBufsMut {
4223            inner: IoBufsMutInner::Triple([
4224                IoBufMut::default(),
4225                IoBufMut::from(b"b"),
4226                IoBufMut::default(),
4227            ]),
4228        };
4229        assert_eq!(triple_second.chunk(), b"b");
4230        let triple_empty = IoBufsMut {
4231            inner: IoBufsMutInner::Triple([
4232                IoBufMut::default(),
4233                IoBufMut::default(),
4234                IoBufMut::default(),
4235            ]),
4236        };
4237        assert_eq!(triple_empty.chunk(), b"");
4238
4239        let chunked_second = IoBufsMut {
4240            inner: IoBufsMutInner::Chunked(VecDeque::from([
4241                IoBufMut::default(),
4242                IoBufMut::from(b"d"),
4243            ])),
4244        };
4245        assert_eq!(chunked_second.chunk(), b"d");
4246        let chunked_empty = IoBufsMut {
4247            inner: IoBufsMutInner::Chunked(VecDeque::from([IoBufMut::default()])),
4248        };
4249        assert_eq!(chunked_empty.chunk(), b"");
4250
4251        // `chunk_mut()` should skip non-writable fronts and return first writable chunk.
4252        let mut pair_chunk_mut = IoBufsMut {
4253            inner: IoBufsMutInner::Pair([no_spare_capacity_buf(&pool), IoBufMut::with_capacity(2)]),
4254        };
4255        assert!(pair_chunk_mut.chunk_mut().len() >= 2);
4256
4257        let mut pair_chunk_mut_empty = IoBufsMut {
4258            inner: IoBufsMutInner::Pair([
4259                no_spare_capacity_buf(&pool),
4260                no_spare_capacity_buf(&pool),
4261            ]),
4262        };
4263        assert_eq!(pair_chunk_mut_empty.chunk_mut().len(), 0);
4264
4265        let mut triple_chunk_mut = IoBufsMut {
4266            inner: IoBufsMutInner::Triple([
4267                no_spare_capacity_buf(&pool),
4268                no_spare_capacity_buf(&pool),
4269                IoBufMut::with_capacity(3),
4270            ]),
4271        };
4272        assert!(triple_chunk_mut.chunk_mut().len() >= 3);
4273        let mut triple_chunk_mut_second = IoBufsMut {
4274            inner: IoBufsMutInner::Triple([
4275                no_spare_capacity_buf(&pool),
4276                IoBufMut::with_capacity(2),
4277                no_spare_capacity_buf(&pool),
4278            ]),
4279        };
4280        assert!(triple_chunk_mut_second.chunk_mut().len() >= 2);
4281
4282        let mut triple_chunk_mut_empty = IoBufsMut {
4283            inner: IoBufsMutInner::Triple([
4284                no_spare_capacity_buf(&pool),
4285                no_spare_capacity_buf(&pool),
4286                no_spare_capacity_buf(&pool),
4287            ]),
4288        };
4289        assert_eq!(triple_chunk_mut_empty.chunk_mut().len(), 0);
4290
4291        let mut chunked_chunk_mut = IoBufsMut {
4292            inner: IoBufsMutInner::Chunked(VecDeque::from([
4293                IoBufMut::default(),
4294                IoBufMut::with_capacity(4),
4295            ])),
4296        };
4297        assert!(chunked_chunk_mut.chunk_mut().len() >= 4);
4298
4299        let mut chunked_chunk_mut_empty = IoBufsMut {
4300            inner: IoBufsMutInner::Chunked(VecDeque::from([no_spare_capacity_buf(&pool)])),
4301        };
4302        assert_eq!(chunked_chunk_mut_empty.chunk_mut().len(), 0);
4303    }
4304
4305    #[test]
4306    fn test_iobuf_internal_chunk_helpers() {
4307        // `copy_to_bytes_chunked` should drop leading empties on zero-length reads.
4308        let mut empty_with_leading = VecDeque::from([IoBuf::default()]);
4309        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut empty_with_leading, 0, "x");
4310        assert!(bytes.is_empty());
4311        assert!(!needs_canonicalize);
4312        assert!(empty_with_leading.is_empty());
4313
4314        // Fast path: front chunk can fully satisfy the request.
4315        let mut fast = VecDeque::from([
4316            IoBuf::from(b"ab".to_vec()),
4317            IoBuf::from(b"cd".to_vec()),
4318            IoBuf::from(b"ef".to_vec()),
4319            IoBuf::from(b"gh".to_vec()),
4320        ]);
4321        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut fast, 2, "x");
4322        assert_eq!(bytes.as_ref(), b"ab");
4323        assert!(needs_canonicalize);
4324        assert_eq!(fast.front().expect("front exists").as_ref(), b"cd");
4325
4326        // Slow path: request spans multiple chunks.
4327        let mut slow = VecDeque::from([
4328            IoBuf::from(b"a".to_vec()),
4329            IoBuf::from(b"bc".to_vec()),
4330            IoBuf::from(b"d".to_vec()),
4331            IoBuf::from(b"e".to_vec()),
4332        ]);
4333        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut slow, 3, "x");
4334        assert_eq!(bytes.as_ref(), b"abc");
4335        assert!(needs_canonicalize);
4336
4337        let mut empty_with_leading_mut = VecDeque::from([IoBufMut::default()]);
4338        let (bytes, needs_canonicalize) =
4339            copy_to_bytes_chunked(&mut empty_with_leading_mut, 0, "x");
4340        assert!(bytes.is_empty());
4341        assert!(!needs_canonicalize);
4342        assert!(empty_with_leading_mut.is_empty());
4343
4344        // Mirror the fast/slow chunked helper paths for mutable chunks too.
4345        let mut fast_mut = VecDeque::from([
4346            IoBufMut::from(b"ab"),
4347            IoBufMut::from(b"cd"),
4348            IoBufMut::from(b"ef"),
4349            IoBufMut::from(b"gh"),
4350        ]);
4351        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut fast_mut, 2, "x");
4352        assert_eq!(bytes.as_ref(), b"ab");
4353        assert!(needs_canonicalize);
4354        assert_eq!(fast_mut.front().expect("front exists").as_ref(), b"cd");
4355
4356        let mut slow_mut = VecDeque::from([
4357            IoBufMut::from(b"a"),
4358            IoBufMut::from(b"bc"),
4359            IoBufMut::from(b"de"),
4360            IoBufMut::from(b"f"),
4361        ]);
4362        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut slow_mut, 4, "x");
4363        assert_eq!(bytes.as_ref(), b"abcd");
4364        assert!(needs_canonicalize);
4365        assert_eq!(slow_mut.front().expect("front exists").as_ref(), b"e");
4366
4367        // `advance_chunked_front` should skip empties and drain in linear order.
4368        let mut advance_chunked = VecDeque::from([
4369            IoBuf::default(),
4370            IoBuf::from(b"abc".to_vec()),
4371            IoBuf::from(b"d".to_vec()),
4372        ]);
4373        advance_chunked_front(&mut advance_chunked, 2);
4374        assert_eq!(
4375            advance_chunked.front().expect("front exists").as_ref(),
4376            b"c"
4377        );
4378        advance_chunked_front(&mut advance_chunked, 2);
4379        assert!(advance_chunked.is_empty());
4380
4381        // The front-advance helper also has a separate mutable monomorphization.
4382        let mut advance_chunked_mut = VecDeque::from([
4383            IoBufMut::default(),
4384            IoBufMut::from(b"abc"),
4385            IoBufMut::from(b"d"),
4386        ]);
4387        advance_chunked_front(&mut advance_chunked_mut, 2);
4388        assert_eq!(
4389            advance_chunked_mut.front().expect("front exists").as_ref(),
4390            b"c"
4391        );
4392        advance_chunked_front(&mut advance_chunked_mut, 2);
4393        assert!(advance_chunked_mut.is_empty());
4394
4395        // `advance_small_chunks` signals canonicalization when front chunks are exhausted.
4396        let mut small = [IoBuf::default(), IoBuf::from(b"abc".to_vec())];
4397        let needs_canonicalize = advance_small_chunks(&mut small, 2);
4398        assert!(needs_canonicalize);
4399        assert_eq!(small[1].as_ref(), b"c");
4400
4401        let mut small_exact = [
4402            IoBuf::from(b"a".to_vec()),
4403            IoBuf::from(b"b".to_vec()),
4404            IoBuf::from(b"c".to_vec()),
4405        ];
4406        let needs_canonicalize = advance_small_chunks(&mut small_exact, 3);
4407        assert!(needs_canonicalize);
4408        assert_eq!(small_exact[0].remaining(), 0);
4409        assert_eq!(small_exact[1].remaining(), 0);
4410        assert_eq!(small_exact[2].remaining(), 0);
4411
4412        // Small-chunk copy canonicalization is also instantiated for mutable chunks.
4413        let mut small_mut = [
4414            IoBufMut::from(b"a"),
4415            IoBufMut::from(b"bc"),
4416            IoBufMut::from(b"d"),
4417        ];
4418        let (bytes, needs_canonicalize) = copy_to_bytes_small_chunks(&mut small_mut, 3, "x");
4419        assert_eq!(bytes.as_ref(), b"abc");
4420        assert!(needs_canonicalize);
4421        assert_eq!(small_mut[2].as_ref(), b"d");
4422
4423        // `advance_mut_in_chunks` returns whether the request fully fit in writable chunks.
4424        let mut writable = [IoBufMut::with_capacity(2), IoBufMut::with_capacity(1)];
4425        let mut remaining = 3usize;
4426        // SAFETY: We do not read from advanced bytes in this test.
4427        let all_advanced = unsafe { advance_mut_in_chunks(&mut writable, &mut remaining) };
4428        assert!(all_advanced);
4429        assert_eq!(remaining, 0);
4430
4431        // `advance_mut_in_chunks` should skip non-writable chunks.
4432        let pool = test_pool();
4433        let mut full = pool.alloc(1);
4434        // SAFETY: We only mark initialized capacity; bytes are not read.
4435        unsafe { full.set_len(full.capacity()) };
4436        let mut writable_after_full = [full, IoBufMut::with_capacity(2)];
4437        let mut remaining = 2usize;
4438        // SAFETY: We do not read from advanced bytes in this test.
4439        let all_advanced =
4440            unsafe { advance_mut_in_chunks(&mut writable_after_full, &mut remaining) };
4441        assert!(all_advanced);
4442        assert_eq!(remaining, 0);
4443
4444        let mut writable_short = [IoBufMut::with_capacity(1), IoBufMut::with_capacity(1)];
4445        let mut remaining = 3usize;
4446        // SAFETY: We do not read from advanced bytes in this test.
4447        let all_advanced = unsafe { advance_mut_in_chunks(&mut writable_short, &mut remaining) };
4448        assert!(!all_advanced);
4449        assert_eq!(remaining, 1);
4450    }
4451
4452    #[test]
4453    fn test_iobufsmut_advance_mut_success_paths() {
4454        // Pair path.
4455        let mut pair = IoBufsMut {
4456            inner: IoBufsMutInner::Pair([IoBufMut::with_capacity(2), IoBufMut::with_capacity(2)]),
4457        };
4458        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
4459        unsafe { pair.advance_mut(3) };
4460        assert_eq!(pair.remaining(), 3);
4461
4462        // Triple path.
4463        let mut triple = IoBufsMut {
4464            inner: IoBufsMutInner::Triple([
4465                IoBufMut::with_capacity(1),
4466                IoBufMut::with_capacity(1),
4467                IoBufMut::with_capacity(1),
4468            ]),
4469        };
4470        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
4471        unsafe { triple.advance_mut(2) };
4472        assert_eq!(triple.remaining(), 2);
4473
4474        // Chunked wrapped-VecDeque path.
4475        let mut wrapped = VecDeque::with_capacity(5);
4476        wrapped.push_back(IoBufMut::with_capacity(1));
4477        wrapped.push_back(IoBufMut::with_capacity(1));
4478        wrapped.push_back(IoBufMut::with_capacity(1));
4479        wrapped.push_back(IoBufMut::with_capacity(1));
4480        wrapped.push_back(IoBufMut::with_capacity(1));
4481        let _ = wrapped.pop_front();
4482        wrapped.push_back(IoBufMut::with_capacity(1));
4483        let (first, second) = wrapped.as_slices();
4484        assert!(!first.is_empty());
4485        assert!(!second.is_empty());
4486
4487        // Force `advance_mut` to consume across the wrapped second slice as well.
4488        let to_advance = first.len() + 1;
4489        let mut chunked = IoBufsMut {
4490            inner: IoBufsMutInner::Chunked(wrapped),
4491        };
4492        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
4493        unsafe { chunked.advance_mut(to_advance) };
4494        assert_eq!(chunked.remaining(), to_advance);
4495        assert!(chunked.remaining_mut() > 0);
4496    }
4497
4498    #[test]
4499    fn test_iobufsmut_advance_mut_zero_noop_when_full() {
4500        fn full_chunk(pool: &BufferPool) -> IoBufMut {
4501            // Pooled buffers have bounded class capacity (unlike growable Bytes),
4502            // so force len == capacity to make remaining_mut() == 0.
4503            let mut buf = pool.alloc(1);
4504            let cap = buf.capacity();
4505            // SAFETY: We never read from this buffer in this test.
4506            unsafe { buf.set_len(cap) };
4507            buf
4508        }
4509
4510        let pool = test_pool();
4511
4512        // Pair path: fully-written chunks should allow advance_mut(0) as a no-op.
4513        let mut pair = IoBufsMut::from(vec![full_chunk(&pool), full_chunk(&pool)]);
4514        assert!(matches!(pair.inner, IoBufsMutInner::Pair(_)));
4515        assert_eq!(pair.remaining_mut(), 0);
4516        let before = pair.remaining();
4517        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4518        unsafe { pair.advance_mut(0) };
4519        assert_eq!(pair.remaining(), before);
4520
4521        // Triple path: same no-op behavior.
4522        let mut triple = IoBufsMut::from(vec![
4523            full_chunk(&pool),
4524            full_chunk(&pool),
4525            full_chunk(&pool),
4526        ]);
4527        assert!(matches!(triple.inner, IoBufsMutInner::Triple(_)));
4528        assert_eq!(triple.remaining_mut(), 0);
4529        let before = triple.remaining();
4530        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4531        unsafe { triple.advance_mut(0) };
4532        assert_eq!(triple.remaining(), before);
4533
4534        // Chunked path: 4+ fully-written chunks should also no-op.
4535        let mut chunked = IoBufsMut::from(vec![
4536            full_chunk(&pool),
4537            full_chunk(&pool),
4538            full_chunk(&pool),
4539            full_chunk(&pool),
4540        ]);
4541        assert!(matches!(chunked.inner, IoBufsMutInner::Chunked(_)));
4542        assert_eq!(chunked.remaining_mut(), 0);
4543        let before = chunked.remaining();
4544        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4545        unsafe { chunked.advance_mut(0) };
4546        assert_eq!(chunked.remaining(), before);
4547    }
4548
4549    #[test]
4550    #[should_panic(expected = "cannot advance past end of buffer")]
4551    fn test_iobufsmut_advance_mut_past_end_pair() {
4552        let mut pair = IoBufsMut {
4553            inner: IoBufsMutInner::Pair([IoBufMut::with_capacity(1), IoBufMut::with_capacity(1)]),
4554        };
4555        // SAFETY: Intentional panic path coverage.
4556        unsafe { pair.advance_mut(3) };
4557    }
4558
4559    #[test]
4560    #[should_panic(expected = "cannot advance past end of buffer")]
4561    fn test_iobufsmut_advance_mut_past_end_triple() {
4562        let mut triple = IoBufsMut {
4563            inner: IoBufsMutInner::Triple([
4564                IoBufMut::with_capacity(1),
4565                IoBufMut::with_capacity(1),
4566                IoBufMut::with_capacity(1),
4567            ]),
4568        };
4569        // SAFETY: Intentional panic path coverage.
4570        unsafe { triple.advance_mut(4) };
4571    }
4572
4573    #[test]
4574    #[should_panic(expected = "cannot advance past end of buffer")]
4575    fn test_iobufsmut_advance_mut_past_end_chunked() {
4576        let mut chunked = IoBufsMut {
4577            inner: IoBufsMutInner::Chunked(VecDeque::from([
4578                IoBufMut::with_capacity(1),
4579                IoBufMut::with_capacity(1),
4580                IoBufMut::with_capacity(1),
4581                IoBufMut::with_capacity(1),
4582            ])),
4583        };
4584        // SAFETY: Intentional panic path coverage.
4585        unsafe { chunked.advance_mut(5) };
4586    }
4587
4588    #[test]
4589    fn test_iobufsmut_set_len() {
4590        // SAFETY: we don't read the uninitialized bytes.
4591        unsafe {
4592            // Single buffer
4593            let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(16));
4594            bufs.set_len(10);
4595            assert_eq!(bufs.len(), 10);
4596
4597            // Chunked: distributes across chunks [cap 5, cap 10], set 12 -> [5, 7]
4598            let mut bufs = IoBufsMut::from(vec![
4599                IoBufMut::with_capacity(5),
4600                IoBufMut::with_capacity(10),
4601            ]);
4602            bufs.set_len(12);
4603            assert_eq!(bufs.len(), 12);
4604            assert_eq!(bufs.chunk().len(), 5);
4605            bufs.advance(5);
4606            assert_eq!(bufs.chunk().len(), 7);
4607            bufs.advance(7);
4608            assert_eq!(bufs.remaining(), 0);
4609
4610            // Uneven capacities [3, 20, 2], set 18 -> [3, 15, 0].
4611            let mut bufs = IoBufsMut::from(vec![
4612                IoBufMut::with_capacity(3),
4613                IoBufMut::with_capacity(20),
4614                IoBufMut::with_capacity(2),
4615            ]);
4616            bufs.set_len(18);
4617            assert_eq!(bufs.chunk().len(), 3);
4618            bufs.advance(3);
4619            assert_eq!(bufs.chunk().len(), 15);
4620            bufs.advance(15);
4621            assert_eq!(bufs.remaining(), 0);
4622
4623            // Exact total capacity [4, 4], set 8 -> [4, 4]
4624            let mut bufs =
4625                IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4626            bufs.set_len(8);
4627            assert_eq!(bufs.chunk().len(), 4);
4628            bufs.advance(4);
4629            assert_eq!(bufs.chunk().len(), 4);
4630            bufs.advance(4);
4631            assert_eq!(bufs.remaining(), 0);
4632
4633            // Zero length preserves caller-provided layout.
4634            let mut bufs =
4635                IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4636            bufs.set_len(0);
4637            assert_eq!(bufs.len(), 0);
4638            assert_eq!(bufs.chunk(), b"");
4639        }
4640    }
4641
4642    #[test]
4643    #[should_panic(expected = "set_len(9) exceeds capacity(8)")]
4644    fn test_iobufsmut_set_len_overflow() {
4645        let mut bufs =
4646            IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4647        // SAFETY: this will panic before any read.
4648        unsafe { bufs.set_len(9) };
4649    }
4650
4651    #[test]
4652    #[should_panic(expected = "set_len(9) exceeds capacity(8)")]
4653    fn test_iobufmut_set_len_overflow() {
4654        let mut buf = IoBufMut::with_capacity(8);
4655        // SAFETY: this will panic before any read.
4656        unsafe { buf.set_len(9) };
4657    }
4658
4659    #[test]
4660    fn test_encode_with_pool_matches_encode() {
4661        let value = vec![1u8, 2, 3, 4, 5, 6];
4662        assert_encode_with_pool_matches_encode(&value);
4663    }
4664
4665    #[test]
4666    fn test_encode_with_pool_mut_len_matches_encode_size() {
4667        let pool = test_pool();
4668        let value = vec![9u8, 8, 7, 6];
4669
4670        let buf = value.encode_with_pool_mut(&pool);
4671        assert_eq!(buf.len(), value.encode_size());
4672    }
4673
4674    #[test]
4675    fn test_iobuf_encode_with_pool_matches_encode() {
4676        let value = IoBuf::from(vec![0xAB; 512]);
4677        assert_encode_with_pool_matches_encode(&value);
4678    }
4679
4680    #[test]
4681    fn test_nested_container_encode_with_pool_matches_encode() {
4682        let value = (
4683            Some(Bytes::from(vec![0xAA; 256])),
4684            vec![Bytes::from(vec![0xBB; 128]), Bytes::from(vec![0xCC; 64])],
4685        );
4686        assert_encode_with_pool_matches_encode(&value);
4687    }
4688
4689    #[test]
4690    fn test_map_encode_with_pool_matches_encode() {
4691        let mut btree = BTreeMap::new();
4692        btree.insert(2u8, Bytes::from(vec![0xDD; 96]));
4693        btree.insert(1u8, Bytes::from(vec![0xEE; 48]));
4694        assert_encode_with_pool_matches_encode(&btree);
4695
4696        let mut hash = HashMap::new();
4697        hash.insert(2u8, Bytes::from(vec![0x11; 96]));
4698        hash.insert(1u8, Bytes::from(vec![0x22; 48]));
4699        assert_encode_with_pool_matches_encode(&hash);
4700    }
4701
4702    #[test]
4703    fn test_lazy_encode_with_pool_matches_encode() {
4704        let value = Lazy::new(Bytes::from(vec![0x44; 200]));
4705        assert_encode_with_pool_matches_encode(&value);
4706    }
4707
4708    #[test]
4709    fn test_range_encode_with_pool_matches_encode() {
4710        let range: Range<Bytes> = Bytes::from(vec![0x10; 32])..Bytes::from(vec![0x20; 48]);
4711        assert_encode_with_pool_matches_encode(&range);
4712
4713        let inclusive: RangeInclusive<Bytes> =
4714            Bytes::from(vec![0x30; 16])..=Bytes::from(vec![0x40; 24]);
4715        assert_encode_with_pool_matches_encode(&inclusive);
4716
4717        let from: RangeFrom<IoBuf> = IoBuf::from(vec![0x50; 40])..;
4718        assert_encode_with_pool_matches_encode(&from);
4719
4720        let to_inclusive: RangeToInclusive<IoBuf> = ..=IoBuf::from(vec![0x60; 56]);
4721        assert_encode_with_pool_matches_encode(&to_inclusive);
4722    }
4723
4724    #[cfg(feature = "arbitrary")]
4725    mod conformance {
4726        use super::IoBuf;
4727        use commonware_codec::conformance::CodecConformance;
4728
4729        commonware_conformance::conformance_tests! {
4730            CodecConformance<IoBuf>
4731        }
4732    }
4733
4734    mod builder_tests {
4735        use super::*;
4736        use commonware_codec::{BufsMut, Encode, Write};
4737
4738        fn builder(capacity: usize) -> Builder {
4739            Builder::new(&test_pool(), NonZeroUsize::new(capacity).unwrap())
4740        }
4741
4742        // Only inline writes, no pushes.
4743        #[test]
4744        fn test_inline_only() {
4745            let mut b = builder(64);
4746            b.put_u32(42);
4747            b.put_u8(7);
4748            let mut r = b.finish();
4749            assert_eq!(r.remaining(), 5);
4750            assert_eq!(r.get_u32(), 42);
4751            assert_eq!(r.get_u8(), 7);
4752        }
4753
4754        // Only zero-copy pushes, no inline writes.
4755        #[test]
4756        fn test_push_only() {
4757            let mut b = builder(64);
4758            let data = Bytes::from(vec![0xAA; 1024]);
4759            b.push(data.clone());
4760            let mut r = b.finish();
4761            assert_eq!(r.remaining(), 1024);
4762            assert_eq!(r.copy_to_bytes(1024), data);
4763        }
4764
4765        // Interleaved: inline header, zero-copy push, inline trailer.
4766        #[test]
4767        fn test_inline_push_inline() {
4768            let mut b = builder(64);
4769            b.put_u16(99);
4770            let payload = Bytes::from(vec![0xBB; 512]);
4771            b.push(payload.clone());
4772            b.put_u8(1);
4773            let mut r = b.finish();
4774            assert_eq!(r.remaining(), 2 + 512 + 1);
4775            assert_eq!(r.get_u16(), 99);
4776            assert_eq!(r.copy_to_bytes(512), payload);
4777            assert_eq!(r.get_u8(), 1);
4778        }
4779
4780        // Bytes::write_bufs produces identical wire format to Bytes::write.
4781        #[test]
4782        fn test_write_bufs_matches_write() {
4783            let data = Bytes::from(vec![0xCC; 256]);
4784            let mut b = builder(64);
4785            data.write_bufs(&mut b);
4786            let mut bufs = b.finish();
4787
4788            let mut out = vec![0u8; bufs.remaining()];
4789            bufs.copy_to_slice(&mut out);
4790            assert_eq!(out, data.encode().as_ref());
4791        }
4792
4793        // Finishing an unused builder produces empty IoBufs.
4794        #[test]
4795        fn test_empty() {
4796            let bufs = builder(64).finish();
4797            assert_eq!(bufs.remaining(), 0);
4798        }
4799
4800        // Inline writes exceeding capacity panic.
4801        #[test]
4802        #[should_panic]
4803        fn test_inline_overflow_panics() {
4804            let mut b = builder(1);
4805            let cap = b.remaining_mut();
4806            b.put_slice(&vec![0xFF; cap]);
4807            b.put_u8(1); // exceeds capacity
4808        }
4809
4810        // Pushing empty Bytes is a no-op.
4811        #[test]
4812        fn test_empty_push_ignored() {
4813            let mut b = builder(64);
4814            b.push(Bytes::new());
4815            b.put_u8(1);
4816            let bufs = b.finish();
4817            assert_eq!(bufs.remaining(), 1);
4818        }
4819
4820        // Consecutive pushes without inline writes between them.
4821        #[test]
4822        fn test_multiple_pushes() {
4823            let mut b = builder(64);
4824            let a = Bytes::from(vec![0xAA; 100]);
4825            let c = Bytes::from(vec![0xCC; 200]);
4826            b.push(a.clone());
4827            b.push(c.clone());
4828            let mut r = b.finish();
4829            assert_eq!(r.remaining(), 300);
4830            assert_eq!(r.copy_to_bytes(100), a);
4831            assert_eq!(r.copy_to_bytes(200), c);
4832        }
4833
4834        // put() exceeding capacity panics.
4835        #[test]
4836        #[should_panic]
4837        fn test_put_exceeding_capacity_panics() {
4838            let mut b = builder(1);
4839            let cap = b.remaining_mut();
4840            let src = Bytes::from(vec![0xAB; cap + 1]);
4841            b.put(src);
4842        }
4843
4844        // put_slice() exceeding capacity panics.
4845        #[test]
4846        #[should_panic]
4847        fn test_put_slice_exceeding_capacity_panics() {
4848            let mut b = builder(1);
4849            let cap = b.remaining_mut();
4850            b.put_slice(&vec![0xFE; cap + 1]);
4851        }
4852
4853        // Simulates a multi-field struct: [u16 | Bytes (via push) | u32].
4854        // Verifies write_bufs produces identical wire format to write.
4855        #[test]
4856        fn test_multi_field_struct_equivalence() {
4857            let header: u16 = 0xCAFE;
4858            let payload = Bytes::from(vec![0xDD; 1024]);
4859            let trailer: u32 = 0xDEADBEEF;
4860
4861            // Flat encoding via write.
4862            let size = header.encode_size() + payload.encode_size() + trailer.encode_size();
4863            let mut flat = BytesMut::with_capacity(size);
4864            header.write(&mut flat);
4865            payload.write(&mut flat);
4866            trailer.write(&mut flat);
4867
4868            // Multi-buffer encoding via write_bufs.
4869            let mut b = builder(64);
4870            header.write(&mut b);
4871            payload.write_bufs(&mut b);
4872            trailer.write(&mut b);
4873            let mut bufs = b.finish();
4874
4875            let mut out = vec![0u8; bufs.remaining()];
4876            bufs.copy_to_slice(&mut out);
4877            assert_eq!(out, flat.as_ref());
4878        }
4879
4880        // encode_with_pool (Builder path) matches encode (flat BytesMut path).
4881        #[test]
4882        fn test_encode_with_pool_matches_encode() {
4883            let pool = test_pool();
4884            let data = Bytes::from(vec![0xEE; 500]);
4885            let mut pooled = data.encode_with_pool(&pool);
4886            let baseline = data.encode();
4887            let mut out = vec![0u8; pooled.remaining()];
4888            pooled.copy_to_slice(&mut out);
4889            assert_eq!(out, baseline.as_ref());
4890        }
4891
4892        // Exercise remaining_mut, chunk_mut, and advance_mut directly.
4893        #[test]
4894        fn test_chunk_mut_and_advance_mut() {
4895            let mut b = builder(64);
4896            let initial = b.remaining_mut();
4897            assert!(initial >= 64);
4898            let chunk = b.chunk_mut();
4899            chunk[0..1].copy_from_slice(&[0xAB]);
4900            // SAFETY: We just wrote 1 byte into chunk_mut above.
4901            unsafe { b.advance_mut(1) };
4902            assert_eq!(b.remaining_mut(), initial - 1);
4903            let mut r = b.finish();
4904            assert_eq!(r.remaining(), 1);
4905            assert_eq!(r.get_u8(), 0xAB);
4906        }
4907
4908        // Writing past a full buffer panics (fixed capacity).
4909        #[test]
4910        #[should_panic]
4911        fn test_write_past_full_panics() {
4912            let mut b = builder(1);
4913            let cap = b.remaining_mut();
4914            b.put_slice(&vec![0xFF; cap]); // fill the buffer completely
4915            assert_eq!(b.remaining_mut(), 0);
4916            b.put_u8(0x42); // panics
4917        }
4918
4919        // Push at offset 0 with inline trailer exercises finish branch
4920        // where offset == pos (no inline prefix before push).
4921        #[test]
4922        fn test_push_at_start_with_trailer() {
4923            let mut b = builder(64);
4924            let payload = Bytes::from(vec![0xCC; 32]);
4925            b.push(payload.clone());
4926            b.put_u8(0x01);
4927            let mut r = b.finish();
4928            assert_eq!(r.remaining(), 33);
4929            assert_eq!(r.copy_to_bytes(32), payload);
4930            assert_eq!(r.get_u8(), 0x01);
4931        }
4932    }
4933}