use std::any::type_name;
use std::cmp::Ordering;
use std::collections::Bound;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::hash::Hash;
use std::hash::Hasher;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::RangeBounds;
use bytes::Buf;
use bytes::Bytes;
use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
use crate::Alignment;
use crate::BufferMut;
use crate::ByteBuffer;
use crate::debug::TruncatedDebug;
use crate::trusted_len::TrustedLen;
pub struct Buffer<T> {
pub(crate) bytes: Bytes,
pub(crate) length: usize,
pub(crate) alignment: Alignment,
pub(crate) _marker: PhantomData<T>,
}
impl<T> Clone for Buffer<T> {
#[inline]
fn clone(&self) -> Self {
Self {
bytes: self.bytes.clone(),
length: self.length,
alignment: self.alignment,
_marker: PhantomData,
}
}
}
impl<T> Default for Buffer<T> {
fn default() -> Self {
Self {
bytes: Default::default(),
length: 0,
alignment: Alignment::of::<T>(),
_marker: PhantomData,
}
}
}
impl<T> PartialEq for Buffer<T> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.bytes == other.bytes
}
}
impl<T: PartialEq> PartialEq<Vec<T>> for Buffer<T> {
fn eq(&self, other: &Vec<T>) -> bool {
self.as_ref() == other.as_slice()
}
}
impl<T: PartialEq> PartialEq<Buffer<T>> for Vec<T> {
fn eq(&self, other: &Buffer<T>) -> bool {
self.as_slice() == other.as_ref()
}
}
impl<T> Eq for Buffer<T> {}
impl<T> Ord for Buffer<T> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.bytes.cmp(&other.bytes)
}
}
impl<T> PartialOrd for Buffer<T> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T> Hash for Buffer<T> {
#[inline]
fn hash<H: Hasher>(&self, state: &mut H) {
self.bytes.as_ref().hash(state)
}
}
impl<T> Buffer<T> {
pub fn copy_from(values: impl AsRef<[T]>) -> Self {
BufferMut::copy_from(values).freeze()
}
pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
BufferMut::copy_from_aligned(values, alignment).freeze()
}
pub fn zeroed(len: usize) -> Self {
Self::zeroed_aligned(len, Alignment::of::<T>())
}
pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
BufferMut::zeroed_aligned(len, alignment).freeze()
}
pub fn empty() -> Self {
BufferMut::empty().freeze()
}
pub fn empty_aligned(alignment: Alignment) -> Self {
BufferMut::empty_aligned(alignment).freeze()
}
pub fn full(item: T, len: usize) -> Self
where
T: Copy,
{
BufferMut::full(item, len).freeze()
}
pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
}
pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
Self::from_bytes_aligned(buffer.into_inner(), alignment)
}
pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
if !alignment.is_aligned_to(Alignment::of::<T>()) {
vortex_panic!(
"Alignment {} must be compatible with the scalar type's alignment {}",
alignment,
Alignment::of::<T>(),
);
}
if bytes.as_ptr().align_offset(*alignment) != 0 {
vortex_panic!(
"Bytes alignment must align to the requested alignment {}",
alignment,
);
}
if !bytes.len().is_multiple_of(size_of::<T>()) {
vortex_panic!(
"Bytes length {} must be a multiple of the scalar type's size {}",
bytes.len(),
size_of::<T>()
);
}
let length = bytes.len() / size_of::<T>();
Self {
bytes,
length,
alignment,
_marker: Default::default(),
}
}
pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
let (_, upper_bound) = iter.size_hint();
let mut buffer = BufferMut::with_capacity(
upper_bound.vortex_expect("TrustedLen iterator has no upper bound"),
);
buffer.extend_trusted(iter);
buffer.freeze()
}
pub fn clear(&mut self) {
self.bytes.clear();
self.length = 0;
}
#[inline(always)]
pub fn len(&self) -> usize {
self.length
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.length == 0
}
#[inline(always)]
pub fn alignment(&self) -> Alignment {
self.alignment
}
#[inline(always)]
pub fn as_slice(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.bytes.as_ptr().cast(), self.length) }
}
#[inline(always)]
pub fn as_bytes(&self) -> &[u8] {
self.bytes.as_ref()
}
pub fn iter(&self) -> Iter<'_, T> {
Iter {
inner: self.as_slice().iter(),
}
}
#[inline(always)]
pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
self.slice_with_alignment(range, self.alignment)
}
#[inline(always)]
pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
self.slice_with_alignment(range, Alignment::of::<u8>())
}
pub fn slice_with_alignment(
&self,
range: impl RangeBounds<usize>,
alignment: Alignment,
) -> Self {
let len = self.len();
let begin = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
Bound::Excluded(&n) => n,
Bound::Unbounded => len,
};
if begin > end {
vortex_panic!(
"range start must not be greater than end: {:?} <= {:?}",
begin,
end
);
}
if end > len {
vortex_panic!("range end out of bounds: {:?} > {:?}", end, len);
}
if end == begin {
return Self::empty_aligned(alignment);
}
let begin_byte = begin * size_of::<T>();
let end_byte = end * size_of::<T>();
if !begin_byte.is_multiple_of(*alignment) {
vortex_panic!(
"range start must be aligned to {alignment:?}, byte {}",
begin_byte
);
}
if !alignment.is_aligned_to(Alignment::of::<T>()) {
vortex_panic!("Slice alignment must at least align to type T")
}
Self {
bytes: self.bytes.slice(begin_byte..end_byte),
length: end - begin,
alignment,
_marker: Default::default(),
}
}
#[inline(always)]
pub fn slice_ref(&self, subset: &[T]) -> Self {
self.slice_ref_with_alignment(subset, Alignment::of::<T>())
}
pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
if !alignment.is_aligned_to(Alignment::of::<T>()) {
vortex_panic!("slice_ref alignment must at least align to type T")
}
if !self.alignment.is_aligned_to(alignment) {
vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
}
if subset.as_ptr().align_offset(*alignment) != 0 {
vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
}
let subset_u8 =
unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
Self {
bytes: self.bytes.slice_ref(subset_u8),
length: subset.len(),
alignment,
_marker: Default::default(),
}
}
pub fn inner(&self) -> &Bytes {
debug_assert_eq!(
self.length * size_of::<T>(),
self.bytes.len(),
"Own length has to be the same as the underlying bytes length"
);
&self.bytes
}
pub fn into_inner(self) -> Bytes {
debug_assert_eq!(
self.length * size_of::<T>(),
self.bytes.len(),
"Own length has to be the same as the underlying bytes length"
);
self.bytes
}
pub fn into_byte_buffer(self) -> ByteBuffer {
ByteBuffer {
bytes: self.bytes,
length: self.length * size_of::<T>(),
alignment: self.alignment,
_marker: Default::default(),
}
}
pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
self.bytes
.try_into_mut()
.map(|bytes| BufferMut {
bytes,
length: self.length,
alignment: self.alignment,
_marker: Default::default(),
})
.map_err(|bytes| Self {
bytes,
length: self.length,
alignment: self.alignment,
_marker: Default::default(),
})
}
pub fn into_mut(self) -> BufferMut<T> {
self.try_into_mut()
.unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
}
pub fn is_aligned(&self, alignment: Alignment) -> bool {
self.bytes.as_ptr().align_offset(*alignment) == 0
}
pub fn aligned(mut self, alignment: Alignment) -> Self {
if self.as_ptr().align_offset(*alignment) == 0 {
self.alignment = alignment;
self
} else {
#[cfg(feature = "warn-copy")]
{
let bt = std::backtrace::Backtrace::capture();
tracing::warn!(
"Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
)
}
Self::copy_from_aligned(self, alignment)
}
}
pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
if self.as_ptr().align_offset(*alignment) == 0 {
self.alignment = alignment;
self
} else {
vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
}
}
}
impl<T> Buffer<T> {
pub unsafe fn transmute<U>(self) -> Buffer<U> {
assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
assert_eq!(
align_of::<T>(),
align_of::<U>(),
"Buffer type alignment mismatch"
);
Buffer {
bytes: self.bytes,
length: self.length,
alignment: self.alignment,
_marker: PhantomData,
}
}
}
pub struct Iter<'a, T> {
inner: std::slice::Iter<'a, T>,
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = &'a T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
#[inline]
fn count(self) -> usize {
self.inner.count()
}
#[inline]
fn last(self) -> Option<Self::Item> {
self.inner.last()
}
#[inline]
fn nth(&mut self, n: usize) -> Option<Self::Item> {
self.inner.nth(n)
}
}
impl<T> ExactSizeIterator for Iter<'_, T> {
#[inline]
fn len(&self) -> usize {
self.inner.len()
}
}
impl<T: Debug> Debug for Buffer<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
.field("length", &self.length)
.field("alignment", &self.alignment)
.field("as_slice", &TruncatedDebug(self.as_slice()))
.finish()
}
}
impl<T> Deref for Buffer<T> {
type Target = [T];
#[inline]
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl<T> AsRef<[T]> for Buffer<T> {
#[inline]
fn as_ref(&self) -> &[T] {
self.as_slice()
}
}
impl<T> FromIterator<T> for Buffer<T> {
#[inline]
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
BufferMut::from_iter(iter).freeze()
}
}
#[repr(transparent)]
struct Wrapper<T>(Vec<T>);
impl<T> AsRef<[u8]> for Wrapper<T> {
fn as_ref(&self) -> &[u8] {
let data = self.0.as_ptr().cast::<u8>();
let len = self.0.len() * size_of::<T>();
unsafe { std::slice::from_raw_parts(data, len) }
}
}
impl<T> From<Vec<T>> for Buffer<T>
where
T: Send + 'static,
{
fn from(value: Vec<T>) -> Self {
let original_len = value.len();
let wrapped_vec = Wrapper(value);
let bytes = Bytes::from_owner(wrapped_vec);
assert_eq!(bytes.as_ptr().align_offset(align_of::<T>()), 0);
Self {
bytes,
length: original_len,
alignment: Alignment::of::<T>(),
_marker: PhantomData,
}
}
}
impl From<Bytes> for ByteBuffer {
fn from(bytes: Bytes) -> Self {
let length = bytes.len();
Self {
bytes,
length,
alignment: Alignment::of::<u8>(),
_marker: Default::default(),
}
}
}
impl Buf for ByteBuffer {
#[inline]
fn remaining(&self) -> usize {
self.len()
}
#[inline]
fn chunk(&self) -> &[u8] {
self.as_slice()
}
#[inline]
fn advance(&mut self, cnt: usize) {
if !cnt.is_multiple_of(*self.alignment) {
vortex_panic!(
"Cannot advance buffer by {} items, resulting alignment is not {}",
cnt,
self.alignment
);
}
self.bytes.advance(cnt);
self.length -= cnt;
}
}
pub struct BufferIterator<T: Copy> {
_buffer: Buffer<T>,
ptr: *const T,
end: *const T,
}
impl<T: Copy> Iterator for BufferIterator<T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.ptr == self.end {
None
} else {
let value = unsafe { self.ptr.read() };
self.ptr = unsafe { self.ptr.add(1) };
Some(value)
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = unsafe { self.end.offset_from(self.ptr) } as usize;
(remaining, Some(remaining))
}
}
impl<T: Copy> ExactSizeIterator for BufferIterator<T> {}
impl<T: Copy> IntoIterator for Buffer<T> {
type Item = T;
type IntoIter = BufferIterator<T>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
let ptr = self.as_slice().as_ptr();
let end = unsafe { ptr.add(self.len()) };
BufferIterator {
_buffer: self,
ptr,
end,
}
}
}
impl<T> From<BufferMut<T>> for Buffer<T> {
#[inline]
fn from(value: BufferMut<T>) -> Self {
value.freeze()
}
}
#[cfg(test)]
mod test {
use bytes::Buf;
use crate::Alignment;
use crate::Buffer;
use crate::ByteBuffer;
use crate::buffer;
#[test]
fn align() {
let buf = buffer![0u8, 1, 2];
let aligned = buf.aligned(Alignment::new(32));
assert_eq!(aligned.alignment(), Alignment::new(32));
assert_eq!(aligned.as_slice(), &[0, 1, 2]);
}
#[test]
fn slice() {
let buf = buffer![0, 1, 2, 3, 4];
assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
}
#[test]
fn slice_unaligned() {
let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
let sliced = buf.slice_unaligned(1..2);
assert_eq!(sliced.len(), 1);
assert_eq!(sliced.as_slice(), &[0]);
}
#[test]
#[should_panic]
fn slice_bad_alignment() {
let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
buf.slice(1..2);
}
#[test]
fn bytes_buf() {
let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
assert_eq!(buf.remaining(), 10);
assert_eq!(buf.chunk(), b"helloworld");
Buf::advance(&mut buf, 5);
assert_eq!(buf.remaining(), 5);
assert_eq!(buf.as_slice(), b"world");
assert_eq!(buf.chunk(), b"world");
}
#[test]
fn from_vec() {
let vec = vec![1, 2, 3, 4, 5];
let buff = Buffer::from(vec.clone());
assert!(buff.is_aligned(Alignment::of::<i32>()));
assert_eq!(vec, buff);
}
#[test]
fn test_slice_unaligned_end_pos() {
let data = vec![0u8; 2];
let aligned_buffer = Buffer::copy_from_aligned(&data, Alignment::new(8));
aligned_buffer.slice(0..1);
}
}