1use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::Debug;
8use std::fmt::Formatter;
9use std::hash::Hash;
10use std::hash::Hasher;
11use std::marker::PhantomData;
12use std::ops::Deref;
13use std::ops::RangeBounds;
14
15use bytes::Buf;
16use bytes::Bytes;
17use vortex_error::VortexExpect;
18use vortex_error::vortex_panic;
19
20use crate::Alignment;
21use crate::BufferMut;
22use crate::ByteBuffer;
23use crate::debug::TruncatedDebug;
24use crate::trusted_len::TrustedLen;
25
26pub struct Buffer<T> {
28 pub(crate) bytes: Bytes,
29 pub(crate) length: usize,
30 pub(crate) alignment: Alignment,
31 pub(crate) _marker: PhantomData<T>,
32}
33
34impl<T> Clone for Buffer<T> {
35 #[inline]
36 fn clone(&self) -> Self {
37 Self {
38 bytes: self.bytes.clone(),
39 length: self.length,
40 alignment: self.alignment,
41 _marker: PhantomData,
42 }
43 }
44}
45
46impl<T> Default for Buffer<T> {
47 fn default() -> Self {
48 Self {
49 bytes: Default::default(),
50 length: 0,
51 alignment: Alignment::of::<T>(),
52 _marker: PhantomData,
53 }
54 }
55}
56
57impl<T> PartialEq for Buffer<T> {
58 #[inline]
59 fn eq(&self, other: &Self) -> bool {
60 self.bytes == other.bytes
61 }
62}
63
64impl<T: PartialEq> PartialEq<Vec<T>> for Buffer<T> {
65 fn eq(&self, other: &Vec<T>) -> bool {
66 self.as_ref() == other.as_slice()
67 }
68}
69
70impl<T: PartialEq> PartialEq<Buffer<T>> for Vec<T> {
71 fn eq(&self, other: &Buffer<T>) -> bool {
72 self.as_slice() == other.as_ref()
73 }
74}
75
76impl<T> Eq for Buffer<T> {}
77
78impl<T> Ord for Buffer<T> {
79 #[inline]
80 fn cmp(&self, other: &Self) -> Ordering {
81 self.bytes.cmp(&other.bytes)
82 }
83}
84
85impl<T> PartialOrd for Buffer<T> {
86 #[inline]
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92impl<T> Hash for Buffer<T> {
93 #[inline]
94 fn hash<H: Hasher>(&self, state: &mut H) {
95 self.bytes.as_ref().hash(state)
96 }
97}
98
99impl<T> Buffer<T> {
100 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
107 BufferMut::copy_from(values).freeze()
108 }
109
110 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
112 BufferMut::copy_from_aligned(values, alignment).freeze()
113 }
114
115 pub fn zeroed(len: usize) -> Self {
117 Self::zeroed_aligned(len, Alignment::of::<T>())
118 }
119
120 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
122 BufferMut::zeroed_aligned(len, alignment).freeze()
123 }
124
125 pub fn empty() -> Self {
127 BufferMut::empty().freeze()
128 }
129
130 pub fn empty_aligned(alignment: Alignment) -> Self {
132 BufferMut::empty_aligned(alignment).freeze()
133 }
134
135 pub fn full(item: T, len: usize) -> Self
137 where
138 T: Copy,
139 {
140 BufferMut::full(item, len).freeze()
141 }
142
143 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
150 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
152 }
153
154 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
161 Self::from_bytes_aligned(buffer.into_inner(), alignment)
162 }
163
164 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
171 if !alignment.is_aligned_to(Alignment::of::<T>()) {
172 vortex_panic!(
173 "Alignment {} must be compatible with the scalar type's alignment {}",
174 alignment,
175 Alignment::of::<T>(),
176 );
177 }
178 if bytes.as_ptr().align_offset(*alignment) != 0 {
179 vortex_panic!(
180 "Bytes alignment must align to the requested alignment {}",
181 alignment,
182 );
183 }
184 if !bytes.len().is_multiple_of(size_of::<T>()) {
185 vortex_panic!(
186 "Bytes length {} must be a multiple of the scalar type's size {}",
187 bytes.len(),
188 size_of::<T>()
189 );
190 }
191 let length = bytes.len() / size_of::<T>();
192 Self {
193 bytes,
194 length,
195 alignment,
196 _marker: Default::default(),
197 }
198 }
199
200 pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
203 let (_, upper_bound) = iter.size_hint();
204 let mut buffer = BufferMut::with_capacity(
205 upper_bound.vortex_expect("TrustedLen iterator has no upper bound"),
206 );
207 buffer.extend_trusted(iter);
208 buffer.freeze()
209 }
210
211 pub fn map_each_in_place<R, F>(self, mut f: F) -> BufferMut<R>
213 where
214 T: Copy,
215 F: FnMut(T) -> R,
216 {
217 match self.try_into_mut() {
218 Ok(mut_buf) => mut_buf.map_each_in_place(f),
219 Err(buf) => {
220 let len = buf.len();
221 let mut out_buf = BufferMut::with_capacity(len);
222 out_buf
223 .spare_capacity_mut()
224 .iter_mut()
225 .zip(buf)
226 .for_each(|(out, in_)| {
227 out.write(f(in_));
228 });
229 unsafe { out_buf.set_len(len) }
231 out_buf
232 }
233 }
234 }
235
236 pub fn clear(&mut self) {
238 self.bytes.clear();
239 self.length = 0;
240 }
241
242 #[inline(always)]
244 pub fn len(&self) -> usize {
245 self.length
246 }
247
248 #[inline(always)]
250 pub fn is_empty(&self) -> bool {
251 self.length == 0
252 }
253
254 #[inline(always)]
256 pub fn alignment(&self) -> Alignment {
257 self.alignment
258 }
259
260 #[inline(always)]
262 pub fn as_slice(&self) -> &[T] {
263 unsafe { std::slice::from_raw_parts(self.bytes.as_ptr().cast(), self.length) }
265 }
266
267 #[inline(always)]
269 pub fn as_bytes(&self) -> &[u8] {
270 self.bytes.as_ref()
271 }
272
273 pub fn iter(&self) -> Iter<'_, T> {
275 Iter {
276 inner: self.as_slice().iter(),
277 }
278 }
279
280 #[inline(always)]
287 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
288 self.slice_with_alignment(range, self.alignment)
289 }
290
291 #[inline(always)]
298 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
299 self.slice_with_alignment(range, Alignment::of::<u8>())
300 }
301
302 pub fn slice_with_alignment(
310 &self,
311 range: impl RangeBounds<usize>,
312 alignment: Alignment,
313 ) -> Self {
314 let len = self.len();
315 let begin = match range.start_bound() {
316 Bound::Included(&n) => n,
317 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
318 Bound::Unbounded => 0,
319 };
320 let end = match range.end_bound() {
321 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
322 Bound::Excluded(&n) => n,
323 Bound::Unbounded => len,
324 };
325
326 if begin > end {
327 vortex_panic!(
328 "range start must not be greater than end: {:?} <= {:?}",
329 begin,
330 end
331 );
332 }
333 if end > len {
334 vortex_panic!("range end out of bounds: {:?} > {:?}", end, len);
335 }
336
337 if end == begin {
338 return Self::empty_aligned(alignment);
341 }
342
343 let begin_byte = begin * size_of::<T>();
344 let end_byte = end * size_of::<T>();
345
346 if !begin_byte.is_multiple_of(*alignment) {
347 vortex_panic!(
348 "range start must be aligned to {alignment:?}, byte {}",
349 begin_byte
350 );
351 }
352 if !alignment.is_aligned_to(Alignment::of::<T>()) {
353 vortex_panic!("Slice alignment must at least align to type T")
354 }
355
356 Self {
357 bytes: self.bytes.slice(begin_byte..end_byte),
358 length: end - begin,
359 alignment,
360 _marker: Default::default(),
361 }
362 }
363
364 #[inline(always)]
373 pub fn slice_ref(&self, subset: &[T]) -> Self {
374 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
375 }
376
377 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
387 if !alignment.is_aligned_to(Alignment::of::<T>()) {
388 vortex_panic!("slice_ref alignment must at least align to type T")
389 }
390
391 if !self.alignment.is_aligned_to(alignment) {
392 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
393 }
394
395 if subset.as_ptr().align_offset(*alignment) != 0 {
396 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
397 }
398
399 let subset_u8 =
400 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
401
402 Self {
403 bytes: self.bytes.slice_ref(subset_u8),
404 length: subset.len(),
405 alignment,
406 _marker: Default::default(),
407 }
408 }
409
410 pub fn inner(&self) -> &Bytes {
412 debug_assert_eq!(
413 self.length * size_of::<T>(),
414 self.bytes.len(),
415 "Own length has to be the same as the underlying bytes length"
416 );
417 &self.bytes
418 }
419
420 pub fn into_inner(self) -> Bytes {
422 debug_assert_eq!(
423 self.length * size_of::<T>(),
424 self.bytes.len(),
425 "Own length has to be the same as the underlying bytes length"
426 );
427 self.bytes
428 }
429
430 pub fn into_byte_buffer(self) -> ByteBuffer {
432 ByteBuffer {
433 bytes: self.bytes,
434 length: self.length * size_of::<T>(),
435 alignment: self.alignment,
436 _marker: Default::default(),
437 }
438 }
439
440 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
442 self.bytes
443 .try_into_mut()
444 .map(|bytes| BufferMut {
445 bytes,
446 length: self.length,
447 alignment: self.alignment,
448 _marker: Default::default(),
449 })
450 .map_err(|bytes| Self {
451 bytes,
452 length: self.length,
453 alignment: self.alignment,
454 _marker: Default::default(),
455 })
456 }
457
458 pub fn into_mut(self) -> BufferMut<T> {
460 self.try_into_mut()
461 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
462 }
463
464 pub fn is_aligned(&self, alignment: Alignment) -> bool {
466 self.bytes.as_ptr().align_offset(*alignment) == 0
467 }
468
469 pub fn aligned(mut self, alignment: Alignment) -> Self {
471 if self.as_ptr().align_offset(*alignment) == 0 {
472 self.alignment = alignment;
473 self
474 } else {
475 #[cfg(feature = "warn-copy")]
476 {
477 let bt = std::backtrace::Backtrace::capture();
478 tracing::warn!(
479 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
480 )
481 }
482 Self::copy_from_aligned(self, alignment)
483 }
484 }
485
486 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
488 if self.as_ptr().align_offset(*alignment) == 0 {
489 self.alignment = alignment;
490 self
491 } else {
492 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
493 }
494 }
495}
496
497impl<T> Buffer<T> {
498 pub unsafe fn transmute<U>(self) -> Buffer<U> {
510 assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
511 assert_eq!(
512 align_of::<T>(),
513 align_of::<U>(),
514 "Buffer type alignment mismatch"
515 );
516
517 Buffer {
518 bytes: self.bytes,
519 length: self.length,
520 alignment: self.alignment,
521 _marker: PhantomData,
522 }
523 }
524}
525
526pub struct Iter<'a, T> {
530 inner: std::slice::Iter<'a, T>,
531}
532
533impl<'a, T> Iterator for Iter<'a, T> {
534 type Item = &'a T;
535
536 #[inline]
537 fn next(&mut self) -> Option<Self::Item> {
538 self.inner.next()
539 }
540
541 #[inline]
542 fn size_hint(&self) -> (usize, Option<usize>) {
543 self.inner.size_hint()
544 }
545
546 #[inline]
547 fn count(self) -> usize {
548 self.inner.count()
549 }
550
551 #[inline]
552 fn last(self) -> Option<Self::Item> {
553 self.inner.last()
554 }
555
556 #[inline]
557 fn nth(&mut self, n: usize) -> Option<Self::Item> {
558 self.inner.nth(n)
559 }
560}
561
562impl<T> ExactSizeIterator for Iter<'_, T> {
563 #[inline]
564 fn len(&self) -> usize {
565 self.inner.len()
566 }
567}
568
569impl<T: Debug> Debug for Buffer<T> {
570 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
571 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
572 .field("length", &self.length)
573 .field("alignment", &self.alignment)
574 .field("as_slice", &TruncatedDebug(self.as_slice()))
575 .finish()
576 }
577}
578
579impl<T> Deref for Buffer<T> {
580 type Target = [T];
581
582 #[inline]
583 fn deref(&self) -> &Self::Target {
584 self.as_slice()
585 }
586}
587
588impl<T> AsRef<[T]> for Buffer<T> {
589 #[inline]
590 fn as_ref(&self) -> &[T] {
591 self.as_slice()
592 }
593}
594
595impl<T> FromIterator<T> for Buffer<T> {
596 #[inline]
597 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
598 BufferMut::from_iter(iter).freeze()
599 }
600}
601
602#[repr(transparent)]
604struct Wrapper<T>(Vec<T>);
605
606impl<T> AsRef<[u8]> for Wrapper<T> {
607 fn as_ref(&self) -> &[u8] {
608 let data = self.0.as_ptr().cast::<u8>();
609 let len = self.0.len() * size_of::<T>();
610 unsafe { std::slice::from_raw_parts(data, len) }
611 }
612}
613
614impl<T> From<Vec<T>> for Buffer<T>
615where
616 T: Send + 'static,
617{
618 fn from(value: Vec<T>) -> Self {
619 let original_len = value.len();
620 let wrapped_vec = Wrapper(value);
621
622 let bytes = Bytes::from_owner(wrapped_vec);
623
624 assert_eq!(bytes.as_ptr().align_offset(align_of::<T>()), 0);
625
626 Self {
627 bytes,
628 length: original_len,
629 alignment: Alignment::of::<T>(),
630 _marker: PhantomData,
631 }
632 }
633}
634
635impl From<Bytes> for ByteBuffer {
636 fn from(bytes: Bytes) -> Self {
637 let length = bytes.len();
638 Self {
639 bytes,
640 length,
641 alignment: Alignment::of::<u8>(),
642 _marker: Default::default(),
643 }
644 }
645}
646
647impl Buf for ByteBuffer {
648 #[inline]
649 fn remaining(&self) -> usize {
650 self.len()
651 }
652
653 #[inline]
654 fn chunk(&self) -> &[u8] {
655 self.as_slice()
656 }
657
658 #[inline]
659 fn advance(&mut self, cnt: usize) {
660 if !cnt.is_multiple_of(*self.alignment) {
661 vortex_panic!(
662 "Cannot advance buffer by {} items, resulting alignment is not {}",
663 cnt,
664 self.alignment
665 );
666 }
667 self.bytes.advance(cnt);
668 self.length -= cnt;
669 }
670}
671
672pub struct BufferIterator<T: Copy> {
674 _buffer: Buffer<T>,
676 ptr: *const T,
677 end: *const T,
678}
679
680impl<T: Copy> Iterator for BufferIterator<T> {
681 type Item = T;
682
683 #[inline]
684 fn next(&mut self) -> Option<Self::Item> {
685 if self.ptr == self.end {
686 None
687 } else {
688 let value = unsafe { self.ptr.read() };
690 self.ptr = unsafe { self.ptr.add(1) };
691 Some(value)
692 }
693 }
694
695 #[inline]
696 fn size_hint(&self) -> (usize, Option<usize>) {
697 let remaining = unsafe { self.end.offset_from(self.ptr) } as usize;
698 (remaining, Some(remaining))
699 }
700}
701
702impl<T: Copy> ExactSizeIterator for BufferIterator<T> {}
703
704impl<T: Copy> IntoIterator for Buffer<T> {
705 type Item = T;
706 type IntoIter = BufferIterator<T>;
707
708 #[inline]
709 fn into_iter(self) -> Self::IntoIter {
710 let ptr = self.as_slice().as_ptr();
711 let end = unsafe { ptr.add(self.len()) };
712 BufferIterator {
713 _buffer: self,
714 ptr,
715 end,
716 }
717 }
718}
719
720impl<T> From<BufferMut<T>> for Buffer<T> {
721 #[inline]
722 fn from(value: BufferMut<T>) -> Self {
723 value.freeze()
724 }
725}
726
727#[cfg(test)]
728mod test {
729 use bytes::Buf;
730
731 use crate::Alignment;
732 use crate::Buffer;
733 use crate::ByteBuffer;
734 use crate::buffer;
735
736 #[test]
737 fn align() {
738 let buf = buffer![0u8, 1, 2];
739 let aligned = buf.aligned(Alignment::new(32));
740 assert_eq!(aligned.alignment(), Alignment::new(32));
741 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
742 }
743
744 #[test]
745 fn slice() {
746 let buf = buffer![0, 1, 2, 3, 4];
747 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
748 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
749 }
750
751 #[test]
752 fn slice_unaligned() {
753 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
754 let sliced = buf.slice_unaligned(1..2);
756 assert_eq!(sliced.len(), 1);
758 assert_eq!(sliced.as_slice(), &[0]);
761 }
762
763 #[test]
764 #[should_panic]
765 fn slice_bad_alignment() {
766 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
767 buf.slice(1..2);
769 }
770
771 #[test]
772 fn bytes_buf() {
773 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
774 assert_eq!(buf.remaining(), 10);
775 assert_eq!(buf.chunk(), b"helloworld");
776
777 Buf::advance(&mut buf, 5);
778 assert_eq!(buf.remaining(), 5);
779 assert_eq!(buf.as_slice(), b"world");
780 assert_eq!(buf.chunk(), b"world");
781 }
782
783 #[test]
784 fn from_vec() {
785 let vec = vec![1, 2, 3, 4, 5];
786 let buff = Buffer::from(vec.clone());
787 assert!(buff.is_aligned(Alignment::of::<i32>()));
788 assert_eq!(vec, buff);
789 }
790
791 #[test]
792 fn test_slice_unaligned_end_pos() {
793 let data = vec![0u8; 2];
794 let aligned_buffer = Buffer::copy_from_aligned(&data, Alignment::new(8));
796 aligned_buffer.slice(0..1);
801 }
802}