bytesbuf 0.4.2

Types for creating and manipulating byte sequences.
Documentation
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use std::mem::MaybeUninit;
use std::num::NonZero;
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicUsize};

use bytes::Bytes;
use smallvec::SmallVec;

use crate::mem::{Block, BlockRef, BlockRefDynamic, BlockRefVTable, BlockSize};
use crate::{BytesView, MAX_INLINE_SPANS, Span};

impl From<Bytes> for BytesView {
    /// Converts a [`Bytes`] instance into a `BytesView`.
    ///
    /// This operation is always zero-copy, though does cost a small dynamic allocation.
    fn from(value: Bytes) -> Self {
        // A Bytes instance may contain any number of bytes, same as a BytesView. However, each
        // block of memory inside BytesView is limited to BlockSize::MAX, which is a smaller size.
        // Therefore, we may need to chop up the Bytes into smaller slices, so each slice fits in
        // a BlockSize. This iterator does the job.
        let bytes_blocks = BytesBlockIterator::new(value);

        let blocks = bytes_blocks.map(|bytes| {
            // SAFETY: We must treat the provided memory capacity as immutable. We do, only using
            // it to create a `BytesView` over the immutable data that already exists within.
            // Note that this requirement also extends down the stack - no code that runs in this
            // function is allowed to create an exclusive reference over the data of the `Bytes`,
            // even if that exclusive reference is not used for writes (Miri will tell you if you
            // did it wrong).
            unsafe { non_empty_bytes_to_immutable_block(bytes) }
        });

        let spans = blocks.map(|block| {
            let mut span_builder = block.into_span_builder();

            #[expect(clippy::cast_possible_truncation, reason = "a span can never be larger than BlockSize")]
            let len = NonZero::new(span_builder.remaining_capacity() as BlockSize).expect("splitting Bytes cannot yield zero-sized chunks");

            // SAFETY: We know that the data is already initialized; we simply declare this to the
            // SpanBuilder and get it to emit a completed Span from all its contents.
            unsafe {
                span_builder.advance(len.get() as usize);
            }

            span_builder.consume(len)
        });

        // NB! We cannot use `BytesBuf::from_blocks` because it is not guaranteed to use the
        // blocks in the same order as they are provided. Instead, we directly construct the inner
        // span array in the BytesView, which lets us avoid any temporary allocations and resizing.
        let mut spans_reversed: SmallVec<[Span; MAX_INLINE_SPANS]> = spans.collect();

        // Not ideal but 99.999% of the case this is a 1-element array, so it does not matter.
        spans_reversed.reverse();

        Self::from_spans_reversed(spans_reversed)
    }
}

/// An implementation of `BlockRef` that reuses immutable memory of an owned `Bytes` instance.
struct BytesBlock {
    // This field exists to keep the Bytes alive. The data within is accessed directly via pointers.
    _inner: Bytes,

    ref_count: AtomicUsize,
}

impl BytesBlock {
    pub const fn new(inner: Bytes) -> Self {
        Self {
            _inner: inner,
            ref_count: AtomicUsize::new(1),
        }
    }
}

// SAFETY: We must guarantee thread-safety. We do.
unsafe impl BlockRefDynamic for BytesBlock {
    type State = Self;

    fn clone(state_ptr: NonNull<Self::State>) -> NonNull<Self::State> {
        // SAFETY: The state pointer is always valid for reads.
        // We only ever created shared references to the block state - it exists just to track the
        // reference count.
        let state = unsafe { state_ptr.as_ref() };

        // Relaxed because incrementing reference count is independent of any other state.
        state.ref_count.fetch_add(1, atomic::Ordering::Relaxed);

        // We reuse the same state between all clones.
        state_ptr
    }

    #[cfg_attr(test, mutants::skip)] // Impractical to test. Miri will inform about memory leaks.
    fn drop(state_ptr: NonNull<Self::State>) {
        // SAFETY: The state pointer is always valid for reads.
        // We only ever created shared references to the block state - it exists just to track the
        // reference count.
        let state = unsafe { state_ptr.as_ref() };

        // Release because we are releasing the synchronization block for the memory block state.
        if state.ref_count.fetch_sub(1, atomic::Ordering::Release) != 1 {
            return;
        }

        // This was the last reference, so we can deallocate the block.
        // All we need to do is deallocate the block object - dropping the Bytes field
        // will cleanup the memory capacity provided by the Bytes instance.

        // Ensure that we have observed all writes into the block from other threads.
        // On x86 this does nothing but on weaker memory models writes could be delayed.
        atomic::fence(atomic::Ordering::Acquire);

        // SAFETY: No more references exist, we can resurrect the object inside a Box and drop.
        drop(unsafe { Box::from_raw(state_ptr.as_ptr()) });
    }
}

