1use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::Debug;
8use std::fmt::Formatter;
9use std::hash::Hash;
10use std::hash::Hasher;
11use std::marker::PhantomData;
12use std::ops::Deref;
13use std::ops::RangeBounds;
14
15use bytes::Buf;
16use bytes::Bytes;
17use vortex_error::VortexExpect;
18use vortex_error::vortex_panic;
19
20use crate::Alignment;
21use crate::BufferMut;
22use crate::ByteBuffer;
23use crate::debug::TruncatedDebug;
24use crate::trusted_len::TrustedLen;
25
26#[derive(Clone)]
28pub struct Buffer<T> {
29 pub(crate) bytes: Bytes,
30 pub(crate) length: usize,
31 pub(crate) alignment: Alignment,
32 pub(crate) _marker: PhantomData<T>,
33}
34
35impl<T> Default for Buffer<T> {
36 fn default() -> Self {
37 Self {
38 bytes: Default::default(),
39 length: 0,
40 alignment: Alignment::of::<T>(),
41 _marker: PhantomData,
42 }
43 }
44}
45
46impl<T> PartialEq for Buffer<T> {
47 #[inline]
48 fn eq(&self, other: &Self) -> bool {
49 self.bytes == other.bytes
50 }
51}
52
53impl<T> Eq for Buffer<T> {}
54
55impl<T> Ord for Buffer<T> {
56 #[inline]
57 fn cmp(&self, other: &Self) -> Ordering {
58 self.bytes.cmp(&other.bytes)
59 }
60}
61
62impl<T> PartialOrd for Buffer<T> {
63 #[inline]
64 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69impl<T> Hash for Buffer<T> {
70 #[inline]
71 fn hash<H: Hasher>(&self, state: &mut H) {
72 self.bytes.as_ref().hash(state)
73 }
74}
75
76impl<T> Buffer<T> {
77 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
84 BufferMut::copy_from(values).freeze()
85 }
86
87 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
89 BufferMut::copy_from_aligned(values, alignment).freeze()
90 }
91
92 pub fn zeroed(len: usize) -> Self {
94 Self::zeroed_aligned(len, Alignment::of::<T>())
95 }
96
97 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
99 BufferMut::zeroed_aligned(len, alignment).freeze()
100 }
101
102 pub fn empty() -> Self {
104 BufferMut::empty().freeze()
105 }
106
107 pub fn empty_aligned(alignment: Alignment) -> Self {
109 BufferMut::empty_aligned(alignment).freeze()
110 }
111
112 pub fn full(item: T, len: usize) -> Self
114 where
115 T: Copy,
116 {
117 BufferMut::full(item, len).freeze()
118 }
119
120 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
127 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
129 }
130
131 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
138 Self::from_bytes_aligned(buffer.into_inner(), alignment)
139 }
140
141 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
148 if !alignment.is_aligned_to(Alignment::of::<T>()) {
149 vortex_panic!(
150 "Alignment {} must be compatible with the scalar type's alignment {}",
151 alignment,
152 Alignment::of::<T>(),
153 );
154 }
155 if bytes.as_ptr().align_offset(*alignment) != 0 {
156 vortex_panic!(
157 "Bytes alignment must align to the requested alignment {}",
158 alignment,
159 );
160 }
161 if !bytes.len().is_multiple_of(size_of::<T>()) {
162 vortex_panic!(
163 "Bytes length {} must be a multiple of the scalar type's size {}",
164 bytes.len(),
165 size_of::<T>()
166 );
167 }
168 let length = bytes.len() / size_of::<T>();
169 Self {
170 bytes,
171 length,
172 alignment,
173 _marker: Default::default(),
174 }
175 }
176
177 pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
180 let (_, upper_bound) = iter.size_hint();
181 let mut buffer = BufferMut::with_capacity(
182 upper_bound.vortex_expect("TrustedLen iterator has no upper bound"),
183 );
184 buffer.extend_trusted(iter);
185 buffer.freeze()
186 }
187
188 pub fn map_each_in_place<R, F>(self, mut f: F) -> BufferMut<R>
190 where
191 T: Copy,
192 F: FnMut(T) -> R,
193 {
194 match self.try_into_mut() {
195 Ok(mut_buf) => mut_buf.map_each_in_place(f),
196 Err(buf) => {
197 let len = buf.len();
198 let mut out_buf = BufferMut::with_capacity(len);
199 out_buf
200 .spare_capacity_mut()
201 .iter_mut()
202 .zip(buf)
203 .for_each(|(out, in_)| {
204 out.write(f(in_));
205 });
206 unsafe { out_buf.set_len(len) }
208 out_buf
209 }
210 }
211 }
212
213 pub fn clear(&mut self) {
215 self.bytes.clear();
216 self.length = 0;
217 }
218
219 #[inline(always)]
221 pub fn len(&self) -> usize {
222 self.length
223 }
224
225 #[inline(always)]
227 pub fn is_empty(&self) -> bool {
228 self.length == 0
229 }
230
231 #[inline(always)]
233 pub fn alignment(&self) -> Alignment {
234 self.alignment
235 }
236
237 #[inline(always)]
239 pub fn as_slice(&self) -> &[T] {
240 unsafe { std::slice::from_raw_parts(self.bytes.as_ptr().cast(), self.length) }
242 }
243
244 #[inline(always)]
246 pub fn as_bytes(&self) -> &[u8] {
247 self.bytes.as_ref()
248 }
249
250 pub fn iter(&self) -> Iter<'_, T> {
252 Iter {
253 inner: self.as_slice().iter(),
254 }
255 }
256
257 #[inline(always)]
264 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
265 self.slice_with_alignment(range, self.alignment)
266 }
267
268 #[inline(always)]
275 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
276 self.slice_with_alignment(range, Alignment::of::<u8>())
277 }
278
279 pub fn slice_with_alignment(
287 &self,
288 range: impl RangeBounds<usize>,
289 alignment: Alignment,
290 ) -> Self {
291 let len = self.len();
292 let begin = match range.start_bound() {
293 Bound::Included(&n) => n,
294 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
295 Bound::Unbounded => 0,
296 };
297 let end = match range.end_bound() {
298 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
299 Bound::Excluded(&n) => n,
300 Bound::Unbounded => len,
301 };
302
303 if begin > end {
304 vortex_panic!(
305 "range start must not be greater than end: {:?} <= {:?}",
306 begin,
307 end
308 );
309 }
310 if end > len {
311 vortex_panic!("range end out of bounds: {:?} > {:?}", end, len);
312 }
313
314 if end == begin {
315 return Self::empty_aligned(alignment);
318 }
319
320 let begin_byte = begin * size_of::<T>();
321 let end_byte = end * size_of::<T>();
322
323 if !begin_byte.is_multiple_of(*alignment) {
324 vortex_panic!(
325 "range start must be aligned to {alignment:?}, byte {}",
326 begin_byte
327 );
328 }
329 if !alignment.is_aligned_to(Alignment::of::<T>()) {
330 vortex_panic!("Slice alignment must at least align to type T")
331 }
332
333 Self {
334 bytes: self.bytes.slice(begin_byte..end_byte),
335 length: end - begin,
336 alignment,
337 _marker: Default::default(),
338 }
339 }
340
341 #[inline(always)]
350 pub fn slice_ref(&self, subset: &[T]) -> Self {
351 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
352 }
353
354 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
364 if !alignment.is_aligned_to(Alignment::of::<T>()) {
365 vortex_panic!("slice_ref alignment must at least align to type T")
366 }
367
368 if !self.alignment.is_aligned_to(alignment) {
369 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
370 }
371
372 if subset.as_ptr().align_offset(*alignment) != 0 {
373 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
374 }
375
376 let subset_u8 =
377 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
378
379 Self {
380 bytes: self.bytes.slice_ref(subset_u8),
381 length: subset.len(),
382 alignment,
383 _marker: Default::default(),
384 }
385 }
386
387 pub fn inner(&self) -> &Bytes {
389 debug_assert_eq!(
390 self.length * size_of::<T>(),
391 self.bytes.len(),
392 "Own length has to be the same as the underlying bytes length"
393 );
394 &self.bytes
395 }
396
397 pub fn into_inner(self) -> Bytes {
399 debug_assert_eq!(
400 self.length * size_of::<T>(),
401 self.bytes.len(),
402 "Own length has to be the same as the underlying bytes length"
403 );
404 self.bytes
405 }
406
407 pub fn into_byte_buffer(self) -> ByteBuffer {
409 ByteBuffer {
410 bytes: self.bytes,
411 length: self.length * size_of::<T>(),
412 alignment: self.alignment,
413 _marker: Default::default(),
414 }
415 }
416
417 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
419 self.bytes
420 .try_into_mut()
421 .map(|bytes| BufferMut {
422 bytes,
423 length: self.length,
424 alignment: self.alignment,
425 _marker: Default::default(),
426 })
427 .map_err(|bytes| Self {
428 bytes,
429 length: self.length,
430 alignment: self.alignment,
431 _marker: Default::default(),
432 })
433 }
434
435 pub fn into_mut(self) -> BufferMut<T> {
437 self.try_into_mut()
438 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
439 }
440
441 pub fn is_aligned(&self, alignment: Alignment) -> bool {
443 self.bytes.as_ptr().align_offset(*alignment) == 0
444 }
445
446 pub fn aligned(mut self, alignment: Alignment) -> Self {
448 if self.as_ptr().align_offset(*alignment) == 0 {
449 self.alignment = alignment;
450 self
451 } else {
452 #[cfg(feature = "warn-copy")]
453 {
454 let bt = std::backtrace::Backtrace::capture();
455 tracing::warn!(
456 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
457 )
458 }
459 Self::copy_from_aligned(self, alignment)
460 }
461 }
462
463 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
465 if self.as_ptr().align_offset(*alignment) == 0 {
466 self.alignment = alignment;
467 self
468 } else {
469 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
470 }
471 }
472}
473
474impl<T> Buffer<T> {
475 pub unsafe fn transmute<U>(self) -> Buffer<U> {
487 assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
488 assert_eq!(
489 align_of::<T>(),
490 align_of::<U>(),
491 "Buffer type alignment mismatch"
492 );
493
494 Buffer {
495 bytes: self.bytes,
496 length: self.length,
497 alignment: self.alignment,
498 _marker: PhantomData,
499 }
500 }
501}
502
503pub struct Iter<'a, T> {
507 inner: std::slice::Iter<'a, T>,
508}
509
510impl<'a, T> Iterator for Iter<'a, T> {
511 type Item = &'a T;
512
513 #[inline]
514 fn next(&mut self) -> Option<Self::Item> {
515 self.inner.next()
516 }
517
518 #[inline]
519 fn size_hint(&self) -> (usize, Option<usize>) {
520 self.inner.size_hint()
521 }
522
523 #[inline]
524 fn count(self) -> usize {
525 self.inner.count()
526 }
527
528 #[inline]
529 fn last(self) -> Option<Self::Item> {
530 self.inner.last()
531 }
532
533 #[inline]
534 fn nth(&mut self, n: usize) -> Option<Self::Item> {
535 self.inner.nth(n)
536 }
537}
538
539impl<T> ExactSizeIterator for Iter<'_, T> {
540 #[inline]
541 fn len(&self) -> usize {
542 self.inner.len()
543 }
544}
545
546impl<T: Debug> Debug for Buffer<T> {
547 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
548 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
549 .field("length", &self.length)
550 .field("alignment", &self.alignment)
551 .field("as_slice", &TruncatedDebug(self.as_slice()))
552 .finish()
553 }
554}
555
556impl<T> Deref for Buffer<T> {
557 type Target = [T];
558
559 #[inline]
560 fn deref(&self) -> &Self::Target {
561 self.as_slice()
562 }
563}
564
565impl<T> AsRef<[T]> for Buffer<T> {
566 #[inline]
567 fn as_ref(&self) -> &[T] {
568 self.as_slice()
569 }
570}
571
572impl<T> FromIterator<T> for Buffer<T> {
573 #[inline]
574 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
575 BufferMut::from_iter(iter).freeze()
576 }
577}
578
579#[repr(transparent)]
581struct Wrapper<T>(Vec<T>);
582
583impl<T> AsRef<[u8]> for Wrapper<T> {
584 fn as_ref(&self) -> &[u8] {
585 let data = self.0.as_ptr().cast::<u8>();
586 let len = self.0.len() * size_of::<T>();
587 unsafe { std::slice::from_raw_parts(data, len) }
588 }
589}
590
591impl<T> From<Vec<T>> for Buffer<T>
592where
593 T: Send + 'static,
594{
595 fn from(value: Vec<T>) -> Self {
596 let original_len = value.len();
597 let wrapped_vec = Wrapper(value);
598
599 let bytes = Bytes::from_owner(wrapped_vec);
600
601 assert_eq!(bytes.as_ptr().align_offset(align_of::<T>()), 0);
602
603 Self {
604 bytes,
605 length: original_len,
606 alignment: Alignment::of::<T>(),
607 _marker: PhantomData,
608 }
609 }
610}
611
612impl From<Bytes> for ByteBuffer {
613 fn from(bytes: Bytes) -> Self {
614 let length = bytes.len();
615 Self {
616 bytes,
617 length,
618 alignment: Alignment::of::<u8>(),
619 _marker: Default::default(),
620 }
621 }
622}
623
624impl Buf for ByteBuffer {
625 #[inline]
626 fn remaining(&self) -> usize {
627 self.len()
628 }
629
630 #[inline]
631 fn chunk(&self) -> &[u8] {
632 self.as_slice()
633 }
634
635 #[inline]
636 fn advance(&mut self, cnt: usize) {
637 if !cnt.is_multiple_of(*self.alignment) {
638 vortex_panic!(
639 "Cannot advance buffer by {} items, resulting alignment is not {}",
640 cnt,
641 self.alignment
642 );
643 }
644 self.bytes.advance(cnt);
645 self.length -= cnt;
646 }
647}
648
649pub struct BufferIterator<T: Copy> {
651 _buffer: Buffer<T>,
653 ptr: *const T,
654 end: *const T,
655}
656
657impl<T: Copy> Iterator for BufferIterator<T> {
658 type Item = T;
659
660 #[inline]
661 fn next(&mut self) -> Option<Self::Item> {
662 if self.ptr == self.end {
663 None
664 } else {
665 let value = unsafe { self.ptr.read() };
667 self.ptr = unsafe { self.ptr.add(1) };
668 Some(value)
669 }
670 }
671
672 #[inline]
673 fn size_hint(&self) -> (usize, Option<usize>) {
674 let remaining = unsafe { self.end.offset_from(self.ptr) } as usize;
675 (remaining, Some(remaining))
676 }
677}
678
679impl<T: Copy> ExactSizeIterator for BufferIterator<T> {}
680
681impl<T: Copy> IntoIterator for Buffer<T> {
682 type Item = T;
683 type IntoIter = BufferIterator<T>;
684
685 #[inline]
686 fn into_iter(self) -> Self::IntoIter {
687 let ptr = self.as_slice().as_ptr();
688 let end = unsafe { ptr.add(self.len()) };
689 BufferIterator {
690 _buffer: self,
691 ptr,
692 end,
693 }
694 }
695}
696
697impl<T> From<BufferMut<T>> for Buffer<T> {
698 #[inline]
699 fn from(value: BufferMut<T>) -> Self {
700 value.freeze()
701 }
702}
703
704#[cfg(test)]
705mod test {
706 use bytes::Buf;
707
708 use crate::Alignment;
709 use crate::Buffer;
710 use crate::ByteBuffer;
711 use crate::buffer;
712
713 #[test]
714 fn align() {
715 let buf = buffer![0u8, 1, 2];
716 let aligned = buf.aligned(Alignment::new(32));
717 assert_eq!(aligned.alignment(), Alignment::new(32));
718 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
719 }
720
721 #[test]
722 fn slice() {
723 let buf = buffer![0, 1, 2, 3, 4];
724 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
725 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
726 }
727
728 #[test]
729 fn slice_unaligned() {
730 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
731 let sliced = buf.slice_unaligned(1..2);
733 assert_eq!(sliced.len(), 1);
735 assert_eq!(sliced.as_slice(), &[0]);
738 }
739
740 #[test]
741 #[should_panic]
742 fn slice_bad_alignment() {
743 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
744 buf.slice(1..2);
746 }
747
748 #[test]
749 fn bytes_buf() {
750 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
751 assert_eq!(buf.remaining(), 10);
752 assert_eq!(buf.chunk(), b"helloworld");
753
754 buf.advance(5);
755 assert_eq!(buf.remaining(), 5);
756 assert_eq!(buf.as_slice(), b"world");
757 assert_eq!(buf.chunk(), b"world");
758 }
759
760 #[test]
761 fn buffer_zeroed() {
762 const LEN: usize = 17;
763
764 let buf = Buffer::<u32>::zeroed(LEN);
765
766 assert!(buf.is_aligned(Alignment::of::<u32>()));
767 assert_eq!(buf.as_slice(), &[0; LEN]);
768 }
769
770 #[test]
771 fn buffer_zeroed_aligned() {
772 const LEN: usize = 17;
773 let alignment = Alignment::new(64);
774
775 let buf = Buffer::<u32>::zeroed_aligned(LEN, alignment);
776
777 assert!(buf.is_aligned(alignment));
778 assert_eq!(buf.as_slice(), &[0; LEN]);
779 }
780
781 #[test]
782 fn from_vec() {
783 let vec = vec![1, 2, 3, 4, 5];
784 let buff = Buffer::from(vec.clone());
785 assert!(buff.is_aligned(Alignment::of::<i32>()));
786 assert_eq!(vec, buff.as_ref());
787 }
788
789 #[test]
790 fn test_slice_unaligned_end_pos() {
791 let data = vec![0u8; 2];
792 let aligned_buffer = Buffer::copy_from_aligned(&data, Alignment::new(8));
794 aligned_buffer.slice(0..1);
799 }
800}