use std::mem::MaybeUninit;
use std::num::NonZero;
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicUsize};
use bytes::Bytes;
use smallvec::SmallVec;
use crate::mem::{Block, BlockRef, BlockRefDynamic, BlockRefVTable, BlockSize};
use crate::{BytesView, MAX_INLINE_SPANS, Span};
impl From<Bytes> for BytesView {
fn from(value: Bytes) -> Self {
let bytes_blocks = BytesBlockIterator::new(value);
let blocks = bytes_blocks.map(|bytes| {
unsafe { non_empty_bytes_to_immutable_block(bytes) }
});
let spans = blocks.map(|block| {
let mut span_builder = block.into_span_builder();
#[expect(clippy::cast_possible_truncation, reason = "a span can never be larger than BlockSize")]
let len = NonZero::new(span_builder.remaining_capacity() as BlockSize).expect("splitting Bytes cannot yield zero-sized chunks");
unsafe {
span_builder.advance(len.get() as usize);
}
span_builder.consume(len)
});
let mut spans_reversed: SmallVec<[Span; MAX_INLINE_SPANS]> = spans.collect();
spans_reversed.reverse();
Self::from_spans_reversed(spans_reversed)
}
}
struct BytesBlock {
_inner: Bytes,
ref_count: AtomicUsize,
}
impl BytesBlock {
pub const fn new(inner: Bytes) -> Self {
Self {
_inner: inner,
ref_count: AtomicUsize::new(1),
}
}
}
unsafe impl BlockRefDynamic for BytesBlock {
type State = Self;
fn clone(state_ptr: NonNull<Self::State>) -> NonNull<Self::State> {
let state = unsafe { state_ptr.as_ref() };
state.ref_count.fetch_add(1, atomic::Ordering::Relaxed);
state_ptr
}
#[cfg_attr(test, mutants::skip)] fn drop(state_ptr: NonNull<Self::State>) {
let state = unsafe { state_ptr.as_ref() };
if state.ref_count.fetch_sub(1, atomic::Ordering::Release) != 1 {
return;
}
atomic::fence(atomic::Ordering::Acquire);
drop(unsafe { Box::from_raw(state_ptr.as_ptr()) });
}
}
unsafe fn non_empty_bytes_to_immutable_block(bytes: Bytes) -> Block {
assert!(!bytes.is_empty());
let len: BlockSize = bytes
.len()
.try_into()
.expect("length of Bytes instance was greater than BlockSize::MAX");
let capacity_ptr = NonNull::new(bytes.as_ptr().cast_mut())
.expect("guarded by 'is zero sized Bytes' check upstream - non-empty Bytes must have non-null capacity pointer")
.cast::<MaybeUninit<u8>>();
let len = NonZero::new(len).expect("guarded by 'is zero sized Bytes' check upstream");
let block_ptr =
NonNull::new(Box::into_raw(Box::new(BytesBlock::new(bytes)))).expect("we just allocated it - it cannot possibly be null");
let block_ref = unsafe { BlockRef::new(block_ptr, &BLOCK_REF_FNS) };
unsafe { Block::new(capacity_ptr, len, block_ref) }
}
const BLOCK_REF_FNS: BlockRefVTable<BytesBlock> = BlockRefVTable::from_trait();
struct BytesBlockIterator {
remaining: Bytes,
}
impl BytesBlockIterator {
const fn new(bytes: Bytes) -> Self {
Self { remaining: bytes }
}
}
impl Iterator for BytesBlockIterator {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining.is_empty() {
return None;
}
let bytes_to_take = self.remaining.len().min(BlockSize::MAX as usize);
let take = self.remaining.slice(0..bytes_to_take);
let keep = self.remaining.slice(bytes_to_take..);
self.remaining = keep;
Some(take)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let blocks_remaining = self.remaining.len().div_ceil(BlockSize::MAX as usize);
(blocks_remaining, Some(blocks_remaining))
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests {
use bytes::{BufMut, BytesMut};
use super::*;
#[test]
fn test_bytes_to_view() {
let bytes = Bytes::from_static(b"Hello, world!");
let bytes_data_ptr = bytes.as_ptr();
let view: BytesView = bytes.into();
assert_eq!(view.len(), 13);
assert_eq!(view, b"Hello, world!");
assert_eq!(view.first_slice().as_ptr(), bytes_data_ptr);
}
#[test]
fn zero_sized_bytes() {
let bytes = Bytes::new();
let view: BytesView = bytes.into();
assert_eq!(view.len(), 0);
assert!(view.is_empty());
}
#[test]
fn test_giant_bytes_to_view() {
#[cfg(all(not(miri), any(target_os = "linux", target_os = "windows")))]
if crate::testing::system_memory() < 10_000_000_000 {
eprintln!("Skipping giant allocation test due to insufficient memory.");
return;
}
let mut bytes = BytesMut::new();
bytes.put_bytes(0, 5_000_000_000);
let bytes = bytes.freeze();
let view: BytesView = bytes.into();
assert_eq!(view.len(), 5_000_000_000);
assert_eq!(view.first_slice().len(), u32::MAX as usize);
assert_eq!(view.into_spans_reversed().len(), 2);
}
#[test]
fn test_bytes_block_iterator_size_hint_single_block() {
let bytes = Bytes::from_static(b"Hello, world!");
let iterator = BytesBlockIterator::new(bytes);
let (min, max) = iterator.size_hint();
assert_eq!(min, 1);
assert_eq!(max, Some(1));
}
#[test]
fn test_bytes_block_iterator_size_hint_multiple_blocks() {
let mut bytes = BytesMut::new();
let size = (BlockSize::MAX as usize) + 1000;
bytes.put_bytes(0, size);
let bytes = bytes.freeze();
let iterator = BytesBlockIterator::new(bytes);
let (min, max) = iterator.size_hint();
assert_eq!(min, 2);
assert_eq!(max, Some(2));
}
#[test]
fn test_bytes_block_iterator_size_hint_empty() {
let bytes = Bytes::new();
let iterator = BytesBlockIterator::new(bytes);
let (min, max) = iterator.size_hint();
assert_eq!(min, 0);
assert_eq!(max, Some(0));
}
#[test]
fn test_bytes_block_iterator_size_hint_exact_block_size() {
let mut bytes = BytesMut::new();
bytes.put_bytes(0, BlockSize::MAX as usize);
let bytes = bytes.freeze();
let iterator = BytesBlockIterator::new(bytes);
let (min, max) = iterator.size_hint();
assert_eq!(min, 1);
assert_eq!(max, Some(1));
}
}