Skip to main content

bytesbuf/
buf.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use std::any::type_name;
5use std::mem::{self, MaybeUninit};
6use std::num::NonZero;
7
8use smallvec::SmallVec;
9
10use crate::mem::{Block, BlockMeta, BlockSize, Memory};
11use crate::{BytesBufWriter, BytesView, MAX_INLINE_SPANS, MemoryGuard, Span, SpanBuilder};
12
13/// Assembles byte sequences, exposing them as [`BytesView`]s.
14///
15/// The buffer owns some memory capacity into which it allows you to write a sequence of bytes that
16/// you can thereafter extract as one or more [`BytesView`]s over immutable data. Mutation of the
17/// buffer contents is append-only - once data has been written into the buffer, it cannot be modified.
18///
19/// Capacity must be reserved in advance (e.g. via [`reserve()`]) before you can write data into the buffer.
20/// The exception to this is when appending an existing [`BytesView`] via [`put_bytes()`] because
21/// appending a [`BytesView`] is a zero-copy operation that reuses the view's existing memory capacity.
22///
23/// # Memory capacity
24///
25/// A single `BytesBuf` can use memory capacity from any [memory provider], including a
26/// mix of different memory providers. All methods that extend the memory capacity require the caller
27/// to provide a reference to the memory provider to use.
28///
29/// To understand how to obtain access to a memory provider, see [Producing Byte Sequences].
30///
31/// When data is extracted from the buffer by consuming it (via [`consume()`] or [`consume_all()`]),
32/// ownership of the used memory capacity is transferred to the returned [`BytesView`]. Any leftover
33/// memory capacity remains in the buffer, ready to receive further writes.
34///
35/// # Conceptual design
36///
37/// The memory capacity owned by a `BytesBuf` can be viewed as two regions:
38///
39/// * Filled memory - data has been written into this memory but this data has not yet been consumed as a
40///   [`BytesView`]. Nevertheless, this data may already be in use because it may have been exposed via
41///   [`peek()`], which does not consume it from the buffer. Memory capacity is removed from this region
42///   when bytes are consumed from the buffer.
43/// * Available memory - no data has been written into this memory. Calling any of the write methods on
44///   `BytesBuf` will write data to the start of this region and transfer the affected capacity to the
45///   filled memory region.
46///
47/// Existing [`BytesView`]s can be appended to the `BytesBuf` via [`put_bytes()`] without
48/// consuming capacity as each appended [`BytesView`] brings its own backing memory capacity.
49#[doc = include_str!("../doc/snippets/sequence_memory_layout.md")]
50///
51/// # Example
52///
53/// ```
54/// # use bytesbuf::mem::{GlobalPool, Memory};
55/// use bytesbuf::BytesBuf;
56///
57/// const HEADER_MAGIC: &[u8] = b"HDR\x00";
58///
59/// # let memory = GlobalPool::new();
60/// let mut buf = memory.reserve(64);
61///
62/// // Build a message from various pieces.
63/// buf.put_slice(HEADER_MAGIC);
64/// buf.put_num_be(1_u16); // Version
65/// buf.put_num_be(42_u32); // Payload length
66/// buf.put_num_be(0xDEAD_BEEF_u64); // Checksum
67///
68/// // Consume the buffered data as an immutable BytesView.
69/// let message = buf.consume_all();
70/// assert_eq!(message.len(), 18);
71/// ```
72///
73/// [memory provider]: crate::mem::Memory
74/// [`reserve()`]: Self::reserve
75/// [`put_bytes()`]: Self::put_bytes
76/// [`consume()`]: Self::consume
77/// [`consume_all()`]: Self::consume_all
78/// [`peek()`]: Self::peek
79/// [Producing Byte Sequences]: crate#producing-byte-sequences
80#[derive(Default)]
81pub struct BytesBuf {
82    // The frozen spans are at the front of the sequence being built and have already become
83    // immutable (or already arrived in that form). They will be consumed first.
84    //
85    // Optimization: we might get slightly better performance by using a stack-preferring queue
86    // here instead. No suitable crate was found at time of writing, may need to invent it.
87    frozen_spans: SmallVec<[Span; MAX_INLINE_SPANS]>,
88
89    // The span builders contain "potential spans" that have not yet been materialized/frozen.
90    // The first item may be partially filled with data, with the others being spare capacity.
91    //
92    // Exception: a vectored write may write to any number of span builders concurrently but when
93    // the vectored write is committed we immediately restore the above situation (with only
94    // the first span builder potentially containing data).
95    //
96    // When the capacity of a span builder is exhausted, we transform any data in it into a span
97    // and move it to `frozen_spans`.
98    //
99    // Partially filled span builders may be split into a span and a builder over the remaining
100    // memory. This happens on demand when the sequence builder needs to emit data from part of
101    // a span builder's memory region.
102    //
103    // Note that we do not require the span builders to be of the same capacity.
104    //
105    // We store the span builders in reverse order - the logically first span builder (which we
106    // may have partially filled with content) is the last one in this collection.
107    //
108    // Optimization: we might get slightly better performance by using a stack-preferring queue
109    // here instead. No suitable crate was found at time of writing, may need to invent it.
110    span_builders_reversed: SmallVec<[SpanBuilder; MAX_INLINE_SPANS]>,
111
112    /// Length of the filled memory in this sequence builder.
113    ///
114    /// We cache this to avoid recalculating it every time we need this information.
115    len: usize,
116
117    /// Length of the data contained in the frozen spans. The total `len` is this plus whatever
118    /// may be partially filled in the (logically) first span builder.
119    ///
120    /// We cache this to avoid recalculating it every time we need this information.
121    frozen: usize,
122
123    /// Available capacity that can accept additional data into it.
124    /// The total capacity is `len + available`.
125    ///
126    /// We cache this to avoid recalculating it every time we need this information.
127    available: usize,
128}
129
130impl BytesBuf {
131    /// Creates an instance without any memory capacity.
132    #[must_use]
133    pub fn new() -> Self {
134        Self::default()
135    }
136
137    /// Creates an instance that owns the provided memory blocks.
138    ///
139    /// This is the API used by memory providers to issue rented memory capacity to callers.
140    /// Unless you are implementing a memory provider, you will not need to call this function.
141    /// Instead, use either [`Memory::reserve()`] or [`BytesBuf::reserve()`].
142    ///
143    /// # Blocks are unordered
144    ///
145    /// There is no guarantee that the `BytesBuf` uses the blocks in the order provided to
146    /// this function. Blocks may be used in any order.
147    ///
148    /// [`Memory::reserve()`]: Memory::reserve
149    /// [`BytesBuf::reserve()`]: Self::reserve
150    #[must_use]
151    pub fn from_blocks<I>(blocks: I) -> Self
152    where
153        I: IntoIterator<Item = Block>,
154    {
155        Self::from_span_builders(blocks.into_iter().map(Block::into_span_builder))
156    }
157
158    pub(crate) fn from_span_builders<I>(span_builders: I) -> Self
159    where
160        I: IntoIterator<Item = SpanBuilder>,
161    {
162        let span_builders: SmallVec<[SpanBuilder; MAX_INLINE_SPANS]> = span_builders.into_iter().collect();
163
164        let available = span_builders.iter().map(SpanBuilder::remaining_capacity).sum();
165
166        Self {
167            frozen_spans: SmallVec::new_const(),
168            // We do not expect the order that we use the span builders to matter,
169            // so we do not reverse them here before storing.
170            span_builders_reversed: span_builders,
171            len: 0,
172            frozen: 0,
173            available,
174        }
175    }
176
177    /// Adds enough memory capacity to accommodate at least `additional_bytes` of content.
178    ///
179    /// After this call, [`remaining_capacity()`] will be at least `additional_bytes`.
180    ///
181    /// The memory provider may provide more capacity than requested - `additional_bytes` is only a lower bound.
182    ///
183    /// # Example
184    ///
185    /// ```
186    /// use bytesbuf::BytesBuf;
187    /// # use bytesbuf::mem::GlobalPool;
188    ///
189    /// # let memory = GlobalPool::new();
190    /// let mut buf = BytesBuf::new();
191    ///
192    /// // Must reserve capacity before writing.
193    /// buf.reserve(16, &memory);
194    /// assert!(buf.remaining_capacity() >= 16);
195    ///
196    /// buf.put_num_be(0x1234_5678_u32);
197    ///
198    /// // Can reserve more capacity at any time.
199    /// buf.reserve(100, &memory);
200    /// assert!(buf.remaining_capacity() >= 100);
201    /// ```
202    ///
203    /// # Panics
204    ///
205    /// Panics if the resulting total buffer capacity would be greater than `usize::MAX`.
206    ///
207    /// [`remaining_capacity()`]: Self::remaining_capacity
208    pub fn reserve<M: Memory + ?Sized>(&mut self, additional_bytes: usize, memory_provider: &M) {
209        let bytes_needed = additional_bytes.saturating_sub(self.remaining_capacity());
210
211        let Some(bytes_needed) = NonZero::new(bytes_needed) else {
212            return;
213        };
214
215        self.extend_capacity_by_at_least(bytes_needed, memory_provider);
216    }
217
218    fn extend_capacity_by_at_least<M: Memory + ?Sized>(&mut self, bytes: NonZero<usize>, memory_provider: &M) {
219        let additional_memory = memory_provider.reserve(bytes.get());
220
221        // For extra paranoia. We expect a memory provider to return an empty buffer.
222        debug_assert!(additional_memory.capacity() >= bytes.get());
223        debug_assert!(additional_memory.is_empty());
224
225        self.available = self
226            .available
227            .checked_add(additional_memory.capacity())
228            .expect("buffer capacity cannot exceed usize::MAX");
229
230        // We put the new ones in front (existing content needs to stay at the end).
231        self.span_builders_reversed.insert_many(0, additional_memory.span_builders_reversed);
232    }
233
234    /// Appends the contents of an existing [`BytesView`] to the end of the buffer.
235    ///
236    /// Memory capacity of the existing [`BytesView`] is reused without copying.
237    ///
238    /// This is a private API to keep the nitty-gritty of span bookkeeping contained in this file
239    /// while the public API lives in another file for ease of maintenance. The equivalent
240    /// public API is `put_bytes()`.
241    ///
242    /// # Panics
243    ///
244    /// Panics if the resulting total buffer capacity would be greater than `usize::MAX`.
245    pub(crate) fn append(&mut self, bytes: BytesView) {
246        if bytes.is_empty() {
247            return;
248        }
249
250        let bytes_len = bytes.len();
251
252        // Only the first span builder may hold unfrozen data (the rest are for spare capacity).
253        let total_unfrozen_bytes = NonZero::new(self.span_builders_reversed.last().map_or(0, SpanBuilder::len));
254
255        if let Some(total_unfrozen_bytes) = total_unfrozen_bytes {
256            // If there is any unfrozen data, we freeze it now to ensure we append after all
257            // existing data already in the sequence builder.
258            self.freeze_from_first(total_unfrozen_bytes);
259
260            // Debug build paranoia: nothing remains in the span builder, right?
261            debug_assert!(self.span_builders_reversed.last().map_or(0, SpanBuilder::len) == 0);
262        }
263
264        // We do this first so if we do panic, we have not performed any incomplete operations.
265        // The freezing above is safe even if we panic here - freezing is an atomic operation.
266        self.len = self.len.checked_add(bytes_len).expect("buffer capacity cannot exceed usize::MAX");
267
268        // Any appended BytesView is frozen by definition, as contents of a BytesView are immutable.
269        // This cannot wrap because we verified `len` is in-bounds and `frozen <= len` is a type invariant.
270        self.frozen = self.frozen.wrapping_add(bytes_len);
271
272        self.frozen_spans.extend(bytes.into_spans_reversed().into_iter().rev());
273    }
274
275    /// Peeks at the contents of the filled bytes region.
276    ///
277    /// The returned [`BytesView`] covers all data in the buffer but does not consume any of the data.
278    ///
279    /// Functionally similar to [`consume_all()`] except all the data remains in the
280    /// buffer and can still be consumed later.
281    ///
282    /// # Example
283    ///
284    /// ```
285    /// # let memory = bytesbuf::mem::GlobalPool::new();
286    /// use bytesbuf::mem::Memory;
287    ///
288    /// let mut buf = memory.reserve(16);
289    /// buf.put_num_be(0x1234_u16);
290    /// buf.put_num_be(0x5678_u16);
291    ///
292    /// // Peek at the data without consuming it.
293    /// let mut peeked = buf.peek();
294    /// assert_eq!(peeked.get_num_be::<u16>(), 0x1234);
295    /// assert_eq!(peeked.get_num_be::<u16>(), 0x5678);
296    ///
297    /// // Despite consuming from peeked, the buffer still contains all data.
298    /// assert_eq!(buf.len(), 4);
299    ///
300    /// let consumed = buf.consume_all();
301    /// assert_eq!(consumed.len(), 4);
302    /// ```
303    ///
304    /// [`consume_all()`]: Self::consume_all
305    #[must_use]
306    pub fn peek(&self) -> BytesView {
307        // Build a list of all spans to include in the result, in reverse order for efficient construction.
308        let mut result_spans_reversed: SmallVec<[Span; MAX_INLINE_SPANS]> = SmallVec::new();
309
310        // Add any filled data from the first (potentially partially filled) span builder.
311        if let Some(first_builder) = self.span_builders_reversed.last() {
312            // We only peek the span builder, as well. This is to avoid freezing it because freezing
313            // has security/performance implications and the motivating idea behind peeking is to
314            // verify the contents are ready for processing before we commit to freezing them.
315            let span = first_builder.peek();
316
317            // It might just be empty - that's also fine.
318            if !span.is_empty() {
319                result_spans_reversed.push(span);
320            }
321        }
322
323        // Add all the frozen spans. They are stored in content order in our storage,
324        // so we reverse them when adding to the result_spans_reversed collection.
325        result_spans_reversed.extend(self.frozen_spans.iter().rev().cloned());
326
327        BytesView::from_spans_reversed(result_spans_reversed)
328    }
329
330    /// How many bytes of data are in the buffer, ready to be consumed.
331    ///
332    /// # Example
333    ///
334    /// ```
335    /// # let memory = bytesbuf::mem::GlobalPool::new();
336    /// use bytesbuf::mem::Memory;
337    ///
338    /// let mut buf = memory.reserve(32);
339    /// assert_eq!(buf.len(), 0);
340    ///
341    /// buf.put_num_be(0x1234_5678_u32);
342    /// assert_eq!(buf.len(), 4);
343    ///
344    /// buf.put_slice(*b"Hello");
345    /// assert_eq!(buf.len(), 9);
346    ///
347    /// _ = buf.consume(4);
348    /// assert_eq!(buf.len(), 5);
349    /// ```
350    #[must_use]
351    #[cfg_attr(debug_assertions, expect(clippy::missing_panics_doc, reason = "only unreachable panics"))]
352    pub fn len(&self) -> usize {
353        #[cfg(debug_assertions)]
354        assert_eq!(self.len, self.calculate_len());
355
356        self.len
357    }
358
359    #[cfg(debug_assertions)]
360    fn calculate_len(&self) -> usize {
361        let frozen_len = self.frozen_spans.iter().map(|x| x.len() as usize).sum::<usize>();
362        let unfrozen_len = self.span_builders_reversed.last().map_or(0, SpanBuilder::len) as usize;
363
364        // Will not overflow - `capacity <= usize::MAX` is a type invariant and obviously `len < capacity`.
365        frozen_len.wrapping_add(unfrozen_len)
366    }
367
368    /// Whether the buffer is empty (contains no data).
369    ///
370    /// This does not imply that the buffer has no remaining memory capacity.
371    #[must_use]
372    pub fn is_empty(&self) -> bool {
373        self.len() == 0
374    }
375
376    /// The total capacity of the buffer.
377    ///
378    /// This is the total length of the filled bytes and the available bytes regions.
379    ///
380    /// # Example
381    ///
382    /// ```
383    /// use bytesbuf::BytesBuf;
384    /// # use bytesbuf::mem::GlobalPool;
385    ///
386    /// # let memory = GlobalPool::new();
387    /// let mut buf = BytesBuf::new();
388    /// assert_eq!(buf.capacity(), 0);
389    ///
390    /// buf.reserve(100, &memory);
391    /// let initial_capacity = buf.capacity();
392    /// assert!(initial_capacity >= 100);
393    ///
394    /// // Writing does not change capacity.
395    /// buf.put_slice(*b"Hello");
396    /// assert_eq!(buf.capacity(), initial_capacity);
397    ///
398    /// // Consuming reduces capacity (memory is transferred to the BytesView).
399    /// _ = buf.consume(5);
400    /// assert!(buf.capacity() < initial_capacity);
401    /// ```
402    #[must_use]
403    pub fn capacity(&self) -> usize {
404        // Will not overflow - `capacity <= usize::MAX` is a type invariant.
405        self.len().wrapping_add(self.remaining_capacity())
406    }
407
408    /// How many more bytes can be written into the buffer
409    /// before its memory capacity is exhausted.
410    ///
411    /// # Example
412    ///
413    /// ```
414    /// use bytesbuf::BytesBuf;
415    /// # use bytesbuf::mem::GlobalPool;
416    ///
417    /// # let memory = GlobalPool::new();
418    /// let mut buf = BytesBuf::new();
419    ///
420    /// buf.reserve(100, &memory);
421    /// let initial_remaining = buf.remaining_capacity();
422    /// assert!(initial_remaining >= 100);
423    ///
424    /// // Writing reduces remaining capacity.
425    /// buf.put_slice(*b"Hello");
426    /// assert_eq!(buf.remaining_capacity(), initial_remaining - 5);
427    ///
428    /// // Reserving more increases remaining capacity.
429    /// buf.reserve(200, &memory);
430    /// assert!(buf.remaining_capacity() >= 200);
431    ///
432    /// // Consuming buffered data does NOT affect remaining capacity.
433    /// let remaining_before_consume = buf.remaining_capacity();
434    /// _ = buf.consume(5);
435    /// assert_eq!(buf.remaining_capacity(), remaining_before_consume);
436    /// ```
437    #[cfg_attr(test, mutants::skip)] // Lying about buffer sizes is an easy way to infinite loops.
438    pub fn remaining_capacity(&self) -> usize {
439        // The remaining capacity is the sum of the remaining capacity of all span builders.
440        debug_assert_eq!(
441            self.available,
442            self.span_builders_reversed
443                .iter()
444                .map(SpanBuilder::remaining_capacity)
445                .sum::<usize>()
446        );
447
448        self.available
449    }
450
451    /// Consumes `len` bytes from the beginning of the buffer.
452    ///
453    /// The consumed bytes and the memory capacity that backs them are removed from the buffer.
454    ///
455    /// # Example
456    ///
457    /// ```
458    /// # let memory = bytesbuf::mem::GlobalPool::new();
459    /// use bytesbuf::mem::Memory;
460    ///
461    /// let mut buf = memory.reserve(32);
462    ///
463    /// buf.put_num_be(0x1111_u16);
464    /// buf.put_num_be(0x2222_u16);
465    ///
466    /// // Consume first part.
467    /// let mut first = buf.consume(2);
468    /// assert_eq!(first.get_num_be::<u16>(), 0x1111);
469    ///
470    /// // Write more data.
471    /// buf.put_num_be(0x3333_u16);
472    ///
473    /// // Consume remaining data.
474    /// let mut rest = buf.consume(4);
475    /// assert_eq!(rest.get_num_be::<u16>(), 0x2222);
476    /// assert_eq!(rest.get_num_be::<u16>(), 0x3333);
477    /// ```
478    ///
479    /// # Panics
480    ///
481    /// Panics if the buffer does not contain at least `len` bytes.
482    pub fn consume(&mut self, len: usize) -> BytesView {
483        self.consume_checked(len)
484            .expect("attempted to consume more bytes than available in buffer")
485    }
486
487    /// Consumes `len` bytes from the beginning of the buffer.
488    ///
489    /// Returns `None` if the buffer does not contain at least `len` bytes.
490    ///
491    /// The consumed bytes and the memory capacity that backs them are removed from the buffer.
492    #[expect(clippy::missing_panics_doc, reason = "only unreachable panics")]
493    #[cfg_attr(test, mutants::skip)] // Mutating the bounds check causes UB via unwrap_unchecked in consume_all or infinite loops in prepare_consume.
494    pub fn consume_checked(&mut self, len: usize) -> Option<BytesView> {
495        if len > self.len() {
496            return None;
497        }
498
499        self.ensure_frozen(len);
500
501        let manifest = self.prepare_consume(len);
502
503        // We build the result spans collection up in storage order.
504        // The first piece of content goes last into the result spans.
505        let mut result_spans_reversed: SmallVec<[Span; MAX_INLINE_SPANS]> = SmallVec::with_capacity(manifest.required_spans_capacity());
506
507        // The content-order last span goes first into the result, so if we have a partial
508        // span, shove it in there first. The fully detached spans get processed together.
509        if manifest.consume_partial_span_bytes != 0 {
510            // We also need some bytes from the first frozen span that now remains
511            // but not the entire frozen span.
512            let partially_consumed_frozen_span = self
513                .frozen_spans
514                .get_mut(manifest.detach_complete_frozen_spans)
515                .expect("guarded by ensure_frozen()");
516
517            let take = partially_consumed_frozen_span.slice(0..manifest.consume_partial_span_bytes);
518            result_spans_reversed.push(take);
519
520            // SAFETY: We must guarantee that we do not try to advance out of bounds. This is guaranteed
521            // by the manifest calculation, the job of which is to determine the right in-bounds value.
522            unsafe { partially_consumed_frozen_span.advance(manifest.consume_partial_span_bytes as usize) };
523        }
524
525        // We extend the result spans with the (storage-order) fully detached spans.
526        // BytesBuf stores the frozen spans in content order, so we must reverse.
527        result_spans_reversed.extend(self.frozen_spans.drain(..manifest.detach_complete_frozen_spans).rev());
528
529        // Will not wrap because we verified bounds above.
530        self.len = self.len.wrapping_sub(len);
531
532        // Will not wrap because all consumed data must first have been frozen,
533        // which we guarantee via ensure_frozen() above.
534        self.frozen = self.frozen.wrapping_sub(len);
535
536        Some(BytesView::from_spans_reversed(result_spans_reversed))
537    }
538
539    fn prepare_consume(&self, mut len: usize) -> ConsumeManifest {
540        debug_assert!(len <= self.frozen);
541
542        let mut detach_complete_frozen_spans: usize = 0;
543
544        for span in &self.frozen_spans {
545            let span_len = span.len();
546
547            if span_len as usize <= len {
548                // Will not wrap because a type invariant is `capacity <= usize::MAX`, so if
549                // capacity is in-bounds, the number of spans could not possibly be greater.
550                detach_complete_frozen_spans = detach_complete_frozen_spans.wrapping_add(1);
551
552                len = len
553                    .checked_sub(span_len as usize)
554                    .expect("somehow ended up with negative bytes remaining - algorithm defect");
555
556                if len != 0 {
557                    // We will consume this whole span and need more - go to next one.
558                    continue;
559                }
560            }
561
562            // This span satisfied our needs, either in full or in part.
563            break;
564        }
565
566        ConsumeManifest {
567            detach_complete_frozen_spans,
568            // If any `len` was left, it was not a full span.
569            consume_partial_span_bytes: len.try_into().expect("we are supposed to have less than one memory block worth of data remaining but its length does not fit into a single memory block - algorithm defect"),
570        }
571    }
572
573    /// Consumes all bytes in the buffer.
574    ///
575    /// The consumed bytes and the memory capacity that backs them are removed from the buffer.
576    ///
577    /// # Example
578    ///
579    /// ```
580    /// # let memory = bytesbuf::mem::GlobalPool::new();
581    /// use bytesbuf::mem::Memory;
582    ///
583    /// let mut buf = memory.reserve(32);
584    /// buf.put_slice(*b"Hello, ");
585    /// buf.put_slice(*b"world!");
586    /// buf.put_num_be(0x2121_u16); // "!!"
587    ///
588    /// let message = buf.consume_all();
589    ///
590    /// assert_eq!(message, b"Hello, world!!!");
591    /// assert!(buf.is_empty());
592    /// ```
593    pub fn consume_all(&mut self) -> BytesView {
594        // SAFETY: Consuming len() bytes from self cannot possibly be out of bounds.
595        unsafe { self.consume_checked(self.len()).unwrap_unchecked() }
596    }
597
598    /// Splits off `count` bytes of remaining capacity from the buffer.
599    ///
600    /// Returns a new `BytesBuf` that owns that capacity and has no contents.
601    ///
602    /// The buffer's filled data (length) is not affected.
603    ///
604    /// # Example
605    ///
606    /// ```
607    /// # let memory = bytesbuf::mem::GlobalPool::new();
608    /// use bytesbuf::mem::Memory;
609    ///
610    /// let mut buf = memory.reserve(64);
611    ///
612    /// buf.put_num_be(0xDEAD_u16);
613    ///
614    /// let remaining_before = buf.remaining_capacity();
615    /// let mut split = buf.split_off_remaining(20);
616    ///
617    /// // The split-off buffer has the requested capacity and no data.
618    /// assert!(split.remaining_capacity() >= 20);
619    /// assert!(split.is_empty());
620    ///
621    /// // The original buffer lost that capacity but kept its data.
622    /// assert_eq!(buf.remaining_capacity(), remaining_before - 20);
623    /// assert_eq!(buf.len(), 2);
624    ///
625    /// // The split-off buffer can be used independently.
626    /// split.put_num_be(0xBEEF_u16);
627    /// assert_eq!(split.len(), 2);
628    /// ```
629    ///
630    /// # Panics
631    ///
632    /// Panics if `count` is greater than the [`remaining_capacity()`] of the buffer.
633    ///
634    /// [`remaining_capacity()`]: Self::remaining_capacity
635    #[must_use]
636    pub fn split_off_remaining(&mut self, count: usize) -> Self {
637        self.split_off_remaining_checked(count)
638            .expect("attempted to split off more remaining capacity than available in buffer")
639    }
640
641    /// Splits off `count` bytes of remaining capacity from the buffer.
642    ///
643    /// Returns a new `BytesBuf` that owns that capacity and has no contents, or `None`
644    /// if `count` is greater than the [`remaining_capacity()`] of the buffer.
645    ///
646    /// The buffer's filled data (length) is not affected.
647    ///
648    /// [`remaining_capacity()`]: Self::remaining_capacity
649    #[must_use]
650    #[expect(clippy::missing_panics_doc, reason = "only unreachable panics")]
651    #[cfg_attr(test, mutants::skip)] // Mutating the bounds check produces invalid SpanBuilders.
652    pub fn split_off_remaining_checked(&mut self, count: usize) -> Option<Self> {
653        if count > self.remaining_capacity() {
654            return None;
655        }
656
657        if count == 0 {
658            return Some(Self::new());
659        }
660
661        let mut result_builders: SmallVec<[SpanBuilder; MAX_INLINE_SPANS]> = SmallVec::new();
662        let mut remaining = count;
663
664        // Walk span_builders_reversed from front (logically last builders, which are empty
665        // spare-capacity builders per type invariant). Drain as many whole builders as feasible first.
666        let mut whole_span_builders_to_take: usize = 0;
667
668        for span_builder in &self.span_builders_reversed {
669            if !span_builder.is_empty() {
670                // We have reached the last span builder (logically first) because it is the
671                // only one allowed to have data in it. As it is not empty, we exit the loop
672                // and will later split this span builder to get any remaining bytes.
673                break;
674            }
675
676            let capacity_in_span_builder = span_builder.remaining_capacity();
677
678            if capacity_in_span_builder > remaining {
679                // This span builder is too big - we have to split it, cannot take entirely.
680                break;
681            }
682
683            // Will not wrap - guarded by if-statement above.
684            remaining = remaining.wrapping_sub(capacity_in_span_builder);
685            // Will not wrap as that would imply that there are more span builders than there is virtual memory.
686            whole_span_builders_to_take = whole_span_builders_to_take.wrapping_add(1);
687        }
688
689        // Drain whole builders from the front (logically last builders).
690        result_builders.extend(self.span_builders_reversed.drain(..whole_span_builders_to_take));
691
692        // If we still need more capacity, split the next builder's available capacity.
693        if remaining > 0 {
694            let span_builder = self.span_builders_reversed.first_mut().expect(
695                "remaining_capacity() check at the top ensures a builder is available because we have not received enough capacity yet",
696            );
697
698            let remaining: u32 = remaining.try_into()
699                .expect("the span builder drain loop ensures that remaining capacity comes from one memory block yet the value is too big to fit into a memory block - impossible");
700            let split_count = NonZero::new(remaining).expect("guarded by if-statement above");
701
702            result_builders.push(span_builder.split_off_available(split_count));
703        }
704
705        self.available = self.available.checked_sub(count).expect("guarded by bounds check above");
706
707        Some(Self::from_span_builders(result_builders))
708    }
709
710    /// Consumes `len` bytes from the first span builder and moves it to the frozen spans list.
711    fn freeze_from_first(&mut self, len: NonZero<BlockSize>) {
712        let span_builder = self
713            .span_builders_reversed
714            .last_mut()
715            .expect("there must be at least one span builder for it to be possible to freeze bytes");
716
717        debug_assert!(len.get() <= span_builder.len());
718
719        let span = span_builder.consume(len);
720        self.frozen_spans.push(span);
721
722        if span_builder.remaining_capacity() == 0 {
723            // No more capacity left in this builder, so drop it.
724            self.span_builders_reversed.pop();
725        }
726
727        self.frozen = self
728            .frozen
729            .checked_add(len.get() as usize)
730            .expect("usize overflow should be impossible here because the sequence builder capacity would exceed virtual memory size");
731    }
732
733    /// Ensures that the frozen spans list contains at least `len` bytes of data, freezing
734    /// additional data from the span builders if necessary.
735    ///
736    /// # Panics
737    ///
738    /// Panics if there is not enough data in the span builders to fulfill the request.
739    fn ensure_frozen(&mut self, len: usize) {
740        let must_freeze_bytes: BlockSize = len
741            .saturating_sub(self.frozen)
742            .try_into()
743            .expect("requested to freeze more bytes from the first block than can actually fit into one block");
744
745        let Some(must_freeze_bytes) = NonZero::new(must_freeze_bytes) else {
746            return;
747        };
748
749        // We only need to freeze from the first span builder because a type invariant is that
750        // only the first span builder may contain data. The others are just spare capacity.
751        self.freeze_from_first(must_freeze_bytes);
752    }
753
754    /// The first slice of memory in the remaining capacity of the buffer.
755    ///
756    /// This allows you to manually write into the buffer instead of using the various
757    /// provided convenience methods. Only the first slice of the remaining capacity is
758    /// exposed at any given time by this API.
759    ///
760    /// After writing data to the start of this slice, call [`advance()`] to indicate
761    /// how many bytes have been filled with data. The next call to `first_unfilled_slice()`
762    /// will return the next slice of memory you can write into. This slice must be
763    /// completely filled before the next slice is exposed (a partial fill will simply
764    /// return the remaining range from the same slice in the next call).
765    ///
766    /// To write to multiple slices concurrently, use [`begin_vectored_write()`].
767    #[doc = include_str!("../doc/snippets/sequence_memory_layout.md")]
768    ///
769    /// # Example
770    ///
771    /// ```
772    /// # let memory = bytesbuf::mem::GlobalPool::new();
773    /// use bytesbuf::mem::Memory;
774    ///
775    /// let mut buf = memory.reserve(64);
776    /// let data_to_write: &[u8] = b"0123456789";
777    ///
778    /// // Write data without assuming the length of first_unfilled_slice().
779    /// let mut written = 0;
780    ///
781    /// while written < data_to_write.len() {
782    ///     let dst = buf.first_unfilled_slice();
783    ///
784    ///     let bytes_to_write = dst.len().min(data_to_write.len() - written);
785    ///
786    ///     for i in 0..bytes_to_write {
787    ///         dst[i].write(data_to_write[written + i]);
788    ///     }
789    ///
790    ///     // SAFETY: We just initialized `bytes_to_write` bytes.
791    ///     unsafe {
792    ///         buf.advance(bytes_to_write);
793    ///     }
794    ///     written += bytes_to_write;
795    /// }
796    ///
797    /// assert_eq!(buf.consume_all(), b"0123456789");
798    /// ```
799    ///
800    /// [`advance()`]: Self::advance
801    /// [`begin_vectored_write()`]: Self::begin_vectored_write
802    pub fn first_unfilled_slice(&mut self) -> &mut [MaybeUninit<u8>] {
803        if let Some(last) = self.span_builders_reversed.last_mut() {
804            last.unfilled_slice_mut()
805        } else {
806            // We are required to always return something, even if we have no span builders!
807            &mut []
808        }
809    }
810
811    /// Inspects the metadata of the memory block backing [`first_unfilled_slice()`].
812    ///
813    /// `None` if there is no metadata associated with the memory block or
814    /// if the buffer has no remaining capacity.
815    ///
816    /// # Example
817    ///
818    /// ```
819    /// # let memory = bytesbuf::mem::GlobalPool::new();
820    /// # struct PageAlignedMemory;
821    /// use bytesbuf::mem::Memory;
822    ///
823    /// let mut buf = memory.reserve(64);
824    ///
825    /// let is_page_aligned = buf
826    ///     .first_unfilled_slice_meta()
827    ///     .is_some_and(|meta| meta.is::<PageAlignedMemory>());
828    ///
829    /// println!("First unfilled slice is page-aligned: {is_page_aligned}");
830    /// ```
831    ///
832    /// [`first_unfilled_slice()`]: Self::first_unfilled_slice
833    #[must_use]
834    pub fn first_unfilled_slice_meta(&self) -> Option<&dyn BlockMeta> {
835        self.span_builders_reversed.last().and_then(|sb| sb.block().meta())
836    }
837
838    /// Signals that `count` bytes have been written to the start of [`first_unfilled_slice()`].
839    ///
840    /// The next call to [`first_unfilled_slice()`] will return the next slice of memory that
841    /// can be filled with data.
842    ///
843    /// # Example
844    ///
845    /// ```
846    /// # let memory = bytesbuf::mem::GlobalPool::new();
847    /// use bytesbuf::mem::Memory;
848    ///
849    /// let mut buf = memory.reserve(64);
850    /// let data_to_write: &[u8] = b"0123456789";
851    ///
852    /// // Write data without assuming the length of first_unfilled_slice().
853    /// let mut written = 0;
854    ///
855    /// while written < data_to_write.len() {
856    ///     let dst = buf.first_unfilled_slice();
857    ///
858    ///     let bytes_to_write = dst.len().min(data_to_write.len() - written);
859    ///
860    ///     for i in 0..bytes_to_write {
861    ///         dst[i].write(data_to_write[written + i]);
862    ///     }
863    ///
864    ///     // SAFETY: We just initialized `bytes_to_write` bytes.
865    ///     unsafe {
866    ///         buf.advance(bytes_to_write);
867    ///     }
868    ///     written += bytes_to_write;
869    /// }
870    ///
871    /// assert_eq!(buf.consume_all(), b"0123456789");
872    /// ```
873    ///
874    /// # Safety
875    ///
876    /// The caller must guarantee that `count` bytes from the beginning of [`first_unfilled_slice()`]
877    /// have been initialized.
878    ///
879    /// [`first_unfilled_slice()`]: Self::first_unfilled_slice
880    #[expect(clippy::missing_panics_doc, reason = "only unreachable panics")]
881    pub unsafe fn advance(&mut self, count: usize) {
882        if count == 0 {
883            return;
884        }
885
886        // The write head can only be advanced (via this method) up to the end of the first slice, no further.
887        // This is guaranteed by our safety requirements, so we only assert this in debug builds for extra validation.
888        debug_assert!(count <= self.span_builders_reversed.last().map_or(0, SpanBuilder::remaining_capacity));
889
890        let span_builder = self
891            .span_builders_reversed
892            .last_mut()
893            .expect("there must be at least one span builder if we wrote nonzero bytes");
894
895        // SAFETY: We simply rely on the caller's safety promises here, "forwarding" them.
896        unsafe { span_builder.advance(count) };
897
898        if span_builder.remaining_capacity() == 0 {
899            // The span builder is full, so we need to freeze it and move it to the frozen spans.
900            let len = NonZero::new(span_builder.len())
901                .expect("there is no capacity left in the span builder so there must be at least one byte to consume unless we somehow left an empty span builder in the queue");
902
903            self.freeze_from_first(len);
904
905            // Debug build paranoia: no full span remains after freeze, right?
906            debug_assert!(
907                self.span_builders_reversed
908                    .last()
909                    .map_or(usize::MAX, SpanBuilder::remaining_capacity)
910                    > 0
911            );
912        }
913
914        self.len = self
915            .len
916            .checked_add(count)
917            .expect("usize overflow should be impossible here because the sequence builder capacity would exceed virtual memory size");
918
919        self.available = self
920            .available
921            .checked_sub(count)
922            .expect("guarded by assertion above - we must have at least this much capacity still available");
923    }
924
925    /// Concurrently writes data into all the byte slices that make up the buffer.
926    ///
927    /// The vectored write takes exclusive ownership of the buffer for the duration of the operation
928    /// and allows individual slices of the remaining capacity to be filled concurrently, up to an
929    /// optional limit of `max_len` bytes.
930    ///
931    /// Some I/O operations are naturally limited to a maximum number of bytes that can be
932    /// transferred, so the length limit here allows you to project a restricted view of the
933    /// available capacity without having to limit the true capacity of the buffer.
934    ///
935    /// # Example
936    ///
937    /// ```
938    /// # let memory = bytesbuf::mem::GlobalPool::new();
939    /// use std::ptr;
940    ///
941    /// use bytesbuf::mem::Memory;
942    ///
943    /// let mut buf = memory.reserve(64);
944    /// let capacity = buf.remaining_capacity();
945    ///
946    /// let mut vectored = buf.begin_vectored_write(None);
947    /// let mut slices: Vec<_> = vectored.slices_mut().map(|(s, _)| s).collect();
948    ///
949    /// // Fill all slices with 0xAE bytes.
950    /// // In practice, these could be filled concurrently by vectored I/O APIs.
951    /// let mut total_written = 0;
952    /// for slice in &mut slices {
953    ///     // SAFETY: Writing valid u8 values to the entire slice.
954    ///     unsafe {
955    ///         ptr::write_bytes(slice.as_mut_ptr(), 0xAE, slice.len());
956    ///     }
957    ///     total_written += slice.len();
958    /// }
959    ///
960    /// // SAFETY: We initialized `total_written` bytes sequentially.
961    /// unsafe {
962    ///     vectored.commit(total_written);
963    /// }
964    ///
965    /// assert_eq!(buf.len(), capacity);
966    /// ```
967    ///
968    /// # Panics
969    ///
970    /// Panics if `max_len` is greater than the remaining capacity of the buffer.
971    pub fn begin_vectored_write(&mut self, max_len: Option<usize>) -> BytesBufVectoredWrite<'_> {
972        self.begin_vectored_write_checked(max_len)
973            .expect("attempted to begin a vectored write with a max_len that was greater than the remaining capacity")
974    }
975
976    /// Concurrently writes data into all the byte slices that make up the buffer.
977    ///
978    /// The vectored write takes exclusive ownership of the buffer for the duration of the operation
979    /// and allows individual slices of the remaining capacity to be filled concurrently, up to an
980    /// optional limit of `max_len` bytes.
981    ///
982    /// Some I/O operations are naturally limited to a maximum number of bytes that can be
983    /// transferred, so the length limit here allows you to project a restricted view of the
984    /// available capacity without having to limit the true capacity of the buffer.
985    ///
986    /// # Returns
987    ///
988    /// Returns `None` if `max_len` is greater than the remaining capacity of the buffer.
989    pub fn begin_vectored_write_checked(&mut self, max_len: Option<usize>) -> Option<BytesBufVectoredWrite<'_>> {
990        if let Some(max_len) = max_len
991            && max_len > self.remaining_capacity()
992        {
993            return None;
994        }
995
996        Some(BytesBufVectoredWrite { buf: self, max_len })
997    }
998
999    fn iter_available_capacity(&mut self, max_len: Option<usize>) -> BytesBufRemaining<'_> {
1000        let next_span_builder_index = if self.span_builders_reversed.is_empty() { None } else { Some(0) };
1001
1002        BytesBufRemaining {
1003            buf: self,
1004            next_span_builder_index,
1005            max_len,
1006        }
1007    }
1008
1009    /// Extends the lifetime of the memory capacity backing this buffer.
1010    ///
1011    /// This can be useful when unsafe code is used to reference the contents of a `BytesBuf` and it
1012    /// is possible to reach a condition where the `BytesBuf` itself no longer exists, even though
1013    /// the contents are referenced (e.g. because the remaining references are in non-Rust code).
1014    pub fn extend_lifetime(&self) -> MemoryGuard {
1015        MemoryGuard::new(
1016            self.span_builders_reversed
1017                .iter()
1018                .map(SpanBuilder::block)
1019                .map(Clone::clone)
1020                .chain(self.frozen_spans.iter().map(Span::block_ref).map(Clone::clone)),
1021        )
1022    }
1023
1024    /// Converts this instance into a [`Write`][std::io::Write] adapter.
1025    ///
1026    /// The memory capacity of the `BytesBuf` will be automatically extended on demand
1027    /// with additional capacity from the supplied memory provider.
1028    ///
1029    /// # Example
1030    ///
1031    /// ```
1032    /// # let memory = bytesbuf::mem::GlobalPool::new();
1033    /// use std::io::Write;
1034    ///
1035    /// use bytesbuf::mem::Memory;
1036    ///
1037    /// let buf = memory.reserve(32);
1038    /// let mut writer = buf.into_writer(&memory);
1039    /// writer.write_all(b"Hello, ")?;
1040    /// writer.write_all(b"world!")?;
1041    /// let mut buf = writer.into_inner();
1042    ///
1043    /// assert_eq!(buf.consume_all(), b"Hello, world!");
1044    /// # Ok::<(), std::io::Error>(())
1045    /// ```
1046    #[inline]
1047    pub fn into_writer<M: Memory>(self, memory: M) -> BytesBufWriter<M> {
1048        BytesBufWriter::new(self, memory)
1049    }
1050}
1051
1052impl std::fmt::Debug for BytesBuf {
1053    #[cfg_attr(test, mutants::skip)] // We have no API contract here.
1054    #[cfg_attr(coverage_nightly, coverage(off))] // We have no API contract here.
1055    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1056        let frozen_spans = self.frozen_spans.iter().map(|x| x.len().to_string()).collect::<Vec<_>>().join(", ");
1057
1058        let span_builders = self
1059            .span_builders_reversed
1060            .iter()
1061            .rev()
1062            .map(|x| {
1063                if x.is_empty() {
1064                    x.remaining_capacity().to_string()
1065                } else {
1066                    format!("{} + {}", x.len(), x.remaining_capacity())
1067                }
1068            })
1069            .collect::<Vec<_>>()
1070            .join(", ");
1071
1072        f.debug_struct(type_name::<Self>())
1073            .field("len", &self.len)
1074            .field("frozen", &self.frozen)
1075            .field("available", &self.available)
1076            .field("frozen_spans", &frozen_spans)
1077            .field("span_builders", &span_builders)
1078            .finish()
1079    }
1080}
1081
1082/// A prepared "consume bytes" operation, identifying what must be done to perform the operation.
1083#[derive(Debug, Clone, Copy)]
1084struct ConsumeManifest {
1085    /// How many frozen spans are to be fully detached from the front of the collection.
1086    detach_complete_frozen_spans: usize,
1087
1088    /// How many bytes of data to consume from the first remaining frozen span.
1089    /// Any remainder is left within that span - the span itself is not detached.
1090    consume_partial_span_bytes: BlockSize,
1091}
1092
1093impl ConsumeManifest {
1094    const fn required_spans_capacity(&self) -> usize {
1095        if self.consume_partial_span_bytes != 0 {
1096            // This will not wrap because a type invariant is `capacity <= usize::MAX`, so if
1097            // capacity is already in-bounds, the count of spans certainly is not a greater number.
1098            self.detach_complete_frozen_spans.wrapping_add(1)
1099        } else {
1100            self.detach_complete_frozen_spans
1101        }
1102    }
1103}
1104
1105/// Coordinates concurrent write operations into a buffer's memory capacity.
1106///
1107/// The operation takes exclusive ownership of the `BytesBuf`. During the vectored write,
1108/// the remaining capacity of the `BytesBuf` is exposed as `MaybeUninit<u8>` slices
1109/// that at the end of the operation must be filled sequentially and in order, without gaps,
1110/// in any desired amount (from 0 bytes written to all slices filled).
1111///
1112/// All slices may be written to concurrently and/or in any order - consistency of the contents
1113/// is only required at the moment the write is committed.
1114///
1115/// The capacity exposed during the operation can optionally be limited to `max_len` bytes.
1116///
1117/// The operation is completed by calling `.commit()` on the instance, after which the operation is
1118/// consumed and the exclusive ownership of the `BytesBuf` released.
1119///
1120/// If the instance is dropped without committing, the operation is aborted and all remaining capacity
1121/// is left in a potentially uninitialized state.
1122#[derive(Debug)]
1123pub struct BytesBufVectoredWrite<'a> {
1124    buf: &'a mut BytesBuf,
1125    max_len: Option<usize>,
1126}
1127
1128impl BytesBufVectoredWrite<'_> {
1129    /// Iterates over the slices of available capacity of the buffer,
1130    /// together with the metadata of the memory block backing each slice.
1131    ///
1132    /// The slices returned from this iterator have the lifetime of the vectored
1133    /// write operation itself, allowing them to be mutated concurrently.
1134    pub fn slices_mut(&mut self) -> BytesBufRemaining<'_> {
1135        self.buf.iter_available_capacity(self.max_len)
1136    }
1137
1138    /// Extends the lifetime of the memory capacity backing this buffer.
1139    ///
1140    /// This can be useful when unsafe code is used to reference the contents of a `BytesBuf` and it
1141    /// is possible to reach a condition where the `BytesBuf` itself no longer exists, even though
1142    /// the contents are referenced (e.g. because the remaining references are in non-Rust code).
1143    pub fn extend_lifetime(&self) -> MemoryGuard {
1144        self.buf.extend_lifetime()
1145    }
1146
1147    /// Completes the vectored write operation, committing `bytes_written` bytes of data that
1148    /// sequentially and completely fills slices from the start of the provided slices.
1149    ///
1150    /// # Safety
1151    ///
1152    /// The caller must ensure that `bytes_written` bytes of data have actually been written
1153    /// into the slices of memory returned from `slices_mut()`, sequentially from the start.
1154    #[expect(clippy::missing_panics_doc, reason = "only unreachable panics")]
1155    pub unsafe fn commit(self, bytes_written: usize) {
1156        debug_assert!(bytes_written <= self.buf.remaining_capacity());
1157
1158        if let Some(max_len) = self.max_len {
1159            debug_assert!(bytes_written <= max_len);
1160        }
1161
1162        // Ordinarily, we have a type invariant that only the first span builder may contain data,
1163        // with the others being spare capacity. For the duration of a vectored write, this
1164        // invariant is suspended (because the vectored write has an exclusive reference which makes
1165        // the suspension of this invariant invisible to any other caller). We must now restore this
1166        // invariant. We do this by advancing the write head slice by slice, triggering the normal
1167        // freezing logic as we go (to avoid implementing two versions of the same logic), until we
1168        // have run out of written bytes to commit.
1169
1170        let mut bytes_remaining = bytes_written;
1171
1172        while bytes_remaining > 0 {
1173            let span_builder = self
1174                .buf
1175                .span_builders_reversed
1176                .last_mut()
1177                .expect("there must be at least one span builder because we still have filled capacity remaining to freeze");
1178
1179            let bytes_available = span_builder.remaining_capacity();
1180            let bytes_to_commit = bytes_available.min(bytes_remaining);
1181
1182            // SAFETY: We forward the promise from our own safety requirements to guarantee that
1183            // the specified number of bytes really has been written.
1184            unsafe { self.buf.advance(bytes_to_commit) };
1185
1186            bytes_remaining = bytes_remaining
1187                .checked_sub(bytes_to_commit)
1188                .expect("we somehow advanced the write head more than the count of written bytes");
1189        }
1190    }
1191}
1192
1193/// Exposes the remaining memory capacity of a `BytesBuf` for concurrent writes.
1194///
1195/// This is used during a vectored write operation, iterating over a sequence
1196/// of `MaybeUninit<u8>` slices that the caller can concurrently write into.
1197///
1198/// The slices may be mutated for as long as the vectored write operation exists.
1199#[derive(Debug)]
1200pub struct BytesBufRemaining<'a> {
1201    buf: &'a mut BytesBuf,
1202    next_span_builder_index: Option<usize>,
1203
1204    // Self-imposed constraint on how much of the available capacity is made visible through
1205    // this iterator. This can be useful to limit the amount of data that can be written into
1206    // a `BytesBuf` during a vectored write operation without having to limit the
1207    // actual capacity of the `BytesBuf`.
1208    max_len: Option<usize>,
1209}
1210
1211impl<'a> Iterator for BytesBufRemaining<'a> {
1212    type Item = (&'a mut [MaybeUninit<u8>], Option<&'a dyn BlockMeta>);
1213
1214    #[cfg_attr(test, mutants::skip)] // This gets mutated into an infinite loop which is not very helpful.
1215    fn next(&mut self) -> Option<Self::Item> {
1216        let next_span_builder_index = self.next_span_builder_index?;
1217
1218        self.next_span_builder_index = Some(
1219            // Will not overflow because `capacity <= usize::MAX` is a type invariant,
1220            // so the count of span builders certainly cannot be greater.
1221            next_span_builder_index.wrapping_add(1),
1222        );
1223        if self.next_span_builder_index == Some(self.buf.span_builders_reversed.len()) {
1224            self.next_span_builder_index = None;
1225        }
1226
1227        // The iterator iterates through things in content order but we need to access
1228        // the span builders in storage order.
1229        let next_span_builder_index_storage_order = self
1230            .buf
1231            .span_builders_reversed
1232            .len()
1233            // Will not overflow because `capacity <= usize::MAX` is a type invariant,
1234            // so the count of span builders certainly cannot be greater.
1235            .wrapping_sub(next_span_builder_index + 1);
1236
1237        let span_builder = self
1238            .buf
1239            .span_builders_reversed
1240            .get_mut(next_span_builder_index_storage_order)
1241            .expect("iterator cursor referenced a span builder that does not exist");
1242
1243        let meta_with_a = {
1244            let meta = span_builder.block().meta();
1245
1246            // SAFETY: The metadata reference points into the block's heap allocation, not into
1247            // the span builder's stack memory. We transmute it to 'a immediately so the immutable
1248            // borrow of `span_builder` is released before the mutable borrow below.
1249            // The metadata is valid for 'a because the BlockRef implementation guarantees metadata
1250            // lives as long as any clone of the memory block, and we hold an exclusive reference
1251            // to the BytesBuf for the lifetime 'a.
1252            unsafe { mem::transmute::<Option<&dyn BlockMeta>, Option<&'a dyn BlockMeta>>(meta) }
1253        };
1254
1255        let uninit_slice_mut = span_builder.unfilled_slice_mut();
1256
1257        // SAFETY: There is nothing Rust can do to promise the reference we return is valid for 'a
1258        // but we can make such a promise ourselves. In essence, returning the references with 'a
1259        // this will extend the exclusive ownership of `BytesBuf` until all returned chunk
1260        // references are dropped, even if the iterator itself is dropped earlier. We can do this
1261        // because we know that to access the chunks requires a reference to the `BytesBuf`,
1262        // so as long as a chunk reference exists, access via the `BytesBuf` is blocked.
1263        let uninit_slice_mut = unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &'a mut [MaybeUninit<u8>]>(&mut *uninit_slice_mut) };
1264
1265        let uninit_slice_mut = if let Some(max_len) = self.max_len {
1266            // Limit the visible range of the slice if we have a size limit.
1267            // If this results in the slice being limited to not its full size,
1268            // we will also terminate the iteration
1269            let constrained_len = uninit_slice_mut.len().min(max_len);
1270
1271            let adjusted_slice = uninit_slice_mut.get_mut(..constrained_len).expect("guarded by min() above");
1272
1273            // Will not wrap because it is guarded by min() above.
1274            self.max_len = Some(max_len.wrapping_sub(constrained_len));
1275
1276            if self.max_len == Some(0) {
1277                // Even if there are more span builders, we have returned all the capacity
1278                // we are allowed to return, so pretend there is nothing more to return.
1279                self.next_span_builder_index = None;
1280            }
1281
1282            adjusted_slice
1283        } else {
1284            uninit_slice_mut
1285        };
1286
1287        Some((uninit_slice_mut, meta_with_a))
1288    }
1289}
1290
1291impl From<BytesView> for BytesBuf {
1292    fn from(value: BytesView) -> Self {
1293        let mut buf = Self::new();
1294        buf.append(value);
1295        buf
1296    }
1297}
1298
1299#[cfg_attr(coverage_nightly, coverage(off))]
1300#[cfg(test)]
1301mod tests {
1302    #![allow(clippy::indexing_slicing, reason = "Fine in test code, we prefer panic on error")]
1303
1304    use std::pin::pin;
1305
1306    use new_zealand::nz;
1307    use static_assertions::assert_impl_all;
1308    use testing_aids::assert_panic;
1309
1310    use super::*;
1311    use crate::mem::GlobalPool;
1312    use crate::mem::testing::{FixedBlockMemory, TestMemoryBlock};
1313
1314    const U64_SIZE: usize = size_of::<u64>();
1315    const TWO_U64_SIZE: usize = size_of::<u64>() + size_of::<u64>();
1316    const THREE_U64_SIZE: usize = size_of::<u64>() + size_of::<u64>() + size_of::<u64>();
1317
1318    assert_impl_all!(BytesBuf: Send, Sync);
1319
1320    #[test]
1321    fn smoke_test() {
1322        let memory = FixedBlockMemory::new(nz!(1234));
1323
1324        let min_length = 1000;
1325
1326        let mut buf = memory.reserve(min_length);
1327
1328        assert!(buf.capacity() >= min_length);
1329        assert!(buf.remaining_capacity() >= min_length);
1330        assert_eq!(buf.capacity(), buf.remaining_capacity());
1331        assert_eq!(buf.len(), 0);
1332        assert!(buf.is_empty());
1333
1334        buf.put_num_ne(1234_u64);
1335        buf.put_num_ne(5678_u64);
1336        buf.put_num_ne(1234_u64);
1337        buf.put_num_ne(5678_u64);
1338
1339        assert_eq!(buf.len(), 32);
1340        assert!(!buf.is_empty());
1341
1342        // SAFETY: Writing 0 bytes is always valid.
1343        unsafe {
1344            buf.advance(0);
1345        }
1346
1347        let mut first_two = buf.consume(TWO_U64_SIZE);
1348        let mut second_two = buf.consume(TWO_U64_SIZE);
1349
1350        assert_eq!(first_two.len(), 16);
1351        assert_eq!(second_two.len(), 16);
1352        assert_eq!(buf.len(), 0);
1353
1354        assert_eq!(first_two.get_num_ne::<u64>(), 1234);
1355        assert_eq!(first_two.get_num_ne::<u64>(), 5678);
1356
1357        assert_eq!(second_two.get_num_ne::<u64>(), 1234);
1358        assert_eq!(second_two.get_num_ne::<u64>(), 5678);
1359
1360        buf.put_num_ne(1111_u64);
1361
1362        assert_eq!(buf.len(), 8);
1363
1364        let mut last = buf.consume(U64_SIZE);
1365
1366        assert_eq!(last.len(), 8);
1367        assert_eq!(buf.len(), 0);
1368
1369        assert_eq!(last.get_num_ne::<u64>(), 1111);
1370
1371        assert!(buf.consume_checked(1).is_none());
1372        assert!(buf.consume_all().is_empty());
1373    }
1374
1375    #[test]
1376    fn extend_capacity() {
1377        let mut buf = BytesBuf::new();
1378
1379        assert_eq!(buf.capacity(), 0);
1380        assert_eq!(buf.remaining_capacity(), 0);
1381
1382        let memory = FixedBlockMemory::new(nz!(100));
1383
1384        // Have 0, desired 10, requesting 10, will get 100.
1385        buf.reserve(10, &memory);
1386
1387        assert_eq!(buf.capacity(), 100);
1388        assert_eq!(buf.remaining_capacity(), 100);
1389
1390        // Write 10 bytes of data just to verify that it does not affect "capacity" logic.
1391        buf.put_num_ne(1234_u64);
1392        buf.put_num_ne(5678_u16);
1393
1394        assert_eq!(buf.len(), 10);
1395        assert_eq!(buf.remaining_capacity(), 90);
1396        assert_eq!(buf.capacity(), 100);
1397
1398        // Have 100, desired 10+140=150, requesting 50, will get another 100 for a total of 200.
1399        buf.reserve(140, &memory);
1400
1401        assert_eq!(buf.len(), 10);
1402        assert_eq!(buf.remaining_capacity(), 190);
1403        assert_eq!(buf.capacity(), 200);
1404
1405        // Have 200, desired 10+200=210, 210-200=10, will get another 100.
1406        buf.reserve(200, &memory);
1407
1408        assert_eq!(buf.len(), 10);
1409        assert_eq!(buf.remaining_capacity(), 290);
1410        assert_eq!(buf.capacity(), 300);
1411    }
1412
1413    #[test]
1414    fn append_existing_view() {
1415        let memory = FixedBlockMemory::new(nz!(1234));
1416
1417        let min_length = 1000;
1418
1419        // This one we use to prepare some data to append.
1420        let mut payload_buffer = memory.reserve(min_length);
1421
1422        // This is where we append the data to.
1423        let mut target_buffer = memory.reserve(min_length);
1424
1425        // First we make a couple pieces to append.
1426        payload_buffer.put_num_ne(1111_u64);
1427        payload_buffer.put_num_ne(2222_u64);
1428        payload_buffer.put_num_ne(3333_u64);
1429        payload_buffer.put_num_ne(4444_u64);
1430
1431        let payload1 = payload_buffer.consume(TWO_U64_SIZE);
1432        let payload2 = payload_buffer.consume(TWO_U64_SIZE);
1433
1434        // Then we prefill some data to start us off.
1435        target_buffer.put_num_ne(5555_u64);
1436        target_buffer.put_num_ne(6666_u64);
1437
1438        // Consume a little just for extra complexity.
1439        let _ = target_buffer.consume(U64_SIZE);
1440
1441        // Append the payloads.
1442        target_buffer.put_bytes(payload1);
1443        target_buffer.put_bytes(payload2);
1444
1445        // Appending an empty byte sequence does nothing.
1446        target_buffer.put_bytes(BytesView::default());
1447
1448        // Add some custom data at the end.
1449        target_buffer.put_num_ne(7777_u64);
1450
1451        assert_eq!(target_buffer.len(), 48);
1452
1453        let mut result = target_buffer.consume(48);
1454
1455        assert_eq!(result.get_num_ne::<u64>(), 6666);
1456        assert_eq!(result.get_num_ne::<u64>(), 1111);
1457        assert_eq!(result.get_num_ne::<u64>(), 2222);
1458        assert_eq!(result.get_num_ne::<u64>(), 3333);
1459        assert_eq!(result.get_num_ne::<u64>(), 4444);
1460        assert_eq!(result.get_num_ne::<u64>(), 7777);
1461    }
1462
1463    #[test]
1464    fn consume_all_mixed() {
1465        let mut buf = BytesBuf::new();
1466        let memory = FixedBlockMemory::new(nz!(8));
1467
1468        // Reserve some capacity and add initial data.
1469        buf.reserve(16, &memory);
1470        buf.put_num_ne(1111_u64);
1471        buf.put_num_ne(2222_u64);
1472
1473        // Consume some data (the 1111).
1474        let _ = buf.consume(8);
1475
1476        // Append a sequence (the 3333).
1477        let mut append_buf = BytesBuf::new();
1478        append_buf.reserve(8, &memory);
1479        append_buf.put_num_ne(3333_u64);
1480        let reused_bytes_to_append = append_buf.consume_all();
1481        buf.append(reused_bytes_to_append);
1482
1483        // Add more data (the 4444).
1484        buf.reserve(8, &memory);
1485        buf.put_num_ne(4444_u64);
1486
1487        // Consume all data and validate we got all the pieces.
1488        let mut result = buf.consume_all();
1489
1490        assert_eq!(result.len(), 24);
1491        assert_eq!(result.get_num_ne::<u64>(), 2222);
1492        assert_eq!(result.get_num_ne::<u64>(), 3333);
1493        assert_eq!(result.get_num_ne::<u64>(), 4444);
1494    }
1495
1496    #[test]
1497    #[expect(clippy::cognitive_complexity, reason = "test code")]
1498    fn peek_basic() {
1499        let mut buf = BytesBuf::new();
1500
1501        assert_eq!(buf.capacity(), 0);
1502        assert_eq!(buf.remaining_capacity(), 0);
1503
1504        let memory = FixedBlockMemory::new(nz!(10));
1505
1506        // Peeking an empty buffer is fine, it is just an empty BytesView in that case.
1507        let peeked = buf.peek();
1508        assert_eq!(peeked.len(), 0);
1509
1510        buf.reserve(100, &memory);
1511
1512        assert_eq!(buf.capacity(), 100);
1513
1514        buf.put_num_ne(1111_u64);
1515
1516        // We have 0 frozen spans and 10 span builders,
1517        // the first of which has 8 bytes of filled content.
1518        let mut peeked = buf.peek();
1519        assert_eq!(peeked.first_slice().len(), 8);
1520        assert_eq!(peeked.get_num_ne::<u64>(), 1111);
1521        assert_eq!(peeked.len(), 0);
1522
1523        buf.put_num_ne(2222_u64);
1524        buf.put_num_ne(3333_u64);
1525        buf.put_num_ne(4444_u64);
1526        buf.put_num_ne(5555_u64);
1527        buf.put_num_ne(6666_u64);
1528        buf.put_num_ne(7777_u64);
1529        buf.put_num_ne(8888_u64);
1530        // These will cross a span boundary so we can also observe
1531        // crossing that boundary during peeking.
1532        buf.put_byte_repeated(9, 8);
1533
1534        assert_eq!(buf.len(), 72);
1535        assert_eq!(buf.capacity(), 100);
1536        assert_eq!(buf.remaining_capacity(), 28);
1537
1538        // We should have 7 frozen spans and 3 span builders,
1539        // the first of which has 2 bytes of filled content.
1540        let mut peeked = buf.peek();
1541
1542        assert_eq!(peeked.len(), 72);
1543
1544        // This should be the first frozen span of 10 bytes.
1545        assert_eq!(peeked.first_slice().len(), 10);
1546
1547        assert_eq!(peeked.get_num_ne::<u64>(), 1111);
1548        assert_eq!(peeked.get_num_ne::<u64>(), 2222);
1549
1550        // The length of the buffer does not change just because we peek at its data.
1551        assert_eq!(buf.len(), 72);
1552
1553        // We consumed 16 bytes from the peeked view, so should be looking at the remaining 4 bytes in the 2nd span.
1554        assert_eq!(peeked.first_slice().len(), 4);
1555
1556        assert_eq!(peeked.get_num_ne::<u64>(), 3333);
1557        assert_eq!(peeked.get_num_ne::<u64>(), 4444);
1558        assert_eq!(peeked.get_num_ne::<u64>(), 5555);
1559        assert_eq!(peeked.get_num_ne::<u64>(), 6666);
1560        assert_eq!(peeked.get_num_ne::<u64>(), 7777);
1561        assert_eq!(peeked.get_num_ne::<u64>(), 8888);
1562
1563        for _ in 0..8 {
1564            assert_eq!(peeked.get_byte(), 9);
1565        }
1566
1567        assert_eq!(peeked.len(), 0);
1568        assert_eq!(peeked.first_slice().len(), 0);
1569
1570        // Fill up the remaining 28 bytes of data so we have a full sequence builder.
1571        buf.put_byte_repeated(88, 28);
1572
1573        let mut peeked = buf.peek();
1574        peeked.advance(72);
1575
1576        assert_eq!(peeked.len(), 28);
1577
1578        for _ in 0..28 {
1579            assert_eq!(peeked.get_byte(), 88);
1580        }
1581    }
1582
1583    #[test]
1584    fn consume_part_of_frozen_span() {
1585        let mut buf = BytesBuf::new();
1586
1587        assert_eq!(buf.capacity(), 0);
1588        assert_eq!(buf.remaining_capacity(), 0);
1589
1590        let memory = FixedBlockMemory::new(nz!(10));
1591
1592        buf.reserve(100, &memory);
1593
1594        assert_eq!(buf.capacity(), 100);
1595
1596        buf.put_num_ne(1111_u64);
1597        // This freezes the first span of 10, as we filled it all up.
1598        buf.put_num_ne(2222_u64);
1599
1600        let mut first8 = buf.consume(U64_SIZE);
1601        assert_eq!(first8.get_num_ne::<u64>(), 1111);
1602        assert!(first8.is_empty());
1603
1604        buf.put_num_ne(3333_u64);
1605
1606        let mut second16 = buf.consume(16);
1607        assert_eq!(second16.get_num_ne::<u64>(), 2222);
1608        assert_eq!(second16.get_num_ne::<u64>(), 3333);
1609        assert!(second16.is_empty());
1610    }
1611
1612    #[test]
1613    fn empty_buffer() {
1614        let mut buf = BytesBuf::new();
1615        assert!(buf.is_empty());
1616        assert!(buf.peek().is_empty());
1617        assert_eq!(0, buf.first_unfilled_slice().len());
1618
1619        let consumed = buf.consume(0);
1620        assert!(consumed.is_empty());
1621
1622        let consumed = buf.consume_all();
1623        assert!(consumed.is_empty());
1624    }
1625
1626    #[test]
1627    fn iter_available_empty_with_capacity() {
1628        let mut buf = BytesBuf::new();
1629
1630        assert_eq!(buf.capacity(), 0);
1631        assert_eq!(buf.remaining_capacity(), 0);
1632
1633        let memory = FixedBlockMemory::new(nz!(100));
1634
1635        // Capacity: 0 -> 1000 (10x100)
1636        buf.reserve(1000, &memory);
1637
1638        assert_eq!(buf.capacity(), 1000);
1639        assert_eq!(buf.remaining_capacity(), 1000);
1640
1641        let iter = buf.iter_available_capacity(None);
1642
1643        // Demonstrating that we can access slices concurrently, not only one by one.
1644        let slices: Vec<_> = iter.map(|(s, _)| s).collect();
1645
1646        assert_eq!(slices.len(), 10);
1647
1648        for slice in slices {
1649            assert_eq!(slice.len(), 100);
1650        }
1651
1652        // After we have dropped all slice references, it is again legal to access the buffer.
1653        // This is blocked by the borrow checker while slice references still exist.
1654        buf.reserve(100, &memory);
1655    }
1656
1657    #[test]
1658    fn iter_available_nonempty() {
1659        let mut buf = BytesBuf::new();
1660
1661        assert_eq!(buf.capacity(), 0);
1662        assert_eq!(buf.remaining_capacity(), 0);
1663
1664        let memory = FixedBlockMemory::new(nz!(8));
1665
1666        // Capacity: 0 -> 16 (2x8)
1667        buf.reserve(TWO_U64_SIZE, &memory);
1668
1669        assert_eq!(buf.capacity(), 16);
1670        assert_eq!(buf.remaining_capacity(), 16);
1671
1672        // We write an u64 - this fills half the capacity and should result in
1673        // the first span builder being frozen and the second remaining in its entirety.
1674        buf.put_num_ne(1234_u64);
1675
1676        assert_eq!(buf.len(), 8);
1677        assert_eq!(buf.remaining_capacity(), 8);
1678
1679        let available_slices: Vec<_> = buf.iter_available_capacity(None).map(|(s, _)| s).collect();
1680        assert_eq!(available_slices.len(), 1);
1681        assert_eq!(available_slices[0].len(), 8);
1682
1683        // We write a u32 - this fills half the remaining capacity, which results
1684        // in a half-filled span builder remaining in the buffer.
1685        buf.put_num_ne(5678_u32);
1686
1687        assert_eq!(buf.len(), 12);
1688        assert_eq!(buf.remaining_capacity(), 4);
1689
1690        let available_slices: Vec<_> = buf.iter_available_capacity(None).map(|(s, _)| s).collect();
1691        assert_eq!(available_slices.len(), 1);
1692        assert_eq!(available_slices[0].len(), 4);
1693
1694        // We write a final u32 to use up all the capacity.
1695        buf.put_num_ne(9012_u32);
1696
1697        assert_eq!(buf.len(), 16);
1698        assert_eq!(buf.remaining_capacity(), 0);
1699
1700        assert_eq!(buf.iter_available_capacity(None).count(), 0);
1701    }
1702
1703    #[test]
1704    fn iter_available_empty_no_capacity() {
1705        let mut buf = BytesBuf::new();
1706
1707        assert_eq!(buf.capacity(), 0);
1708        assert_eq!(buf.remaining_capacity(), 0);
1709        assert_eq!(buf.iter_available_capacity(None).count(), 0);
1710    }
1711
1712    #[test]
1713    fn vectored_write_zero() {
1714        let mut buf = BytesBuf::new();
1715
1716        assert_eq!(buf.capacity(), 0);
1717        assert_eq!(buf.remaining_capacity(), 0);
1718
1719        let memory = FixedBlockMemory::new(nz!(8));
1720
1721        // Capacity: 0 -> 16 (2x8)
1722        buf.reserve(TWO_U64_SIZE, &memory);
1723
1724        assert_eq!(buf.capacity(), 16);
1725        assert_eq!(buf.remaining_capacity(), 16);
1726
1727        let vectored_write = buf.begin_vectored_write(None);
1728
1729        // SAFETY: Yes, we really wrote 0 bytes.
1730        unsafe {
1731            vectored_write.commit(0);
1732        }
1733
1734        assert_eq!(buf.capacity(), 16);
1735        assert_eq!(buf.remaining_capacity(), 16);
1736    }
1737
1738    #[test]
1739    fn vectored_write_one_slice() {
1740        let mut buf = BytesBuf::new();
1741
1742        assert_eq!(buf.capacity(), 0);
1743        assert_eq!(buf.remaining_capacity(), 0);
1744
1745        let memory = FixedBlockMemory::new(nz!(8));
1746
1747        // Capacity: 0 -> 8 (1x8)
1748        buf.reserve(U64_SIZE, &memory);
1749
1750        assert_eq!(buf.capacity(), 8);
1751        assert_eq!(buf.remaining_capacity(), 8);
1752
1753        let mut vectored_write = buf.begin_vectored_write(None);
1754
1755        let mut slices: Vec<_> = vectored_write.slices_mut().map(|(s, _)| s).collect();
1756        assert_eq!(slices.len(), 1);
1757        assert_eq!(slices[0].len(), 8);
1758
1759        write_copy_of_slice(slices[0], &0x3333_3333_3333_3333_u64.to_ne_bytes());
1760
1761        // SAFETY: Yes, we really wrote 8 bytes.
1762        unsafe {
1763            vectored_write.commit(8);
1764        }
1765
1766        assert_eq!(buf.len(), 8);
1767        assert_eq!(buf.remaining_capacity(), 0);
1768        assert_eq!(buf.capacity(), 8);
1769
1770        let mut result = buf.consume(U64_SIZE);
1771        assert_eq!(result.get_num_ne::<u64>(), 0x3333_3333_3333_3333);
1772    }
1773
1774    #[test]
1775    fn vectored_write_multiple_slices() {
1776        let mut buf = BytesBuf::new();
1777
1778        assert_eq!(buf.capacity(), 0);
1779        assert_eq!(buf.remaining_capacity(), 0);
1780
1781        let memory = FixedBlockMemory::new(nz!(8));
1782
1783        // Capacity: 0 -> 24 (3x8)
1784        buf.reserve(THREE_U64_SIZE, &memory);
1785
1786        assert_eq!(buf.capacity(), 24);
1787        assert_eq!(buf.remaining_capacity(), 24);
1788
1789        let mut vectored_write = buf.begin_vectored_write(None);
1790
1791        let mut slices: Vec<_> = vectored_write.slices_mut().map(|(s, _)| s).collect();
1792        assert_eq!(slices.len(), 3);
1793        assert_eq!(slices[0].len(), 8);
1794        assert_eq!(slices[1].len(), 8);
1795        assert_eq!(slices[2].len(), 8);
1796
1797        // We fill 12 bytes, leaving middle chunk split in half between filled/available.
1798
1799        write_copy_of_slice(slices[0], &0x3333_3333_3333_3333_u64.to_ne_bytes());
1800        write_copy_of_slice(slices[1], &0x4444_4444_u32.to_ne_bytes());
1801
1802        // SAFETY: Yes, we really wrote 12 bytes.
1803        unsafe {
1804            vectored_write.commit(12);
1805        }
1806
1807        assert_eq!(buf.len(), 12);
1808        assert_eq!(buf.remaining_capacity(), 12);
1809        assert_eq!(buf.capacity(), 24);
1810
1811        let mut vectored_write = buf.begin_vectored_write(None);
1812
1813        let mut slices: Vec<_> = vectored_write.slices_mut().map(|(s, _)| s).collect();
1814        assert_eq!(slices.len(), 2);
1815        assert_eq!(slices[0].len(), 4);
1816        assert_eq!(slices[1].len(), 8);
1817
1818        // We fill the remaining 12 bytes.
1819
1820        write_copy_of_slice(slices[0], &0x5555_5555_u32.to_ne_bytes());
1821        write_copy_of_slice(slices[1], &0x6666_6666_6666_6666_u64.to_ne_bytes());
1822
1823        // SAFETY: Yes, we really wrote 12 bytes.
1824        unsafe {
1825            vectored_write.commit(12);
1826        }
1827
1828        assert_eq!(buf.len(), 24);
1829        assert_eq!(buf.remaining_capacity(), 0);
1830        assert_eq!(buf.capacity(), 24);
1831
1832        let mut result = buf.consume(THREE_U64_SIZE);
1833        assert_eq!(result.get_num_ne::<u64>(), 0x3333_3333_3333_3333);
1834        assert_eq!(result.get_num_ne::<u32>(), 0x4444_4444);
1835        assert_eq!(result.get_num_ne::<u32>(), 0x5555_5555);
1836        assert_eq!(result.get_num_ne::<u64>(), 0x6666_6666_6666_6666);
1837    }
1838
1839    #[test]
1840    fn vectored_write_max_len() {
1841        let mut buf = BytesBuf::new();
1842
1843        assert_eq!(buf.capacity(), 0);
1844        assert_eq!(buf.remaining_capacity(), 0);
1845
1846        let memory = FixedBlockMemory::new(nz!(8));
1847
1848        // Capacity: 0 -> 24 (3x8)
1849        buf.reserve(THREE_U64_SIZE, &memory);
1850
1851        assert_eq!(buf.capacity(), 24);
1852        assert_eq!(buf.remaining_capacity(), 24);
1853
1854        // We limit to 13 bytes of visible capacity, of which we will fill 12.
1855        let mut vectored_write = buf.begin_vectored_write(Some(13));
1856
1857        let mut slices: Vec<_> = vectored_write.slices_mut().map(|(s, _)| s).collect();
1858        assert_eq!(slices.len(), 2);
1859        assert_eq!(slices[0].len(), 8);
1860        assert_eq!(slices[1].len(), 5);
1861
1862        // We fill 12 bytes, leaving middle chunk split in half between filled/available.
1863
1864        write_copy_of_slice(slices[0], &0x3333_3333_3333_3333_u64.to_ne_bytes());
1865        write_copy_of_slice(slices[1], &0x4444_4444_u32.to_ne_bytes());
1866
1867        // SAFETY: Yes, we really wrote 12 bytes.
1868        unsafe {
1869            vectored_write.commit(12);
1870        }
1871
1872        assert_eq!(buf.len(), 12);
1873        assert_eq!(buf.remaining_capacity(), 12);
1874        assert_eq!(buf.capacity(), 24);
1875
1876        // There are 12 remaining and we set max_limit to exactly cover those 12
1877        let mut vectored_write = buf.begin_vectored_write(Some(12));
1878
1879        let mut slices: Vec<_> = vectored_write.slices_mut().map(|(s, _)| s).collect();
1880        assert_eq!(slices.len(), 2);
1881        assert_eq!(slices[0].len(), 4);
1882        assert_eq!(slices[1].len(), 8);
1883
1884        write_copy_of_slice(slices[0], &0x5555_5555_u32.to_ne_bytes());
1885        write_copy_of_slice(slices[1], &0x6666_6666_6666_6666_u64.to_ne_bytes());
1886
1887        // SAFETY: Yes, we really wrote 12 bytes.
1888        unsafe {
1889            vectored_write.commit(12);
1890        }
1891
1892        assert_eq!(buf.len(), 24);
1893        assert_eq!(buf.remaining_capacity(), 0);
1894        assert_eq!(buf.capacity(), 24);
1895
1896        let mut result = buf.consume(THREE_U64_SIZE);
1897        assert_eq!(result.get_num_ne::<u64>(), 0x3333_3333_3333_3333);
1898        assert_eq!(result.get_num_ne::<u32>(), 0x4444_4444);
1899        assert_eq!(result.get_num_ne::<u32>(), 0x5555_5555);
1900        assert_eq!(result.get_num_ne::<u64>(), 0x6666_6666_6666_6666);
1901    }
1902
1903    #[test]
1904    fn vectored_write_max_len_overflow() {
1905        let mut buf = BytesBuf::new();
1906
1907        let memory = FixedBlockMemory::new(nz!(8));
1908
1909        // Capacity: 0 -> 24 (3x8)
1910        buf.reserve(THREE_U64_SIZE, &memory);
1911
1912        assert_eq!(buf.capacity(), 24);
1913        assert_eq!(buf.remaining_capacity(), 24);
1914
1915        // We ask for 25 bytes of capacity but there are only 24 available. Oops!
1916        assert_panic!(buf.begin_vectored_write(Some(25)));
1917    }
1918
1919    #[test]
1920    fn vectored_write_overcommit() {
1921        let mut buf = BytesBuf::new();
1922
1923        assert_eq!(buf.capacity(), 0);
1924        assert_eq!(buf.remaining_capacity(), 0);
1925
1926        let memory = FixedBlockMemory::new(nz!(8));
1927
1928        // Capacity: 0 -> 16 (2x8)
1929        buf.reserve(TWO_U64_SIZE, &memory);
1930
1931        assert_eq!(buf.capacity(), 16);
1932        assert_eq!(buf.remaining_capacity(), 16);
1933
1934        let vectored_write = buf.begin_vectored_write(None);
1935
1936        assert_panic!(
1937            // SAFETY: Intentionally lying here to trigger a panic.
1938            unsafe {
1939                vectored_write.commit(17);
1940            }
1941        );
1942    }
1943
1944    #[test]
1945    fn vectored_write_abort() {
1946        let mut buf = BytesBuf::new();
1947
1948        assert_eq!(buf.capacity(), 0);
1949        assert_eq!(buf.remaining_capacity(), 0);
1950
1951        let memory = FixedBlockMemory::new(nz!(8));
1952
1953        // Capacity: 0 -> 8 (1x8)
1954        buf.reserve(U64_SIZE, &memory);
1955
1956        assert_eq!(buf.capacity(), 8);
1957        assert_eq!(buf.remaining_capacity(), 8);
1958
1959        let mut vectored_write = buf.begin_vectored_write(None);
1960
1961        let mut slices: Vec<_> = vectored_write.slices_mut().map(|(s, _)| s).collect();
1962        assert_eq!(slices.len(), 1);
1963        assert_eq!(slices[0].len(), 8);
1964
1965        write_copy_of_slice(slices[0], &0x3333_3333_3333_3333_u64.to_ne_bytes());
1966
1967        // Actually never mind - we drop it here.
1968        #[expect(clippy::drop_non_drop, reason = "Just being explicit for illustration")]
1969        drop(vectored_write);
1970
1971        assert_eq!(buf.len(), 0);
1972        assert_eq!(buf.remaining_capacity(), 8);
1973        assert_eq!(buf.capacity(), 8);
1974    }
1975
1976    #[test]
1977    fn extend_lifetime_references_all_blocks() {
1978        // We need to detect here whether a block is being released (i.e. ref count goes to zero).
1979
1980        // SAFETY: We are not allowed to drop this until all BlockRef are gone. This is fine
1981        // because it is dropped at the end of the function, after all BlockRef instances.
1982        let block1 = unsafe { TestMemoryBlock::new(nz!(8), None) };
1983        let block1 = pin!(block1);
1984
1985        // SAFETY: We are not allowed to drop this until all BlockRef are gone. This is fine
1986        // because it is dropped at the end of the function, after all BlockRef instances.
1987        let block2 = unsafe { TestMemoryBlock::new(nz!(8), None) };
1988        let block2 = pin!(block2);
1989
1990        let guard = {
1991            // SAFETY: We guarantee exclusive access to the memory capacity.
1992            let block1 = unsafe { block1.as_ref().to_block() };
1993            // SAFETY: We guarantee exclusive access to the memory capacity.
1994            let block2 = unsafe { block2.as_ref().to_block() };
1995
1996            let mut buf = BytesBuf::from_blocks([block1, block2]);
1997
1998            // Freezes first span of 8, retains one span builder.
1999            buf.put_num_ne(1234_u64);
2000
2001            assert_eq!(buf.frozen_spans.len(), 1);
2002            assert_eq!(buf.span_builders_reversed.len(), 1);
2003
2004            buf.extend_lifetime()
2005        };
2006
2007        // The sequence builder was destroyed and all BlockRefs it was holding are gone.
2008        // However, the lifetime guard is still alive and has a BlockRef.
2009
2010        assert_eq!(block1.ref_count(), 1);
2011        assert_eq!(block2.ref_count(), 1);
2012
2013        drop(guard);
2014
2015        // And now they should all be dead.
2016        assert_eq!(block1.ref_count(), 0);
2017        assert_eq!(block2.ref_count(), 0);
2018    }
2019
2020    #[test]
2021    fn extend_lifetime_during_vectored_write_references_all_blocks() {
2022        // We need to detect here whether a block is being released (i.e. ref count goes to zero).
2023
2024        // SAFETY: We are not allowed to drop this until all BlockRef are gone. This is fine
2025        // because it is dropped at the end of the function, after all BlockRef instances.
2026        let block1 = unsafe { TestMemoryBlock::new(nz!(8), None) };
2027        let block1 = pin!(block1);
2028
2029        // SAFETY: We are not allowed to drop this until all BlockRef are gone. This is fine
2030        // because it is dropped at the end of the function, after all BlockRef instances.
2031        let block2 = unsafe { TestMemoryBlock::new(nz!(8), None) };
2032        let block2 = pin!(block2);
2033
2034        let guard = {
2035            // SAFETY: We guarantee exclusive access to the memory capacity.
2036            let block1 = unsafe { block1.as_ref().to_block() };
2037            // SAFETY: We guarantee exclusive access to the memory capacity.
2038            let block2 = unsafe { block2.as_ref().to_block() };
2039
2040            let mut buf = BytesBuf::from_blocks([block1, block2]);
2041
2042            // Freezes first span of 8, retains one span builder.
2043            buf.put_num_ne(1234_u64);
2044
2045            assert_eq!(buf.frozen_spans.len(), 1);
2046            assert_eq!(buf.span_builders_reversed.len(), 1);
2047
2048            let vectored_write = buf.begin_vectored_write(None);
2049
2050            vectored_write.extend_lifetime()
2051        };
2052
2053        // The sequence builder was destroyed and all BlockRefs it was holding are gone.
2054        // However, the lifetime guard is still alive and has a BlockRef.
2055
2056        assert_eq!(block1.ref_count(), 1);
2057        assert_eq!(block2.ref_count(), 1);
2058
2059        drop(guard);
2060
2061        // And now they should all be dead.
2062        assert_eq!(block1.ref_count(), 0);
2063        assert_eq!(block2.ref_count(), 0);
2064    }
2065
2066    #[test]
2067    fn from_view() {
2068        let memory = GlobalPool::new();
2069
2070        let view1 = BytesView::copied_from_slice(b"bla bla bla", &memory);
2071
2072        let mut buf: BytesBuf = view1.clone().into();
2073
2074        let view2 = buf.consume_all();
2075
2076        assert_eq!(view1, view2);
2077    }
2078
2079    #[test]
2080    fn consume_manifest_correctly_calculated() {
2081        let memory = FixedBlockMemory::new(nz!(10));
2082
2083        let mut buf = BytesBuf::new();
2084        buf.reserve(100, &memory);
2085
2086        // 32 bytes in 3 spans.
2087        buf.put_num_ne(1111_u64);
2088        buf.put_num_ne(1111_u64);
2089        buf.put_num_ne(1111_u64);
2090        buf.put_num_ne(1111_u64);
2091
2092        // Freeze it all - a precondition to consuming is to freeze everything first.
2093        buf.ensure_frozen(32);
2094
2095        let consume8 = buf.prepare_consume(8);
2096
2097        assert_eq!(consume8.detach_complete_frozen_spans, 0);
2098        assert_eq!(consume8.consume_partial_span_bytes, 8);
2099        assert_eq!(consume8.required_spans_capacity(), 1);
2100
2101        let consume10 = buf.prepare_consume(10);
2102
2103        assert_eq!(consume10.detach_complete_frozen_spans, 1);
2104        assert_eq!(consume10.consume_partial_span_bytes, 0);
2105        assert_eq!(consume10.required_spans_capacity(), 1);
2106
2107        let consume11 = buf.prepare_consume(11);
2108
2109        assert_eq!(consume11.detach_complete_frozen_spans, 1);
2110        assert_eq!(consume11.consume_partial_span_bytes, 1);
2111        assert_eq!(consume11.required_spans_capacity(), 2);
2112
2113        let consume30 = buf.prepare_consume(30);
2114
2115        assert_eq!(consume30.detach_complete_frozen_spans, 3);
2116        assert_eq!(consume30.consume_partial_span_bytes, 0);
2117        assert_eq!(consume30.required_spans_capacity(), 3);
2118
2119        let consume31 = buf.prepare_consume(31);
2120
2121        assert_eq!(consume31.detach_complete_frozen_spans, 3);
2122        assert_eq!(consume31.consume_partial_span_bytes, 1);
2123        assert_eq!(consume31.required_spans_capacity(), 4);
2124
2125        let consume32 = buf.prepare_consume(32);
2126
2127        // Note that even though our memory comes in blocks of 10, there are only 2 bytes
2128        // in the last frozen span, for a total frozen of 10 + 10 + 10 + 2. We consume it all.
2129        // Frozen spans do not have to be full memory blocks!
2130        assert_eq!(consume32.detach_complete_frozen_spans, 4);
2131        assert_eq!(consume32.consume_partial_span_bytes, 0);
2132        assert_eq!(consume32.required_spans_capacity(), 4);
2133    }
2134
2135    #[test]
2136    fn size_change_detector() {
2137        // The point of this is not to say that we expect it to have a specific size but to allow
2138        // us to easily detect when the size changes and (if we choose to) bless the change.
2139        // We assume 64-bit pointers - any support for 32-bit is problem for the future.
2140        assert_eq!(size_of::<BytesBuf>(), 552);
2141    }
2142
2143    #[test]
2144    fn peek_empty_builder() {
2145        let buf = BytesBuf::new();
2146        let peeked = buf.peek();
2147
2148        assert!(peeked.is_empty());
2149        assert_eq!(peeked.len(), 0);
2150    }
2151
2152    #[test]
2153    fn peek_with_frozen_spans_only() {
2154        let memory = FixedBlockMemory::new(nz!(10));
2155        let mut buf = BytesBuf::new();
2156
2157        buf.reserve(20, &memory);
2158        buf.put_num_ne(0x1111_1111_1111_1111_u64);
2159        buf.put_num_ne(0x2222_2222_2222_2222_u64);
2160        // Both blocks are now frozen (filled completely)
2161        assert_eq!(buf.len(), 16);
2162
2163        let mut peeked = buf.peek();
2164
2165        assert_eq!(peeked.len(), 16);
2166        assert_eq!(peeked.get_num_ne::<u64>(), 0x1111_1111_1111_1111);
2167        assert_eq!(peeked.get_num_ne::<u64>(), 0x2222_2222_2222_2222);
2168
2169        // Original builder still has the data
2170        assert_eq!(buf.len(), 16);
2171    }
2172
2173    #[test]
2174    fn peek_with_partially_filled_span_builder() {
2175        let memory = FixedBlockMemory::new(nz!(10));
2176        let mut buf = BytesBuf::new();
2177
2178        buf.reserve(10, &memory);
2179        buf.put_num_ne(0x3333_3333_3333_3333_u64);
2180        buf.put_num_ne(0x4444_u16);
2181        // We have 10 bytes filled in a 10-byte block
2182        assert_eq!(buf.len(), 10);
2183
2184        let mut peeked = buf.peek();
2185
2186        assert_eq!(peeked.len(), 10);
2187        assert_eq!(peeked.get_num_ne::<u64>(), 0x3333_3333_3333_3333);
2188        assert_eq!(peeked.get_num_ne::<u16>(), 0x4444);
2189
2190        // Original builder still has the data
2191        assert_eq!(buf.len(), 10);
2192    }
2193
2194    #[test]
2195    fn peek_preserves_capacity_of_partial_span_builder() {
2196        let memory = FixedBlockMemory::new(nz!(20));
2197        let mut buf = BytesBuf::new();
2198
2199        buf.reserve(20, &memory);
2200        buf.put_num_ne(0x5555_5555_5555_5555_u64);
2201
2202        // We have 8 bytes filled and 12 bytes remaining capacity
2203        assert_eq!(buf.len(), 8);
2204        assert_eq!(buf.remaining_capacity(), 12);
2205
2206        let mut peeked = buf.peek();
2207
2208        assert_eq!(peeked.len(), 8);
2209        assert_eq!(peeked.get_num_ne::<u64>(), 0x5555_5555_5555_5555);
2210
2211        // CRITICAL TEST: Capacity should be preserved
2212        assert_eq!(buf.len(), 8);
2213        assert_eq!(buf.remaining_capacity(), 12);
2214
2215        // We should still be able to write more data
2216        buf.put_num_ne(0x6666_6666_u32);
2217        assert_eq!(buf.len(), 12);
2218        assert_eq!(buf.remaining_capacity(), 8);
2219
2220        // And we can peek again to see the updated data
2221        let mut peeked2 = buf.peek();
2222        assert_eq!(peeked2.len(), 12);
2223        assert_eq!(peeked2.get_num_ne::<u64>(), 0x5555_5555_5555_5555);
2224        assert_eq!(peeked2.get_num_ne::<u32>(), 0x6666_6666);
2225    }
2226
2227    #[test]
2228    fn peek_with_mixed_frozen_and_unfrozen() {
2229        let memory = FixedBlockMemory::new(nz!(10));
2230        let mut buf = BytesBuf::new();
2231
2232        buf.reserve(30, &memory);
2233
2234        // Fill first block completely (10 bytes) - will be frozen
2235        buf.put_num_ne(0x1111_1111_1111_1111_u64);
2236        buf.put_num_ne(0x2222_u16);
2237
2238        // Fill second block completely (10 bytes) - will be frozen
2239        buf.put_num_ne(0x3333_3333_3333_3333_u64);
2240        buf.put_num_ne(0x4444_u16);
2241
2242        // Partially fill third block (only 4 bytes) - will remain unfrozen
2243        buf.put_num_ne(0x5555_5555_u32);
2244
2245        assert_eq!(buf.len(), 24);
2246        assert_eq!(buf.remaining_capacity(), 6);
2247
2248        let mut peeked = buf.peek();
2249
2250        assert_eq!(peeked.len(), 24);
2251        assert_eq!(peeked.get_num_ne::<u64>(), 0x1111_1111_1111_1111);
2252        assert_eq!(peeked.get_num_ne::<u16>(), 0x2222);
2253        assert_eq!(peeked.get_num_ne::<u64>(), 0x3333_3333_3333_3333);
2254        assert_eq!(peeked.get_num_ne::<u16>(), 0x4444);
2255        assert_eq!(peeked.get_num_ne::<u32>(), 0x5555_5555);
2256        // Original builder still has all the data and capacity
2257        assert_eq!(buf.len(), 24);
2258        assert_eq!(buf.remaining_capacity(), 6);
2259    }
2260
2261    #[test]
2262    fn peek_then_consume() {
2263        let memory = FixedBlockMemory::new(nz!(20));
2264        let mut buf = BytesBuf::new();
2265
2266        buf.reserve(20, &memory);
2267        buf.put_num_ne(0x7777_7777_7777_7777_u64);
2268        buf.put_num_ne(0x8888_8888_u32);
2269        assert_eq!(buf.len(), 12);
2270
2271        // Peek at the data
2272        let mut peeked = buf.peek();
2273        assert_eq!(peeked.len(), 12);
2274        assert_eq!(peeked.get_num_ne::<u64>(), 0x7777_7777_7777_7777);
2275
2276        // Original builder still has the data
2277        assert_eq!(buf.len(), 12);
2278
2279        // Now consume some of it
2280        let mut consumed = buf.consume(8);
2281        assert_eq!(consumed.get_num_ne::<u64>(), 0x7777_7777_7777_7777);
2282
2283        // Builder should have less data now
2284        assert_eq!(buf.len(), 4);
2285
2286        // Peek again should show the remaining data
2287        let mut peeked2 = buf.peek();
2288        assert_eq!(peeked2.len(), 4);
2289        assert_eq!(peeked2.get_num_ne::<u32>(), 0x8888_8888);
2290    }
2291
2292    #[test]
2293    fn peek_multiple_times() {
2294        let memory = FixedBlockMemory::new(nz!(20));
2295        let mut buf = BytesBuf::new();
2296
2297        buf.reserve(20, &memory);
2298        buf.put_num_ne(0xAAAA_AAAA_AAAA_AAAA_u64);
2299
2300        // Peek multiple times - each should work independently
2301        let mut peeked1 = buf.peek();
2302        let mut peeked2 = buf.peek();
2303
2304        assert_eq!(peeked1.get_num_ne::<u64>(), 0xAAAA_AAAA_AAAA_AAAA);
2305        assert_eq!(peeked2.get_num_ne::<u64>(), 0xAAAA_AAAA_AAAA_AAAA);
2306
2307        // Original builder still intact
2308        assert_eq!(buf.len(), 8);
2309    }
2310
2311    #[test]
2312    fn first_unfilled_slice_meta_no_capacity() {
2313        let buf = BytesBuf::new();
2314        assert!(buf.first_unfilled_slice_meta().is_none());
2315    }
2316
2317    #[test]
2318    fn first_unfilled_slice_meta_no_meta() {
2319        let memory = FixedBlockMemory::new(nz!(64));
2320        let buf = memory.reserve(64);
2321        assert!(buf.first_unfilled_slice_meta().is_none());
2322    }
2323
2324    #[test]
2325    fn first_unfilled_slice_meta_with_meta() {
2326        #[derive(Debug)]
2327        struct CustomMeta;
2328
2329        impl BlockMeta for CustomMeta {}
2330
2331        // SAFETY: We are not allowed to drop this until all BlockRef are gone. This is fine
2332        // because it is dropped at the end of the function, after all BlockRef instances.
2333        let block = unsafe { TestMemoryBlock::new(nz!(100), Some(Box::new(CustomMeta))) };
2334        let block = pin!(block);
2335
2336        // SAFETY: We guarantee exclusive access to the memory capacity.
2337        let block = unsafe { block.as_ref().to_block() };
2338
2339        let buf = BytesBuf::from_blocks([block]);
2340        let meta = buf.first_unfilled_slice_meta().expect("should have metadata");
2341        assert!(meta.is::<CustomMeta>());
2342        assert!(!meta.is::<u8>());
2343    }
2344
2345    // To be stabilized soon: https://github.com/rust-lang/rust/issues/79995
2346    fn write_copy_of_slice(dst: &mut [MaybeUninit<u8>], src: &[u8]) {
2347        assert!(dst.len() >= src.len());
2348
2349        // SAFETY: We have verified that dst is large enough.
2350        unsafe {
2351            src.as_ptr().copy_to_nonoverlapping(dst.as_mut_ptr().cast(), src.len());
2352        }
2353    }
2354
2355    #[test]
2356    fn split_off_remaining_basic() {
2357        let memory = FixedBlockMemory::new(nz!(100));
2358        let mut buf = memory.reserve(100);
2359
2360        buf.put_num_ne(1234_u64);
2361
2362        let remaining_before_split = buf.remaining_capacity();
2363
2364        let split = buf.split_off_remaining(20);
2365
2366        // Original keeps its data and lost the split capacity.
2367        assert_eq!(buf.len(), U64_SIZE);
2368        assert_eq!(buf.remaining_capacity(), remaining_before_split - 20);
2369
2370        // Split-off buffer has the requested capacity and no data.
2371        assert!(split.remaining_capacity() >= 20);
2372        assert!(split.is_empty());
2373    }
2374
2375    #[test]
2376    fn split_off_remaining_write_to_split() {
2377        let memory = FixedBlockMemory::new(nz!(100));
2378        let mut buf = memory.reserve(100);
2379
2380        buf.put_num_ne(1111_u64);
2381
2382        let mut split = buf.split_off_remaining(40);
2383
2384        // Write into the split-off buffer.
2385        split.put_num_ne(2222_u64);
2386        split.put_num_ne(3333_u64);
2387
2388        assert_eq!(split.len(), TWO_U64_SIZE);
2389
2390        let mut split_view = split.consume_all();
2391        assert_eq!(split_view.get_num_ne::<u64>(), 2222);
2392        assert_eq!(split_view.get_num_ne::<u64>(), 3333);
2393
2394        // Original buffer data is unaffected.
2395        let mut original_view = buf.consume_all();
2396        assert_eq!(original_view.get_num_ne::<u64>(), 1111);
2397    }
2398
2399    #[test]
2400    fn split_off_remaining_all_capacity() {
2401        let memory = FixedBlockMemory::new(nz!(100));
2402        let mut buf = memory.reserve(100);
2403        let full_capacity = buf.remaining_capacity();
2404
2405        let split = buf.split_off_remaining(full_capacity);
2406
2407        assert_eq!(buf.remaining_capacity(), 0);
2408        assert_eq!(split.remaining_capacity(), full_capacity);
2409    }
2410
2411    #[test]
2412    fn split_off_remaining_zero() {
2413        let memory = FixedBlockMemory::new(nz!(100));
2414        let mut buf = memory.reserve(100);
2415        let original_capacity = buf.remaining_capacity();
2416
2417        let split = buf.split_off_remaining(0);
2418
2419        assert_eq!(buf.remaining_capacity(), original_capacity);
2420        assert!(split.is_empty());
2421        assert_eq!(split.remaining_capacity(), 0);
2422    }
2423
2424    #[test]
2425    fn split_off_remaining_multi_block() {
2426        let memory = FixedBlockMemory::new(nz!(10));
2427
2428        // Reserve enough to trigger multiple blocks.
2429        let mut buf = memory.reserve(30);
2430
2431        buf.put_num_ne(1234_u64);
2432
2433        let original_remaining = buf.remaining_capacity();
2434
2435        // Split off more than one block's capacity to exercise draining + partial split.
2436        let mut split = buf.split_off_remaining(15);
2437
2438        assert_eq!(buf.remaining_capacity(), original_remaining - 15);
2439        assert_eq!(buf.len(), U64_SIZE);
2440
2441        // The split-off buffer capacity is usable.
2442        split.put_byte_repeated(0xAA, 15);
2443        assert_eq!(split.len(), 15);
2444
2445        // Original data is unaffected.
2446        let mut original_view = buf.consume(U64_SIZE);
2447        assert_eq!(original_view.get_num_ne::<u64>(), 1234);
2448    }
2449
2450    #[test]
2451    fn split_off_remaining_empty_buffer_no_capacity() {
2452        let mut buf = BytesBuf::new();
2453        assert_eq!(buf.remaining_capacity(), 0);
2454
2455        let split = buf.split_off_remaining(0);
2456
2457        assert_eq!(buf.len(), 0);
2458        assert_eq!(buf.remaining_capacity(), 0);
2459        assert_eq!(split.len(), 0);
2460        assert_eq!(split.remaining_capacity(), 0);
2461    }
2462
2463    #[test]
2464    fn split_off_remaining_panics_on_overflow() {
2465        let memory = FixedBlockMemory::new(nz!(100));
2466        let mut buf = memory.reserve(100);
2467        let capacity = buf.remaining_capacity();
2468
2469        assert_panic!(buf.split_off_remaining(capacity + 1));
2470    }
2471
2472    #[test]
2473    fn split_off_remaining_checked_returns_none() {
2474        let memory = FixedBlockMemory::new(nz!(100));
2475        let mut buf = memory.reserve(100);
2476        let capacity = buf.remaining_capacity();
2477
2478        assert!(buf.split_off_remaining_checked(capacity + 1).is_none());
2479
2480        // Buffer is unmodified.
2481        assert_eq!(buf.remaining_capacity(), capacity);
2482    }
2483
2484    // Compile time test
2485    fn _can_use_in_dyn_traits(mem: &dyn Memory) {
2486        let buf = mem.reserve(123);
2487        let _ = buf.into_writer(mem);
2488    }
2489}