/// # Panics
///
/// Panics if the `Bytes` is larger than `BlockSize::MAX`.
///
/// # Safety
///
/// The block contents must be treated as immutable because `Bytes` instances are immutable.
unsafe fn non_empty_bytes_to_immutable_block(bytes: Bytes) -> Block {
    assert!(!bytes.is_empty());

    let len: BlockSize = bytes
        .len()
        .try_into()
        .expect("length of Bytes instance was greater than BlockSize::MAX");

    let capacity_ptr = NonNull::new(bytes.as_ptr().cast_mut())
        .expect("guarded by 'is zero sized Bytes' check upstream - non-empty Bytes must have non-null capacity pointer")
        .cast::<MaybeUninit<u8>>();

    let len = NonZero::new(len).expect("guarded by 'is zero sized Bytes' check upstream");

    let block_ptr =
        NonNull::new(Box::into_raw(Box::new(BytesBlock::new(bytes)))).expect("we just allocated it - it cannot possibly be null");

    // SAFETY: block_ptr must remain valid until the dynamic fns drop() is called. Yep, it does.
    // We only ever created shared references to the block state - it exists just to track the
    // reference count.
    let block_ref = unsafe { BlockRef::new(block_ptr, &BLOCK_REF_FNS) };

    // SAFETY: Block requires us to guarantee exclusive access. We actually cannot do that - this
    // memory block is shared and immutable, unlike many others! However, the good news is that this
    // requirement on Block exists to support mutation. As long as we never treat the block as
    // having mutable contents, we are fine with shared immutable access.
    unsafe { Block::new(capacity_ptr, len, block_ref) }
}

const BLOCK_REF_FNS: BlockRefVTable<BytesBlock> = BlockRefVTable::from_trait();

/// Returns pieces of a `Bytes` no greater than `BlockSize::MAX` in length.
struct BytesBlockIterator {
    remaining: Bytes,
}

impl BytesBlockIterator {
    const fn new(bytes: Bytes) -> Self {
        Self { remaining: bytes }
    }
}

impl Iterator for BytesBlockIterator {
    type Item = Bytes;

    fn next(&mut self) -> Option<Self::Item> {
        if self.remaining.is_empty() {
            return None;
        }

        let bytes_to_take = self.remaining.len().min(BlockSize::MAX as usize);
        let take = self.remaining.slice(0..bytes_to_take);
        let keep = self.remaining.slice(bytes_to_take..);

        self.remaining = keep;
        Some(take)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let blocks_remaining = self.remaining.len().div_ceil(BlockSize::MAX as usize);
        (blocks_remaining, Some(blocks_remaining))
    }
}

#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests {
    use bytes::{BufMut, BytesMut};

    use super::*;

    #[test]
    fn test_bytes_to_view() {
        let bytes = Bytes::from_static(b"Hello, world!");

        let bytes_data_ptr = bytes.as_ptr();

        let view: BytesView = bytes.into();

        assert_eq!(view.len(), 13);
        assert_eq!(view, b"Hello, world!");

        // We expect this to be zero-copy - Bytes to BytesView always is.
        assert_eq!(view.first_slice().as_ptr(), bytes_data_ptr);
    }

    #[test]
    fn zero_sized_bytes() {
        let bytes = Bytes::new();
        let view: BytesView = bytes.into();

        assert_eq!(view.len(), 0);
        assert!(view.is_empty());
    }

    #[test]
    fn test_giant_bytes_to_view() {
        // This test requires at least 5 GB of memory to run. The publishing pipeline runs on a system
        // where this may not be available, so we skip this test in that environment.
        #[cfg(all(not(miri), any(target_os = "linux", target_os = "windows")))]
        if crate::testing::system_memory() < 10_000_000_000 {
            eprintln!("Skipping giant allocation test due to insufficient memory.");
            return;
        }

        let mut bytes = BytesMut::new();

        // This is bigger than fits into a single memory block, so will be split in two blocks.
        bytes.put_bytes(0, 5_000_000_000);

        let bytes = bytes.freeze();

        let view: BytesView = bytes.into();
        assert_eq!(view.len(), 5_000_000_000);
        assert_eq!(view.first_slice().len(), u32::MAX as usize);
        assert_eq!(view.into_spans_reversed().len(), 2);
    }

    #[test]
    fn test_bytes_block_iterator_size_hint_single_block() {
        let bytes = Bytes::from_static(b"Hello, world!");
        let iterator = BytesBlockIterator::new(bytes);

        let (min, max) = iterator.size_hint();
        assert_eq!(min, 1);
        assert_eq!(max, Some(1));
    }

    #[test]
    fn test_bytes_block_iterator_size_hint_multiple_blocks() {
        let mut bytes = BytesMut::new();

        // Create a bytes instance that requires exactly 2 blocks
        let size = (BlockSize::MAX as usize) + 1000;
        bytes.put_bytes(0, size);
        let bytes = bytes.freeze();

        let iterator = BytesBlockIterator::new(bytes);

        let (min, max) = iterator.size_hint();
        assert_eq!(min, 2);
        assert_eq!(max, Some(2));
    }

    #[test]
    fn test_bytes_block_iterator_size_hint_empty() {
        let bytes = Bytes::new();
        let iterator = BytesBlockIterator::new(bytes);

        let (min, max) = iterator.size_hint();
        assert_eq!(min, 0);
        assert_eq!(max, Some(0));
    }

    #[test]
    fn test_bytes_block_iterator_size_hint_exact_block_size() {
        let mut bytes = BytesMut::new();

        // Create a bytes instance that is exactly one block size
        bytes.put_bytes(0, BlockSize::MAX as usize);
        let bytes = bytes.freeze();

        let iterator = BytesBlockIterator::new(bytes);

        let (min, max) = iterator.size_hint();
        assert_eq!(min, 1);
        assert_eq!(max, Some(1));
    }
}