1use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::Debug;
8use std::fmt::Formatter;
9use std::hash::Hash;
10use std::hash::Hasher;
11use std::marker::PhantomData;
12use std::ops::Deref;
13use std::ops::RangeBounds;
14
15use bytes::Buf;
16use bytes::Bytes;
17use vortex_error::VortexExpect;
18use vortex_error::vortex_panic;
19
20use crate::Alignment;
21use crate::BufferMut;
22use crate::ByteBuffer;
23use crate::debug::TruncatedDebug;
24use crate::trusted_len::TrustedLen;
25
26pub struct Buffer<T> {
28 pub(crate) bytes: Bytes,
29 pub(crate) length: usize,
30 pub(crate) alignment: Alignment,
31 pub(crate) _marker: PhantomData<T>,
32}
33
34impl<T> Clone for Buffer<T> {
35 #[inline]
36 fn clone(&self) -> Self {
37 Self {
38 bytes: self.bytes.clone(),
39 length: self.length,
40 alignment: self.alignment,
41 _marker: PhantomData,
42 }
43 }
44}
45
46impl<T> Default for Buffer<T> {
47 fn default() -> Self {
48 Self {
49 bytes: Default::default(),
50 length: 0,
51 alignment: Alignment::of::<T>(),
52 _marker: PhantomData,
53 }
54 }
55}
56
57impl<T> PartialEq for Buffer<T> {
58 #[inline]
59 fn eq(&self, other: &Self) -> bool {
60 self.bytes == other.bytes
61 }
62}
63
64impl<T: PartialEq> PartialEq<Vec<T>> for Buffer<T> {
65 fn eq(&self, other: &Vec<T>) -> bool {
66 self.as_ref() == other.as_slice()
67 }
68}
69
70impl<T: PartialEq> PartialEq<Buffer<T>> for Vec<T> {
71 fn eq(&self, other: &Buffer<T>) -> bool {
72 self.as_slice() == other.as_ref()
73 }
74}
75
76impl<T> Eq for Buffer<T> {}
77
78impl<T> Ord for Buffer<T> {
79 #[inline]
80 fn cmp(&self, other: &Self) -> Ordering {
81 self.bytes.cmp(&other.bytes)
82 }
83}
84
85impl<T> PartialOrd for Buffer<T> {
86 #[inline]
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92impl<T> Hash for Buffer<T> {
93 #[inline]
94 fn hash<H: Hasher>(&self, state: &mut H) {
95 self.bytes.as_ref().hash(state)
96 }
97}
98
99impl<T> Buffer<T> {
100 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
107 BufferMut::copy_from(values).freeze()
108 }
109
110 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
112 BufferMut::copy_from_aligned(values, alignment).freeze()
113 }
114
115 pub fn zeroed(len: usize) -> Self {
117 Self::zeroed_aligned(len, Alignment::of::<T>())
118 }
119
120 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
122 BufferMut::zeroed_aligned(len, alignment).freeze()
123 }
124
125 pub fn empty() -> Self {
127 BufferMut::empty().freeze()
128 }
129
130 pub fn empty_aligned(alignment: Alignment) -> Self {
132 BufferMut::empty_aligned(alignment).freeze()
133 }
134
135 pub fn full(item: T, len: usize) -> Self
137 where
138 T: Copy,
139 {
140 BufferMut::full(item, len).freeze()
141 }
142
143 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
150 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
152 }
153
154 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
161 Self::from_bytes_aligned(buffer.into_inner(), alignment)
162 }
163
164 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
171 if !alignment.is_aligned_to(Alignment::of::<T>()) {
172 vortex_panic!(
173 "Alignment {} must be compatible with the scalar type's alignment {}",
174 alignment,
175 Alignment::of::<T>(),
176 );
177 }
178 if bytes.as_ptr().align_offset(*alignment) != 0 {
179 vortex_panic!(
180 "Bytes alignment must align to the requested alignment {}",
181 alignment,
182 );
183 }
184 if !bytes.len().is_multiple_of(size_of::<T>()) {
185 vortex_panic!(
186 "Bytes length {} must be a multiple of the scalar type's size {}",
187 bytes.len(),
188 size_of::<T>()
189 );
190 }
191 let length = bytes.len() / size_of::<T>();
192 Self {
193 bytes,
194 length,
195 alignment,
196 _marker: Default::default(),
197 }
198 }
199
200 pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
203 let (_, upper_bound) = iter.size_hint();
204 let mut buffer = BufferMut::with_capacity(
205 upper_bound.vortex_expect("TrustedLen iterator has no upper bound"),
206 );
207 buffer.extend_trusted(iter);
208 buffer.freeze()
209 }
210
211 pub fn clear(&mut self) {
213 self.bytes.clear();
214 self.length = 0;
215 }
216
217 #[inline(always)]
219 pub fn len(&self) -> usize {
220 self.length
221 }
222
223 #[inline(always)]
225 pub fn is_empty(&self) -> bool {
226 self.length == 0
227 }
228
229 #[inline(always)]
231 pub fn alignment(&self) -> Alignment {
232 self.alignment
233 }
234
235 #[inline(always)]
237 pub fn as_slice(&self) -> &[T] {
238 unsafe { std::slice::from_raw_parts(self.bytes.as_ptr().cast(), self.length) }
240 }
241
242 #[inline(always)]
244 pub fn as_bytes(&self) -> &[u8] {
245 self.bytes.as_ref()
246 }
247
248 pub fn iter(&self) -> Iter<'_, T> {
250 Iter {
251 inner: self.as_slice().iter(),
252 }
253 }
254
255 #[inline(always)]
262 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
263 self.slice_with_alignment(range, self.alignment)
264 }
265
266 #[inline(always)]
273 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
274 self.slice_with_alignment(range, Alignment::of::<u8>())
275 }
276
277 pub fn slice_with_alignment(
285 &self,
286 range: impl RangeBounds<usize>,
287 alignment: Alignment,
288 ) -> Self {
289 let len = self.len();
290 let begin = match range.start_bound() {
291 Bound::Included(&n) => n,
292 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
293 Bound::Unbounded => 0,
294 };
295 let end = match range.end_bound() {
296 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
297 Bound::Excluded(&n) => n,
298 Bound::Unbounded => len,
299 };
300
301 if begin > end {
302 vortex_panic!(
303 "range start must not be greater than end: {:?} <= {:?}",
304 begin,
305 end
306 );
307 }
308 if end > len {
309 vortex_panic!("range end out of bounds: {:?} > {:?}", end, len);
310 }
311
312 if end == begin {
313 return Self::empty_aligned(alignment);
316 }
317
318 let begin_byte = begin * size_of::<T>();
319 let end_byte = end * size_of::<T>();
320
321 if !begin_byte.is_multiple_of(*alignment) {
322 vortex_panic!(
323 "range start must be aligned to {alignment:?}, byte {}",
324 begin_byte
325 );
326 }
327 if !alignment.is_aligned_to(Alignment::of::<T>()) {
328 vortex_panic!("Slice alignment must at least align to type T")
329 }
330
331 Self {
332 bytes: self.bytes.slice(begin_byte..end_byte),
333 length: end - begin,
334 alignment,
335 _marker: Default::default(),
336 }
337 }
338
339 #[inline(always)]
348 pub fn slice_ref(&self, subset: &[T]) -> Self {
349 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
350 }
351
352 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
362 if !alignment.is_aligned_to(Alignment::of::<T>()) {
363 vortex_panic!("slice_ref alignment must at least align to type T")
364 }
365
366 if !self.alignment.is_aligned_to(alignment) {
367 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
368 }
369
370 if subset.as_ptr().align_offset(*alignment) != 0 {
371 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
372 }
373
374 let subset_u8 =
375 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
376
377 Self {
378 bytes: self.bytes.slice_ref(subset_u8),
379 length: subset.len(),
380 alignment,
381 _marker: Default::default(),
382 }
383 }
384
385 pub fn inner(&self) -> &Bytes {
387 debug_assert_eq!(
388 self.length * size_of::<T>(),
389 self.bytes.len(),
390 "Own length has to be the same as the underlying bytes length"
391 );
392 &self.bytes
393 }
394
395 pub fn into_inner(self) -> Bytes {
397 debug_assert_eq!(
398 self.length * size_of::<T>(),
399 self.bytes.len(),
400 "Own length has to be the same as the underlying bytes length"
401 );
402 self.bytes
403 }
404
405 pub fn into_byte_buffer(self) -> ByteBuffer {
407 ByteBuffer {
408 bytes: self.bytes,
409 length: self.length * size_of::<T>(),
410 alignment: self.alignment,
411 _marker: Default::default(),
412 }
413 }
414
415 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
417 self.bytes
418 .try_into_mut()
419 .map(|bytes| BufferMut {
420 bytes,
421 length: self.length,
422 alignment: self.alignment,
423 _marker: Default::default(),
424 })
425 .map_err(|bytes| Self {
426 bytes,
427 length: self.length,
428 alignment: self.alignment,
429 _marker: Default::default(),
430 })
431 }
432
433 pub fn into_mut(self) -> BufferMut<T> {
435 self.try_into_mut()
436 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
437 }
438
439 pub fn is_aligned(&self, alignment: Alignment) -> bool {
441 self.bytes.as_ptr().align_offset(*alignment) == 0
442 }
443
444 pub fn aligned(mut self, alignment: Alignment) -> Self {
446 if self.as_ptr().align_offset(*alignment) == 0 {
447 self.alignment = alignment;
448 self
449 } else {
450 #[cfg(feature = "warn-copy")]
451 {
452 let bt = std::backtrace::Backtrace::capture();
453 tracing::warn!(
454 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
455 )
456 }
457 Self::copy_from_aligned(self, alignment)
458 }
459 }
460
461 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
463 if self.as_ptr().align_offset(*alignment) == 0 {
464 self.alignment = alignment;
465 self
466 } else {
467 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
468 }
469 }
470}
471
472impl<T> Buffer<T> {
473 pub unsafe fn transmute<U>(self) -> Buffer<U> {
485 assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
486 assert_eq!(
487 align_of::<T>(),
488 align_of::<U>(),
489 "Buffer type alignment mismatch"
490 );
491
492 Buffer {
493 bytes: self.bytes,
494 length: self.length,
495 alignment: self.alignment,
496 _marker: PhantomData,
497 }
498 }
499}
500
501pub struct Iter<'a, T> {
505 inner: std::slice::Iter<'a, T>,
506}
507
508impl<'a, T> Iterator for Iter<'a, T> {
509 type Item = &'a T;
510
511 #[inline]
512 fn next(&mut self) -> Option<Self::Item> {
513 self.inner.next()
514 }
515
516 #[inline]
517 fn size_hint(&self) -> (usize, Option<usize>) {
518 self.inner.size_hint()
519 }
520
521 #[inline]
522 fn count(self) -> usize {
523 self.inner.count()
524 }
525
526 #[inline]
527 fn last(self) -> Option<Self::Item> {
528 self.inner.last()
529 }
530
531 #[inline]
532 fn nth(&mut self, n: usize) -> Option<Self::Item> {
533 self.inner.nth(n)
534 }
535}
536
537impl<T> ExactSizeIterator for Iter<'_, T> {
538 #[inline]
539 fn len(&self) -> usize {
540 self.inner.len()
541 }
542}
543
544impl<T: Debug> Debug for Buffer<T> {
545 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
546 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
547 .field("length", &self.length)
548 .field("alignment", &self.alignment)
549 .field("as_slice", &TruncatedDebug(self.as_slice()))
550 .finish()
551 }
552}
553
554impl<T> Deref for Buffer<T> {
555 type Target = [T];
556
557 #[inline]
558 fn deref(&self) -> &Self::Target {
559 self.as_slice()
560 }
561}
562
563impl<T> AsRef<[T]> for Buffer<T> {
564 #[inline]
565 fn as_ref(&self) -> &[T] {
566 self.as_slice()
567 }
568}
569
570impl<T> FromIterator<T> for Buffer<T> {
571 #[inline]
572 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
573 BufferMut::from_iter(iter).freeze()
574 }
575}
576
577#[repr(transparent)]
579struct Wrapper<T>(Vec<T>);
580
581impl<T> AsRef<[u8]> for Wrapper<T> {
582 fn as_ref(&self) -> &[u8] {
583 let data = self.0.as_ptr().cast::<u8>();
584 let len = self.0.len() * size_of::<T>();
585 unsafe { std::slice::from_raw_parts(data, len) }
586 }
587}
588
589impl<T> From<Vec<T>> for Buffer<T>
590where
591 T: Send + 'static,
592{
593 fn from(value: Vec<T>) -> Self {
594 let original_len = value.len();
595 let wrapped_vec = Wrapper(value);
596
597 let bytes = Bytes::from_owner(wrapped_vec);
598
599 assert_eq!(bytes.as_ptr().align_offset(align_of::<T>()), 0);
600
601 Self {
602 bytes,
603 length: original_len,
604 alignment: Alignment::of::<T>(),
605 _marker: PhantomData,
606 }
607 }
608}
609
610impl From<Bytes> for ByteBuffer {
611 fn from(bytes: Bytes) -> Self {
612 let length = bytes.len();
613 Self {
614 bytes,
615 length,
616 alignment: Alignment::of::<u8>(),
617 _marker: Default::default(),
618 }
619 }
620}
621
622impl Buf for ByteBuffer {
623 #[inline]
624 fn remaining(&self) -> usize {
625 self.len()
626 }
627
628 #[inline]
629 fn chunk(&self) -> &[u8] {
630 self.as_slice()
631 }
632
633 #[inline]
634 fn advance(&mut self, cnt: usize) {
635 if !cnt.is_multiple_of(*self.alignment) {
636 vortex_panic!(
637 "Cannot advance buffer by {} items, resulting alignment is not {}",
638 cnt,
639 self.alignment
640 );
641 }
642 self.bytes.advance(cnt);
643 self.length -= cnt;
644 }
645}
646
647pub struct BufferIterator<T> {
649 buffer: Buffer<T>,
650 index: usize,
651}
652
653impl<T: Copy> Iterator for BufferIterator<T> {
654 type Item = T;
655
656 #[inline]
657 fn next(&mut self) -> Option<Self::Item> {
658 (self.index < self.buffer.len()).then(move || {
659 let value = self.buffer[self.index];
660 self.index += 1;
661 value
662 })
663 }
664
665 #[inline]
666 fn size_hint(&self) -> (usize, Option<usize>) {
667 let remaining = self.buffer.len() - self.index;
668 (remaining, Some(remaining))
669 }
670}
671
672impl<T: Copy> IntoIterator for Buffer<T> {
673 type Item = T;
674 type IntoIter = BufferIterator<T>;
675
676 #[inline]
677 fn into_iter(self) -> Self::IntoIter {
678 BufferIterator {
679 buffer: self,
680 index: 0,
681 }
682 }
683}
684
685impl<T> From<BufferMut<T>> for Buffer<T> {
686 #[inline]
687 fn from(value: BufferMut<T>) -> Self {
688 value.freeze()
689 }
690}
691
692#[cfg(test)]
693mod test {
694 use bytes::Buf;
695
696 use crate::Alignment;
697 use crate::Buffer;
698 use crate::ByteBuffer;
699 use crate::buffer;
700
701 #[test]
702 fn align() {
703 let buf = buffer![0u8, 1, 2];
704 let aligned = buf.aligned(Alignment::new(32));
705 assert_eq!(aligned.alignment(), Alignment::new(32));
706 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
707 }
708
709 #[test]
710 fn slice() {
711 let buf = buffer![0, 1, 2, 3, 4];
712 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
713 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
714 }
715
716 #[test]
717 fn slice_unaligned() {
718 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
719 let sliced = buf.slice_unaligned(1..2);
721 assert_eq!(sliced.len(), 1);
723 assert_eq!(sliced.as_slice(), &[0]);
726 }
727
728 #[test]
729 #[should_panic]
730 fn slice_bad_alignment() {
731 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
732 buf.slice(1..2);
734 }
735
736 #[test]
737 fn bytes_buf() {
738 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
739 assert_eq!(buf.remaining(), 10);
740 assert_eq!(buf.chunk(), b"helloworld");
741
742 Buf::advance(&mut buf, 5);
743 assert_eq!(buf.remaining(), 5);
744 assert_eq!(buf.as_slice(), b"world");
745 assert_eq!(buf.chunk(), b"world");
746 }
747
748 #[test]
749 fn from_vec() {
750 let vec = vec![1, 2, 3, 4, 5];
751 let buff = Buffer::from(vec.clone());
752 assert!(buff.is_aligned(Alignment::of::<i32>()));
753 assert_eq!(vec, buff);
754 }
755
756 #[test]
757 fn test_slice_unaligned_end_pos() {
758 let data = vec![0u8; 2];
759 let aligned_buffer = Buffer::copy_from_aligned(&data, Alignment::new(8));
761 aligned_buffer.slice(0..1);
766 }
767}