Skip to main content

commonware_runtime/iobuf/
mod.rs

1//! Buffer types for I/O operations.
2//!
3//! Each buffer type is backed by one of three storage variants:
4//! - [`Bytes`]/[`BytesMut`]: standard heap allocation (from `From` conversions)
5//! - Aligned: untracked aligned allocation (from [`IoBufMut::with_alignment`],
6//!   pool bypass for small requests, or fallback)
7//! - Pooled: tracked aligned allocation returned to its originating [`BufferPool`]
8//!   on drop
9//!
10//! Public types:
11//! - [`IoBuf`]: Immutable byte buffer
12//! - [`IoBufMut`]: Mutable byte buffer
13//! - [`IoBufs`]: Container for one or more immutable buffers
14//! - [`IoBufsMut`]: Container for one or more mutable buffers
15//! - [`BufferPool`]: Pool of reusable, aligned buffers
16
17mod buffer;
18mod freelist;
19mod pool;
20
21pub(crate) use buffer::AlignedBuffer;
22use buffer::{AlignedBuf, AlignedBufMut, PooledBuf, PooledBufMut};
23use bytes::{Buf, BufMut, Bytes, BytesMut};
24use commonware_codec::{util::at_least, BufsMut, EncodeSize, Error, RangeCfg, Read, Write};
25use crossbeam_utils::CachePadded;
26pub use pool::{BufferPool, BufferPoolConfig, BufferPoolThreadCache, PoolError};
27use std::{collections::VecDeque, io::IoSlice, mem::align_of, num::NonZeroUsize, ops::RangeBounds};
28
29/// Returns the system page size.
30///
31/// On Unix systems, queries the actual page size via `sysconf`.
32/// On other systems (Windows), defaults to 4KB.
33#[allow(clippy::missing_const_for_fn)]
34pub fn page_size() -> usize {
35    #[cfg(unix)]
36    {
37        // SAFETY: sysconf is safe to call.
38        let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
39        if size <= 0 {
40            4096 // Safe fallback if sysconf fails
41        } else {
42            size as usize
43        }
44    }
45
46    #[cfg(not(unix))]
47    {
48        4096
49    }
50}
51
52/// Returns the cache line size for the current architecture.
53pub const fn cache_line_size() -> usize {
54    align_of::<CachePadded<u8>>()
55}
56
57#[cfg(feature = "bench")]
58pub mod bench {
59    pub use super::{buffer::PooledBuffer, freelist::Freelist};
60}
61
62/// Immutable byte buffer.
63///
64/// Backed by [`Bytes`], an untracked aligned allocation, or a pooled
65/// allocation.
66///
67/// Use this for immutable payloads. To build or mutate data, use
68/// [`IoBufMut`] and then [`IoBufMut::freeze`].
69///
70/// For pooled-backed values, the underlying buffer is returned to the pool
71/// when the final reference is dropped.
72///
73/// All `From<*> for IoBuf` implementations are guaranteed to be non-copy
74/// conversions. Use [`IoBuf::copy_from_slice`] when an explicit copy from
75/// borrowed data is required.
76///
77/// Cloning is cheap and does not copy underlying bytes.
78#[derive(Clone, Debug)]
79pub struct IoBuf {
80    inner: IoBufInner,
81}
82
83/// Internal storage variant for [`IoBuf`].
84///
85/// - `Bytes`: from `From<Bytes>`, `From<Vec<u8>>`, `From<&'static [u8]>`, etc.
86/// - `Aligned`: from untracked aligned allocation ([`IoBufMut::with_alignment`],
87///   pool bypass for small requests, or fallback)
88/// - `Pooled`: from [`BufferPool`] allocation (returned to pool on final drop)
89#[derive(Clone, Debug)]
90enum IoBufInner {
91    Bytes(Bytes),
92    Aligned(AlignedBuf),
93    Pooled(PooledBuf),
94}
95
96impl IoBuf {
97    /// Create a buffer by copying data from a slice.
98    ///
99    /// Use this when you have a non-static `&[u8]` that needs to be converted to an
100    /// [`IoBuf`]. For static slices, prefer [`IoBuf::from`] which is zero-copy.
101    pub fn copy_from_slice(data: &[u8]) -> Self {
102        Self {
103            inner: IoBufInner::Bytes(Bytes::copy_from_slice(data)),
104        }
105    }
106
107    /// Create a buffer from a pooled allocation.
108    #[inline]
109    const fn from_pooled(pooled: PooledBuf) -> Self {
110        Self {
111            inner: IoBufInner::Pooled(pooled),
112        }
113    }
114
115    /// Create a buffer from an untracked aligned allocation.
116    #[inline]
117    const fn from_aligned(aligned: AlignedBuf) -> Self {
118        Self {
119            inner: IoBufInner::Aligned(aligned),
120        }
121    }
122
123    /// Returns `true` if this buffer is tracked by a pool.
124    ///
125    /// Tracked buffers originate from [`BufferPool`] allocations and are
126    /// returned to the pool when the final reference is dropped.
127    ///
128    /// Buffers backed by [`Bytes`], and untracked aligned allocations (from
129    /// [`IoBufMut::with_alignment`], pool bypass for small requests, or
130    /// fallback), return `false`.
131    #[inline]
132    pub const fn is_pooled(&self) -> bool {
133        match &self.inner {
134            IoBufInner::Bytes(_) => false,
135            IoBufInner::Aligned(_) => false,
136            IoBufInner::Pooled(_) => true,
137        }
138    }
139
140    /// Number of bytes remaining in the buffer.
141    #[inline]
142    pub fn len(&self) -> usize {
143        self.remaining()
144    }
145
146    /// Whether the buffer is empty.
147    #[inline]
148    pub fn is_empty(&self) -> bool {
149        self.remaining() == 0
150    }
151
152    /// Get raw pointer to the buffer data.
153    #[inline]
154    pub fn as_ptr(&self) -> *const u8 {
155        match &self.inner {
156            IoBufInner::Bytes(b) => b.as_ptr(),
157            IoBufInner::Aligned(a) => a.as_ptr(),
158            IoBufInner::Pooled(p) => p.as_ptr(),
159        }
160    }
161
162    /// Returns a slice of self for the provided range (zero-copy).
163    ///
164    /// For pooled buffers, empty ranges return an empty detached buffer
165    /// ([`IoBuf::default`]) so the underlying pooled allocation is not retained.
166    #[inline]
167    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
168        match &self.inner {
169            IoBufInner::Bytes(b) => Self {
170                inner: IoBufInner::Bytes(b.slice(range)),
171            },
172            IoBufInner::Aligned(a) => a
173                .slice(range)
174                .map_or_else(Self::default, Self::from_aligned),
175            IoBufInner::Pooled(p) => p.slice(range).map_or_else(Self::default, Self::from_pooled),
176        }
177    }
178
179    /// Splits the buffer into two at the given index.
180    ///
181    /// Afterwards `self` contains bytes `[at, len)`, and the returned [`IoBuf`]
182    /// contains bytes `[0, at)`.
183    ///
184    /// This is an `O(1)` zero-copy operation.
185    ///
186    /// # Panics
187    ///
188    /// Panics if `at > len`.
189    pub fn split_to(&mut self, at: usize) -> Self {
190        if at == 0 {
191            return Self::default();
192        }
193
194        if at == self.remaining() {
195            return std::mem::take(self);
196        }
197
198        match &mut self.inner {
199            IoBufInner::Bytes(b) => Self {
200                inner: IoBufInner::Bytes(b.split_to(at)),
201            },
202            IoBufInner::Aligned(a) => Self::from_aligned(a.split_to(at)),
203            IoBufInner::Pooled(p) => Self::from_pooled(p.split_to(at)),
204        }
205    }
206
207    /// Try to convert this buffer into [`IoBufMut`] without copying.
208    ///
209    /// Succeeds when `self` holds exclusive ownership of the backing storage
210    /// and returns an [`IoBufMut`] with the same contents. Fails and returns
211    /// `self` unchanged when ownership is shared.
212    ///
213    /// For [`Bytes`]-backed buffers, this matches [`Bytes::try_into_mut`]
214    /// semantics: succeeds only for uniquely-owned full buffers, and always
215    /// fails for [`Bytes::from_owner`] and [`Bytes::from_static`] buffers. For
216    /// pooled buffers, this succeeds for any uniquely-owned view (including
217    /// slices) and fails when shared.
218    pub fn try_into_mut(self) -> Result<IoBufMut, Self> {
219        match self.inner {
220            IoBufInner::Bytes(bytes) => bytes
221                .try_into_mut()
222                .map(|mut_bytes| IoBufMut {
223                    inner: IoBufMutInner::Bytes(mut_bytes),
224                })
225                .map_err(|bytes| Self {
226                    inner: IoBufInner::Bytes(bytes),
227                }),
228            IoBufInner::Aligned(aligned) => aligned
229                .try_into_mut()
230                .map(|mut_aligned| IoBufMut {
231                    inner: IoBufMutInner::Aligned(mut_aligned),
232                })
233                .map_err(|aligned| Self {
234                    inner: IoBufInner::Aligned(aligned),
235                }),
236            IoBufInner::Pooled(pooled) => pooled
237                .try_into_mut()
238                .map(|mut_pooled| IoBufMut {
239                    inner: IoBufMutInner::Pooled(mut_pooled),
240                })
241                .map_err(|pooled| Self {
242                    inner: IoBufInner::Pooled(pooled),
243                }),
244        }
245    }
246}
247
248impl AsRef<[u8]> for IoBuf {
249    #[inline]
250    fn as_ref(&self) -> &[u8] {
251        match &self.inner {
252            IoBufInner::Bytes(b) => b.as_ref(),
253            IoBufInner::Aligned(a) => a.as_ref(),
254            IoBufInner::Pooled(p) => p.as_ref(),
255        }
256    }
257}
258
259impl Default for IoBuf {
260    fn default() -> Self {
261        Self {
262            inner: IoBufInner::Bytes(Bytes::new()),
263        }
264    }
265}
266
267impl PartialEq for IoBuf {
268    fn eq(&self, other: &Self) -> bool {
269        self.as_ref() == other.as_ref()
270    }
271}
272
273impl Eq for IoBuf {}
274
275impl PartialEq<[u8]> for IoBuf {
276    #[inline]
277    fn eq(&self, other: &[u8]) -> bool {
278        self.as_ref() == other
279    }
280}
281
282impl PartialEq<&[u8]> for IoBuf {
283    #[inline]
284    fn eq(&self, other: &&[u8]) -> bool {
285        self.as_ref() == *other
286    }
287}
288
289impl<const N: usize> PartialEq<[u8; N]> for IoBuf {
290    #[inline]
291    fn eq(&self, other: &[u8; N]) -> bool {
292        self.as_ref() == other
293    }
294}
295
296impl<const N: usize> PartialEq<&[u8; N]> for IoBuf {
297    #[inline]
298    fn eq(&self, other: &&[u8; N]) -> bool {
299        self.as_ref() == *other
300    }
301}
302
303impl Buf for IoBuf {
304    #[inline]
305    fn remaining(&self) -> usize {
306        match &self.inner {
307            IoBufInner::Bytes(b) => b.remaining(),
308            IoBufInner::Aligned(a) => a.remaining(),
309            IoBufInner::Pooled(p) => p.remaining(),
310        }
311    }
312
313    #[inline]
314    fn chunk(&self) -> &[u8] {
315        match &self.inner {
316            IoBufInner::Bytes(b) => b.chunk(),
317            IoBufInner::Aligned(a) => a.chunk(),
318            IoBufInner::Pooled(p) => p.chunk(),
319        }
320    }
321
322    #[inline]
323    fn advance(&mut self, cnt: usize) {
324        match &mut self.inner {
325            IoBufInner::Bytes(b) => b.advance(cnt),
326            IoBufInner::Aligned(a) => a.advance(cnt),
327            IoBufInner::Pooled(p) => p.advance(cnt),
328        }
329    }
330
331    #[inline]
332    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
333        match &mut self.inner {
334            IoBufInner::Bytes(b) => b.copy_to_bytes(len),
335            IoBufInner::Aligned(a) => a.copy_to_bytes(len),
336            IoBufInner::Pooled(p) => {
337                // Full non-empty drain: transfer ownership so the drained source no
338                // longer retains the pooled allocation. Keep len == 0 on the normal
339                // path to avoid creating an empty Bytes that still pins pool memory.
340                if len != 0 && len == p.remaining() {
341                    let inner = std::mem::replace(&mut self.inner, IoBufInner::Bytes(Bytes::new()));
342                    match inner {
343                        IoBufInner::Pooled(p) => p.into_bytes(),
344                        _ => unreachable!(),
345                    }
346                } else {
347                    p.copy_to_bytes(len)
348                }
349            }
350        }
351    }
352}
353
354impl From<Bytes> for IoBuf {
355    fn from(bytes: Bytes) -> Self {
356        Self {
357            inner: IoBufInner::Bytes(bytes),
358        }
359    }
360}
361
362impl From<Vec<u8>> for IoBuf {
363    fn from(vec: Vec<u8>) -> Self {
364        Self {
365            inner: IoBufInner::Bytes(Bytes::from(vec)),
366        }
367    }
368}
369
370impl<const N: usize> From<&'static [u8; N]> for IoBuf {
371    fn from(array: &'static [u8; N]) -> Self {
372        Self {
373            inner: IoBufInner::Bytes(Bytes::from_static(array)),
374        }
375    }
376}
377
378impl From<&'static [u8]> for IoBuf {
379    fn from(slice: &'static [u8]) -> Self {
380        Self {
381            inner: IoBufInner::Bytes(Bytes::from_static(slice)),
382        }
383    }
384}
385
386/// Convert an [`IoBuf`] into a [`Vec<u8>`].
387///
388/// This conversion may copy:
389/// - [`Bytes`]-backed buffers may reuse allocation when possible
390/// - pooled buffers copy readable bytes into a new [`Vec<u8>`]
391impl From<IoBuf> for Vec<u8> {
392    fn from(buf: IoBuf) -> Self {
393        match buf.inner {
394            IoBufInner::Bytes(bytes) => Self::from(bytes),
395            IoBufInner::Aligned(aligned) => aligned.as_ref().to_vec(),
396            IoBufInner::Pooled(pooled) => pooled.as_ref().to_vec(),
397        }
398    }
399}
400
401/// Convert an [`IoBuf`] into [`Bytes`] without copying readable data.
402///
403/// For pooled buffers, this wraps the pooled owner using [`Bytes::from_owner`].
404impl From<IoBuf> for Bytes {
405    fn from(buf: IoBuf) -> Self {
406        match buf.inner {
407            IoBufInner::Bytes(bytes) => bytes,
408            IoBufInner::Aligned(aligned) => Self::from_owner(aligned),
409            IoBufInner::Pooled(pooled) => Self::from_owner(pooled),
410        }
411    }
412}
413
414impl Write for IoBuf {
415    #[inline]
416    fn write(&self, buf: &mut impl BufMut) {
417        self.len().write(buf);
418        buf.put_slice(self.as_ref());
419    }
420
421    #[inline]
422    fn write_bufs(&self, buf: &mut impl BufsMut) {
423        self.len().write(buf);
424        buf.push(self.clone());
425    }
426}
427
428impl EncodeSize for IoBuf {
429    #[inline]
430    fn encode_size(&self) -> usize {
431        self.len().encode_size() + self.len()
432    }
433
434    #[inline]
435    fn encode_inline_size(&self) -> usize {
436        self.len().encode_size()
437    }
438}
439
440impl Read for IoBuf {
441    type Cfg = RangeCfg<usize>;
442
443    #[inline]
444    fn read_cfg(buf: &mut impl Buf, range: &Self::Cfg) -> Result<Self, Error> {
445        let len = usize::read_cfg(buf, range)?;
446        at_least(buf, len)?;
447        Ok(Self::from(buf.copy_to_bytes(len)))
448    }
449}
450
451#[cfg(feature = "arbitrary")]
452impl arbitrary::Arbitrary<'_> for IoBuf {
453    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
454        let len = u.arbitrary_len::<u8>()?;
455        let data: Vec<u8> = u.arbitrary_iter()?.take(len).collect::<Result<_, _>>()?;
456        Ok(Self::from(data))
457    }
458}
459
460/// Mutable byte buffer.
461///
462/// Backed by [`BytesMut`], an untracked aligned allocation, or a pooled
463/// allocation.
464///
465/// Use this to build or mutate payloads before freezing into [`IoBuf`].
466///
467/// For pooled-backed values, dropping this buffer returns the underlying
468/// allocation to the pool. After [`IoBufMut::freeze`], the frozen `IoBuf`
469/// keeps the allocation alive until its final reference is dropped.
470#[derive(Debug)]
471pub struct IoBufMut {
472    inner: IoBufMutInner,
473}
474
475/// Internal storage variant for [`IoBufMut`]. See [`IoBufInner`] for variant
476/// semantics.
477#[derive(Debug)]
478enum IoBufMutInner {
479    Bytes(BytesMut),
480    Aligned(AlignedBufMut),
481    Pooled(PooledBufMut),
482}
483
484impl Default for IoBufMut {
485    fn default() -> Self {
486        Self {
487            inner: IoBufMutInner::Bytes(BytesMut::new()),
488        }
489    }
490}
491
492impl IoBufMut {
493    /// Create a buffer with the given capacity.
494    #[inline]
495    pub fn with_capacity(capacity: usize) -> Self {
496        Self {
497            inner: IoBufMutInner::Bytes(BytesMut::with_capacity(capacity)),
498        }
499    }
500
501    /// Create an untracked aligned buffer with the given capacity and alignment.
502    ///
503    /// This uses the aligned backing path directly rather than `BytesMut`.
504    /// The returned buffer is not tracked by a [`BufferPool`], so dropping it
505    /// deallocates the aligned allocation immediately.
506    ///
507    /// Use this when the caller needs a specific alignment but does not need
508    /// pooled reuse.
509    #[inline]
510    pub fn with_alignment(capacity: usize, alignment: NonZeroUsize) -> Self {
511        if capacity == 0 {
512            return Self::with_capacity(0);
513        }
514        let buffer = AlignedBuffer::new(capacity, alignment.get());
515        Self::from_aligned(AlignedBufMut::new(buffer))
516    }
517
518    /// Create a zero-initialized untracked aligned buffer with the given
519    /// length and alignment.
520    ///
521    /// Unlike [`Self::with_alignment`], this initializes the full readable
522    /// range to zero and sets `len == capacity == len`.
523    #[inline]
524    pub fn zeroed_with_alignment(len: usize, alignment: NonZeroUsize) -> Self {
525        if len == 0 {
526            return Self::zeroed(0);
527        }
528        let buffer = AlignedBuffer::new_zeroed(len, alignment.get());
529        let mut buffer = Self::from_aligned(AlignedBufMut::new(buffer));
530        // SAFETY: the aligned allocation was zero-initialized for `len` bytes.
531        unsafe { buffer.set_len(len) };
532        buffer
533    }
534
535    /// Create a buffer of `len` bytes, all initialized to zero.
536    ///
537    /// Unlike `with_capacity`, this sets both capacity and length to `len`,
538    /// making the entire buffer immediately usable for read operations
539    /// (e.g., `file.read_exact`).
540    #[inline]
541    pub fn zeroed(len: usize) -> Self {
542        Self {
543            inner: IoBufMutInner::Bytes(BytesMut::zeroed(len)),
544        }
545    }
546
547    /// Create a buffer from a pooled allocation.
548    #[inline]
549    const fn from_pooled(pooled: PooledBufMut) -> Self {
550        Self {
551            inner: IoBufMutInner::Pooled(pooled),
552        }
553    }
554
555    /// Create a buffer from an untracked aligned allocation.
556    #[inline]
557    const fn from_aligned(aligned: AlignedBufMut) -> Self {
558        Self {
559            inner: IoBufMutInner::Aligned(aligned),
560        }
561    }
562
563    /// Returns `true` if this buffer is tracked by a pool.
564    ///
565    /// Tracked buffers originate from [`BufferPool`] allocations and are
566    /// returned to the pool when dropped.
567    ///
568    /// Buffers backed by [`BytesMut`], and untracked aligned allocations (from
569    /// [`IoBufMut::with_alignment`], pool bypass for small requests, or
570    /// fallback), return `false`.
571    #[inline]
572    pub const fn is_pooled(&self) -> bool {
573        match &self.inner {
574            IoBufMutInner::Bytes(_) => false,
575            IoBufMutInner::Aligned(_) => false,
576            IoBufMutInner::Pooled(_) => true,
577        }
578    }
579
580    /// Sets the length of the buffer.
581    ///
582    /// This will explicitly set the size of the buffer without actually
583    /// modifying the data, so it is up to the caller to ensure that the data
584    /// has been initialized.
585    ///
586    /// # Safety
587    ///
588    /// Caller must ensure all bytes in `0..len` are initialized before any
589    /// read operations.
590    ///
591    /// # Panics
592    ///
593    /// Panics if `len > capacity()`.
594    #[inline]
595    pub unsafe fn set_len(&mut self, len: usize) {
596        assert!(
597            len <= self.capacity(),
598            "set_len({len}) exceeds capacity({})",
599            self.capacity()
600        );
601        match &mut self.inner {
602            IoBufMutInner::Bytes(b) => b.set_len(len),
603            IoBufMutInner::Aligned(b) => b.set_len(len),
604            IoBufMutInner::Pooled(b) => b.set_len(len),
605        }
606    }
607
608    /// Number of bytes remaining in the buffer.
609    #[inline]
610    pub fn len(&self) -> usize {
611        self.remaining()
612    }
613
614    /// Whether the buffer is empty.
615    #[inline]
616    pub fn is_empty(&self) -> bool {
617        match &self.inner {
618            IoBufMutInner::Bytes(b) => b.is_empty(),
619            IoBufMutInner::Aligned(b) => b.is_empty(),
620            IoBufMutInner::Pooled(b) => b.is_empty(),
621        }
622    }
623
624    /// Freeze into immutable [`IoBuf`].
625    #[inline]
626    pub fn freeze(self) -> IoBuf {
627        match self.inner {
628            IoBufMutInner::Bytes(b) => b.freeze().into(),
629            IoBufMutInner::Aligned(b) => b.freeze(),
630            IoBufMutInner::Pooled(b) => b.freeze(),
631        }
632    }
633
634    /// Returns the number of bytes the buffer can hold without reallocating.
635    #[inline]
636    pub fn capacity(&self) -> usize {
637        match &self.inner {
638            IoBufMutInner::Bytes(b) => b.capacity(),
639            IoBufMutInner::Aligned(b) => b.capacity(),
640            IoBufMutInner::Pooled(b) => b.capacity(),
641        }
642    }
643
644    /// Returns an unsafe mutable pointer to the buffer's data.
645    #[inline]
646    pub fn as_mut_ptr(&mut self) -> *mut u8 {
647        match &mut self.inner {
648            IoBufMutInner::Bytes(b) => b.as_mut_ptr(),
649            IoBufMutInner::Aligned(b) => b.as_mut_ptr(),
650            IoBufMutInner::Pooled(b) => b.as_mut_ptr(),
651        }
652    }
653
654    /// Truncates the buffer to `len` readable bytes.
655    ///
656    /// If `len` is greater than the current length, this has no effect.
657    #[inline]
658    pub fn truncate(&mut self, len: usize) {
659        match &mut self.inner {
660            IoBufMutInner::Bytes(b) => b.truncate(len),
661            IoBufMutInner::Aligned(b) => b.truncate(len),
662            IoBufMutInner::Pooled(b) => b.truncate(len),
663        }
664    }
665
666    /// Clears the buffer, removing all data. Existing capacity is preserved.
667    #[inline]
668    pub fn clear(&mut self) {
669        match &mut self.inner {
670            IoBufMutInner::Bytes(b) => b.clear(),
671            IoBufMutInner::Aligned(b) => b.clear(),
672            IoBufMutInner::Pooled(b) => b.clear(),
673        }
674    }
675}
676
677impl AsRef<[u8]> for IoBufMut {
678    #[inline]
679    fn as_ref(&self) -> &[u8] {
680        match &self.inner {
681            IoBufMutInner::Bytes(b) => b.as_ref(),
682            IoBufMutInner::Aligned(b) => b.as_ref(),
683            IoBufMutInner::Pooled(b) => b.as_ref(),
684        }
685    }
686}
687
688impl AsMut<[u8]> for IoBufMut {
689    #[inline]
690    fn as_mut(&mut self) -> &mut [u8] {
691        match &mut self.inner {
692            IoBufMutInner::Bytes(b) => b.as_mut(),
693            IoBufMutInner::Aligned(b) => b.as_mut(),
694            IoBufMutInner::Pooled(b) => b.as_mut(),
695        }
696    }
697}
698
699impl PartialEq<[u8]> for IoBufMut {
700    #[inline]
701    fn eq(&self, other: &[u8]) -> bool {
702        self.as_ref() == other
703    }
704}
705
706impl PartialEq<&[u8]> for IoBufMut {
707    #[inline]
708    fn eq(&self, other: &&[u8]) -> bool {
709        self.as_ref() == *other
710    }
711}
712
713impl<const N: usize> PartialEq<[u8; N]> for IoBufMut {
714    #[inline]
715    fn eq(&self, other: &[u8; N]) -> bool {
716        self.as_ref() == other
717    }
718}
719
720impl<const N: usize> PartialEq<&[u8; N]> for IoBufMut {
721    #[inline]
722    fn eq(&self, other: &&[u8; N]) -> bool {
723        self.as_ref() == *other
724    }
725}
726
727impl Buf for IoBufMut {
728    #[inline]
729    fn remaining(&self) -> usize {
730        match &self.inner {
731            IoBufMutInner::Bytes(b) => b.remaining(),
732            IoBufMutInner::Aligned(b) => b.remaining(),
733            IoBufMutInner::Pooled(b) => b.remaining(),
734        }
735    }
736
737    #[inline]
738    fn chunk(&self) -> &[u8] {
739        match &self.inner {
740            IoBufMutInner::Bytes(b) => b.chunk(),
741            IoBufMutInner::Aligned(b) => b.chunk(),
742            IoBufMutInner::Pooled(b) => b.chunk(),
743        }
744    }
745
746    #[inline]
747    fn advance(&mut self, cnt: usize) {
748        match &mut self.inner {
749            IoBufMutInner::Bytes(b) => b.advance(cnt),
750            IoBufMutInner::Aligned(b) => b.advance(cnt),
751            IoBufMutInner::Pooled(b) => b.advance(cnt),
752        }
753    }
754
755    #[inline]
756    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
757        match &mut self.inner {
758            IoBufMutInner::Bytes(b) => b.copy_to_bytes(len),
759            IoBufMutInner::Aligned(a) => a.copy_to_bytes(len),
760            IoBufMutInner::Pooled(p) => {
761                // Full non-empty drain: transfer ownership so the drained source no
762                // longer retains the pooled allocation. Keep len == 0 on the normal
763                // path to avoid creating an empty Bytes that still pins pool memory.
764                if len != 0 && len == p.remaining() {
765                    let inner =
766                        std::mem::replace(&mut self.inner, IoBufMutInner::Bytes(BytesMut::new()));
767                    match inner {
768                        IoBufMutInner::Pooled(p) => p.into_bytes(),
769                        _ => unreachable!(),
770                    }
771                } else {
772                    p.copy_to_bytes(len)
773                }
774            }
775        }
776    }
777}
778
779// SAFETY: Delegates to BytesMut or PooledBufMut which implement BufMut safely.
780unsafe impl BufMut for IoBufMut {
781    #[inline]
782    fn remaining_mut(&self) -> usize {
783        match &self.inner {
784            IoBufMutInner::Bytes(b) => b.remaining_mut(),
785            IoBufMutInner::Aligned(b) => b.remaining_mut(),
786            IoBufMutInner::Pooled(b) => b.remaining_mut(),
787        }
788    }
789
790    #[inline]
791    unsafe fn advance_mut(&mut self, cnt: usize) {
792        match &mut self.inner {
793            IoBufMutInner::Bytes(b) => b.advance_mut(cnt),
794            IoBufMutInner::Aligned(b) => b.advance_mut(cnt),
795            IoBufMutInner::Pooled(b) => b.advance_mut(cnt),
796        }
797    }
798
799    #[inline]
800    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
801        match &mut self.inner {
802            IoBufMutInner::Bytes(b) => b.chunk_mut(),
803            IoBufMutInner::Aligned(b) => b.chunk_mut(),
804            IoBufMutInner::Pooled(b) => b.chunk_mut(),
805        }
806    }
807}
808
809impl From<Vec<u8>> for IoBufMut {
810    fn from(vec: Vec<u8>) -> Self {
811        Self::from(Bytes::from(vec))
812    }
813}
814
815impl From<&[u8]> for IoBufMut {
816    fn from(slice: &[u8]) -> Self {
817        Self {
818            inner: IoBufMutInner::Bytes(BytesMut::from(slice)),
819        }
820    }
821}
822
823impl<const N: usize> From<[u8; N]> for IoBufMut {
824    fn from(array: [u8; N]) -> Self {
825        Self::from(array.as_ref())
826    }
827}
828
829impl<const N: usize> From<&[u8; N]> for IoBufMut {
830    fn from(array: &[u8; N]) -> Self {
831        Self::from(array.as_ref())
832    }
833}
834
835impl From<BytesMut> for IoBufMut {
836    fn from(bytes: BytesMut) -> Self {
837        Self {
838            inner: IoBufMutInner::Bytes(bytes),
839        }
840    }
841}
842
843impl From<Bytes> for IoBufMut {
844    /// Zero-copy if `bytes` is unique for the entire original buffer (refcount is 1),
845    /// copies otherwise. Always copies if the [`Bytes`] was constructed via
846    /// [`Bytes::from_owner`] or [`Bytes::from_static`].
847    fn from(bytes: Bytes) -> Self {
848        Self {
849            inner: IoBufMutInner::Bytes(BytesMut::from(bytes)),
850        }
851    }
852}
853
854impl From<IoBuf> for IoBufMut {
855    /// Zero-copy when exclusive ownership can be recovered, copies otherwise.
856    fn from(buf: IoBuf) -> Self {
857        match buf.try_into_mut() {
858            Ok(buf) => buf,
859            Err(buf) => Self::from(buf.as_ref()),
860        }
861    }
862}
863
864/// Container for one or more immutable buffers.
865#[derive(Clone, Debug)]
866pub struct IoBufs {
867    inner: IoBufsInner,
868}
869
870/// Internal immutable representation.
871///
872/// - Representation is canonical and minimal for readable data:
873///   - `Single` is the only representation for empty data and one-chunk data.
874///   - `Chunked` is used only when four or more readable chunks remain.
875/// - `Pair`, `Triple`, and `Chunked` never store empty chunks.
876#[derive(Clone, Debug)]
877enum IoBufsInner {
878    /// Single buffer (fast path).
879    Single(IoBuf),
880    /// Two buffers (fast path).
881    Pair([IoBuf; 2]),
882    /// Three buffers (fast path).
883    Triple([IoBuf; 3]),
884    /// Four or more buffers.
885    Chunked(VecDeque<IoBuf>),
886}
887
888impl Default for IoBufs {
889    fn default() -> Self {
890        Self {
891            inner: IoBufsInner::Single(IoBuf::default()),
892        }
893    }
894}
895
896impl IoBufs {
897    /// Build canonical immutable chunk storage from readable chunks.
898    ///
899    /// Empty chunks are removed before representation selection.
900    fn from_chunks_iter(chunks: impl IntoIterator<Item = IoBuf>) -> Self {
901        let mut iter = chunks.into_iter().filter(|buf| !buf.is_empty());
902        let first = match iter.next() {
903            Some(first) => first,
904            None => return Self::default(),
905        };
906        let second = match iter.next() {
907            Some(second) => second,
908            None => {
909                return Self {
910                    inner: IoBufsInner::Single(first),
911                };
912            }
913        };
914        let third = match iter.next() {
915            Some(third) => third,
916            None => {
917                return Self {
918                    inner: IoBufsInner::Pair([first, second]),
919                };
920            }
921        };
922        let fourth = match iter.next() {
923            Some(fourth) => fourth,
924            None => {
925                return Self {
926                    inner: IoBufsInner::Triple([first, second, third]),
927                };
928            }
929        };
930
931        let mut bufs = VecDeque::with_capacity(4);
932        bufs.push_back(first);
933        bufs.push_back(second);
934        bufs.push_back(third);
935        bufs.push_back(fourth);
936        bufs.extend(iter);
937
938        Self {
939            inner: IoBufsInner::Chunked(bufs),
940        }
941    }
942
943    /// Re-establish canonical immutable representation invariants.
944    fn canonicalize(&mut self) {
945        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
946        self.inner = match inner {
947            IoBufsInner::Single(buf) => {
948                if buf.is_empty() {
949                    IoBufsInner::Single(IoBuf::default())
950                } else {
951                    IoBufsInner::Single(buf)
952                }
953            }
954            IoBufsInner::Pair([a, b]) => Self::from_chunks_iter([a, b]).inner,
955            IoBufsInner::Triple([a, b, c]) => Self::from_chunks_iter([a, b, c]).inner,
956            IoBufsInner::Chunked(bufs) => Self::from_chunks_iter(bufs).inner,
957        };
958    }
959
960    /// Returns a reference to the single contiguous buffer, if present.
961    ///
962    /// Returns `Some` only when all remaining data is in one contiguous buffer.
963    pub const fn as_single(&self) -> Option<&IoBuf> {
964        match &self.inner {
965            IoBufsInner::Single(buf) => Some(buf),
966            _ => None,
967        }
968    }
969
970    /// Consume this container and return the single buffer if present.
971    ///
972    /// Returns `Ok(IoBuf)` only when all remaining data is already contained in
973    /// a single chunk. Returns `Err(Self)` with the original container
974    /// otherwise.
975    pub fn try_into_single(self) -> Result<IoBuf, Self> {
976        match self.inner {
977            IoBufsInner::Single(buf) => Ok(buf),
978            inner => Err(Self { inner }),
979        }
980    }
981
982    /// Number of bytes remaining across all buffers.
983    #[inline]
984    pub fn len(&self) -> usize {
985        self.remaining()
986    }
987
988    /// Number of non-empty readable chunks.
989    #[inline]
990    pub fn chunk_count(&self) -> usize {
991        // This assumes canonical form.
992        match &self.inner {
993            IoBufsInner::Single(buf) => {
994                if buf.is_empty() {
995                    0
996                } else {
997                    1
998                }
999            }
1000            IoBufsInner::Pair(_) => 2,
1001            IoBufsInner::Triple(_) => 3,
1002            IoBufsInner::Chunked(bufs) => bufs.len(),
1003        }
1004    }
1005
1006    /// Whether all buffers are empty.
1007    #[inline]
1008    pub fn is_empty(&self) -> bool {
1009        self.remaining() == 0
1010    }
1011
1012    /// Whether this contains a single contiguous buffer.
1013    ///
1014    /// When true, `chunk()` returns all remaining bytes.
1015    #[inline]
1016    pub const fn is_single(&self) -> bool {
1017        matches!(self.inner, IoBufsInner::Single(_))
1018    }
1019
1020    /// Visit each readable chunk in order without coalescing.
1021    #[inline]
1022    pub fn for_each_chunk(&self, mut f: impl FnMut(&[u8])) {
1023        match &self.inner {
1024            IoBufsInner::Single(buf) => {
1025                let chunk = buf.as_ref();
1026                if !chunk.is_empty() {
1027                    f(chunk);
1028                }
1029            }
1030            IoBufsInner::Pair(pair) => {
1031                for buf in pair {
1032                    let chunk = buf.as_ref();
1033                    if !chunk.is_empty() {
1034                        f(chunk);
1035                    }
1036                }
1037            }
1038            IoBufsInner::Triple(triple) => {
1039                for buf in triple {
1040                    let chunk = buf.as_ref();
1041                    if !chunk.is_empty() {
1042                        f(chunk);
1043                    }
1044                }
1045            }
1046            IoBufsInner::Chunked(bufs) => {
1047                for buf in bufs {
1048                    let chunk = buf.as_ref();
1049                    if !chunk.is_empty() {
1050                        f(chunk);
1051                    }
1052                }
1053            }
1054        }
1055    }
1056
1057    /// Prepend a buffer to the front.
1058    ///
1059    /// Empty input buffers are ignored.
1060    pub fn prepend(&mut self, buf: IoBuf) {
1061        if buf.is_empty() {
1062            return;
1063        }
1064        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
1065        self.inner = match inner {
1066            IoBufsInner::Single(existing) if existing.is_empty() => IoBufsInner::Single(buf),
1067            IoBufsInner::Single(existing) => IoBufsInner::Pair([buf, existing]),
1068            IoBufsInner::Pair([a, b]) => IoBufsInner::Triple([buf, a, b]),
1069            IoBufsInner::Triple([a, b, c]) => {
1070                let mut bufs = VecDeque::with_capacity(4);
1071                bufs.push_back(buf);
1072                bufs.push_back(a);
1073                bufs.push_back(b);
1074                bufs.push_back(c);
1075                IoBufsInner::Chunked(bufs)
1076            }
1077            IoBufsInner::Chunked(mut bufs) => {
1078                bufs.push_front(buf);
1079                IoBufsInner::Chunked(bufs)
1080            }
1081        };
1082    }
1083
1084    /// Append a buffer to the back.
1085    ///
1086    /// Empty input buffers are ignored.
1087    pub fn append(&mut self, buf: IoBuf) {
1088        if buf.is_empty() {
1089            return;
1090        }
1091        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
1092        self.inner = match inner {
1093            IoBufsInner::Single(existing) if existing.is_empty() => IoBufsInner::Single(buf),
1094            IoBufsInner::Single(existing) => IoBufsInner::Pair([existing, buf]),
1095            IoBufsInner::Pair([a, b]) => IoBufsInner::Triple([a, b, buf]),
1096            IoBufsInner::Triple([a, b, c]) => {
1097                let mut bufs = VecDeque::with_capacity(4);
1098                bufs.push_back(a);
1099                bufs.push_back(b);
1100                bufs.push_back(c);
1101                bufs.push_back(buf);
1102                IoBufsInner::Chunked(bufs)
1103            }
1104            IoBufsInner::Chunked(mut bufs) => {
1105                bufs.push_back(buf);
1106                IoBufsInner::Chunked(bufs)
1107            }
1108        };
1109    }
1110
1111    /// Splits the buffer(s) into two at the given index.
1112    ///
1113    /// Afterwards `self` contains bytes `[at, len)`, and the returned
1114    /// [`IoBufs`] contains bytes `[0, at)`.
1115    ///
1116    /// Whole chunks are moved without copying. If the split point lands inside
1117    /// a chunk, the chunk is split zero-copy via [`IoBuf::split_to`].
1118    ///
1119    /// # Panics
1120    ///
1121    /// Panics if `at > len`.
1122    pub fn split_to(&mut self, at: usize) -> Self {
1123        if at == 0 {
1124            return Self::default();
1125        }
1126
1127        let remaining = self.remaining();
1128        assert!(
1129            at <= remaining,
1130            "split_to out of bounds: {:?} <= {:?}",
1131            at,
1132            remaining,
1133        );
1134
1135        if at == remaining {
1136            return std::mem::take(self);
1137        }
1138
1139        let inner = std::mem::replace(&mut self.inner, IoBufsInner::Single(IoBuf::default()));
1140        match inner {
1141            IoBufsInner::Single(mut buf) => {
1142                // Delegate directly and keep remainder as single
1143                let prefix = buf.split_to(at);
1144                self.inner = IoBufsInner::Single(buf);
1145                Self::from(prefix)
1146            }
1147            IoBufsInner::Pair([mut a, mut b]) => {
1148                let a_len = a.remaining();
1149                if at < a_len {
1150                    // Split stays entirely in chunk `a`.
1151                    let prefix = a.split_to(at);
1152                    self.inner = IoBufsInner::Pair([a, b]);
1153                    return Self::from(prefix);
1154                }
1155                if at == a_len {
1156                    // Exact chunk boundary: move `a` out, keep `b`.
1157                    self.inner = IoBufsInner::Single(b);
1158                    return Self::from(a);
1159                }
1160
1161                // Split crosses from `a` into `b`.
1162                let b_prefix_len = at - a_len;
1163                let b_prefix = b.split_to(b_prefix_len);
1164                self.inner = IoBufsInner::Single(b);
1165                Self {
1166                    inner: IoBufsInner::Pair([a, b_prefix]),
1167                }
1168            }
1169            IoBufsInner::Triple([mut a, mut b, mut c]) => {
1170                let a_len = a.remaining();
1171                if at < a_len {
1172                    // Split stays entirely in chunk `a`.
1173                    let prefix = a.split_to(at);
1174                    self.inner = IoBufsInner::Triple([a, b, c]);
1175                    return Self::from(prefix);
1176                }
1177                if at == a_len {
1178                    // Exact boundary after `a`.
1179                    self.inner = IoBufsInner::Pair([b, c]);
1180                    return Self::from(a);
1181                }
1182
1183                let mut remaining = at - a_len;
1184                let b_len = b.remaining();
1185                if remaining < b_len {
1186                    // Split lands inside `b`.
1187                    let b_prefix = b.split_to(remaining);
1188                    self.inner = IoBufsInner::Pair([b, c]);
1189                    return Self {
1190                        inner: IoBufsInner::Pair([a, b_prefix]),
1191                    };
1192                }
1193                if remaining == b_len {
1194                    // Exact boundary after `b`.
1195                    self.inner = IoBufsInner::Single(c);
1196                    return Self {
1197                        inner: IoBufsInner::Pair([a, b]),
1198                    };
1199                }
1200
1201                // Split reaches into `c`.
1202                remaining -= b_len;
1203                let c_prefix = c.split_to(remaining);
1204                self.inner = IoBufsInner::Single(c);
1205                Self {
1206                    inner: IoBufsInner::Triple([a, b, c_prefix]),
1207                }
1208            }
1209            IoBufsInner::Chunked(mut bufs) => {
1210                let mut remaining = at;
1211                let mut out = VecDeque::new();
1212
1213                while remaining > 0 {
1214                    let mut front = bufs.pop_front().expect("split_to out of bounds");
1215                    let avail = front.remaining();
1216                    if avail == 0 {
1217                        // Canonical chunked state should not contain empties.
1218                        continue;
1219                    }
1220                    if remaining < avail {
1221                        // Split inside this chunk: keep suffix in `self`, move prefix to output.
1222                        let prefix = front.split_to(remaining);
1223                        out.push_back(prefix);
1224                        bufs.push_front(front);
1225                        break;
1226                    }
1227
1228                    // Consume this full chunk into the output prefix.
1229                    out.push_back(front);
1230                    remaining -= avail;
1231                }
1232
1233                self.inner = if bufs.len() >= 4 {
1234                    IoBufsInner::Chunked(bufs)
1235                } else {
1236                    Self::from_chunks_iter(bufs).inner
1237                };
1238
1239                if out.len() >= 4 {
1240                    Self {
1241                        inner: IoBufsInner::Chunked(out),
1242                    }
1243                } else {
1244                    Self::from_chunks_iter(out)
1245                }
1246            }
1247        }
1248    }
1249
1250    /// Coalesce all remaining bytes into a single contiguous [`IoBuf`].
1251    ///
1252    /// Zero-copy if only one buffer. Copies if multiple buffers.
1253    #[inline]
1254    pub fn coalesce(mut self) -> IoBuf {
1255        match self.inner {
1256            IoBufsInner::Single(buf) => buf,
1257            _ => self.copy_to_bytes(self.remaining()).into(),
1258        }
1259    }
1260
1261    /// Coalesce all remaining bytes into a single contiguous [`IoBuf`], using the pool
1262    /// for allocation if multiple buffers need to be merged.
1263    ///
1264    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
1265    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBuf {
1266        match self.inner {
1267            IoBufsInner::Single(buf) => buf,
1268            IoBufsInner::Pair([a, b]) => {
1269                let total_len = a.remaining().saturating_add(b.remaining());
1270                let mut result = pool.alloc(total_len);
1271                result.put_slice(a.as_ref());
1272                result.put_slice(b.as_ref());
1273                result.freeze()
1274            }
1275            IoBufsInner::Triple([a, b, c]) => {
1276                let total_len = a
1277                    .remaining()
1278                    .saturating_add(b.remaining())
1279                    .saturating_add(c.remaining());
1280                let mut result = pool.alloc(total_len);
1281                result.put_slice(a.as_ref());
1282                result.put_slice(b.as_ref());
1283                result.put_slice(c.as_ref());
1284                result.freeze()
1285            }
1286            IoBufsInner::Chunked(bufs) => {
1287                let total_len: usize = bufs
1288                    .iter()
1289                    .map(|b| b.remaining())
1290                    .fold(0, usize::saturating_add);
1291                let mut result = pool.alloc(total_len);
1292                for buf in bufs {
1293                    result.put_slice(buf.as_ref());
1294                }
1295                result.freeze()
1296            }
1297        }
1298    }
1299}
1300
1301impl Buf for IoBufs {
1302    fn remaining(&self) -> usize {
1303        match &self.inner {
1304            IoBufsInner::Single(buf) => buf.remaining(),
1305            IoBufsInner::Pair([a, b]) => a.remaining().saturating_add(b.remaining()),
1306            IoBufsInner::Triple([a, b, c]) => a
1307                .remaining()
1308                .saturating_add(b.remaining())
1309                .saturating_add(c.remaining()),
1310            IoBufsInner::Chunked(bufs) => bufs
1311                .iter()
1312                .map(|b| b.remaining())
1313                .fold(0, usize::saturating_add),
1314        }
1315    }
1316
1317    fn chunk(&self) -> &[u8] {
1318        match &self.inner {
1319            IoBufsInner::Single(buf) => buf.chunk(),
1320            IoBufsInner::Pair([a, b]) => {
1321                if a.remaining() > 0 {
1322                    a.chunk()
1323                } else if b.remaining() > 0 {
1324                    b.chunk()
1325                } else {
1326                    &[]
1327                }
1328            }
1329            IoBufsInner::Triple([a, b, c]) => {
1330                if a.remaining() > 0 {
1331                    a.chunk()
1332                } else if b.remaining() > 0 {
1333                    b.chunk()
1334                } else if c.remaining() > 0 {
1335                    c.chunk()
1336                } else {
1337                    &[]
1338                }
1339            }
1340            IoBufsInner::Chunked(bufs) => {
1341                for buf in bufs.iter() {
1342                    if buf.remaining() > 0 {
1343                        return buf.chunk();
1344                    }
1345                }
1346                &[]
1347            }
1348        }
1349    }
1350
1351    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
1352        if dst.is_empty() {
1353            return 0;
1354        }
1355
1356        match &self.inner {
1357            IoBufsInner::Single(buf) => {
1358                let chunk = buf.chunk();
1359                if !chunk.is_empty() {
1360                    dst[0] = IoSlice::new(chunk);
1361                    return 1;
1362                }
1363                0
1364            }
1365            IoBufsInner::Pair([a, b]) => fill_vectored_from_chunks(dst, [a.chunk(), b.chunk()]),
1366            IoBufsInner::Triple([a, b, c]) => {
1367                fill_vectored_from_chunks(dst, [a.chunk(), b.chunk(), c.chunk()])
1368            }
1369            IoBufsInner::Chunked(bufs) => {
1370                fill_vectored_from_chunks(dst, bufs.iter().map(|buf| buf.chunk()))
1371            }
1372        }
1373    }
1374
1375    fn advance(&mut self, cnt: usize) {
1376        let should_canonicalize = match &mut self.inner {
1377            IoBufsInner::Single(buf) => {
1378                buf.advance(cnt);
1379                false
1380            }
1381            IoBufsInner::Pair(pair) => advance_small_chunks(pair.as_mut_slice(), cnt),
1382            IoBufsInner::Triple(triple) => advance_small_chunks(triple.as_mut_slice(), cnt),
1383            IoBufsInner::Chunked(bufs) => {
1384                advance_chunked_front(bufs, cnt);
1385                bufs.len() <= 3
1386            }
1387        };
1388
1389        if should_canonicalize {
1390            self.canonicalize();
1391        }
1392    }
1393
1394    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
1395        let (result, needs_canonicalize) = match &mut self.inner {
1396            IoBufsInner::Single(buf) => return buf.copy_to_bytes(len),
1397            IoBufsInner::Pair(pair) => {
1398                copy_to_bytes_small_chunks(pair, len, "IoBufs::copy_to_bytes: not enough data")
1399            }
1400            IoBufsInner::Triple(triple) => {
1401                copy_to_bytes_small_chunks(triple, len, "IoBufs::copy_to_bytes: not enough data")
1402            }
1403            IoBufsInner::Chunked(bufs) => {
1404                copy_to_bytes_chunked(bufs, len, "IoBufs::copy_to_bytes: not enough data")
1405            }
1406        };
1407
1408        if needs_canonicalize {
1409            self.canonicalize();
1410        }
1411
1412        result
1413    }
1414}
1415
1416impl From<IoBuf> for IoBufs {
1417    fn from(buf: IoBuf) -> Self {
1418        Self {
1419            inner: IoBufsInner::Single(buf),
1420        }
1421    }
1422}
1423
1424impl From<IoBufMut> for IoBufs {
1425    fn from(buf: IoBufMut) -> Self {
1426        Self {
1427            inner: IoBufsInner::Single(buf.freeze()),
1428        }
1429    }
1430}
1431
1432impl From<Bytes> for IoBufs {
1433    fn from(bytes: Bytes) -> Self {
1434        Self::from(IoBuf::from(bytes))
1435    }
1436}
1437
1438impl From<BytesMut> for IoBufs {
1439    fn from(bytes: BytesMut) -> Self {
1440        Self::from(IoBuf::from(bytes.freeze()))
1441    }
1442}
1443
1444impl From<Vec<u8>> for IoBufs {
1445    fn from(vec: Vec<u8>) -> Self {
1446        Self::from(IoBuf::from(vec))
1447    }
1448}
1449
1450impl From<Vec<IoBuf>> for IoBufs {
1451    fn from(bufs: Vec<IoBuf>) -> Self {
1452        Self::from_chunks_iter(bufs)
1453    }
1454}
1455
1456impl<const N: usize> From<&'static [u8; N]> for IoBufs {
1457    fn from(array: &'static [u8; N]) -> Self {
1458        Self::from(IoBuf::from(array))
1459    }
1460}
1461
1462impl From<&'static [u8]> for IoBufs {
1463    fn from(slice: &'static [u8]) -> Self {
1464        Self::from(IoBuf::from(slice))
1465    }
1466}
1467
1468/// Container for one or more mutable buffers.
1469#[derive(Debug)]
1470pub struct IoBufsMut {
1471    inner: IoBufsMutInner,
1472}
1473
1474/// Internal mutable representation.
1475///
1476/// - Construction from caller-provided writable chunks keeps chunks with
1477///   non-zero capacity, even when `remaining() == 0`.
1478/// - Read-canonicalization paths remove drained chunks (`remaining() == 0`)
1479///   and collapse shape as readable chunk count shrinks.
1480#[derive(Debug)]
1481enum IoBufsMutInner {
1482    /// Single buffer (common case, no allocation).
1483    Single(IoBufMut),
1484    /// Two buffers (fast path, no VecDeque allocation).
1485    Pair([IoBufMut; 2]),
1486    /// Three buffers (fast path, no VecDeque allocation).
1487    Triple([IoBufMut; 3]),
1488    /// Four or more buffers.
1489    Chunked(VecDeque<IoBufMut>),
1490}
1491
1492impl Default for IoBufsMut {
1493    fn default() -> Self {
1494        Self {
1495            inner: IoBufsMutInner::Single(IoBufMut::default()),
1496        }
1497    }
1498}
1499
1500impl IoBufsMut {
1501    /// Build mutable chunk storage from already-filtered chunks.
1502    ///
1503    /// This helper intentionally does not filter.
1504    /// Callers choose filter policy first:
1505    /// - [`Self::from_writable_chunks_iter`] for construction from writable chunks (`capacity() > 0`)
1506    /// - [`Self::from_readable_chunks_iter`] for read-canonicalization (`remaining() > 0`)
1507    fn from_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1508        let mut iter = chunks.into_iter();
1509        let first = match iter.next() {
1510            Some(first) => first,
1511            None => return Self::default(),
1512        };
1513        let second = match iter.next() {
1514            Some(second) => second,
1515            None => {
1516                return Self {
1517                    inner: IoBufsMutInner::Single(first),
1518                };
1519            }
1520        };
1521        let third = match iter.next() {
1522            Some(third) => third,
1523            None => {
1524                return Self {
1525                    inner: IoBufsMutInner::Pair([first, second]),
1526                };
1527            }
1528        };
1529        let fourth = match iter.next() {
1530            Some(fourth) => fourth,
1531            None => {
1532                return Self {
1533                    inner: IoBufsMutInner::Triple([first, second, third]),
1534                };
1535            }
1536        };
1537
1538        let mut bufs = VecDeque::with_capacity(4);
1539        bufs.push_back(first);
1540        bufs.push_back(second);
1541        bufs.push_back(third);
1542        bufs.push_back(fourth);
1543        bufs.extend(iter);
1544        Self {
1545            inner: IoBufsMutInner::Chunked(bufs),
1546        }
1547    }
1548
1549    /// Build canonical mutable chunk storage from writable chunks.
1550    ///
1551    /// Chunks with zero capacity are removed.
1552    fn from_writable_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1553        // Keep chunks that can hold data (including len == 0 writable buffers).
1554        Self::from_chunks_iter(chunks.into_iter().filter(|buf| buf.capacity() > 0))
1555    }
1556
1557    /// Build canonical mutable chunk storage from readable chunks.
1558    ///
1559    /// Chunks with no remaining readable bytes are removed.
1560    fn from_readable_chunks_iter(chunks: impl IntoIterator<Item = IoBufMut>) -> Self {
1561        Self::from_chunks_iter(chunks.into_iter().filter(|buf| buf.remaining() > 0))
1562    }
1563
1564    /// Re-establish canonical mutable representation invariants.
1565    fn canonicalize(&mut self) {
1566        let inner = std::mem::replace(&mut self.inner, IoBufsMutInner::Single(IoBufMut::default()));
1567        self.inner = match inner {
1568            IoBufsMutInner::Single(buf) => IoBufsMutInner::Single(buf),
1569            IoBufsMutInner::Pair([a, b]) => Self::from_readable_chunks_iter([a, b]).inner,
1570            IoBufsMutInner::Triple([a, b, c]) => Self::from_readable_chunks_iter([a, b, c]).inner,
1571            IoBufsMutInner::Chunked(bufs) => Self::from_readable_chunks_iter(bufs).inner,
1572        };
1573    }
1574
1575    #[inline]
1576    fn for_each_chunk_mut(&mut self, mut f: impl FnMut(&mut IoBufMut)) {
1577        match &mut self.inner {
1578            IoBufsMutInner::Single(buf) => f(buf),
1579            IoBufsMutInner::Pair(pair) => {
1580                for buf in pair.iter_mut() {
1581                    f(buf);
1582                }
1583            }
1584            IoBufsMutInner::Triple(triple) => {
1585                for buf in triple.iter_mut() {
1586                    f(buf);
1587                }
1588            }
1589            IoBufsMutInner::Chunked(bufs) => {
1590                for buf in bufs.iter_mut() {
1591                    f(buf);
1592                }
1593            }
1594        }
1595    }
1596
1597    /// Returns a reference to the single contiguous buffer, if present.
1598    ///
1599    /// Returns `Some` only when this is currently represented as one chunk.
1600    pub const fn as_single(&self) -> Option<&IoBufMut> {
1601        match &self.inner {
1602            IoBufsMutInner::Single(buf) => Some(buf),
1603            _ => None,
1604        }
1605    }
1606
1607    /// Returns a mutable reference to the single contiguous buffer, if present.
1608    ///
1609    /// Returns `Some` only when this is currently represented as one chunk.
1610    pub const fn as_single_mut(&mut self) -> Option<&mut IoBufMut> {
1611        match &mut self.inner {
1612            IoBufsMutInner::Single(buf) => Some(buf),
1613            _ => None,
1614        }
1615    }
1616
1617    /// Consume this container and return the single buffer if present.
1618    ///
1619    /// Returns `Ok(IoBufMut)` only when readable data is represented as one
1620    /// chunk. Returns `Err(Self)` with the original container otherwise.
1621    #[allow(clippy::result_large_err)]
1622    pub fn try_into_single(self) -> Result<IoBufMut, Self> {
1623        match self.inner {
1624            IoBufsMutInner::Single(buf) => Ok(buf),
1625            inner => Err(Self { inner }),
1626        }
1627    }
1628
1629    /// Number of bytes remaining across all buffers.
1630    #[inline]
1631    pub fn len(&self) -> usize {
1632        self.remaining()
1633    }
1634
1635    /// Whether all buffers are empty.
1636    #[inline]
1637    pub fn is_empty(&self) -> bool {
1638        self.remaining() == 0
1639    }
1640
1641    /// Whether this contains a single contiguous buffer.
1642    ///
1643    /// When true, `chunk()` returns all remaining bytes.
1644    #[inline]
1645    pub const fn is_single(&self) -> bool {
1646        matches!(self.inner, IoBufsMutInner::Single(_))
1647    }
1648
1649    /// Freeze into immutable [`IoBufs`].
1650    pub fn freeze(self) -> IoBufs {
1651        match self.inner {
1652            IoBufsMutInner::Single(buf) => IoBufs::from(buf.freeze()),
1653            IoBufsMutInner::Pair([a, b]) => IoBufs::from_chunks_iter([a.freeze(), b.freeze()]),
1654            IoBufsMutInner::Triple([a, b, c]) => {
1655                IoBufs::from_chunks_iter([a.freeze(), b.freeze(), c.freeze()])
1656            }
1657            IoBufsMutInner::Chunked(bufs) => {
1658                IoBufs::from_chunks_iter(bufs.into_iter().map(IoBufMut::freeze))
1659            }
1660        }
1661    }
1662
1663    fn coalesce_with<F>(self, allocate: F) -> IoBufMut
1664    where
1665        F: FnOnce(usize) -> IoBufMut,
1666    {
1667        match self.inner {
1668            IoBufsMutInner::Single(buf) => buf,
1669            IoBufsMutInner::Pair([a, b]) => {
1670                let total_len = a.len().saturating_add(b.len());
1671                let mut result = allocate(total_len);
1672                result.put_slice(a.as_ref());
1673                result.put_slice(b.as_ref());
1674                result
1675            }
1676            IoBufsMutInner::Triple([a, b, c]) => {
1677                let total_len = a.len().saturating_add(b.len()).saturating_add(c.len());
1678                let mut result = allocate(total_len);
1679                result.put_slice(a.as_ref());
1680                result.put_slice(b.as_ref());
1681                result.put_slice(c.as_ref());
1682                result
1683            }
1684            IoBufsMutInner::Chunked(bufs) => {
1685                let total_len: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
1686                let mut result = allocate(total_len);
1687                for buf in bufs {
1688                    result.put_slice(buf.as_ref());
1689                }
1690                result
1691            }
1692        }
1693    }
1694
1695    /// Coalesce all buffers into a single contiguous [`IoBufMut`].
1696    ///
1697    /// Zero-copy if only one buffer. Copies if multiple buffers.
1698    pub fn coalesce(self) -> IoBufMut {
1699        self.coalesce_with(IoBufMut::with_capacity)
1700    }
1701
1702    /// Coalesce all buffers into a single contiguous [`IoBufMut`], using the pool
1703    /// for allocation if multiple buffers need to be merged.
1704    ///
1705    /// Zero-copy if only one buffer. Uses pool allocation if multiple buffers.
1706    pub fn coalesce_with_pool(self, pool: &BufferPool) -> IoBufMut {
1707        self.coalesce_with(|len| pool.alloc(len))
1708    }
1709
1710    /// Coalesce all buffers into a single contiguous [`IoBufMut`] with extra
1711    /// capacity, using the pool for allocation.
1712    ///
1713    /// Zero-copy if single buffer with sufficient spare capacity.
1714    pub fn coalesce_with_pool_extra(self, pool: &BufferPool, extra: usize) -> IoBufMut {
1715        match self.inner {
1716            IoBufsMutInner::Single(buf) if buf.capacity() - buf.len() >= extra => buf,
1717            IoBufsMutInner::Single(buf) => {
1718                let mut result = pool.alloc(buf.len() + extra);
1719                result.put_slice(buf.as_ref());
1720                result
1721            }
1722            IoBufsMutInner::Pair([a, b]) => {
1723                let total = a.len().saturating_add(b.len());
1724                let mut result = pool.alloc(total + extra);
1725                result.put_slice(a.as_ref());
1726                result.put_slice(b.as_ref());
1727                result
1728            }
1729            IoBufsMutInner::Triple([a, b, c]) => {
1730                let total = a.len().saturating_add(b.len()).saturating_add(c.len());
1731                let mut result = pool.alloc(total + extra);
1732                result.put_slice(a.as_ref());
1733                result.put_slice(b.as_ref());
1734                result.put_slice(c.as_ref());
1735                result
1736            }
1737            IoBufsMutInner::Chunked(bufs) => {
1738                let total: usize = bufs.iter().map(|b| b.len()).fold(0, usize::saturating_add);
1739                let mut result = pool.alloc(total + extra);
1740                for buf in bufs {
1741                    result.put_slice(buf.as_ref());
1742                }
1743                result
1744            }
1745        }
1746    }
1747
1748    /// Returns the total capacity across all buffers.
1749    pub fn capacity(&self) -> usize {
1750        match &self.inner {
1751            IoBufsMutInner::Single(buf) => buf.capacity(),
1752            IoBufsMutInner::Pair([a, b]) => a.capacity().saturating_add(b.capacity()),
1753            IoBufsMutInner::Triple([a, b, c]) => a
1754                .capacity()
1755                .saturating_add(b.capacity())
1756                .saturating_add(c.capacity()),
1757            IoBufsMutInner::Chunked(bufs) => bufs
1758                .iter()
1759                .map(|b| b.capacity())
1760                .fold(0, usize::saturating_add),
1761        }
1762    }
1763
1764    /// Sets the length of the buffer(s) to `len`, distributing across chunks
1765    /// while preserving the current chunk layout.
1766    ///
1767    /// This is useful for APIs that must fill caller-provided buffer structure
1768    /// in place (for example [`Blob::read_at_buf`](crate::Blob::read_at_buf)).
1769    ///
1770    /// # Safety
1771    ///
1772    /// Caller must initialize all `len` bytes before the buffer is read.
1773    ///
1774    /// # Panics
1775    ///
1776    /// Panics if `len` exceeds total capacity.
1777    pub(crate) unsafe fn set_len(&mut self, len: usize) {
1778        let capacity = self.capacity();
1779        assert!(
1780            len <= capacity,
1781            "set_len({len}) exceeds capacity({capacity})"
1782        );
1783        let mut remaining = len;
1784        self.for_each_chunk_mut(|buf| {
1785            let cap = buf.capacity();
1786            let to_set = remaining.min(cap);
1787            buf.set_len(to_set);
1788            remaining -= to_set;
1789        });
1790    }
1791
1792    /// Copy data from a slice into the buffers.
1793    ///
1794    /// Panics if the slice length doesn't match the total buffer length.
1795    pub fn copy_from_slice(&mut self, src: &[u8]) {
1796        assert_eq!(
1797            src.len(),
1798            self.len(),
1799            "source slice length must match buffer length"
1800        );
1801        let mut offset = 0;
1802        self.for_each_chunk_mut(|buf| {
1803            let len = buf.len();
1804            buf.as_mut().copy_from_slice(&src[offset..offset + len]);
1805            offset += len;
1806        });
1807    }
1808}
1809
1810impl Buf for IoBufsMut {
1811    fn remaining(&self) -> usize {
1812        match &self.inner {
1813            IoBufsMutInner::Single(buf) => buf.remaining(),
1814            IoBufsMutInner::Pair([a, b]) => a.remaining().saturating_add(b.remaining()),
1815            IoBufsMutInner::Triple([a, b, c]) => a
1816                .remaining()
1817                .saturating_add(b.remaining())
1818                .saturating_add(c.remaining()),
1819            IoBufsMutInner::Chunked(bufs) => bufs
1820                .iter()
1821                .map(|b| b.remaining())
1822                .fold(0, usize::saturating_add),
1823        }
1824    }
1825
1826    fn chunk(&self) -> &[u8] {
1827        match &self.inner {
1828            IoBufsMutInner::Single(buf) => buf.chunk(),
1829            IoBufsMutInner::Pair([a, b]) => {
1830                if a.remaining() > 0 {
1831                    a.chunk()
1832                } else if b.remaining() > 0 {
1833                    b.chunk()
1834                } else {
1835                    &[]
1836                }
1837            }
1838            IoBufsMutInner::Triple([a, b, c]) => {
1839                if a.remaining() > 0 {
1840                    a.chunk()
1841                } else if b.remaining() > 0 {
1842                    b.chunk()
1843                } else if c.remaining() > 0 {
1844                    c.chunk()
1845                } else {
1846                    &[]
1847                }
1848            }
1849            IoBufsMutInner::Chunked(bufs) => {
1850                for buf in bufs.iter() {
1851                    if buf.remaining() > 0 {
1852                        return buf.chunk();
1853                    }
1854                }
1855                &[]
1856            }
1857        }
1858    }
1859
1860    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
1861        if dst.is_empty() {
1862            return 0;
1863        }
1864
1865        match &self.inner {
1866            IoBufsMutInner::Single(buf) => {
1867                let chunk = buf.chunk();
1868                if !chunk.is_empty() {
1869                    dst[0] = IoSlice::new(chunk);
1870                    return 1;
1871                }
1872                0
1873            }
1874            IoBufsMutInner::Pair([a, b]) => fill_vectored_from_chunks(dst, [a.chunk(), b.chunk()]),
1875            IoBufsMutInner::Triple([a, b, c]) => {
1876                fill_vectored_from_chunks(dst, [a.chunk(), b.chunk(), c.chunk()])
1877            }
1878            IoBufsMutInner::Chunked(bufs) => {
1879                fill_vectored_from_chunks(dst, bufs.iter().map(|buf| buf.chunk()))
1880            }
1881        }
1882    }
1883
1884    fn advance(&mut self, cnt: usize) {
1885        let should_canonicalize = match &mut self.inner {
1886            IoBufsMutInner::Single(buf) => {
1887                buf.advance(cnt);
1888                false
1889            }
1890            IoBufsMutInner::Pair(pair) => advance_small_chunks(pair.as_mut_slice(), cnt),
1891            IoBufsMutInner::Triple(triple) => advance_small_chunks(triple.as_mut_slice(), cnt),
1892            IoBufsMutInner::Chunked(bufs) => {
1893                advance_chunked_front(bufs, cnt);
1894                bufs.len() <= 3
1895            }
1896        };
1897
1898        if should_canonicalize {
1899            self.canonicalize();
1900        }
1901    }
1902
1903    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
1904        let (result, needs_canonicalize) = match &mut self.inner {
1905            IoBufsMutInner::Single(buf) => return buf.copy_to_bytes(len),
1906            IoBufsMutInner::Pair(pair) => {
1907                copy_to_bytes_small_chunks(pair, len, "IoBufsMut::copy_to_bytes: not enough data")
1908            }
1909            IoBufsMutInner::Triple(triple) => {
1910                copy_to_bytes_small_chunks(triple, len, "IoBufsMut::copy_to_bytes: not enough data")
1911            }
1912            IoBufsMutInner::Chunked(bufs) => {
1913                copy_to_bytes_chunked(bufs, len, "IoBufsMut::copy_to_bytes: not enough data")
1914            }
1915        };
1916
1917        if needs_canonicalize {
1918            self.canonicalize();
1919        }
1920
1921        result
1922    }
1923}
1924
1925// SAFETY: Delegates to IoBufMut which implements BufMut safely.
1926unsafe impl BufMut for IoBufsMut {
1927    #[inline]
1928    fn remaining_mut(&self) -> usize {
1929        match &self.inner {
1930            IoBufsMutInner::Single(buf) => buf.remaining_mut(),
1931            IoBufsMutInner::Pair([a, b]) => a.remaining_mut().saturating_add(b.remaining_mut()),
1932            IoBufsMutInner::Triple([a, b, c]) => a
1933                .remaining_mut()
1934                .saturating_add(b.remaining_mut())
1935                .saturating_add(c.remaining_mut()),
1936            IoBufsMutInner::Chunked(bufs) => bufs
1937                .iter()
1938                .map(|b| b.remaining_mut())
1939                .fold(0, usize::saturating_add),
1940        }
1941    }
1942
1943    #[inline]
1944    unsafe fn advance_mut(&mut self, cnt: usize) {
1945        match &mut self.inner {
1946            IoBufsMutInner::Single(buf) => buf.advance_mut(cnt),
1947            IoBufsMutInner::Pair(pair) => {
1948                let mut remaining = cnt;
1949                if advance_mut_in_chunks(pair, &mut remaining) {
1950                    return;
1951                }
1952                panic!("cannot advance past end of buffer");
1953            }
1954            IoBufsMutInner::Triple(triple) => {
1955                let mut remaining = cnt;
1956                if advance_mut_in_chunks(triple, &mut remaining) {
1957                    return;
1958                }
1959                panic!("cannot advance past end of buffer");
1960            }
1961            IoBufsMutInner::Chunked(bufs) => {
1962                let mut remaining = cnt;
1963                let (first, second) = bufs.as_mut_slices();
1964                if advance_mut_in_chunks(first, &mut remaining)
1965                    || advance_mut_in_chunks(second, &mut remaining)
1966                {
1967                    return;
1968                }
1969                panic!("cannot advance past end of buffer");
1970            }
1971        }
1972    }
1973
1974    #[inline]
1975    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
1976        match &mut self.inner {
1977            IoBufsMutInner::Single(buf) => buf.chunk_mut(),
1978            IoBufsMutInner::Pair(pair) => {
1979                if pair[0].remaining_mut() > 0 {
1980                    pair[0].chunk_mut()
1981                } else if pair[1].remaining_mut() > 0 {
1982                    pair[1].chunk_mut()
1983                } else {
1984                    bytes::buf::UninitSlice::new(&mut [])
1985                }
1986            }
1987            IoBufsMutInner::Triple(triple) => {
1988                if triple[0].remaining_mut() > 0 {
1989                    triple[0].chunk_mut()
1990                } else if triple[1].remaining_mut() > 0 {
1991                    triple[1].chunk_mut()
1992                } else if triple[2].remaining_mut() > 0 {
1993                    triple[2].chunk_mut()
1994                } else {
1995                    bytes::buf::UninitSlice::new(&mut [])
1996                }
1997            }
1998            IoBufsMutInner::Chunked(bufs) => {
1999                for buf in bufs.iter_mut() {
2000                    if buf.remaining_mut() > 0 {
2001                        return buf.chunk_mut();
2002                    }
2003                }
2004                bytes::buf::UninitSlice::new(&mut [])
2005            }
2006        }
2007    }
2008}
2009
2010impl From<IoBufMut> for IoBufsMut {
2011    fn from(buf: IoBufMut) -> Self {
2012        Self {
2013            inner: IoBufsMutInner::Single(buf),
2014        }
2015    }
2016}
2017
2018impl From<Vec<u8>> for IoBufsMut {
2019    fn from(vec: Vec<u8>) -> Self {
2020        Self {
2021            inner: IoBufsMutInner::Single(IoBufMut::from(vec)),
2022        }
2023    }
2024}
2025
2026impl From<BytesMut> for IoBufsMut {
2027    fn from(bytes: BytesMut) -> Self {
2028        Self {
2029            inner: IoBufsMutInner::Single(IoBufMut::from(bytes)),
2030        }
2031    }
2032}
2033
2034impl From<Vec<IoBufMut>> for IoBufsMut {
2035    fn from(bufs: Vec<IoBufMut>) -> Self {
2036        Self::from_writable_chunks_iter(bufs)
2037    }
2038}
2039
2040impl<const N: usize> From<[u8; N]> for IoBufsMut {
2041    fn from(array: [u8; N]) -> Self {
2042        Self {
2043            inner: IoBufsMutInner::Single(IoBufMut::from(array)),
2044        }
2045    }
2046}
2047
2048/// Drain `len` readable bytes from a small fixed chunk array (`Pair`/`Triple`).
2049///
2050/// Returns drained bytes plus whether the caller should canonicalize afterward.
2051#[inline]
2052fn copy_to_bytes_small_chunks<B: Buf, const N: usize>(
2053    chunks: &mut [B; N],
2054    len: usize,
2055    not_enough_data_msg: &str,
2056) -> (Bytes, bool) {
2057    let total = chunks
2058        .iter()
2059        .map(|buf| buf.remaining())
2060        .fold(0, usize::saturating_add);
2061    assert!(total >= len, "{not_enough_data_msg}");
2062
2063    if chunks[0].remaining() >= len {
2064        let bytes = chunks[0].copy_to_bytes(len);
2065        return (bytes, chunks[0].remaining() == 0);
2066    }
2067
2068    let mut out = BytesMut::with_capacity(len);
2069    let mut remaining = len;
2070    for buf in chunks.iter_mut() {
2071        if remaining == 0 {
2072            break;
2073        }
2074        let to_copy = remaining.min(buf.remaining());
2075        out.extend_from_slice(&buf.chunk()[..to_copy]);
2076        buf.advance(to_copy);
2077        remaining -= to_copy;
2078    }
2079
2080    // Slow path always consumes past chunk 0, so canonicalization is required.
2081    (out.freeze(), true)
2082}
2083
2084/// Drain `len` readable bytes from a deque-backed chunk representation.
2085///
2086/// Returns drained bytes plus whether the caller should canonicalize afterward.
2087#[inline]
2088fn copy_to_bytes_chunked<B: Buf>(
2089    bufs: &mut VecDeque<B>,
2090    len: usize,
2091    not_enough_data_msg: &str,
2092) -> (Bytes, bool) {
2093    while bufs.front().is_some_and(|buf| buf.remaining() == 0) {
2094        bufs.pop_front();
2095    }
2096
2097    if bufs.front().is_none() {
2098        assert_eq!(len, 0, "{not_enough_data_msg}");
2099        return (Bytes::new(), false);
2100    }
2101
2102    if bufs.front().is_some_and(|front| front.remaining() >= len) {
2103        let front = bufs.front_mut().expect("front checked above");
2104        let bytes = front.copy_to_bytes(len);
2105        if front.remaining() == 0 {
2106            bufs.pop_front();
2107        }
2108        return (bytes, bufs.len() <= 3);
2109    }
2110
2111    let total = bufs
2112        .iter()
2113        .map(|buf| buf.remaining())
2114        .fold(0, usize::saturating_add);
2115    assert!(total >= len, "{not_enough_data_msg}");
2116
2117    let mut out = BytesMut::with_capacity(len);
2118    let mut remaining = len;
2119    while remaining > 0 {
2120        let front = bufs
2121            .front_mut()
2122            .expect("remaining > 0 implies non-empty bufs");
2123        let to_copy = remaining.min(front.remaining());
2124        out.extend_from_slice(&front.chunk()[..to_copy]);
2125        front.advance(to_copy);
2126        if front.remaining() == 0 {
2127            bufs.pop_front();
2128        }
2129        remaining -= to_copy;
2130    }
2131
2132    (out.freeze(), bufs.len() <= 3)
2133}
2134
2135/// Advance across a [`VecDeque`] of chunks by consuming from the front.
2136#[inline]
2137fn advance_chunked_front<B: Buf>(bufs: &mut VecDeque<B>, mut cnt: usize) {
2138    while cnt > 0 {
2139        let front = bufs.front_mut().expect("cannot advance past end of buffer");
2140        let avail = front.remaining();
2141        if avail == 0 {
2142            bufs.pop_front();
2143            continue;
2144        }
2145        if cnt < avail {
2146            front.advance(cnt);
2147            break;
2148        }
2149        front.advance(avail);
2150        bufs.pop_front();
2151        cnt -= avail;
2152    }
2153}
2154
2155/// Advance across a small fixed set of chunks (`Pair`/`Triple`).
2156///
2157/// Returns `true` when one or more chunks became (or were) empty, so callers
2158/// can canonicalize once after the operation.
2159#[inline]
2160fn advance_small_chunks<B: Buf>(chunks: &mut [B], mut cnt: usize) -> bool {
2161    let mut idx = 0;
2162    let mut needs_canonicalize = false;
2163
2164    while cnt > 0 {
2165        let chunk = chunks
2166            .get_mut(idx)
2167            .expect("cannot advance past end of buffer");
2168        let avail = chunk.remaining();
2169        if avail == 0 {
2170            idx += 1;
2171            needs_canonicalize = true;
2172            continue;
2173        }
2174        if cnt < avail {
2175            chunk.advance(cnt);
2176            return needs_canonicalize;
2177        }
2178        chunk.advance(avail);
2179        cnt -= avail;
2180        idx += 1;
2181        needs_canonicalize = true;
2182    }
2183
2184    needs_canonicalize
2185}
2186
2187/// Advance writable cursors across `chunks` by up to `*remaining` bytes.
2188///
2189/// Returns `true` when the full request has been satisfied.
2190///
2191/// # Safety
2192///
2193/// Forwards to [`BufMut::advance_mut`], so callers must ensure the advanced
2194/// region has been initialized according to [`BufMut`]'s contract.
2195#[inline]
2196unsafe fn advance_mut_in_chunks<B: BufMut>(chunks: &mut [B], remaining: &mut usize) -> bool {
2197    if *remaining == 0 {
2198        return true;
2199    }
2200
2201    for buf in chunks.iter_mut() {
2202        let avail = buf.chunk_mut().len();
2203        if avail == 0 {
2204            continue;
2205        }
2206        if *remaining <= avail {
2207            // SAFETY: Upheld by this function's safety contract.
2208            unsafe { buf.advance_mut(*remaining) };
2209            *remaining = 0;
2210            return true;
2211        }
2212        // SAFETY: Upheld by this function's safety contract.
2213        unsafe { buf.advance_mut(avail) };
2214        *remaining -= avail;
2215    }
2216    false
2217}
2218
2219/// Fill `dst` with `IoSlice`s built from `chunks`.
2220///
2221/// Empty chunks are skipped. At most `dst.len()` slices are written.
2222/// Returns the number of slices written.
2223#[inline]
2224fn fill_vectored_from_chunks<'a, I>(dst: &mut [IoSlice<'a>], chunks: I) -> usize
2225where
2226    I: IntoIterator<Item = &'a [u8]>,
2227{
2228    let mut written = 0;
2229    for chunk in chunks
2230        .into_iter()
2231        .filter(|chunk| !chunk.is_empty())
2232        .take(dst.len())
2233    {
2234        dst[written] = IoSlice::new(chunk);
2235        written += 1;
2236    }
2237    written
2238}
2239
2240/// Assembles [`IoBufs`] from a mix of inline writes and zero-copy pieces.
2241///
2242/// All inline writes go into a single pool-backed buffer. [`BufsMut::push`]
2243/// records boundaries without flushing. [`Builder::finish`] freezes the buffer
2244/// once and uses [`IoBuf::slice`] to carve it into pieces at the recorded
2245/// boundaries, interleaved with the pushed [`Bytes`].
2246///
2247/// The inline buffer has a fixed capacity set at construction and will not
2248/// grow. Callers must ensure the capacity accounts for all inline
2249/// (non-pushed) bytes that will be written. Exceeding it will panic.
2250///
2251/// ```text
2252/// builder.put_u16(99);                        // inline
2253/// builder.push(shard_payload.clone());        // zero-copy (Arc clone)
2254/// builder.put_u32(checksum);                  // inline
2255/// let output = builder.finish();
2256///
2257/// // output: [ 99 | --- 1 MB shard --- | checksum ]
2258/// //           pool    Arc clone          pool
2259/// //            \________________________/
2260/// //             slices of one allocation
2261/// ```
2262pub struct Builder {
2263    // Single working buffer for all inline writes.
2264    buf: IoBufMut,
2265    // Each entry is (offset_in_buf, pushed_bytes) recording where a push
2266    // interrupts the inline byte stream.
2267    pushes: Vec<(usize, Bytes)>,
2268}
2269
2270impl Builder {
2271    /// Creates a new builder with a fixed-capacity inline buffer.
2272    ///
2273    /// `capacity` is the minimum number of inline bytes the buffer can hold.
2274    /// The pool may round up to a larger size class. Writing more inline
2275    /// bytes than the allocated capacity will panic.
2276    pub fn new(pool: &BufferPool, capacity: NonZeroUsize) -> Self {
2277        Self {
2278            buf: pool.alloc(capacity.get()),
2279            pushes: Vec::new(),
2280        }
2281    }
2282
2283    /// Freezes the inline buffer and assembles [`IoBufs`] by slicing at
2284    /// the recorded push boundaries.
2285    pub fn finish(self) -> IoBufs {
2286        if self.pushes.is_empty() {
2287            return IoBufs::from(self.buf.freeze());
2288        }
2289
2290        let frozen = self.buf.freeze();
2291        let mut result = IoBufs::default();
2292        let mut pos = 0;
2293
2294        for (offset, pushed) in self.pushes {
2295            if offset > pos {
2296                result.append(frozen.slice(pos..offset));
2297            }
2298            result.append(IoBuf::from(pushed));
2299            pos = offset;
2300        }
2301
2302        if pos < frozen.len() {
2303            result.append(frozen.slice(pos..));
2304        }
2305
2306        result
2307    }
2308}
2309
2310// SAFETY: All methods delegate directly to `self.buf`, a pool-backed
2311// `IoBufMut` with a sound `BufMut` implementation. The inline buffer has
2312// fixed capacity; writes that exceed it will panic via the underlying
2313// `IoBufMut` implementation.
2314unsafe impl BufMut for Builder {
2315    #[inline]
2316    fn remaining_mut(&self) -> usize {
2317        self.buf.remaining_mut()
2318    }
2319
2320    #[inline]
2321    unsafe fn advance_mut(&mut self, cnt: usize) {
2322        self.buf.advance_mut(cnt);
2323    }
2324
2325    #[inline]
2326    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
2327        self.buf.chunk_mut()
2328    }
2329}
2330
2331impl BufsMut for Builder {
2332    fn push(&mut self, bytes: impl Into<Bytes>) {
2333        let bytes = bytes.into();
2334        if !bytes.is_empty() {
2335            self.pushes.push((self.buf.len(), bytes));
2336        }
2337    }
2338}
2339
2340/// Extension trait for encoding values into pooled I/O buffers.
2341///
2342/// This is useful for hot paths that need to avoid frequent heap allocations
2343/// when serializing values that implement [`Write`] and [`EncodeSize`].
2344pub trait EncodeExt: EncodeSize + Write {
2345    /// Encode this value into an [`IoBufMut`] allocated from `pool`.
2346    ///
2347    /// # Panics
2348    ///
2349    /// Panics if [`EncodeSize::encode_size`] does not match the number of
2350    /// bytes written by [`Write::write`].
2351    fn encode_with_pool_mut(&self, pool: &BufferPool) -> IoBufMut {
2352        let len = self.encode_size();
2353        let mut buf = pool.alloc(len);
2354        self.write(&mut buf);
2355        assert_eq!(
2356            buf.len(),
2357            len,
2358            "write() did not write expected bytes into pooled buffer"
2359        );
2360        buf
2361    }
2362
2363    /// Encode into [`IoBufs`] using pool allocation.
2364    ///
2365    /// Override [`Write::write_bufs`] to avoid copying large [`Bytes`] fields.
2366    ///
2367    /// # Panics
2368    ///
2369    /// Panics if [`EncodeSize::encode_inline_size`] underreports the number
2370    /// of inline bytes written by [`Write::write_bufs`], or if
2371    /// [`EncodeSize::encode_size`] does not match the total bytes written.
2372    fn encode_with_pool(&self, pool: &BufferPool) -> IoBufs {
2373        let len = self.encode_size();
2374        let capacity = NonZeroUsize::new(self.encode_inline_size()).unwrap_or(NonZeroUsize::MIN);
2375        let mut builder = Builder::new(pool, capacity);
2376        self.write_bufs(&mut builder);
2377        let bufs = builder.finish();
2378        assert_eq!(
2379            bufs.remaining(),
2380            len,
2381            "write_bufs() did not write expected bytes"
2382        );
2383        bufs
2384    }
2385}
2386
2387impl<T: EncodeSize + Write> EncodeExt for T {}
2388
2389#[cfg(test)]
2390mod tests {
2391    use super::*;
2392    use bytes::{Bytes, BytesMut};
2393    use commonware_codec::{types::lazy::Lazy, Decode, Encode, RangeCfg};
2394    use core::ops::{Range, RangeFrom, RangeInclusive, RangeToInclusive};
2395    use std::collections::{BTreeMap, HashMap};
2396
2397    fn test_pool() -> BufferPool {
2398        cfg_if::cfg_if! {
2399            if #[cfg(miri)] {
2400                // Reduce max_per_class to avoid slow atomics under miri.
2401                let pool_config = BufferPoolConfig {
2402                    pool_min_size: 0,
2403                    max_per_class: commonware_utils::NZU32!(32),
2404                    ..BufferPoolConfig::for_network()
2405                };
2406            } else {
2407                let pool_config = BufferPoolConfig::for_network().with_pool_min_size(0);
2408            }
2409        }
2410        let mut registry = crate::telemetry::metrics::Registry::default();
2411        BufferPool::new(pool_config, &mut registry)
2412    }
2413
2414    fn assert_encode_with_pool_matches_encode<T: Encode + EncodeExt>(value: &T) {
2415        let pool = test_pool();
2416        let mut pooled = value.encode_with_pool(&pool);
2417        let baseline = value.encode();
2418        let mut pooled_bytes = vec![0u8; pooled.remaining()];
2419        pooled.copy_to_slice(&mut pooled_bytes);
2420        assert_eq!(pooled_bytes, baseline.as_ref());
2421    }
2422
2423    #[test]
2424    fn test_iobuf_core_behaviors() {
2425        // Clone stays zero-copy for immutable buffers.
2426        let buf1 = IoBuf::from(vec![1u8; 1000]);
2427        let buf2 = buf1.clone();
2428        assert_eq!(buf1.as_ref().as_ptr(), buf2.as_ref().as_ptr());
2429
2430        // copy_from_slice creates an owned immutable buffer.
2431        let data = vec![1u8, 2, 3, 4, 5];
2432        let copied = IoBuf::copy_from_slice(&data);
2433        assert_eq!(copied, [1, 2, 3, 4, 5]);
2434        assert_eq!(copied.len(), 5);
2435        let empty = IoBuf::copy_from_slice(&[]);
2436        assert!(empty.is_empty());
2437
2438        // Equality works against both arrays and slices.
2439        let eq = IoBuf::from(b"hello");
2440        assert_eq!(eq, *b"hello");
2441        assert_eq!(eq, b"hello");
2442        assert_ne!(eq, *b"world");
2443        assert_ne!(eq, b"world");
2444        assert_eq!(IoBuf::from(b"hello"), IoBuf::from(b"hello"));
2445        assert_ne!(IoBuf::from(b"hello"), IoBuf::from(b"world"));
2446        let bytes: Bytes = IoBuf::from(b"bytes").into();
2447        assert_eq!(bytes.as_ref(), b"bytes");
2448
2449        // Buf trait operations keep `len()` and `remaining()` in sync.
2450        let mut buf = IoBuf::from(b"hello world");
2451        assert_eq!(buf.len(), buf.remaining());
2452        assert_eq!(buf.as_ref(), buf.chunk());
2453        assert_eq!(buf.remaining(), 11);
2454        buf.advance(6);
2455        assert_eq!(buf.chunk(), b"world");
2456        assert_eq!(buf.len(), buf.remaining());
2457
2458        // copy_to_bytes drains in-order and advances the source.
2459        let first = buf.copy_to_bytes(2);
2460        assert_eq!(&first[..], b"wo");
2461        let rest = buf.copy_to_bytes(3);
2462        assert_eq!(&rest[..], b"rld");
2463        assert_eq!(buf.remaining(), 0);
2464
2465        // Slicing remains zero-copy and supports all common range forms.
2466        let src = IoBuf::from(b"hello world");
2467        assert_eq!(src.slice(..5), b"hello");
2468        assert_eq!(src.slice(6..), b"world");
2469        assert_eq!(src.slice(3..8), b"lo wo");
2470        assert!(src.slice(5..5).is_empty());
2471    }
2472
2473    #[test]
2474    fn test_iobuf_codec_roundtrip() {
2475        let cfg: RangeCfg<usize> = (0..=1024).into();
2476
2477        let original = IoBuf::from(b"hello world");
2478        let encoded = original.encode();
2479        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
2480        assert_eq!(original, decoded);
2481
2482        let empty = IoBuf::default();
2483        let encoded = empty.encode();
2484        let decoded = IoBuf::decode_cfg(encoded, &cfg).unwrap();
2485        assert_eq!(empty, decoded);
2486
2487        let large_cfg: RangeCfg<usize> = (0..=20000).into();
2488        let large = IoBuf::from(vec![42u8; 10000]);
2489        let encoded = large.encode();
2490        let decoded = IoBuf::decode_cfg(encoded, &large_cfg).unwrap();
2491        assert_eq!(large, decoded);
2492
2493        let mut truncated = BytesMut::new();
2494        4usize.write(&mut truncated);
2495        truncated.extend_from_slice(b"xy");
2496        let mut truncated = truncated.freeze();
2497        assert!(IoBuf::read_cfg(&mut truncated, &cfg).is_err());
2498
2499        // Directly exercise the successful `read_cfg` path, not just decode helpers.
2500        let mut direct = BytesMut::new();
2501        4usize.write(&mut direct);
2502        direct.extend_from_slice(b"wxyz");
2503        let mut direct = direct.freeze();
2504        let decoded = IoBuf::read_cfg(&mut direct, &cfg).unwrap();
2505        assert_eq!(decoded, b"wxyz");
2506    }
2507
2508    #[test]
2509    #[should_panic(expected = "cannot advance")]
2510    fn test_iobuf_advance_past_end() {
2511        let mut buf = IoBuf::from(b"hello");
2512        buf.advance(10);
2513    }
2514
2515    #[test]
2516    fn test_iobuf_split_to_consistent_across_backings() {
2517        // split_to on pooled and Bytes-backed IoBufs should produce identical results.
2518        let pool = test_pool();
2519        let mut pooled = pool.try_alloc(256).expect("pooled allocation");
2520        pooled.put_slice(b"hello world");
2521        let mut pooled_buf = pooled.freeze();
2522        let mut bytes_buf = IoBuf::from(b"hello world");
2523
2524        assert!(pooled_buf.is_pooled());
2525        assert!(!bytes_buf.is_pooled());
2526
2527        let pooled_empty = pooled_buf.split_to(0);
2528        let bytes_empty = bytes_buf.split_to(0);
2529        assert_eq!(pooled_empty, bytes_empty);
2530        assert_eq!(pooled_buf, bytes_buf);
2531        assert!(!pooled_empty.is_pooled());
2532
2533        let pooled_prefix = pooled_buf.split_to(5);
2534        let bytes_prefix = bytes_buf.split_to(5);
2535        assert_eq!(pooled_prefix, bytes_prefix);
2536        assert_eq!(pooled_buf, bytes_buf);
2537        assert!(pooled_prefix.is_pooled());
2538
2539        let pooled_rest = pooled_buf.split_to(pooled_buf.len());
2540        let bytes_rest = bytes_buf.split_to(bytes_buf.len());
2541        assert_eq!(pooled_rest, bytes_rest);
2542        assert_eq!(pooled_buf, bytes_buf);
2543        assert!(pooled_buf.is_empty());
2544        assert!(bytes_buf.is_empty());
2545        assert!(!pooled_buf.is_pooled());
2546    }
2547
2548    #[test]
2549    #[should_panic(expected = "split_to out of bounds")]
2550    fn test_iobuf_split_to_out_of_bounds() {
2551        let mut buf = IoBuf::from(b"abc");
2552        let _ = buf.split_to(4);
2553    }
2554
2555    #[test]
2556    fn test_iobufmut_core_behaviors() {
2557        // Build mutable buffers incrementally and freeze to immutable.
2558        let mut buf = IoBufMut::with_capacity(100);
2559        assert!(buf.capacity() >= 100);
2560        assert_eq!(buf.len(), 0);
2561        buf.put_slice(b"hello");
2562        buf.put_slice(b" world");
2563        assert_eq!(buf, b"hello world");
2564        assert_eq!(buf, &b"hello world"[..]);
2565        assert_eq!(buf.freeze(), b"hello world");
2566
2567        // `zeroed` creates readable initialized bytes; `set_len` can shrink safely.
2568        let mut zeroed = IoBufMut::zeroed(10);
2569        assert_eq!(zeroed, &[0u8; 10]);
2570        // SAFETY: shrinking readable length to initialized region.
2571        unsafe { zeroed.set_len(5) };
2572        assert_eq!(zeroed, &[0u8; 5]);
2573        zeroed.as_mut()[..5].copy_from_slice(b"hello");
2574        assert_eq!(&zeroed.as_ref()[..5], b"hello");
2575        let frozen = zeroed.freeze();
2576        let vec: Vec<u8> = frozen.into();
2577        assert_eq!(&vec[..5], b"hello");
2578
2579        // Exercise pooled branch behavior for `is_empty`.
2580        let pool = test_pool();
2581        let mut pooled = pool.alloc(8);
2582        assert!(pooled.is_empty());
2583        pooled.put_slice(b"x");
2584        assert!(!pooled.is_empty());
2585    }
2586
2587    #[test]
2588    fn test_iobufs_shapes_and_read_paths() {
2589        // Empty construction normalizes to an empty single chunk.
2590        let empty = IoBufs::from(Vec::<u8>::new());
2591        assert!(empty.is_empty());
2592        assert!(empty.is_single());
2593        assert!(empty.as_single().is_some());
2594
2595        // Single-buffer read path.
2596        let mut single = IoBufs::from(b"hello world");
2597        assert!(single.is_single());
2598        assert_eq!(single.chunk(), b"hello world");
2599        single.advance(6);
2600        assert_eq!(single.chunk(), b"world");
2601        assert_eq!(single.copy_to_bytes(5).as_ref(), b"world");
2602        assert_eq!(single.remaining(), 0);
2603
2604        // Fast-path shapes (Pair/Triple/Chunked).
2605        let mut pair = IoBufs::from(IoBuf::from(b"a"));
2606        pair.append(IoBuf::from(b"b"));
2607        assert!(matches!(pair.inner, IoBufsInner::Pair(_)));
2608        assert!(pair.as_single().is_none());
2609
2610        let mut triple = IoBufs::from(IoBuf::from(b"a"));
2611        triple.append(IoBuf::from(b"b"));
2612        triple.append(IoBuf::from(b"c"));
2613        assert!(matches!(triple.inner, IoBufsInner::Triple(_)));
2614
2615        let mut chunked = IoBufs::from(IoBuf::from(b"a"));
2616        chunked.append(IoBuf::from(b"b"));
2617        chunked.append(IoBuf::from(b"c"));
2618        chunked.append(IoBuf::from(b"d"));
2619        assert!(matches!(chunked.inner, IoBufsInner::Chunked(_)));
2620
2621        // prepend + append preserve ordering.
2622        let mut joined = IoBufs::from(b"middle");
2623        joined.prepend(IoBuf::from(b"start "));
2624        joined.append(IoBuf::from(b" end"));
2625        assert_eq!(joined.coalesce(), b"start middle end");
2626
2627        // prepending empty is a no-op, and prepending into pair upgrades to triple.
2628        let mut prepend_noop = IoBufs::from(b"x");
2629        prepend_noop.prepend(IoBuf::default());
2630        assert_eq!(prepend_noop.coalesce(), b"x");
2631
2632        // Prepending into an empty aggregate should stay on the single-buffer fast path.
2633        let mut prepend_into_empty = IoBufs::default();
2634        prepend_into_empty.prepend(IoBuf::from(b"z"));
2635        assert!(prepend_into_empty.is_single());
2636        assert_eq!(prepend_into_empty.coalesce(), b"z");
2637
2638        let mut prepend_pair = IoBufs::from(vec![IoBuf::from(b"b"), IoBuf::from(b"c")]);
2639        prepend_pair.prepend(IoBuf::from(b"a"));
2640        assert!(matches!(prepend_pair.inner, IoBufsInner::Triple(_)));
2641        assert_eq!(prepend_pair.coalesce(), b"abc");
2642
2643        // canonicalizing a non-empty single should keep the same representation.
2644        let mut canonical_single = IoBufs::from(b"q");
2645        canonical_single.canonicalize();
2646        assert!(canonical_single.is_single());
2647        assert_eq!(canonical_single.coalesce(), b"q");
2648    }
2649
2650    #[test]
2651    fn test_iobufs_split_to_cases() {
2652        // Zero and full split on a single chunk.
2653        let mut bufs = IoBufs::from(b"hello");
2654
2655        let empty = bufs.split_to(0);
2656        assert!(empty.is_empty());
2657        assert_eq!(bufs.coalesce(), b"hello");
2658
2659        let mut bufs = IoBufs::from(b"hello");
2660        let all = bufs.split_to(5);
2661        assert_eq!(all.coalesce(), b"hello");
2662        assert!(bufs.is_single());
2663        assert!(bufs.is_empty());
2664
2665        // Single split in the middle.
2666        let mut single_mid = IoBufs::from(b"hello");
2667        let single_prefix = single_mid.split_to(2);
2668        assert!(single_prefix.is_single());
2669        assert_eq!(single_prefix.coalesce(), b"he");
2670        assert_eq!(single_mid.coalesce(), b"llo");
2671
2672        // Pair split paths: in-first, boundary-after-first, crossing-into-second.
2673        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2674        let pair_prefix = pair.split_to(1);
2675        assert!(pair_prefix.is_single());
2676        assert_eq!(pair_prefix.coalesce(), b"a");
2677        assert!(matches!(pair.inner, IoBufsInner::Pair(_)));
2678        assert_eq!(pair.coalesce(), b"bcd");
2679
2680        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2681        let pair_prefix = pair.split_to(2);
2682        assert!(pair_prefix.is_single());
2683        assert_eq!(pair_prefix.coalesce(), b"ab");
2684        assert!(pair.is_single());
2685        assert_eq!(pair.coalesce(), b"cd");
2686
2687        let mut pair = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
2688        let pair_prefix = pair.split_to(3);
2689        assert!(matches!(pair_prefix.inner, IoBufsInner::Pair(_)));
2690        assert_eq!(pair_prefix.coalesce(), b"abc");
2691        assert!(pair.is_single());
2692        assert_eq!(pair.coalesce(), b"d");
2693
2694        // Triple split paths: in-first, boundary-after-first, in-second, boundary-after-second,
2695        // and reaching into third.
2696        let mut triple = IoBufs::from(vec![
2697            IoBuf::from(b"ab"),
2698            IoBuf::from(b"cd"),
2699            IoBuf::from(b"ef"),
2700        ]);
2701        let triple_prefix = triple.split_to(1);
2702        assert!(triple_prefix.is_single());
2703        assert_eq!(triple_prefix.coalesce(), b"a");
2704        assert!(matches!(triple.inner, IoBufsInner::Triple(_)));
2705        assert_eq!(triple.coalesce(), b"bcdef");
2706
2707        let mut triple = IoBufs::from(vec![
2708            IoBuf::from(b"ab"),
2709            IoBuf::from(b"cd"),
2710            IoBuf::from(b"ef"),
2711        ]);
2712        let triple_prefix = triple.split_to(2);
2713        assert!(triple_prefix.is_single());
2714        assert_eq!(triple_prefix.coalesce(), b"ab");
2715        assert!(matches!(triple.inner, IoBufsInner::Pair(_)));
2716        assert_eq!(triple.coalesce(), b"cdef");
2717
2718        let mut triple = IoBufs::from(vec![
2719            IoBuf::from(b"ab"),
2720            IoBuf::from(b"cd"),
2721            IoBuf::from(b"ef"),
2722        ]);
2723        let triple_prefix = triple.split_to(3);
2724        assert!(matches!(triple_prefix.inner, IoBufsInner::Pair(_)));
2725        assert_eq!(triple_prefix.coalesce(), b"abc");
2726        assert!(matches!(triple.inner, IoBufsInner::Pair(_)));
2727        assert_eq!(triple.coalesce(), b"def");
2728
2729        let mut triple = IoBufs::from(vec![
2730            IoBuf::from(b"ab"),
2731            IoBuf::from(b"cd"),
2732            IoBuf::from(b"ef"),
2733        ]);
2734        let triple_prefix = triple.split_to(4);
2735        assert!(matches!(triple_prefix.inner, IoBufsInner::Pair(_)));
2736        assert_eq!(triple_prefix.coalesce(), b"abcd");
2737        assert!(triple.is_single());
2738        assert_eq!(triple.coalesce(), b"ef");
2739
2740        let mut triple = IoBufs::from(vec![
2741            IoBuf::from(b"ab"),
2742            IoBuf::from(b"cd"),
2743            IoBuf::from(b"ef"),
2744        ]);
2745        let triple_prefix = triple.split_to(5);
2746        assert!(matches!(triple_prefix.inner, IoBufsInner::Triple(_)));
2747        assert_eq!(triple_prefix.coalesce(), b"abcde");
2748        assert!(triple.is_single());
2749        assert_eq!(triple.coalesce(), b"f");
2750
2751        // Chunked split can canonicalize remainder/prefix shapes.
2752        let mut bufs = IoBufs::from(vec![
2753            IoBuf::from(b"ab"),
2754            IoBuf::from(b"cd"),
2755            IoBuf::from(b"ef"),
2756            IoBuf::from(b"gh"),
2757        ]);
2758        let prefix = bufs.split_to(4);
2759        assert!(matches!(prefix.inner, IoBufsInner::Pair(_)));
2760        assert_eq!(prefix.coalesce(), b"abcd");
2761        assert!(matches!(bufs.inner, IoBufsInner::Pair(_)));
2762        assert_eq!(bufs.coalesce(), b"efgh");
2763
2764        // Chunked split inside a chunk.
2765        let mut bufs = IoBufs::from(vec![
2766            IoBuf::from(b"ab"),
2767            IoBuf::from(b"cd"),
2768            IoBuf::from(b"ef"),
2769            IoBuf::from(b"gh"),
2770        ]);
2771        let prefix = bufs.split_to(5);
2772        assert!(matches!(prefix.inner, IoBufsInner::Triple(_)));
2773        assert_eq!(prefix.coalesce(), b"abcde");
2774        assert!(matches!(bufs.inner, IoBufsInner::Pair(_)));
2775        assert_eq!(bufs.coalesce(), b"fgh");
2776
2777        // Chunked split can remain chunked on both sides when both have >= 4 chunks.
2778        let mut bufs = IoBufs::from(vec![
2779            IoBuf::from(b"a"),
2780            IoBuf::from(b"b"),
2781            IoBuf::from(b"c"),
2782            IoBuf::from(b"d"),
2783            IoBuf::from(b"e"),
2784            IoBuf::from(b"f"),
2785            IoBuf::from(b"g"),
2786            IoBuf::from(b"h"),
2787        ]);
2788        let prefix = bufs.split_to(4);
2789        assert!(matches!(prefix.inner, IoBufsInner::Chunked(_)));
2790        assert_eq!(prefix.coalesce(), b"abcd");
2791        assert!(matches!(bufs.inner, IoBufsInner::Chunked(_)));
2792        assert_eq!(bufs.coalesce(), b"efgh");
2793
2794        // Defensive path: tolerate accidental empty chunks in non-canonical chunked input.
2795        let mut bufs = IoBufs {
2796            inner: IoBufsInner::Chunked(VecDeque::from([
2797                IoBuf::default(),
2798                IoBuf::from(b"ab"),
2799                IoBuf::from(b"cd"),
2800                IoBuf::from(b"ef"),
2801                IoBuf::from(b"gh"),
2802            ])),
2803        };
2804        let prefix = bufs.split_to(3);
2805        assert_eq!(prefix.coalesce(), b"abc");
2806        assert_eq!(bufs.coalesce(), b"defgh");
2807    }
2808
2809    #[test]
2810    #[should_panic(expected = "split_to out of bounds")]
2811    fn test_iobufs_split_to_out_of_bounds() {
2812        let mut bufs = IoBufs::from(b"abc");
2813        let _ = bufs.split_to(4);
2814    }
2815
2816    #[test]
2817    fn test_iobufs_chunk_count() {
2818        assert_eq!(IoBufs::default().chunk_count(), 0);
2819        assert_eq!(IoBufs::from(IoBuf::from(b"a")).chunk_count(), 1);
2820        assert_eq!(
2821            IoBufs::from(vec![IoBuf::from(b"b"), IoBuf::from(b"c")]).chunk_count(),
2822            2
2823        );
2824        assert_eq!(
2825            IoBufs::from(vec![
2826                IoBuf::from(b"a"),
2827                IoBuf::from(b"b"),
2828                IoBuf::from(b"c")
2829            ])
2830            .chunk_count(),
2831            3
2832        );
2833        assert_eq!(
2834            IoBufs::from(vec![
2835                IoBuf::from(b"a"),
2836                IoBuf::from(b"b"),
2837                IoBuf::from(b"c"),
2838                IoBuf::from(b"d")
2839            ])
2840            .chunk_count(),
2841            4
2842        );
2843    }
2844
2845    #[test]
2846    fn test_iobufs_coalesce_after_advance() {
2847        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2848        bufs.append(IoBuf::from(b" world"));
2849
2850        assert_eq!(bufs.len(), 11);
2851
2852        bufs.advance(3);
2853        assert_eq!(bufs.len(), 8);
2854
2855        assert_eq!(bufs.coalesce(), b"lo world");
2856    }
2857
2858    #[test]
2859    fn test_iobufs_coalesce_with_pool() {
2860        let pool = test_pool();
2861
2862        // Single buffer: zero-copy (same pointer)
2863        let buf = IoBuf::from(vec![1u8, 2, 3, 4, 5]);
2864        let original_ptr = buf.as_ptr();
2865        let bufs = IoBufs::from(buf);
2866        let coalesced = bufs.coalesce_with_pool(&pool);
2867        assert_eq!(coalesced, [1, 2, 3, 4, 5]);
2868        assert_eq!(coalesced.as_ptr(), original_ptr);
2869
2870        // Multiple buffers: merged using pool
2871        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2872        bufs.append(IoBuf::from(b" world"));
2873        let coalesced = bufs.coalesce_with_pool(&pool);
2874        assert_eq!(coalesced, b"hello world");
2875
2876        // Multiple buffers after advance: only remaining data coalesced
2877        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2878        bufs.append(IoBuf::from(b" world"));
2879        bufs.advance(3);
2880        let coalesced = bufs.coalesce_with_pool(&pool);
2881        assert_eq!(coalesced, b"lo world");
2882
2883        // Empty buffers in the middle
2884        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
2885        bufs.append(IoBuf::default());
2886        bufs.append(IoBuf::from(b" world"));
2887        let coalesced = bufs.coalesce_with_pool(&pool);
2888        assert_eq!(coalesced, b"hello world");
2889
2890        // Empty IoBufs
2891        let bufs = IoBufs::default();
2892        let coalesced = bufs.coalesce_with_pool(&pool);
2893        assert!(coalesced.is_empty());
2894
2895        // 4+ buffers: exercise chunked coalesce-with-pool path.
2896        let bufs = IoBufs::from(vec![
2897            IoBuf::from(b"ab"),
2898            IoBuf::from(b"cd"),
2899            IoBuf::from(b"ef"),
2900            IoBuf::from(b"gh"),
2901        ]);
2902        let coalesced = bufs.coalesce_with_pool(&pool);
2903        assert_eq!(coalesced, b"abcdefgh");
2904        assert!(coalesced.is_pooled());
2905    }
2906
2907    #[test]
2908    fn test_iobufs_empty_chunks_and_copy_to_bytes_paths() {
2909        // Empty chunks are skipped while reading across multiple chunks.
2910        let mut bufs = IoBufs::default();
2911        bufs.append(IoBuf::from(b"hello"));
2912        bufs.append(IoBuf::default());
2913        bufs.append(IoBuf::from(b" "));
2914        bufs.append(IoBuf::default());
2915        bufs.append(IoBuf::from(b"world"));
2916        assert_eq!(bufs.len(), 11);
2917        assert_eq!(bufs.chunk(), b"hello");
2918        bufs.advance(5);
2919        assert_eq!(bufs.chunk(), b" ");
2920        bufs.advance(1);
2921        assert_eq!(bufs.chunk(), b"world");
2922
2923        // Single-buffer copy_to_bytes path.
2924        let mut single = IoBufs::from(b"hello world");
2925        assert_eq!(single.copy_to_bytes(5).as_ref(), b"hello");
2926        assert_eq!(single.remaining(), 6);
2927
2928        // Multi-buffer copy_to_bytes path across boundaries.
2929        let mut multi = IoBufs::from(b"hello");
2930        multi.prepend(IoBuf::from(b"say "));
2931        assert_eq!(multi.copy_to_bytes(7).as_ref(), b"say hel");
2932        assert_eq!(multi.copy_to_bytes(2).as_ref(), b"lo");
2933    }
2934
2935    #[test]
2936    fn test_iobufs_copy_to_bytes_pair_and_triple() {
2937        // Pair: crossing one boundary should collapse to the trailing single chunk.
2938        let mut pair = IoBufs::from(IoBuf::from(b"ab"));
2939        pair.append(IoBuf::from(b"cd"));
2940        let first = pair.copy_to_bytes(3);
2941        assert_eq!(&first[..], b"abc");
2942        assert!(pair.is_single());
2943        assert_eq!(pair.chunk(), b"d");
2944
2945        // Triple: draining across two chunks leaves the final chunk readable.
2946        let mut triple = IoBufs::from(IoBuf::from(b"ab"));
2947        triple.append(IoBuf::from(b"cd"));
2948        triple.append(IoBuf::from(b"ef"));
2949        let first = triple.copy_to_bytes(5);
2950        assert_eq!(&first[..], b"abcde");
2951        assert!(triple.is_single());
2952        assert_eq!(triple.chunk(), b"f");
2953    }
2954
2955    #[test]
2956    fn test_iobufs_copy_to_bytes_chunked_four_plus() {
2957        let mut bufs = IoBufs::from(vec![
2958            IoBuf::from(b"ab"),
2959            IoBuf::from(b"cd"),
2960            IoBuf::from(b"ef"),
2961            IoBuf::from(b"gh"),
2962        ]);
2963
2964        // Chunked fast-path: first chunk alone satisfies request.
2965        let first = bufs.copy_to_bytes(1);
2966        assert_eq!(&first[..], b"a");
2967
2968        // Chunked slow-path: request crosses chunk boundaries.
2969        let second = bufs.copy_to_bytes(4);
2970        assert_eq!(&second[..], b"bcde");
2971
2972        let rest = bufs.copy_to_bytes(3);
2973        assert_eq!(&rest[..], b"fgh");
2974        assert_eq!(bufs.remaining(), 0);
2975    }
2976
2977    #[test]
2978    fn test_iobufs_copy_to_bytes_edge_cases() {
2979        // Leading empty chunk should not affect copied payload.
2980        let mut iobufs = IoBufs::from(IoBuf::from(b""));
2981        iobufs.append(IoBuf::from(b"hello"));
2982        assert_eq!(iobufs.copy_to_bytes(5).as_ref(), b"hello");
2983
2984        // Boundary-aligned reads should return exact chunk payloads in-order.
2985        let mut boundary = IoBufs::from(IoBuf::from(b"hello"));
2986        boundary.append(IoBuf::from(b"world"));
2987        assert_eq!(boundary.copy_to_bytes(5).as_ref(), b"hello");
2988        assert_eq!(boundary.copy_to_bytes(5).as_ref(), b"world");
2989        assert_eq!(boundary.remaining(), 0);
2990    }
2991
2992    #[test]
2993    #[should_panic(expected = "cannot advance past end of buffer")]
2994    fn test_iobufs_advance_past_end() {
2995        let mut bufs = IoBufs::from(b"hel");
2996        bufs.append(IoBuf::from(b"lo"));
2997        bufs.advance(10);
2998    }
2999
3000    #[test]
3001    #[should_panic(expected = "not enough data")]
3002    fn test_iobufs_copy_to_bytes_past_end() {
3003        let mut bufs = IoBufs::from(b"hel");
3004        bufs.append(IoBuf::from(b"lo"));
3005        bufs.copy_to_bytes(10);
3006    }
3007
3008    #[test]
3009    fn test_iobufs_matches_bytes_chain() {
3010        let b1 = Bytes::from_static(b"hello");
3011        let b2 = Bytes::from_static(b" ");
3012        let b3 = Bytes::from_static(b"world");
3013
3014        // Buf parity for remaining/chunk/advance should match `Bytes::chain`.
3015        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
3016        let mut iobufs = IoBufs::from(IoBuf::from(b1.clone()));
3017        iobufs.append(IoBuf::from(b2.clone()));
3018        iobufs.append(IoBuf::from(b3.clone()));
3019
3020        assert_eq!(chain.remaining(), iobufs.remaining());
3021        assert_eq!(chain.chunk(), iobufs.chunk());
3022
3023        chain.advance(3);
3024        iobufs.advance(3);
3025        assert_eq!(chain.remaining(), iobufs.remaining());
3026        assert_eq!(chain.chunk(), iobufs.chunk());
3027
3028        chain.advance(3);
3029        iobufs.advance(3);
3030        assert_eq!(chain.remaining(), iobufs.remaining());
3031        assert_eq!(chain.chunk(), iobufs.chunk());
3032
3033        // Test copy_to_bytes
3034        let mut chain = b1.clone().chain(b2.clone()).chain(b3.clone());
3035        let mut iobufs = IoBufs::from(IoBuf::from(b1));
3036        iobufs.append(IoBuf::from(b2));
3037        iobufs.append(IoBuf::from(b3));
3038
3039        assert_eq!(chain.copy_to_bytes(3), iobufs.copy_to_bytes(3));
3040        assert_eq!(chain.copy_to_bytes(4), iobufs.copy_to_bytes(4));
3041        assert_eq!(
3042            chain.copy_to_bytes(chain.remaining()),
3043            iobufs.copy_to_bytes(iobufs.remaining())
3044        );
3045        assert_eq!(chain.remaining(), 0);
3046        assert_eq!(iobufs.remaining(), 0);
3047    }
3048
3049    #[test]
3050    fn test_iobufs_try_into_single() {
3051        let single = IoBufs::from(IoBuf::from(b"hello"));
3052        let single = single.try_into_single().expect("single expected");
3053        assert_eq!(single, b"hello");
3054
3055        let multi = IoBufs::from(vec![IoBuf::from(b"ab"), IoBuf::from(b"cd")]);
3056        let multi = multi.try_into_single().expect_err("multi expected");
3057        assert_eq!(multi.coalesce(), b"abcd");
3058    }
3059
3060    #[test]
3061    fn test_iobufs_chunks_vectored_multiple_slices() {
3062        // Single non-empty buffers should export exactly one slice.
3063        let single = IoBufs::from(IoBuf::from(b"xy"));
3064        let mut single_dst = [IoSlice::new(&[]); 2];
3065        let count = single.chunks_vectored(&mut single_dst);
3066        assert_eq!(count, 1);
3067        assert_eq!(&single_dst[0][..], b"xy");
3068
3069        // Single empty buffers should export no slices.
3070        let empty_single = IoBufs::default();
3071        let mut empty_single_dst = [IoSlice::new(&[]); 1];
3072        assert_eq!(empty_single.chunks_vectored(&mut empty_single_dst), 0);
3073
3074        let bufs = IoBufs::from(vec![
3075            IoBuf::from(b"ab"),
3076            IoBuf::from(b"cd"),
3077            IoBuf::from(b"ef"),
3078            IoBuf::from(b"gh"),
3079        ]);
3080
3081        // Destination capacity should cap how many chunks we export.
3082        let mut small = [IoSlice::new(&[]); 2];
3083        let count = bufs.chunks_vectored(&mut small);
3084        assert_eq!(count, 2);
3085        assert_eq!(&small[0][..], b"ab");
3086        assert_eq!(&small[1][..], b"cd");
3087
3088        // Larger destination should include every readable chunk.
3089        let mut large = [IoSlice::new(&[]); 8];
3090        let count = bufs.chunks_vectored(&mut large);
3091        assert_eq!(count, 4);
3092        assert_eq!(&large[0][..], b"ab");
3093        assert_eq!(&large[1][..], b"cd");
3094        assert_eq!(&large[2][..], b"ef");
3095        assert_eq!(&large[3][..], b"gh");
3096
3097        // Empty destination cannot accept any slices.
3098        let mut empty_dst: [IoSlice<'_>; 0] = [];
3099        assert_eq!(bufs.chunks_vectored(&mut empty_dst), 0);
3100
3101        // Non-canonical shapes should skip empty leading chunks.
3102        let sparse = IoBufs {
3103            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::from(b"x")]),
3104        };
3105        let mut dst = [IoSlice::new(&[]); 2];
3106        let count = sparse.chunks_vectored(&mut dst);
3107        assert_eq!(count, 1);
3108        assert_eq!(&dst[0][..], b"x");
3109
3110        // Triple should skip empty chunks and preserve readable order.
3111        let sparse_triple = IoBufs {
3112            inner: IoBufsInner::Triple([IoBuf::default(), IoBuf::from(b"y"), IoBuf::from(b"z")]),
3113        };
3114        let mut dst = [IoSlice::new(&[]); 3];
3115        let count = sparse_triple.chunks_vectored(&mut dst);
3116        assert_eq!(count, 2);
3117        assert_eq!(&dst[0][..], b"y");
3118        assert_eq!(&dst[1][..], b"z");
3119
3120        // Chunked shapes with only empty buffers should export no slices.
3121        let empty_chunked = IoBufs {
3122            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default(), IoBuf::default()])),
3123        };
3124        let mut dst = [IoSlice::new(&[]); 2];
3125        assert_eq!(empty_chunked.chunks_vectored(&mut dst), 0);
3126    }
3127
3128    #[test]
3129    fn test_iobufsmut_freeze_chunked() {
3130        // Multiple non-empty buffers stay multi-chunk.
3131        let buf1 = IoBufMut::from(b"hello".as_ref());
3132        let buf2 = IoBufMut::from(b" world".as_ref());
3133        let bufs = IoBufsMut::from(vec![buf1, buf2]);
3134        let mut frozen = bufs.freeze();
3135        assert!(!frozen.is_single());
3136        assert_eq!(frozen.chunk(), b"hello");
3137        frozen.advance(5);
3138        assert_eq!(frozen.chunk(), b" world");
3139        frozen.advance(6);
3140        assert_eq!(frozen.remaining(), 0);
3141
3142        // Empty buffers are filtered out.
3143        let buf1 = IoBufMut::from(b"hello".as_ref());
3144        let empty = IoBufMut::default();
3145        let buf2 = IoBufMut::from(b" world".as_ref());
3146        let bufs = IoBufsMut::from(vec![buf1, empty, buf2]);
3147        let mut frozen = bufs.freeze();
3148        assert!(!frozen.is_single());
3149        assert_eq!(frozen.chunk(), b"hello");
3150        frozen.advance(5);
3151        assert_eq!(frozen.chunk(), b" world");
3152        frozen.advance(6);
3153        assert_eq!(frozen.remaining(), 0);
3154
3155        // Collapses to Single when one non-empty buffer remains
3156        let empty1 = IoBufMut::default();
3157        let buf = IoBufMut::from(b"only one".as_ref());
3158        let empty2 = IoBufMut::default();
3159        let bufs = IoBufsMut::from(vec![empty1, buf, empty2]);
3160        let frozen = bufs.freeze();
3161        assert!(frozen.is_single());
3162        assert_eq!(frozen.coalesce(), b"only one");
3163
3164        // All empty buffers -> Single with empty buffer
3165        let empty1 = IoBufMut::default();
3166        let empty2 = IoBufMut::default();
3167        let bufs = IoBufsMut::from(vec![empty1, empty2]);
3168        let frozen = bufs.freeze();
3169        assert!(frozen.is_single());
3170        assert!(frozen.is_empty());
3171    }
3172
3173    #[test]
3174    fn test_iobufsmut_coalesce() {
3175        let buf1 = IoBufMut::from(b"hello");
3176        let buf2 = IoBufMut::from(b" world");
3177        let bufs = IoBufsMut::from(vec![buf1, buf2]);
3178        let coalesced = bufs.coalesce();
3179        assert_eq!(coalesced, b"hello world");
3180    }
3181
3182    #[test]
3183    fn test_iobufsmut_from_vec() {
3184        // Empty Vec becomes Single with empty buffer
3185        let bufs = IoBufsMut::from(Vec::<IoBufMut>::new());
3186        assert!(bufs.is_single());
3187        assert!(bufs.is_empty());
3188
3189        // Vec with one element becomes Single
3190        let buf = IoBufMut::from(b"test");
3191        let bufs = IoBufsMut::from(vec![buf]);
3192        assert!(bufs.is_single());
3193        assert_eq!(bufs.chunk(), b"test");
3194
3195        // Vec with multiple elements becomes multi-chunk.
3196        let buf1 = IoBufMut::from(b"hello");
3197        let buf2 = IoBufMut::from(b" world");
3198        let bufs = IoBufsMut::from(vec![buf1, buf2]);
3199        assert!(!bufs.is_single());
3200    }
3201
3202    #[test]
3203    fn test_iobufsmut_from_vec_filters_empty_chunks() {
3204        let mut bufs = IoBufsMut::from(vec![
3205            IoBufMut::default(),
3206            IoBufMut::from(b"hello"),
3207            IoBufMut::default(),
3208            IoBufMut::from(b" world"),
3209            IoBufMut::default(),
3210        ]);
3211        assert_eq!(bufs.chunk(), b"hello");
3212        bufs.advance(5);
3213        assert_eq!(bufs.chunk(), b" world");
3214        bufs.advance(6);
3215        assert_eq!(bufs.remaining(), 0);
3216    }
3217
3218    #[test]
3219    fn test_iobufsmut_fast_path_shapes() {
3220        let pair = IoBufsMut::from(vec![IoBufMut::from(b"a"), IoBufMut::from(b"b")]);
3221        assert!(matches!(pair.inner, IoBufsMutInner::Pair(_)));
3222
3223        let triple = IoBufsMut::from(vec![
3224            IoBufMut::from(b"a"),
3225            IoBufMut::from(b"b"),
3226            IoBufMut::from(b"c"),
3227        ]);
3228        assert!(matches!(triple.inner, IoBufsMutInner::Triple(_)));
3229
3230        let chunked = IoBufsMut::from(vec![
3231            IoBufMut::from(b"a"),
3232            IoBufMut::from(b"b"),
3233            IoBufMut::from(b"c"),
3234            IoBufMut::from(b"d"),
3235        ]);
3236        assert!(matches!(chunked.inner, IoBufsMutInner::Chunked(_)));
3237    }
3238
3239    #[test]
3240    fn test_iobufsmut_default() {
3241        // Default IoBufsMut should be a single empty chunk.
3242        let bufs = IoBufsMut::default();
3243        assert!(bufs.is_single());
3244        assert!(bufs.is_empty());
3245        assert_eq!(bufs.len(), 0);
3246    }
3247
3248    #[test]
3249    fn test_iobufsmut_from_array() {
3250        // From<[u8; N]> should create a single-chunk container with the array data.
3251        let bufs = IoBufsMut::from([1u8, 2, 3, 4, 5]);
3252        assert!(bufs.is_single());
3253        assert_eq!(bufs.len(), 5);
3254        assert_eq!(bufs.chunk(), &[1, 2, 3, 4, 5]);
3255    }
3256
3257    #[test]
3258    fn test_iobufmut_buf_trait() {
3259        // Buf trait on IoBufMut: remaining/chunk/advance should work like BytesMut.
3260        let mut buf = IoBufMut::from(b"hello world");
3261        assert_eq!(buf.remaining(), 11);
3262        assert_eq!(buf.chunk(), b"hello world");
3263
3264        buf.advance(6);
3265        assert_eq!(buf.remaining(), 5);
3266        assert_eq!(buf.chunk(), b"world");
3267
3268        buf.advance(5);
3269        assert_eq!(buf.remaining(), 0);
3270        assert!(buf.chunk().is_empty());
3271    }
3272
3273    #[test]
3274    #[should_panic(expected = "cannot advance")]
3275    fn test_iobufmut_advance_past_end() {
3276        let mut buf = IoBufMut::from(b"hello");
3277        buf.advance(10);
3278    }
3279
3280    #[test]
3281    fn test_iobufsmut_buf_trait_chunked() {
3282        let buf1 = IoBufMut::from(b"hello");
3283        let buf2 = IoBufMut::from(b" ");
3284        let buf3 = IoBufMut::from(b"world");
3285        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
3286
3287        assert_eq!(bufs.remaining(), 11);
3288        assert_eq!(bufs.chunk(), b"hello");
3289
3290        // Advance within first buffer
3291        bufs.advance(3);
3292        assert_eq!(bufs.remaining(), 8);
3293        assert_eq!(bufs.chunk(), b"lo");
3294
3295        // Advance past first buffer (should pop_front)
3296        bufs.advance(2);
3297        assert_eq!(bufs.remaining(), 6);
3298        assert_eq!(bufs.chunk(), b" ");
3299
3300        // Advance exactly one buffer
3301        bufs.advance(1);
3302        assert_eq!(bufs.remaining(), 5);
3303        assert_eq!(bufs.chunk(), b"world");
3304
3305        // Advance to end
3306        bufs.advance(5);
3307        assert_eq!(bufs.remaining(), 0);
3308    }
3309
3310    #[test]
3311    #[should_panic(expected = "cannot advance past end of buffer")]
3312    fn test_iobufsmut_advance_past_end() {
3313        let buf1 = IoBufMut::from(b"hello");
3314        let buf2 = IoBufMut::from(b" world");
3315        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3316        bufs.advance(20);
3317    }
3318
3319    #[test]
3320    fn test_iobufsmut_bufmut_trait_single() {
3321        let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(20));
3322        // BytesMut can grow, so remaining_mut is very large
3323        assert!(bufs.remaining_mut() > 1000);
3324
3325        bufs.put_slice(b"hello");
3326        assert_eq!(bufs.chunk(), b"hello");
3327        assert_eq!(bufs.len(), 5);
3328
3329        bufs.put_slice(b" world");
3330        assert_eq!(bufs.coalesce(), b"hello world");
3331    }
3332
3333    #[test]
3334    fn test_iobufsmut_zeroed_write() {
3335        // Use zeroed buffers which have a fixed length
3336        let bufs = IoBufsMut::from(IoBufMut::zeroed(20));
3337        assert_eq!(bufs.len(), 20);
3338
3339        // Can write using as_mut on coalesced buffer
3340        let mut coalesced = bufs.coalesce();
3341        coalesced.as_mut()[..5].copy_from_slice(b"hello");
3342        assert_eq!(&coalesced.as_ref()[..5], b"hello");
3343    }
3344
3345    #[test]
3346    fn test_iobufsmut_bufmut_put_slice() {
3347        // Test writing across multiple buffers
3348        let buf1 = IoBufMut::with_capacity(5);
3349        let buf2 = IoBufMut::with_capacity(6);
3350        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3351
3352        // Write data
3353        bufs.put_slice(b"hello");
3354        bufs.put_slice(b" world");
3355        assert_eq!(bufs.coalesce(), b"hello world");
3356    }
3357
3358    #[test]
3359    fn test_iobufs_advance_drains_buffers() {
3360        let mut bufs = IoBufs::from(IoBuf::from(b"hello"));
3361        bufs.append(IoBuf::from(b" "));
3362        bufs.append(IoBuf::from(b"world"));
3363
3364        // Advance exactly past first buffer
3365        bufs.advance(5);
3366        assert_eq!(bufs.remaining(), 6);
3367        assert_eq!(bufs.chunk(), b" ");
3368
3369        // Advance across multiple buffers
3370        bufs.advance(4);
3371        assert_eq!(bufs.remaining(), 2);
3372        assert_eq!(bufs.chunk(), b"ld");
3373    }
3374
3375    #[test]
3376    fn test_iobufs_advance_exactly_to_boundary() {
3377        let mut bufs = IoBufs::from(IoBuf::from(b"abc"));
3378        bufs.append(IoBuf::from(b"def"));
3379
3380        // Advance exactly to first buffer boundary
3381        bufs.advance(3);
3382        assert_eq!(bufs.remaining(), 3);
3383        assert_eq!(bufs.chunk(), b"def");
3384
3385        // Advance exactly to end
3386        bufs.advance(3);
3387        assert_eq!(bufs.remaining(), 0);
3388    }
3389
3390    #[test]
3391    fn test_iobufs_advance_canonicalizes_pair_to_single() {
3392        let mut bufs = IoBufs::from(IoBuf::from(b"ab"));
3393        bufs.append(IoBuf::from(b"cd"));
3394        bufs.advance(2);
3395        assert!(bufs.is_single());
3396        assert_eq!(bufs.chunk(), b"cd");
3397    }
3398
3399    #[test]
3400    fn test_iobufsmut_with_empty_buffers() {
3401        let buf1 = IoBufMut::from(b"hello");
3402        let buf2 = IoBufMut::default();
3403        let buf3 = IoBufMut::from(b" world");
3404        let mut bufs = IoBufsMut::from(vec![buf1, buf2, buf3]);
3405
3406        assert_eq!(bufs.remaining(), 11);
3407        assert_eq!(bufs.chunk(), b"hello");
3408
3409        // Advance past first buffer
3410        bufs.advance(5);
3411        // Empty buffer should be skipped
3412        assert_eq!(bufs.chunk(), b" world");
3413        assert_eq!(bufs.remaining(), 6);
3414    }
3415
3416    #[test]
3417    fn test_iobufsmut_advance_skips_leading_writable_empty_chunk() {
3418        // A leading chunk with capacity but no readable bytes (len == 0) should
3419        // be skipped during advance, reaching the next readable chunk.
3420        let empty_writable = IoBufMut::with_capacity(4);
3421        let payload = IoBufMut::from(b"xy");
3422        let mut bufs = IoBufsMut::from(vec![empty_writable, payload]);
3423
3424        bufs.advance(1);
3425        assert_eq!(bufs.chunk(), b"y");
3426        assert_eq!(bufs.remaining(), 1);
3427    }
3428
3429    #[test]
3430    fn test_iobufsmut_coalesce_after_advance() {
3431        // Advance mid-chunk: advance 3 of 11 bytes
3432        let buf1 = IoBufMut::from(b"hello");
3433        let buf2 = IoBufMut::from(b" world");
3434        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3435
3436        bufs.advance(3);
3437        assert_eq!(bufs.coalesce(), b"lo world");
3438
3439        // Advance to exact chunk boundary: advance 5 of 11 bytes
3440        let buf1 = IoBufMut::from(b"hello");
3441        let buf2 = IoBufMut::from(b" world");
3442        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3443
3444        bufs.advance(5);
3445        assert_eq!(bufs.coalesce(), b" world");
3446    }
3447
3448    #[test]
3449    fn test_iobufsmut_copy_to_bytes() {
3450        let buf1 = IoBufMut::from(b"hello");
3451        let buf2 = IoBufMut::from(b" world");
3452        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3453
3454        // First read spans chunks and leaves unread suffix.
3455        let first = bufs.copy_to_bytes(7);
3456        assert_eq!(&first[..], b"hello w");
3457        assert_eq!(bufs.remaining(), 4);
3458
3459        // Second read drains the remainder.
3460        let rest = bufs.copy_to_bytes(4);
3461        assert_eq!(&rest[..], b"orld");
3462        assert_eq!(bufs.remaining(), 0);
3463    }
3464
3465    #[test]
3466    fn test_iobufsmut_copy_to_bytes_chunked_four_plus() {
3467        let mut bufs = IoBufsMut::from(vec![
3468            IoBufMut::from(b"ab"),
3469            IoBufMut::from(b"cd"),
3470            IoBufMut::from(b"ef"),
3471            IoBufMut::from(b"gh"),
3472        ]);
3473
3474        // Exercise chunked advance path before copy_to_bytes.
3475        bufs.advance(1);
3476        assert_eq!(bufs.chunk(), b"b");
3477        bufs.advance(1);
3478        assert_eq!(bufs.chunk(), b"cd");
3479
3480        // Chunked fast-path: first chunk alone satisfies request.
3481        let first = bufs.copy_to_bytes(1);
3482        assert_eq!(&first[..], b"c");
3483
3484        // Chunked slow-path: request crosses chunk boundaries.
3485        let second = bufs.copy_to_bytes(4);
3486        assert_eq!(&second[..], b"defg");
3487
3488        let rest = bufs.copy_to_bytes(1);
3489        assert_eq!(&rest[..], b"h");
3490        assert_eq!(bufs.remaining(), 0);
3491
3492        // Enter copy_to_bytes while still in chunked representation.
3493        let mut bufs = IoBufsMut::from(vec![
3494            IoBufMut::from(b"a"),
3495            IoBufMut::from(b"b"),
3496            IoBufMut::from(b"c"),
3497            IoBufMut::from(b"d"),
3498            IoBufMut::from(b"e"),
3499        ]);
3500        assert!(matches!(bufs.inner, IoBufsMutInner::Chunked(_)));
3501        let first = bufs.copy_to_bytes(1);
3502        assert_eq!(&first[..], b"a");
3503        // Stay chunked while consuming across multiple tiny chunks.
3504        let next = bufs.copy_to_bytes(3);
3505        assert_eq!(&next[..], b"bcd");
3506        assert_eq!(bufs.chunk(), b"e");
3507        assert_eq!(bufs.remaining(), 1);
3508    }
3509
3510    #[test]
3511    fn test_iobufsmut_copy_to_bytes_canonicalizes_pair() {
3512        let mut bufs = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3513        assert!(matches!(bufs.inner, IoBufsMutInner::Pair(_)));
3514
3515        let first = bufs.copy_to_bytes(2);
3516        assert_eq!(&first[..], b"ab");
3517
3518        assert!(bufs.is_single());
3519        assert_eq!(bufs.chunk(), b"cd");
3520        assert_eq!(bufs.remaining(), 2);
3521    }
3522
3523    #[test]
3524    fn test_iobufsmut_copy_from_slice_single() {
3525        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(11));
3526        bufs.copy_from_slice(b"hello world");
3527        assert_eq!(bufs.coalesce(), b"hello world");
3528    }
3529
3530    #[test]
3531    fn test_iobufsmut_copy_from_slice_chunked() {
3532        let buf1 = IoBufMut::zeroed(5);
3533        let buf2 = IoBufMut::zeroed(6);
3534        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3535
3536        bufs.copy_from_slice(b"hello world");
3537
3538        // Verify each chunk was filled correctly.
3539        assert_eq!(bufs.chunk(), b"hello");
3540        bufs.advance(5);
3541        assert_eq!(bufs.chunk(), b" world");
3542        bufs.advance(6);
3543        assert_eq!(bufs.remaining(), 0);
3544    }
3545
3546    #[test]
3547    #[should_panic(expected = "source slice length must match buffer length")]
3548    fn test_iobufsmut_copy_from_slice_wrong_length() {
3549        let mut bufs = IoBufsMut::from(IoBufMut::zeroed(5));
3550        bufs.copy_from_slice(b"hello world"); // 11 bytes into 5-byte buffer
3551    }
3552
3553    #[test]
3554    fn test_iobufsmut_matches_bytesmut_chain() {
3555        // Create three BytesMut with capacity
3556        let mut bm1 = BytesMut::with_capacity(5);
3557        let mut bm2 = BytesMut::with_capacity(6);
3558        let mut bm3 = BytesMut::with_capacity(7);
3559
3560        // Create matching IoBufsMut
3561        let mut iobufs = IoBufsMut::from(vec![
3562            IoBufMut::with_capacity(5),
3563            IoBufMut::with_capacity(6),
3564            IoBufMut::with_capacity(7),
3565        ]);
3566
3567        // Test initial chunk_mut length matches (spare capacity)
3568        let chain_len = (&mut bm1)
3569            .chain_mut(&mut bm2)
3570            .chain_mut(&mut bm3)
3571            .chunk_mut()
3572            .len();
3573        let iobufs_len = iobufs.chunk_mut().len();
3574        assert_eq!(chain_len, iobufs_len);
3575
3576        // Write some data
3577        (&mut bm1)
3578            .chain_mut(&mut bm2)
3579            .chain_mut(&mut bm3)
3580            .put_slice(b"hel");
3581        iobufs.put_slice(b"hel");
3582
3583        // Verify chunk_mut matches after partial write
3584        let chain_len = (&mut bm1)
3585            .chain_mut(&mut bm2)
3586            .chain_mut(&mut bm3)
3587            .chunk_mut()
3588            .len();
3589        let iobufs_len = iobufs.chunk_mut().len();
3590        assert_eq!(chain_len, iobufs_len);
3591
3592        // Write more data
3593        (&mut bm1)
3594            .chain_mut(&mut bm2)
3595            .chain_mut(&mut bm3)
3596            .put_slice(b"lo world!");
3597        iobufs.put_slice(b"lo world!");
3598
3599        // Verify chunk_mut matches after more writes
3600        let chain_len = (&mut bm1)
3601            .chain_mut(&mut bm2)
3602            .chain_mut(&mut bm3)
3603            .chunk_mut()
3604            .len();
3605        let iobufs_len = iobufs.chunk_mut().len();
3606        assert_eq!(chain_len, iobufs_len);
3607
3608        // Verify final content matches
3609        let frozen = iobufs.freeze().coalesce();
3610        let mut chain_content = bm1.to_vec();
3611        chain_content.extend_from_slice(&bm2);
3612        chain_content.extend_from_slice(&bm3);
3613        assert_eq!(frozen, chain_content.as_slice());
3614        assert_eq!(frozen, b"hello world!");
3615    }
3616
3617    #[test]
3618    fn test_iobufsmut_buf_matches_bytes_chain() {
3619        // Create pre-filled Bytes buffers
3620        let mut b1 = Bytes::from_static(b"hello");
3621        let mut b2 = Bytes::from_static(b" world");
3622        let b3 = Bytes::from_static(b"!");
3623
3624        // Create matching IoBufsMut
3625        let mut iobufs = IoBufsMut::from(vec![
3626            IoBufMut::from(b"hello"),
3627            IoBufMut::from(b" world"),
3628            IoBufMut::from(b"!"),
3629        ]);
3630
3631        // Test Buf::remaining matches
3632        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3633        assert_eq!(chain_remaining, iobufs.remaining());
3634
3635        // Test Buf::chunk matches
3636        let chain_chunk = b1
3637            .clone()
3638            .chain(b2.clone())
3639            .chain(b3.clone())
3640            .chunk()
3641            .to_vec();
3642        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3643
3644        // Advance and test again
3645        b1.advance(3);
3646        iobufs.advance(3);
3647
3648        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3649        assert_eq!(chain_remaining, iobufs.remaining());
3650
3651        let chain_chunk = b1
3652            .clone()
3653            .chain(b2.clone())
3654            .chain(b3.clone())
3655            .chunk()
3656            .to_vec();
3657        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3658
3659        // Advance past first buffer boundary into second
3660        b1.advance(2);
3661        iobufs.advance(2);
3662
3663        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3664        assert_eq!(chain_remaining, iobufs.remaining());
3665
3666        // Now we should be in the second buffer
3667        let chain_chunk = b1
3668            .clone()
3669            .chain(b2.clone())
3670            .chain(b3.clone())
3671            .chunk()
3672            .to_vec();
3673        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3674
3675        // Advance past second buffer boundary into third
3676        b2.advance(6);
3677        iobufs.advance(6);
3678
3679        let chain_remaining = b1.clone().chain(b2.clone()).chain(b3.clone()).remaining();
3680        assert_eq!(chain_remaining, iobufs.remaining());
3681
3682        // Now we should be in the third buffer
3683        let chain_chunk = b1.chain(b2).chain(b3).chunk().to_vec();
3684        assert_eq!(chain_chunk, iobufs.chunk().to_vec());
3685
3686        // Test copy_to_bytes
3687        let b1 = Bytes::from_static(b"hello");
3688        let b2 = Bytes::from_static(b" world");
3689        let b3 = Bytes::from_static(b"!");
3690        let mut iobufs = IoBufsMut::from(vec![
3691            IoBufMut::from(b"hello"),
3692            IoBufMut::from(b" world"),
3693            IoBufMut::from(b"!"),
3694        ]);
3695
3696        let chain_bytes = b1.chain(b2).chain(b3).copy_to_bytes(8);
3697        let iobufs_bytes = iobufs.copy_to_bytes(8);
3698        assert_eq!(chain_bytes, iobufs_bytes);
3699        assert_eq!(chain_bytes.as_ref(), b"hello wo");
3700    }
3701
3702    #[test]
3703    fn test_iobufsmut_chunks_vectored_multiple_slices() {
3704        // Single non-empty buffers should export exactly one slice.
3705        let single = IoBufsMut::from(IoBufMut::from(b"xy"));
3706        let mut single_dst = [IoSlice::new(&[]); 2];
3707        let count = single.chunks_vectored(&mut single_dst);
3708        assert_eq!(count, 1);
3709        assert_eq!(&single_dst[0][..], b"xy");
3710
3711        // Single empty buffers should export no slices.
3712        let empty_single = IoBufsMut::default();
3713        let mut empty_single_dst = [IoSlice::new(&[]); 1];
3714        assert_eq!(empty_single.chunks_vectored(&mut empty_single_dst), 0);
3715
3716        let bufs = IoBufsMut::from(vec![
3717            IoBufMut::from(b"ab"),
3718            IoBufMut::from(b"cd"),
3719            IoBufMut::from(b"ef"),
3720            IoBufMut::from(b"gh"),
3721        ]);
3722
3723        // Destination capacity should cap how many chunks we export.
3724        let mut small = [IoSlice::new(&[]); 2];
3725        let count = bufs.chunks_vectored(&mut small);
3726        assert_eq!(count, 2);
3727        assert_eq!(&small[0][..], b"ab");
3728        assert_eq!(&small[1][..], b"cd");
3729
3730        // Larger destination should include every readable chunk.
3731        let mut large = [IoSlice::new(&[]); 8];
3732        let count = bufs.chunks_vectored(&mut large);
3733        assert_eq!(count, 4);
3734        assert_eq!(&large[0][..], b"ab");
3735        assert_eq!(&large[1][..], b"cd");
3736        assert_eq!(&large[2][..], b"ef");
3737        assert_eq!(&large[3][..], b"gh");
3738
3739        // Empty destination cannot accept any slices.
3740        let mut empty_dst: [IoSlice<'_>; 0] = [];
3741        assert_eq!(bufs.chunks_vectored(&mut empty_dst), 0);
3742
3743        // Non-canonical shapes should skip empty leading chunks.
3744        let sparse = IoBufsMut {
3745            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::from(b"y")]),
3746        };
3747        let mut dst = [IoSlice::new(&[]); 2];
3748        let count = sparse.chunks_vectored(&mut dst);
3749        assert_eq!(count, 1);
3750        assert_eq!(&dst[0][..], b"y");
3751
3752        // Triple should skip empty chunks and preserve readable order.
3753        let sparse_triple = IoBufsMut {
3754            inner: IoBufsMutInner::Triple([
3755                IoBufMut::default(),
3756                IoBufMut::from(b"z"),
3757                IoBufMut::from(b"w"),
3758            ]),
3759        };
3760        let mut dst = [IoSlice::new(&[]); 3];
3761        let count = sparse_triple.chunks_vectored(&mut dst);
3762        assert_eq!(count, 2);
3763        assert_eq!(&dst[0][..], b"z");
3764        assert_eq!(&dst[1][..], b"w");
3765
3766        // Chunked shapes with only empty buffers should export no slices.
3767        let empty_chunked = IoBufsMut {
3768            inner: IoBufsMutInner::Chunked(VecDeque::from([
3769                IoBufMut::default(),
3770                IoBufMut::default(),
3771            ])),
3772        };
3773        let mut dst = [IoSlice::new(&[]); 2];
3774        assert_eq!(empty_chunked.chunks_vectored(&mut dst), 0);
3775    }
3776
3777    #[test]
3778    fn test_iobufsmut_try_into_single() {
3779        let single = IoBufsMut::from(IoBufMut::from(b"hello"));
3780        let single = single.try_into_single().expect("single expected");
3781        assert_eq!(single, b"hello");
3782
3783        let multi = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
3784        let multi = multi.try_into_single().expect_err("multi expected");
3785        assert_eq!(multi.coalesce(), b"abcd");
3786    }
3787
3788    #[test]
3789    fn test_iobufsmut_freeze_after_advance() {
3790        // Partial advance: advance 3 of 11 bytes
3791        let buf1 = IoBufMut::from(b"hello");
3792        let buf2 = IoBufMut::from(b" world");
3793        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3794
3795        bufs.advance(3);
3796        assert_eq!(bufs.len(), 8);
3797
3798        let frozen = bufs.freeze();
3799        assert_eq!(frozen.len(), 8);
3800        assert_eq!(frozen.coalesce(), b"lo world");
3801
3802        // Exact boundary advance: advance 5 of 11 bytes (first buf is 5 bytes)
3803        let buf1 = IoBufMut::from(b"hello");
3804        let buf2 = IoBufMut::from(b" world");
3805        let mut bufs = IoBufsMut::from(vec![buf1, buf2]);
3806
3807        bufs.advance(5);
3808        assert_eq!(bufs.len(), 6);
3809
3810        // First buffer should be fully consumed (empty after advance)
3811        // freeze() filters empty buffers, so result should be Single
3812        let frozen = bufs.freeze();
3813        assert!(frozen.is_single());
3814        assert_eq!(frozen.coalesce(), b" world");
3815    }
3816
3817    #[test]
3818    fn test_iobufsmut_coalesce_with_pool() {
3819        let pool = test_pool();
3820
3821        // Single buffer: zero-copy (same pointer)
3822        let mut buf = IoBufMut::from(b"hello");
3823        let original_ptr = buf.as_mut_ptr();
3824        let bufs = IoBufsMut::from(buf);
3825        let coalesced = bufs.coalesce_with_pool(&pool);
3826        assert_eq!(coalesced, b"hello");
3827        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
3828
3829        // Multiple buffers: merged using pool
3830        let bufs = IoBufsMut::from(vec![IoBufMut::from(b"hello"), IoBufMut::from(b" world")]);
3831        let coalesced = bufs.coalesce_with_pool(&pool);
3832        assert_eq!(coalesced, b"hello world");
3833        assert!(coalesced.is_pooled());
3834
3835        // Four chunks force the deque-backed coalesce path instead of pair/triple fast paths.
3836        let bufs = IoBufsMut::from(vec![
3837            IoBufMut::from(b"a"),
3838            IoBufMut::from(b"b"),
3839            IoBufMut::from(b"c"),
3840            IoBufMut::from(b"d"),
3841        ]);
3842        let coalesced = bufs.coalesce_with_pool(&pool);
3843        assert_eq!(coalesced, b"abcd");
3844        assert!(coalesced.is_pooled());
3845
3846        // With extra capacity: zero-copy if sufficient spare capacity
3847        let mut buf = IoBufMut::with_capacity(100);
3848        buf.put_slice(b"hello");
3849        let original_ptr = buf.as_mut_ptr();
3850        let bufs = IoBufsMut::from(buf);
3851        let coalesced = bufs.coalesce_with_pool_extra(&pool, 10);
3852        assert_eq!(coalesced, b"hello");
3853        assert_eq!(coalesced.as_ref().as_ptr(), original_ptr);
3854
3855        // With extra capacity: reallocates if insufficient
3856        let mut buf = IoBufMut::with_capacity(5);
3857        buf.put_slice(b"hello");
3858        let bufs = IoBufsMut::from(buf);
3859        let coalesced = bufs.coalesce_with_pool_extra(&pool, 100);
3860        assert_eq!(coalesced, b"hello");
3861        assert!(coalesced.capacity() >= 105);
3862    }
3863
3864    #[test]
3865    fn test_iobuf_additional_conversion_and_trait_paths() {
3866        let pool = test_pool();
3867
3868        let mut pooled_mut = pool.alloc(4);
3869        pooled_mut.put_slice(b"data");
3870        let pooled = pooled_mut.freeze();
3871        assert!(!pooled.as_ptr().is_null());
3872
3873        let unique = IoBuf::from(Bytes::from(vec![1u8, 2, 3]));
3874        let unique_mut = unique.try_into_mut().expect("unique bytes should convert");
3875        assert_eq!(unique_mut.as_ref(), &[1u8, 2, 3]);
3876
3877        let shared = IoBuf::from(Bytes::from(vec![4u8, 5, 6]));
3878        let _shared_clone = shared.clone();
3879        assert!(shared.try_into_mut().is_err());
3880
3881        let expected: &[u8] = &[9u8, 8];
3882        let eq_buf = IoBuf::from(vec![9u8, 8]);
3883        assert!(PartialEq::<[u8]>::eq(&eq_buf, expected));
3884
3885        let static_slice: &'static [u8] = b"static";
3886        assert_eq!(IoBuf::from(static_slice), b"static");
3887
3888        let mut pooled_mut = pool.alloc(3);
3889        pooled_mut.put_slice(b"xyz");
3890        let pooled = pooled_mut.freeze();
3891        let vec_out: Vec<u8> = pooled.clone().into();
3892        let bytes_out: Bytes = pooled.into();
3893        assert_eq!(vec_out, b"xyz");
3894        assert_eq!(bytes_out.as_ref(), b"xyz");
3895    }
3896
3897    #[test]
3898    fn test_iobufmut_additional_conversion_and_trait_paths() {
3899        // Basic mutable operations should keep readable bytes consistent.
3900        let mut buf = IoBufMut::from(vec![1u8, 2, 3, 4]);
3901        assert!(!buf.is_empty());
3902        buf.truncate(2);
3903        assert_eq!(buf.as_ref(), &[1u8, 2]);
3904        buf.clear();
3905        assert!(buf.is_empty());
3906        buf.put_slice(b"xyz");
3907
3908        // Equality should work across slice, array, and byte-string forms.
3909        let expected: &[u8] = b"xyz";
3910        assert!(PartialEq::<[u8]>::eq(&buf, expected));
3911        assert!(buf == b"xyz"[..]);
3912        assert!(buf == [b'x', b'y', b'z']);
3913        assert!(buf == b"xyz");
3914
3915        // Conversions from common owned/shared containers preserve contents.
3916        let from_vec = IoBufMut::from(vec![7u8, 8]);
3917        assert_eq!(from_vec.as_ref(), &[7u8, 8]);
3918
3919        let from_bytesmut = IoBufMut::from(BytesMut::from(&b"hi"[..]));
3920        assert_eq!(from_bytesmut.as_ref(), b"hi");
3921
3922        let from_bytes = IoBufMut::from(Bytes::from_static(b"ok"));
3923        assert_eq!(from_bytes.as_ref(), b"ok");
3924
3925        // `Bytes::from_static` cannot be converted to mutable without copy.
3926        let from_iobuf = IoBufMut::from(IoBuf::from(Bytes::from_static(b"io")));
3927        assert_eq!(from_iobuf.as_ref(), b"io");
3928    }
3929
3930    #[test]
3931    fn test_iobuf_aligned_public_paths() {
3932        // Exercise the public IoBuf/IoBufMut API through the untracked aligned
3933        // backing: write, advance, copy_to_bytes, freeze, slice, split_to,
3934        // try_into_mut, and From/Into conversions.
3935        static ARRAY: &[u8; 4] = b"wxyz";
3936
3937        let alignment = NonZeroUsize::new(64).expect("non-zero alignment");
3938
3939        // Start from a non-zero untracked aligned buffer to cover the public mutable API.
3940        let mut aligned_mut = IoBufMut::with_alignment(8, alignment);
3941        assert!(!aligned_mut.is_pooled());
3942        assert!(aligned_mut.is_empty());
3943        assert_eq!(aligned_mut.capacity(), 8);
3944        assert!((aligned_mut.as_mut_ptr() as usize).is_multiple_of(64));
3945
3946        aligned_mut.put_slice(b"abcdefgh");
3947        assert_eq!(aligned_mut.as_mut(), b"abcdefgh");
3948        assert_eq!(aligned_mut.chunk(), b"abcdefgh");
3949        aligned_mut.advance(2);
3950        assert_eq!(aligned_mut.chunk(), b"cdefgh");
3951
3952        let partial = aligned_mut.copy_to_bytes(2);
3953        assert_eq!(partial.as_ref(), b"cd");
3954        assert_eq!(aligned_mut.as_ref(), b"efgh");
3955        let empty = aligned_mut.copy_to_bytes(0);
3956        assert!(empty.is_empty());
3957        assert_eq!(aligned_mut.as_ref(), b"efgh");
3958
3959        aligned_mut.clear();
3960        assert!(aligned_mut.is_empty());
3961        aligned_mut.put_slice(ARRAY);
3962        assert!(aligned_mut == ARRAY);
3963
3964        // Full aligned drains should use the owner-transfer path, including len == 0 first.
3965        let mut fully_drained = IoBufMut::with_alignment(4, alignment);
3966        fully_drained.put_slice(b"lmno");
3967        let empty = fully_drained.copy_to_bytes(0);
3968        assert!(empty.is_empty());
3969        assert_eq!(fully_drained.as_ref(), b"lmno");
3970        let drained = fully_drained.copy_to_bytes(4);
3971        assert_eq!(drained.as_ref(), b"lmno");
3972        assert!(fully_drained.is_empty());
3973
3974        // Freeze to an immutable aligned `IoBuf` and exercise its view/Buf dispatch.
3975        let aligned = aligned_mut.freeze();
3976        assert!(!aligned.is_pooled());
3977        assert_eq!(aligned.as_ref(), &ARRAY[..]);
3978        assert!(aligned == ARRAY);
3979        assert!(!aligned.as_ptr().is_null());
3980        assert_eq!(aligned.slice(..2), b"wx");
3981        assert_eq!(aligned.slice(1..), b"xyz");
3982        assert_eq!(aligned.slice(1..=2), b"xy");
3983        assert_eq!(aligned.chunk(), b"wxyz");
3984
3985        let mut split = aligned.clone();
3986        let prefix = split.split_to(2);
3987        assert_eq!(prefix, b"wx");
3988        assert_eq!(split, b"yz");
3989
3990        let mut advanced = aligned.clone();
3991        advanced.advance(2);
3992        assert_eq!(advanced.chunk(), b"yz");
3993
3994        // Partial and full immutable drains should preserve the aligned backing behavior.
3995        let mut drained = aligned.clone();
3996        let empty = drained.copy_to_bytes(0);
3997        assert!(empty.is_empty());
3998        assert_eq!(drained.as_ref(), &ARRAY[..]);
3999        let first = drained.copy_to_bytes(1);
4000        assert_eq!(first.as_ref(), b"w");
4001        let rest = drained.copy_to_bytes(3);
4002        assert_eq!(rest.as_ref(), b"xyz");
4003        assert_eq!(drained.remaining(), 0);
4004
4005        // Unique aligned immutable buffers can become mutable again.
4006        let mut unique_source = IoBufMut::zeroed_with_alignment(4, alignment);
4007        unique_source.as_mut().copy_from_slice(b"pqrs");
4008        let unique = unique_source.freeze();
4009        let recovered = unique
4010            .try_into_mut()
4011            .expect("unique aligned iobuf should recover mutability");
4012        assert_eq!(recovered.as_ref(), b"pqrs");
4013
4014        // Shared aligned immutable buffers must reject the mutable conversion.
4015        let mut shared_source = IoBufMut::zeroed_with_alignment(4, alignment);
4016        shared_source.as_mut().copy_from_slice(b"tuvw");
4017        let shared = shared_source.freeze();
4018        let _shared_clone = shared.clone();
4019        assert!(shared.try_into_mut().is_err());
4020
4021        // Owned/container conversions should preserve bytes for aligned backings.
4022        let vec_out: Vec<u8> = aligned.clone().into();
4023        let bytes_out: Bytes = aligned.into();
4024        assert_eq!(vec_out, ARRAY.to_vec());
4025        assert_eq!(bytes_out.as_ref(), &ARRAY[..]);
4026
4027        let from_array = IoBuf::from(ARRAY);
4028        assert_eq!(from_array, b"wxyz");
4029
4030        let iobufs = IoBufs::from(ARRAY);
4031        assert_eq!(iobufs.chunk(), b"wxyz");
4032    }
4033
4034    #[test]
4035    fn test_iobufmut_aligned_zero_length_constructors() {
4036        let alignment = NonZeroUsize::new(64).expect("non-zero alignment");
4037
4038        let with_alignment = IoBufMut::with_alignment(0, alignment);
4039        assert!(with_alignment.is_empty());
4040        assert_eq!(with_alignment.len(), 0);
4041        assert_eq!(with_alignment.capacity(), 0);
4042
4043        let zeroed = IoBufMut::zeroed_with_alignment(0, alignment);
4044        assert!(zeroed.is_empty());
4045        assert_eq!(zeroed.len(), 0);
4046        assert_eq!(zeroed.capacity(), 0);
4047    }
4048
4049    #[test]
4050    fn test_iobufs_additional_shape_and_conversion_paths() {
4051        let pool = test_pool();
4052
4053        // Constructor coverage for mutable/immutable/slice-backed inputs.
4054        let from_mut = IoBufs::from(IoBufMut::from(b"m"));
4055        assert_eq!(from_mut.chunk(), b"m");
4056        let from_bytes = IoBufs::from(Bytes::from_static(b"b"));
4057        assert_eq!(from_bytes.chunk(), b"b");
4058        let from_bytesmut = IoBufs::from(BytesMut::from(&b"bm"[..]));
4059        assert_eq!(from_bytesmut.chunk(), b"bm");
4060        let from_vec = IoBufs::from(vec![1u8, 2u8]);
4061        assert_eq!(from_vec.chunk(), &[1u8, 2]);
4062        let static_slice: &'static [u8] = b"slice";
4063        let from_static = IoBufs::from(static_slice);
4064        assert_eq!(from_static.chunk(), b"slice");
4065
4066        // Canonicalizing an already-empty buffer remains a single empty chunk.
4067        let mut single_empty = IoBufs::default();
4068        single_empty.canonicalize();
4069        assert!(single_empty.is_single());
4070
4071        // Triple path: prepend/append can promote into chunked while preserving order.
4072        let mut triple = IoBufs::from(vec![
4073            IoBuf::from(b"a".to_vec()),
4074            IoBuf::from(b"b".to_vec()),
4075            IoBuf::from(b"c".to_vec()),
4076        ]);
4077        assert!(triple.as_single().is_none());
4078        triple.prepend(IoBuf::from(vec![b'0']));
4079        triple.prepend(IoBuf::from(vec![b'1']));
4080        triple.append(IoBuf::from(vec![b'2']));
4081        assert_eq!(triple.copy_to_bytes(triple.remaining()).as_ref(), b"10abc2");
4082
4083        // Appending to an existing triple keeps byte order stable.
4084        let mut triple_append = IoBufs::from(vec![
4085            IoBuf::from(b"x".to_vec()),
4086            IoBuf::from(b"y".to_vec()),
4087            IoBuf::from(b"z".to_vec()),
4088        ]);
4089        triple_append.append(IoBuf::from(vec![b'w']));
4090        assert_eq!(triple_append.coalesce(), b"xyzw");
4091
4092        // coalesce_with_pool on a triple should preserve contents.
4093        let triple_pool = IoBufs::from(vec![
4094            IoBuf::from(b"a".to_vec()),
4095            IoBuf::from(b"b".to_vec()),
4096            IoBuf::from(b"c".to_vec()),
4097        ]);
4098        assert_eq!(triple_pool.coalesce_with_pool(&pool), b"abc");
4099
4100        // coalesce_with_pool on 4+ chunks should read only remaining bytes.
4101        let mut chunked_pool = IoBufs::from(vec![
4102            IoBuf::from(b"a".to_vec()),
4103            IoBuf::from(b"b".to_vec()),
4104            IoBuf::from(b"c".to_vec()),
4105            IoBuf::from(b"d".to_vec()),
4106        ]);
4107        assert_eq!(chunked_pool.remaining(), 4);
4108        chunked_pool.advance(1);
4109        assert_eq!(chunked_pool.coalesce_with_pool(&pool), b"bcd");
4110
4111        // Non-canonical Pair/Triple/Chunked shapes should still expose the first readable chunk.
4112        let pair_second = IoBufs {
4113            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::from(vec![1u8])]),
4114        };
4115        assert_eq!(pair_second.chunk(), &[1u8]);
4116        let pair_empty = IoBufs {
4117            inner: IoBufsInner::Pair([IoBuf::default(), IoBuf::default()]),
4118        };
4119        assert_eq!(pair_empty.chunk(), b"");
4120
4121        let triple_third = IoBufs {
4122            inner: IoBufsInner::Triple([
4123                IoBuf::default(),
4124                IoBuf::default(),
4125                IoBuf::from(vec![3u8]),
4126            ]),
4127        };
4128        assert_eq!(triple_third.chunk(), &[3u8]);
4129        let triple_second = IoBufs {
4130            inner: IoBufsInner::Triple([
4131                IoBuf::default(),
4132                IoBuf::from(vec![2u8]),
4133                IoBuf::default(),
4134            ]),
4135        };
4136        assert_eq!(triple_second.chunk(), &[2u8]);
4137        let triple_empty = IoBufs {
4138            inner: IoBufsInner::Triple([IoBuf::default(), IoBuf::default(), IoBuf::default()]),
4139        };
4140        assert_eq!(triple_empty.chunk(), b"");
4141
4142        let chunked_second = IoBufs {
4143            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default(), IoBuf::from(vec![9u8])])),
4144        };
4145        assert_eq!(chunked_second.chunk(), &[9u8]);
4146        let chunked_empty = IoBufs {
4147            inner: IoBufsInner::Chunked(VecDeque::from([IoBuf::default()])),
4148        };
4149        assert_eq!(chunked_empty.chunk(), b"");
4150    }
4151
4152    #[test]
4153    fn test_iobufsmut_additional_shape_and_conversion_paths() {
4154        // `as_single` accessors should work only for single-shape containers.
4155        let mut single = IoBufsMut::from(IoBufMut::from(b"x"));
4156        assert!(single.as_single().is_some());
4157        assert!(single.as_single_mut().is_some());
4158        single.canonicalize();
4159        assert!(single.is_single());
4160
4161        let mut pair = IoBufsMut::from(vec![IoBufMut::from(b"a"), IoBufMut::from(b"b")]);
4162        assert!(pair.as_single().is_none());
4163        assert!(pair.as_single_mut().is_none());
4164
4165        // Constructor coverage for raw vec and BytesMut sources.
4166        let from_vec = IoBufsMut::from(vec![1u8, 2u8]);
4167        assert_eq!(from_vec.chunk(), &[1u8, 2]);
4168        let from_bytesmut = IoBufsMut::from(BytesMut::from(&b"cd"[..]));
4169        assert_eq!(from_bytesmut.chunk(), b"cd");
4170
4171        // Chunked write path: set_len + copy_from_slice + freeze round-trip.
4172        let mut chunked = IoBufsMut::from(vec![
4173            IoBufMut::with_capacity(1),
4174            IoBufMut::with_capacity(1),
4175            IoBufMut::with_capacity(1),
4176            IoBufMut::with_capacity(1),
4177        ]);
4178        // SAFETY: We only write/read initialized bytes after `copy_from_slice`.
4179        unsafe { chunked.set_len(4) };
4180        chunked.copy_from_slice(b"wxyz");
4181        assert_eq!(chunked.capacity(), 4);
4182        assert_eq!(chunked.remaining(), 4);
4183        let frozen = chunked.freeze();
4184        assert_eq!(frozen.coalesce(), b"wxyz");
4185    }
4186
4187    #[test]
4188    fn test_iobufsmut_coalesce_multi_shape_paths() {
4189        let pool = test_pool();
4190
4191        // Pair: plain coalesce and pool-backed coalesce-with-extra.
4192        let pair = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
4193        assert_eq!(pair.coalesce(), b"abcd");
4194        let pair = IoBufsMut::from(vec![IoBufMut::from(b"ab"), IoBufMut::from(b"cd")]);
4195        let pair_extra = pair.coalesce_with_pool_extra(&pool, 3);
4196        assert_eq!(pair_extra, b"abcd");
4197        assert!(pair_extra.capacity() >= 7);
4198
4199        // Triple: both coalesce paths should preserve payload and requested spare capacity.
4200        let triple = IoBufsMut::from(vec![
4201            IoBufMut::from(b"a"),
4202            IoBufMut::from(b"b"),
4203            IoBufMut::from(b"c"),
4204        ]);
4205        assert_eq!(triple.coalesce(), b"abc");
4206        let triple = IoBufsMut::from(vec![
4207            IoBufMut::from(b"a"),
4208            IoBufMut::from(b"b"),
4209            IoBufMut::from(b"c"),
4210        ]);
4211        let triple_extra = triple.coalesce_with_pool_extra(&pool, 2);
4212        assert_eq!(triple_extra, b"abc");
4213        assert!(triple_extra.capacity() >= 5);
4214
4215        // Chunked (4+): same expectations as pair/triple for content + capacity.
4216        let chunked = IoBufsMut::from(vec![
4217            IoBufMut::from(b"1"),
4218            IoBufMut::from(b"2"),
4219            IoBufMut::from(b"3"),
4220            IoBufMut::from(b"4"),
4221        ]);
4222        assert_eq!(chunked.coalesce(), b"1234");
4223        let chunked = IoBufsMut::from(vec![
4224            IoBufMut::from(b"1"),
4225            IoBufMut::from(b"2"),
4226            IoBufMut::from(b"3"),
4227            IoBufMut::from(b"4"),
4228        ]);
4229        let chunked_extra = chunked.coalesce_with_pool_extra(&pool, 5);
4230        assert_eq!(chunked_extra, b"1234");
4231        assert!(chunked_extra.capacity() >= 9);
4232    }
4233
4234    #[test]
4235    fn test_iobufsmut_noncanonical_chunk_and_chunk_mut_paths() {
4236        fn no_spare_capacity_buf(pool: &BufferPool) -> IoBufMut {
4237            let mut buf = pool.alloc(1);
4238            let cap = buf.capacity();
4239            // SAFETY: We never read from this buffer in this helper.
4240            unsafe { buf.set_len(cap) };
4241            buf
4242        }
4243        let pool = test_pool();
4244
4245        // `chunk()` should skip empty front buffers across all shapes.
4246        let pair_second = IoBufsMut {
4247            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::from(b"b")]),
4248        };
4249        assert_eq!(pair_second.chunk(), b"b");
4250        let pair_empty = IoBufsMut {
4251            inner: IoBufsMutInner::Pair([IoBufMut::default(), IoBufMut::default()]),
4252        };
4253        assert_eq!(pair_empty.chunk(), b"");
4254
4255        let triple_third = IoBufsMut {
4256            inner: IoBufsMutInner::Triple([
4257                IoBufMut::default(),
4258                IoBufMut::default(),
4259                IoBufMut::from(b"c"),
4260            ]),
4261        };
4262        assert_eq!(triple_third.chunk(), b"c");
4263        let triple_second = IoBufsMut {
4264            inner: IoBufsMutInner::Triple([
4265                IoBufMut::default(),
4266                IoBufMut::from(b"b"),
4267                IoBufMut::default(),
4268            ]),
4269        };
4270        assert_eq!(triple_second.chunk(), b"b");
4271        let triple_empty = IoBufsMut {
4272            inner: IoBufsMutInner::Triple([
4273                IoBufMut::default(),
4274                IoBufMut::default(),
4275                IoBufMut::default(),
4276            ]),
4277        };
4278        assert_eq!(triple_empty.chunk(), b"");
4279
4280        let chunked_second = IoBufsMut {
4281            inner: IoBufsMutInner::Chunked(VecDeque::from([
4282                IoBufMut::default(),
4283                IoBufMut::from(b"d"),
4284            ])),
4285        };
4286        assert_eq!(chunked_second.chunk(), b"d");
4287        let chunked_empty = IoBufsMut {
4288            inner: IoBufsMutInner::Chunked(VecDeque::from([IoBufMut::default()])),
4289        };
4290        assert_eq!(chunked_empty.chunk(), b"");
4291
4292        // `chunk_mut()` should skip non-writable fronts and return first writable chunk.
4293        let mut pair_chunk_mut = IoBufsMut {
4294            inner: IoBufsMutInner::Pair([no_spare_capacity_buf(&pool), IoBufMut::with_capacity(2)]),
4295        };
4296        assert!(pair_chunk_mut.chunk_mut().len() >= 2);
4297
4298        let mut pair_chunk_mut_empty = IoBufsMut {
4299            inner: IoBufsMutInner::Pair([
4300                no_spare_capacity_buf(&pool),
4301                no_spare_capacity_buf(&pool),
4302            ]),
4303        };
4304        assert_eq!(pair_chunk_mut_empty.chunk_mut().len(), 0);
4305
4306        let mut triple_chunk_mut = IoBufsMut {
4307            inner: IoBufsMutInner::Triple([
4308                no_spare_capacity_buf(&pool),
4309                no_spare_capacity_buf(&pool),
4310                IoBufMut::with_capacity(3),
4311            ]),
4312        };
4313        assert!(triple_chunk_mut.chunk_mut().len() >= 3);
4314        let mut triple_chunk_mut_second = IoBufsMut {
4315            inner: IoBufsMutInner::Triple([
4316                no_spare_capacity_buf(&pool),
4317                IoBufMut::with_capacity(2),
4318                no_spare_capacity_buf(&pool),
4319            ]),
4320        };
4321        assert!(triple_chunk_mut_second.chunk_mut().len() >= 2);
4322
4323        let mut triple_chunk_mut_empty = IoBufsMut {
4324            inner: IoBufsMutInner::Triple([
4325                no_spare_capacity_buf(&pool),
4326                no_spare_capacity_buf(&pool),
4327                no_spare_capacity_buf(&pool),
4328            ]),
4329        };
4330        assert_eq!(triple_chunk_mut_empty.chunk_mut().len(), 0);
4331
4332        let mut chunked_chunk_mut = IoBufsMut {
4333            inner: IoBufsMutInner::Chunked(VecDeque::from([
4334                IoBufMut::default(),
4335                IoBufMut::with_capacity(4),
4336            ])),
4337        };
4338        assert!(chunked_chunk_mut.chunk_mut().len() >= 4);
4339
4340        let mut chunked_chunk_mut_empty = IoBufsMut {
4341            inner: IoBufsMutInner::Chunked(VecDeque::from([no_spare_capacity_buf(&pool)])),
4342        };
4343        assert_eq!(chunked_chunk_mut_empty.chunk_mut().len(), 0);
4344    }
4345
4346    #[test]
4347    fn test_iobuf_internal_chunk_helpers() {
4348        // `copy_to_bytes_chunked` should drop leading empties on zero-length reads.
4349        let mut empty_with_leading = VecDeque::from([IoBuf::default()]);
4350        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut empty_with_leading, 0, "x");
4351        assert!(bytes.is_empty());
4352        assert!(!needs_canonicalize);
4353        assert!(empty_with_leading.is_empty());
4354
4355        // Fast path: front chunk can fully satisfy the request.
4356        let mut fast = VecDeque::from([
4357            IoBuf::from(b"ab".to_vec()),
4358            IoBuf::from(b"cd".to_vec()),
4359            IoBuf::from(b"ef".to_vec()),
4360            IoBuf::from(b"gh".to_vec()),
4361        ]);
4362        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut fast, 2, "x");
4363        assert_eq!(bytes.as_ref(), b"ab");
4364        assert!(needs_canonicalize);
4365        assert_eq!(fast.front().expect("front exists").as_ref(), b"cd");
4366
4367        // Slow path: request spans multiple chunks.
4368        let mut slow = VecDeque::from([
4369            IoBuf::from(b"a".to_vec()),
4370            IoBuf::from(b"bc".to_vec()),
4371            IoBuf::from(b"d".to_vec()),
4372            IoBuf::from(b"e".to_vec()),
4373        ]);
4374        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut slow, 3, "x");
4375        assert_eq!(bytes.as_ref(), b"abc");
4376        assert!(needs_canonicalize);
4377
4378        let mut empty_with_leading_mut = VecDeque::from([IoBufMut::default()]);
4379        let (bytes, needs_canonicalize) =
4380            copy_to_bytes_chunked(&mut empty_with_leading_mut, 0, "x");
4381        assert!(bytes.is_empty());
4382        assert!(!needs_canonicalize);
4383        assert!(empty_with_leading_mut.is_empty());
4384
4385        // Mirror the fast/slow chunked helper paths for mutable chunks too.
4386        let mut fast_mut = VecDeque::from([
4387            IoBufMut::from(b"ab"),
4388            IoBufMut::from(b"cd"),
4389            IoBufMut::from(b"ef"),
4390            IoBufMut::from(b"gh"),
4391        ]);
4392        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut fast_mut, 2, "x");
4393        assert_eq!(bytes.as_ref(), b"ab");
4394        assert!(needs_canonicalize);
4395        assert_eq!(fast_mut.front().expect("front exists").as_ref(), b"cd");
4396
4397        let mut slow_mut = VecDeque::from([
4398            IoBufMut::from(b"a"),
4399            IoBufMut::from(b"bc"),
4400            IoBufMut::from(b"de"),
4401            IoBufMut::from(b"f"),
4402        ]);
4403        let (bytes, needs_canonicalize) = copy_to_bytes_chunked(&mut slow_mut, 4, "x");
4404        assert_eq!(bytes.as_ref(), b"abcd");
4405        assert!(needs_canonicalize);
4406        assert_eq!(slow_mut.front().expect("front exists").as_ref(), b"e");
4407
4408        // `advance_chunked_front` should skip empties and drain in linear order.
4409        let mut advance_chunked = VecDeque::from([
4410            IoBuf::default(),
4411            IoBuf::from(b"abc".to_vec()),
4412            IoBuf::from(b"d".to_vec()),
4413        ]);
4414        advance_chunked_front(&mut advance_chunked, 2);
4415        assert_eq!(
4416            advance_chunked.front().expect("front exists").as_ref(),
4417            b"c"
4418        );
4419        advance_chunked_front(&mut advance_chunked, 2);
4420        assert!(advance_chunked.is_empty());
4421
4422        // The front-advance helper also has a separate mutable monomorphization.
4423        let mut advance_chunked_mut = VecDeque::from([
4424            IoBufMut::default(),
4425            IoBufMut::from(b"abc"),
4426            IoBufMut::from(b"d"),
4427        ]);
4428        advance_chunked_front(&mut advance_chunked_mut, 2);
4429        assert_eq!(
4430            advance_chunked_mut.front().expect("front exists").as_ref(),
4431            b"c"
4432        );
4433        advance_chunked_front(&mut advance_chunked_mut, 2);
4434        assert!(advance_chunked_mut.is_empty());
4435
4436        // `advance_small_chunks` signals canonicalization when front chunks are exhausted.
4437        let mut small = [IoBuf::default(), IoBuf::from(b"abc".to_vec())];
4438        let needs_canonicalize = advance_small_chunks(&mut small, 2);
4439        assert!(needs_canonicalize);
4440        assert_eq!(small[1].as_ref(), b"c");
4441
4442        let mut small_exact = [
4443            IoBuf::from(b"a".to_vec()),
4444            IoBuf::from(b"b".to_vec()),
4445            IoBuf::from(b"c".to_vec()),
4446        ];
4447        let needs_canonicalize = advance_small_chunks(&mut small_exact, 3);
4448        assert!(needs_canonicalize);
4449        assert_eq!(small_exact[0].remaining(), 0);
4450        assert_eq!(small_exact[1].remaining(), 0);
4451        assert_eq!(small_exact[2].remaining(), 0);
4452
4453        // Small-chunk copy canonicalization is also instantiated for mutable chunks.
4454        let mut small_mut = [
4455            IoBufMut::from(b"a"),
4456            IoBufMut::from(b"bc"),
4457            IoBufMut::from(b"d"),
4458        ];
4459        let (bytes, needs_canonicalize) = copy_to_bytes_small_chunks(&mut small_mut, 3, "x");
4460        assert_eq!(bytes.as_ref(), b"abc");
4461        assert!(needs_canonicalize);
4462        assert_eq!(small_mut[2].as_ref(), b"d");
4463
4464        // `advance_mut_in_chunks` returns whether the request fully fit in writable chunks.
4465        let mut writable = [IoBufMut::with_capacity(2), IoBufMut::with_capacity(1)];
4466        let mut remaining = 3usize;
4467        // SAFETY: We do not read from advanced bytes in this test.
4468        let all_advanced = unsafe { advance_mut_in_chunks(&mut writable, &mut remaining) };
4469        assert!(all_advanced);
4470        assert_eq!(remaining, 0);
4471
4472        // `advance_mut_in_chunks` should skip non-writable chunks.
4473        let pool = test_pool();
4474        let mut full = pool.alloc(1);
4475        // SAFETY: We only mark initialized capacity; bytes are not read.
4476        unsafe { full.set_len(full.capacity()) };
4477        let mut writable_after_full = [full, IoBufMut::with_capacity(2)];
4478        let mut remaining = 2usize;
4479        // SAFETY: We do not read from advanced bytes in this test.
4480        let all_advanced =
4481            unsafe { advance_mut_in_chunks(&mut writable_after_full, &mut remaining) };
4482        assert!(all_advanced);
4483        assert_eq!(remaining, 0);
4484
4485        let mut writable_short = [IoBufMut::with_capacity(1), IoBufMut::with_capacity(1)];
4486        let mut remaining = 3usize;
4487        // SAFETY: We do not read from advanced bytes in this test.
4488        let all_advanced = unsafe { advance_mut_in_chunks(&mut writable_short, &mut remaining) };
4489        assert!(!all_advanced);
4490        assert_eq!(remaining, 1);
4491    }
4492
4493    #[test]
4494    fn test_iobufsmut_advance_mut_success_paths() {
4495        // Pair path.
4496        let mut pair = IoBufsMut {
4497            inner: IoBufsMutInner::Pair([IoBufMut::with_capacity(2), IoBufMut::with_capacity(2)]),
4498        };
4499        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
4500        unsafe { pair.advance_mut(3) };
4501        assert_eq!(pair.remaining(), 3);
4502
4503        // Triple path.
4504        let mut triple = IoBufsMut {
4505            inner: IoBufsMutInner::Triple([
4506                IoBufMut::with_capacity(1),
4507                IoBufMut::with_capacity(1),
4508                IoBufMut::with_capacity(1),
4509            ]),
4510        };
4511        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
4512        unsafe { triple.advance_mut(2) };
4513        assert_eq!(triple.remaining(), 2);
4514
4515        // Chunked wrapped-VecDeque path.
4516        let mut wrapped = VecDeque::with_capacity(5);
4517        wrapped.push_back(IoBufMut::with_capacity(1));
4518        wrapped.push_back(IoBufMut::with_capacity(1));
4519        wrapped.push_back(IoBufMut::with_capacity(1));
4520        wrapped.push_back(IoBufMut::with_capacity(1));
4521        wrapped.push_back(IoBufMut::with_capacity(1));
4522        let _ = wrapped.pop_front();
4523        wrapped.push_back(IoBufMut::with_capacity(1));
4524        let (first, second) = wrapped.as_slices();
4525        assert!(!first.is_empty());
4526        assert!(!second.is_empty());
4527
4528        // Force `advance_mut` to consume across the wrapped second slice as well.
4529        let to_advance = first.len() + 1;
4530        let mut chunked = IoBufsMut {
4531            inner: IoBufsMutInner::Chunked(wrapped),
4532        };
4533        // SAFETY: We only verify cursor movement (`remaining`) and do not read bytes.
4534        unsafe { chunked.advance_mut(to_advance) };
4535        assert_eq!(chunked.remaining(), to_advance);
4536        assert!(chunked.remaining_mut() > 0);
4537    }
4538
4539    #[test]
4540    fn test_iobufsmut_advance_mut_zero_noop_when_full() {
4541        fn full_chunk(pool: &BufferPool) -> IoBufMut {
4542            // Pooled buffers have bounded class capacity (unlike growable Bytes),
4543            // so force len == capacity to make remaining_mut() == 0.
4544            let mut buf = pool.alloc(1);
4545            let cap = buf.capacity();
4546            // SAFETY: We never read from this buffer in this test.
4547            unsafe { buf.set_len(cap) };
4548            buf
4549        }
4550
4551        let pool = test_pool();
4552
4553        // Pair path: fully-written chunks should allow advance_mut(0) as a no-op.
4554        let mut pair = IoBufsMut::from(vec![full_chunk(&pool), full_chunk(&pool)]);
4555        assert!(matches!(pair.inner, IoBufsMutInner::Pair(_)));
4556        assert_eq!(pair.remaining_mut(), 0);
4557        let before = pair.remaining();
4558        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4559        unsafe { pair.advance_mut(0) };
4560        assert_eq!(pair.remaining(), before);
4561
4562        // Triple path: same no-op behavior.
4563        let mut triple = IoBufsMut::from(vec![
4564            full_chunk(&pool),
4565            full_chunk(&pool),
4566            full_chunk(&pool),
4567        ]);
4568        assert!(matches!(triple.inner, IoBufsMutInner::Triple(_)));
4569        assert_eq!(triple.remaining_mut(), 0);
4570        let before = triple.remaining();
4571        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4572        unsafe { triple.advance_mut(0) };
4573        assert_eq!(triple.remaining(), before);
4574
4575        // Chunked path: 4+ fully-written chunks should also no-op.
4576        let mut chunked = IoBufsMut::from(vec![
4577            full_chunk(&pool),
4578            full_chunk(&pool),
4579            full_chunk(&pool),
4580            full_chunk(&pool),
4581        ]);
4582        assert!(matches!(chunked.inner, IoBufsMutInner::Chunked(_)));
4583        assert_eq!(chunked.remaining_mut(), 0);
4584        let before = chunked.remaining();
4585        // SAFETY: Advancing by 0 does not expose uninitialized bytes.
4586        unsafe { chunked.advance_mut(0) };
4587        assert_eq!(chunked.remaining(), before);
4588    }
4589
4590    #[test]
4591    #[should_panic(expected = "cannot advance past end of buffer")]
4592    fn test_iobufsmut_advance_mut_past_end_pair() {
4593        let mut pair = IoBufsMut {
4594            inner: IoBufsMutInner::Pair([IoBufMut::with_capacity(1), IoBufMut::with_capacity(1)]),
4595        };
4596        // SAFETY: Intentional panic path coverage.
4597        unsafe { pair.advance_mut(3) };
4598    }
4599
4600    #[test]
4601    #[should_panic(expected = "cannot advance past end of buffer")]
4602    fn test_iobufsmut_advance_mut_past_end_triple() {
4603        let mut triple = IoBufsMut {
4604            inner: IoBufsMutInner::Triple([
4605                IoBufMut::with_capacity(1),
4606                IoBufMut::with_capacity(1),
4607                IoBufMut::with_capacity(1),
4608            ]),
4609        };
4610        // SAFETY: Intentional panic path coverage.
4611        unsafe { triple.advance_mut(4) };
4612    }
4613
4614    #[test]
4615    #[should_panic(expected = "cannot advance past end of buffer")]
4616    fn test_iobufsmut_advance_mut_past_end_chunked() {
4617        let mut chunked = IoBufsMut {
4618            inner: IoBufsMutInner::Chunked(VecDeque::from([
4619                IoBufMut::with_capacity(1),
4620                IoBufMut::with_capacity(1),
4621                IoBufMut::with_capacity(1),
4622                IoBufMut::with_capacity(1),
4623            ])),
4624        };
4625        // SAFETY: Intentional panic path coverage.
4626        unsafe { chunked.advance_mut(5) };
4627    }
4628
4629    #[test]
4630    fn test_iobufsmut_set_len() {
4631        // SAFETY: we don't read the uninitialized bytes.
4632        unsafe {
4633            // Single buffer
4634            let mut bufs = IoBufsMut::from(IoBufMut::with_capacity(16));
4635            bufs.set_len(10);
4636            assert_eq!(bufs.len(), 10);
4637
4638            // Chunked: distributes across chunks [cap 5, cap 10], set 12 -> [5, 7]
4639            let mut bufs = IoBufsMut::from(vec![
4640                IoBufMut::with_capacity(5),
4641                IoBufMut::with_capacity(10),
4642            ]);
4643            bufs.set_len(12);
4644            assert_eq!(bufs.len(), 12);
4645            assert_eq!(bufs.chunk().len(), 5);
4646            bufs.advance(5);
4647            assert_eq!(bufs.chunk().len(), 7);
4648            bufs.advance(7);
4649            assert_eq!(bufs.remaining(), 0);
4650
4651            // Uneven capacities [3, 20, 2], set 18 -> [3, 15, 0].
4652            let mut bufs = IoBufsMut::from(vec![
4653                IoBufMut::with_capacity(3),
4654                IoBufMut::with_capacity(20),
4655                IoBufMut::with_capacity(2),
4656            ]);
4657            bufs.set_len(18);
4658            assert_eq!(bufs.chunk().len(), 3);
4659            bufs.advance(3);
4660            assert_eq!(bufs.chunk().len(), 15);
4661            bufs.advance(15);
4662            assert_eq!(bufs.remaining(), 0);
4663
4664            // Exact total capacity [4, 4], set 8 -> [4, 4]
4665            let mut bufs =
4666                IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4667            bufs.set_len(8);
4668            assert_eq!(bufs.chunk().len(), 4);
4669            bufs.advance(4);
4670            assert_eq!(bufs.chunk().len(), 4);
4671            bufs.advance(4);
4672            assert_eq!(bufs.remaining(), 0);
4673
4674            // Zero length preserves caller-provided layout.
4675            let mut bufs =
4676                IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4677            bufs.set_len(0);
4678            assert_eq!(bufs.len(), 0);
4679            assert_eq!(bufs.chunk(), b"");
4680        }
4681    }
4682
4683    #[test]
4684    #[should_panic(expected = "set_len(9) exceeds capacity(8)")]
4685    fn test_iobufsmut_set_len_overflow() {
4686        let mut bufs =
4687            IoBufsMut::from(vec![IoBufMut::with_capacity(4), IoBufMut::with_capacity(4)]);
4688        // SAFETY: this will panic before any read.
4689        unsafe { bufs.set_len(9) };
4690    }
4691
4692    #[test]
4693    #[should_panic(expected = "set_len(9) exceeds capacity(8)")]
4694    fn test_iobufmut_set_len_overflow() {
4695        let mut buf = IoBufMut::with_capacity(8);
4696        // SAFETY: this will panic before any read.
4697        unsafe { buf.set_len(9) };
4698    }
4699
4700    #[test]
4701    fn test_encode_with_pool_matches_encode() {
4702        let value = vec![1u8, 2, 3, 4, 5, 6];
4703        assert_encode_with_pool_matches_encode(&value);
4704    }
4705
4706    #[test]
4707    fn test_encode_with_pool_mut_len_matches_encode_size() {
4708        let pool = test_pool();
4709        let value = vec![9u8, 8, 7, 6];
4710
4711        let buf = value.encode_with_pool_mut(&pool);
4712        assert_eq!(buf.len(), value.encode_size());
4713    }
4714
4715    #[test]
4716    fn test_iobuf_encode_with_pool_matches_encode() {
4717        let value = IoBuf::from(vec![0xAB; 512]);
4718        assert_encode_with_pool_matches_encode(&value);
4719    }
4720
4721    #[test]
4722    fn test_nested_container_encode_with_pool_matches_encode() {
4723        let value = (
4724            Some(Bytes::from(vec![0xAA; 256])),
4725            vec![Bytes::from(vec![0xBB; 128]), Bytes::from(vec![0xCC; 64])],
4726        );
4727        assert_encode_with_pool_matches_encode(&value);
4728    }
4729
4730    #[test]
4731    fn test_map_encode_with_pool_matches_encode() {
4732        let mut btree = BTreeMap::new();
4733        btree.insert(2u8, Bytes::from(vec![0xDD; 96]));
4734        btree.insert(1u8, Bytes::from(vec![0xEE; 48]));
4735        assert_encode_with_pool_matches_encode(&btree);
4736
4737        let mut hash = HashMap::new();
4738        hash.insert(2u8, Bytes::from(vec![0x11; 96]));
4739        hash.insert(1u8, Bytes::from(vec![0x22; 48]));
4740        assert_encode_with_pool_matches_encode(&hash);
4741    }
4742
4743    #[test]
4744    fn test_lazy_encode_with_pool_matches_encode() {
4745        let value = Lazy::new(Bytes::from(vec![0x44; 200]));
4746        assert_encode_with_pool_matches_encode(&value);
4747    }
4748
4749    #[test]
4750    fn test_range_encode_with_pool_matches_encode() {
4751        let range: Range<Bytes> = Bytes::from(vec![0x10; 32])..Bytes::from(vec![0x20; 48]);
4752        assert_encode_with_pool_matches_encode(&range);
4753
4754        let inclusive: RangeInclusive<Bytes> =
4755            Bytes::from(vec![0x30; 16])..=Bytes::from(vec![0x40; 24]);
4756        assert_encode_with_pool_matches_encode(&inclusive);
4757
4758        let from: RangeFrom<IoBuf> = IoBuf::from(vec![0x50; 40])..;
4759        assert_encode_with_pool_matches_encode(&from);
4760
4761        let to_inclusive: RangeToInclusive<IoBuf> = ..=IoBuf::from(vec![0x60; 56]);
4762        assert_encode_with_pool_matches_encode(&to_inclusive);
4763    }
4764
4765    #[cfg(feature = "arbitrary")]
4766    mod conformance {
4767        use super::IoBuf;
4768        use commonware_codec::conformance::CodecConformance;
4769
4770        commonware_conformance::conformance_tests! {
4771            CodecConformance<IoBuf>
4772        }
4773    }
4774
4775    mod builder_tests {
4776        use super::*;
4777        use commonware_codec::{BufsMut, Encode, Write};
4778
4779        fn builder(capacity: usize) -> Builder {
4780            Builder::new(&test_pool(), NonZeroUsize::new(capacity).unwrap())
4781        }
4782
4783        // Only inline writes, no pushes.
4784        #[test]
4785        fn test_inline_only() {
4786            let mut b = builder(64);
4787            b.put_u32(42);
4788            b.put_u8(7);
4789            let mut r = b.finish();
4790            assert_eq!(r.remaining(), 5);
4791            assert_eq!(r.get_u32(), 42);
4792            assert_eq!(r.get_u8(), 7);
4793        }
4794
4795        // Only zero-copy pushes, no inline writes.
4796        #[test]
4797        fn test_push_only() {
4798            let mut b = builder(64);
4799            let data = Bytes::from(vec![0xAA; 1024]);
4800            b.push(data.clone());
4801            let mut r = b.finish();
4802            assert_eq!(r.remaining(), 1024);
4803            assert_eq!(r.copy_to_bytes(1024), data);
4804        }
4805
4806        // Interleaved: inline header, zero-copy push, inline trailer.
4807        #[test]
4808        fn test_inline_push_inline() {
4809            let mut b = builder(64);
4810            b.put_u16(99);
4811            let payload = Bytes::from(vec![0xBB; 512]);
4812            b.push(payload.clone());
4813            b.put_u8(1);
4814            let mut r = b.finish();
4815            assert_eq!(r.remaining(), 2 + 512 + 1);
4816            assert_eq!(r.get_u16(), 99);
4817            assert_eq!(r.copy_to_bytes(512), payload);
4818            assert_eq!(r.get_u8(), 1);
4819        }
4820
4821        // Bytes::write_bufs produces identical wire format to Bytes::write.
4822        #[test]
4823        fn test_write_bufs_matches_write() {
4824            let data = Bytes::from(vec![0xCC; 256]);
4825            let mut b = builder(64);
4826            data.write_bufs(&mut b);
4827            let mut bufs = b.finish();
4828
4829            let mut out = vec![0u8; bufs.remaining()];
4830            bufs.copy_to_slice(&mut out);
4831            assert_eq!(out, data.encode().as_ref());
4832        }
4833
4834        // Finishing an unused builder produces empty IoBufs.
4835        #[test]
4836        fn test_empty() {
4837            let bufs = builder(64).finish();
4838            assert_eq!(bufs.remaining(), 0);
4839        }
4840
4841        // Inline writes exceeding capacity panic.
4842        #[test]
4843        #[should_panic]
4844        fn test_inline_overflow_panics() {
4845            let mut b = builder(1);
4846            let cap = b.remaining_mut();
4847            b.put_slice(&vec![0xFF; cap]);
4848            b.put_u8(1); // exceeds capacity
4849        }
4850
4851        // Pushing empty Bytes is a no-op.
4852        #[test]
4853        fn test_empty_push_ignored() {
4854            let mut b = builder(64);
4855            b.push(Bytes::new());
4856            b.put_u8(1);
4857            let bufs = b.finish();
4858            assert_eq!(bufs.remaining(), 1);
4859        }
4860
4861        // Consecutive pushes without inline writes between them.
4862        #[test]
4863        fn test_multiple_pushes() {
4864            let mut b = builder(64);
4865            let a = Bytes::from(vec![0xAA; 100]);
4866            let c = Bytes::from(vec![0xCC; 200]);
4867            b.push(a.clone());
4868            b.push(c.clone());
4869            let mut r = b.finish();
4870            assert_eq!(r.remaining(), 300);
4871            assert_eq!(r.copy_to_bytes(100), a);
4872            assert_eq!(r.copy_to_bytes(200), c);
4873        }
4874
4875        // put() exceeding capacity panics.
4876        #[test]
4877        #[should_panic]
4878        fn test_put_exceeding_capacity_panics() {
4879            let mut b = builder(1);
4880            let cap = b.remaining_mut();
4881            let src = Bytes::from(vec![0xAB; cap + 1]);
4882            b.put(src);
4883        }
4884
4885        // put_slice() exceeding capacity panics.
4886        #[test]
4887        #[should_panic]
4888        fn test_put_slice_exceeding_capacity_panics() {
4889            let mut b = builder(1);
4890            let cap = b.remaining_mut();
4891            b.put_slice(&vec![0xFE; cap + 1]);
4892        }
4893
4894        // Simulates a multi-field struct: [u16 | Bytes (via push) | u32].
4895        // Verifies write_bufs produces identical wire format to write.
4896        #[test]
4897        fn test_multi_field_struct_equivalence() {
4898            let header: u16 = 0xCAFE;
4899            let payload = Bytes::from(vec![0xDD; 1024]);
4900            let trailer: u32 = 0xDEADBEEF;
4901
4902            // Flat encoding via write.
4903            let size = header.encode_size() + payload.encode_size() + trailer.encode_size();
4904            let mut flat = BytesMut::with_capacity(size);
4905            header.write(&mut flat);
4906            payload.write(&mut flat);
4907            trailer.write(&mut flat);
4908
4909            // Multi-buffer encoding via write_bufs.
4910            let mut b = builder(64);
4911            header.write(&mut b);
4912            payload.write_bufs(&mut b);
4913            trailer.write(&mut b);
4914            let mut bufs = b.finish();
4915
4916            let mut out = vec![0u8; bufs.remaining()];
4917            bufs.copy_to_slice(&mut out);
4918            assert_eq!(out, flat.as_ref());
4919        }
4920
4921        // encode_with_pool (Builder path) matches encode (flat BytesMut path).
4922        #[test]
4923        fn test_encode_with_pool_matches_encode() {
4924            let pool = test_pool();
4925            let data = Bytes::from(vec![0xEE; 500]);
4926            let mut pooled = data.encode_with_pool(&pool);
4927            let baseline = data.encode();
4928            let mut out = vec![0u8; pooled.remaining()];
4929            pooled.copy_to_slice(&mut out);
4930            assert_eq!(out, baseline.as_ref());
4931        }
4932
4933        // Exercise remaining_mut, chunk_mut, and advance_mut directly.
4934        #[test]
4935        fn test_chunk_mut_and_advance_mut() {
4936            let mut b = builder(64);
4937            let initial = b.remaining_mut();
4938            assert!(initial >= 64);
4939            let chunk = b.chunk_mut();
4940            chunk[0..1].copy_from_slice(&[0xAB]);
4941            // SAFETY: We just wrote 1 byte into chunk_mut above.
4942            unsafe { b.advance_mut(1) };
4943            assert_eq!(b.remaining_mut(), initial - 1);
4944            let mut r = b.finish();
4945            assert_eq!(r.remaining(), 1);
4946            assert_eq!(r.get_u8(), 0xAB);
4947        }
4948
4949        // Writing past a full buffer panics (fixed capacity).
4950        #[test]
4951        #[should_panic]
4952        fn test_write_past_full_panics() {
4953            let mut b = builder(1);
4954            let cap = b.remaining_mut();
4955            b.put_slice(&vec![0xFF; cap]); // fill the buffer completely
4956            assert_eq!(b.remaining_mut(), 0);
4957            b.put_u8(0x42); // panics
4958        }
4959
4960        // Push at offset 0 with inline trailer exercises finish branch
4961        // where offset == pos (no inline prefix before push).
4962        #[test]
4963        fn test_push_at_start_with_trailer() {
4964            let mut b = builder(64);
4965            let payload = Bytes::from(vec![0xCC; 32]);
4966            b.push(payload.clone());
4967            b.put_u8(0x01);
4968            let mut r = b.finish();
4969            assert_eq!(r.remaining(), 33);
4970            assert_eq!(r.copy_to_bytes(32), payload);
4971            assert_eq!(r.get_u8(), 0x01);
4972        }
4973    }
4974}