1use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::{Debug, Formatter};
8use std::hash::{Hash, Hasher};
9use std::marker::PhantomData;
10use std::ops::{Deref, RangeBounds};
11
12use bytes::{Buf, Bytes};
13use vortex_error::{VortexExpect, vortex_panic};
14
15use crate::debug::TruncatedDebug;
16use crate::trusted_len::TrustedLen;
17use crate::{Alignment, BufferMut, ByteBuffer};
18
19pub struct Buffer<T> {
21 pub(crate) bytes: Bytes,
22 pub(crate) length: usize,
23 pub(crate) alignment: Alignment,
24 pub(crate) _marker: PhantomData<T>,
25}
26
27impl<T> Clone for Buffer<T> {
28 #[inline]
29 fn clone(&self) -> Self {
30 Self {
31 bytes: self.bytes.clone(),
32 length: self.length,
33 alignment: self.alignment,
34 _marker: PhantomData,
35 }
36 }
37}
38
39impl<T> Default for Buffer<T> {
40 fn default() -> Self {
41 Self {
42 bytes: Default::default(),
43 length: 0,
44 alignment: Alignment::of::<T>(),
45 _marker: PhantomData,
46 }
47 }
48}
49
50impl<T> PartialEq for Buffer<T> {
51 #[inline]
52 fn eq(&self, other: &Self) -> bool {
53 self.bytes == other.bytes
54 }
55}
56
57impl<T: PartialEq> PartialEq<Vec<T>> for Buffer<T> {
58 fn eq(&self, other: &Vec<T>) -> bool {
59 self.as_ref() == other.as_slice()
60 }
61}
62
63impl<T: PartialEq> PartialEq<Buffer<T>> for Vec<T> {
64 fn eq(&self, other: &Buffer<T>) -> bool {
65 self.as_slice() == other.as_ref()
66 }
67}
68
69impl<T> Eq for Buffer<T> {}
70
71impl<T> Ord for Buffer<T> {
72 #[inline]
73 fn cmp(&self, other: &Self) -> Ordering {
74 self.bytes.cmp(&other.bytes)
75 }
76}
77
78impl<T> PartialOrd for Buffer<T> {
79 #[inline]
80 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
81 Some(self.cmp(other))
82 }
83}
84
85impl<T> Hash for Buffer<T> {
86 #[inline]
87 fn hash<H: Hasher>(&self, state: &mut H) {
88 self.bytes.as_ref().hash(state)
89 }
90}
91
92impl<T> Buffer<T> {
93 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
100 BufferMut::copy_from(values).freeze()
101 }
102
103 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
105 BufferMut::copy_from_aligned(values, alignment).freeze()
106 }
107
108 pub fn zeroed(len: usize) -> Self {
110 Self::zeroed_aligned(len, Alignment::of::<T>())
111 }
112
113 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
115 BufferMut::zeroed_aligned(len, alignment).freeze()
116 }
117
118 pub fn empty() -> Self {
120 BufferMut::empty().freeze()
121 }
122
123 pub fn empty_aligned(alignment: Alignment) -> Self {
125 BufferMut::empty_aligned(alignment).freeze()
126 }
127
128 pub fn full(item: T, len: usize) -> Self
130 where
131 T: Copy,
132 {
133 BufferMut::full(item, len).freeze()
134 }
135
136 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
143 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
145 }
146
147 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
154 Self::from_bytes_aligned(buffer.into_inner(), alignment)
155 }
156
157 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
164 if !alignment.is_aligned_to(Alignment::of::<T>()) {
165 vortex_panic!(
166 "Alignment {} must be compatible with the scalar type's alignment {}",
167 alignment,
168 Alignment::of::<T>(),
169 );
170 }
171 if bytes.as_ptr().align_offset(*alignment) != 0 {
172 vortex_panic!(
173 "Bytes alignment must align to the requested alignment {}",
174 alignment,
175 );
176 }
177 if bytes.len() % size_of::<T>() != 0 {
178 vortex_panic!(
179 "Bytes length {} must be a multiple of the scalar type's size {}",
180 bytes.len(),
181 size_of::<T>()
182 );
183 }
184 let length = bytes.len() / size_of::<T>();
185 Self {
186 bytes,
187 length,
188 alignment,
189 _marker: Default::default(),
190 }
191 }
192
193 pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
196 let (_, high) = iter.size_hint();
197 let mut buffer =
198 BufferMut::with_capacity(high.vortex_expect("TrustedLen iterator has no upper bound"));
199 buffer.extend_trusted(iter);
200 buffer.freeze()
201 }
202
203 #[inline(always)]
205 pub fn len(&self) -> usize {
206 self.length
207 }
208
209 #[inline(always)]
211 pub fn is_empty(&self) -> bool {
212 self.length == 0
213 }
214
215 #[inline(always)]
217 pub fn alignment(&self) -> Alignment {
218 self.alignment
219 }
220
221 #[inline(always)]
223 pub fn as_slice(&self) -> &[T] {
224 unsafe { std::slice::from_raw_parts(self.bytes.as_ptr().cast(), self.length) }
226 }
227
228 pub fn iter(&self) -> Iter<'_, T> {
230 Iter {
231 inner: self.as_slice().iter(),
232 }
233 }
234
235 #[inline(always)]
242 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
243 self.slice_with_alignment(range, self.alignment)
244 }
245
246 #[inline(always)]
253 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
254 self.slice_with_alignment(range, Alignment::of::<u8>())
255 }
256
257 pub fn slice_with_alignment(
265 &self,
266 range: impl RangeBounds<usize>,
267 alignment: Alignment,
268 ) -> Self {
269 let len = self.len();
270 let begin = match range.start_bound() {
271 Bound::Included(&n) => n,
272 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
273 Bound::Unbounded => 0,
274 };
275 let end = match range.end_bound() {
276 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
277 Bound::Excluded(&n) => n,
278 Bound::Unbounded => len,
279 };
280
281 if begin > end {
282 vortex_panic!(
283 "range start must not be greater than end: {:?} <= {:?}",
284 begin,
285 end
286 );
287 }
288 if end > len {
289 vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
290 }
291
292 if end == begin {
293 return Self::empty_aligned(alignment);
296 }
297
298 let begin_byte = begin * size_of::<T>();
299 let end_byte = end * size_of::<T>();
300
301 if !begin_byte.is_multiple_of(*alignment) {
302 vortex_panic!("range start must be aligned to {:?}", alignment);
303 }
304 if !end_byte.is_multiple_of(*alignment) {
305 vortex_panic!("range end must be aligned to {:?}", alignment);
306 }
307 if !alignment.is_aligned_to(Alignment::of::<T>()) {
308 vortex_panic!("Slice alignment must at least align to type T")
309 }
310
311 Self {
312 bytes: self.bytes.slice(begin_byte..end_byte),
313 length: end - begin,
314 alignment,
315 _marker: Default::default(),
316 }
317 }
318
319 #[inline(always)]
328 pub fn slice_ref(&self, subset: &[T]) -> Self {
329 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
330 }
331
332 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
342 if !alignment.is_aligned_to(Alignment::of::<T>()) {
343 vortex_panic!("slice_ref alignment must at least align to type T")
344 }
345
346 if !self.alignment.is_aligned_to(alignment) {
347 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
348 }
349
350 if subset.as_ptr().align_offset(*alignment) != 0 {
351 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
352 }
353
354 let subset_u8 =
355 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
356
357 Self {
358 bytes: self.bytes.slice_ref(subset_u8),
359 length: subset.len(),
360 alignment,
361 _marker: Default::default(),
362 }
363 }
364
365 pub fn inner(&self) -> &Bytes {
367 debug_assert_eq!(
368 self.length * size_of::<T>(),
369 self.bytes.len(),
370 "Own length has to be the same as the underlying bytes length"
371 );
372 &self.bytes
373 }
374
375 pub fn into_inner(self) -> Bytes {
377 debug_assert_eq!(
378 self.length * size_of::<T>(),
379 self.bytes.len(),
380 "Own length has to be the same as the underlying bytes length"
381 );
382 self.bytes
383 }
384
385 pub fn into_byte_buffer(self) -> ByteBuffer {
387 ByteBuffer {
388 bytes: self.bytes,
389 length: self.length * size_of::<T>(),
390 alignment: self.alignment,
391 _marker: Default::default(),
392 }
393 }
394
395 pub fn into_mut(self) -> BufferMut<T> {
397 self.try_into_mut()
398 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
399 }
400
401 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
403 self.bytes
404 .try_into_mut()
405 .map(|bytes| BufferMut {
406 bytes,
407 length: self.length,
408 alignment: self.alignment,
409 _marker: Default::default(),
410 })
411 .map_err(|bytes| Self {
412 bytes,
413 length: self.length,
414 alignment: self.alignment,
415 _marker: Default::default(),
416 })
417 }
418
419 pub fn is_aligned(&self, alignment: Alignment) -> bool {
421 self.bytes.as_ptr().align_offset(*alignment) == 0
422 }
423
424 pub fn aligned(mut self, alignment: Alignment) -> Self {
426 if self.as_ptr().align_offset(*alignment) == 0 {
427 self.alignment = alignment;
428 self
429 } else {
430 #[cfg(feature = "warn-copy")]
431 {
432 let bt = std::backtrace::Backtrace::capture();
433 log::warn!(
434 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
435 )
436 }
437 Self::copy_from_aligned(self, alignment)
438 }
439 }
440
441 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
443 if self.as_ptr().align_offset(*alignment) == 0 {
444 self.alignment = alignment;
445 self
446 } else {
447 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
448 }
449 }
450
451 pub fn align_to<U>(mut self) -> (Buffer<T>, Buffer<U>, Buffer<T>) {
453 let offset = self.as_ptr().align_offset(align_of::<U>());
454 if offset > self.len() {
455 (
456 self,
457 Buffer::empty_aligned(Alignment::of::<U>()),
458 Buffer::empty_aligned(Alignment::of::<T>()),
459 )
460 } else {
461 let left = self.bytes.split_to(offset);
462 self.length -= offset;
463 let (us_len, _) = self.align_to_offsets::<U>();
464 let trailer = self.bytes.split_off(us_len * size_of::<U>());
465 (
466 Buffer::from_bytes_aligned(left, Alignment::of::<T>()),
467 Buffer::from_bytes_aligned(self.bytes, Alignment::of::<U>()),
468 Buffer::from_bytes_aligned(trailer, Alignment::of::<T>()),
469 )
470 }
471 }
472
473 fn align_to_offsets<U>(&self) -> (usize, usize) {
476 const fn gcd(a: usize, b: usize) -> usize {
495 if b == 0 { a } else { gcd(b, a % b) }
496 }
497
498 let gcd: usize = const { gcd(size_of::<T>(), size_of::<U>()) };
501 let ts: usize = size_of::<U>() / gcd;
502 let us: usize = size_of::<T>() / gcd;
503
504 let us_len = self.len() / ts * us;
506 let ts_len = self.len() % ts;
508 (us_len, ts_len)
509 }
510}
511
512pub struct Iter<'a, T> {
516 inner: std::slice::Iter<'a, T>,
517}
518
519impl<'a, T> Iterator for Iter<'a, T> {
520 type Item = &'a T;
521
522 #[inline]
523 fn next(&mut self) -> Option<Self::Item> {
524 self.inner.next()
525 }
526
527 #[inline]
528 fn size_hint(&self) -> (usize, Option<usize>) {
529 self.inner.size_hint()
530 }
531
532 #[inline]
533 fn count(self) -> usize {
534 self.inner.count()
535 }
536
537 #[inline]
538 fn last(self) -> Option<Self::Item> {
539 self.inner.last()
540 }
541
542 #[inline]
543 fn nth(&mut self, n: usize) -> Option<Self::Item> {
544 self.inner.nth(n)
545 }
546}
547
548impl<T> ExactSizeIterator for Iter<'_, T> {
549 #[inline]
550 fn len(&self) -> usize {
551 self.inner.len()
552 }
553}
554
555impl<T: Debug> Debug for Buffer<T> {
556 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
557 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
558 .field("length", &self.length)
559 .field("alignment", &self.alignment)
560 .field("as_slice", &TruncatedDebug(self.as_slice()))
561 .finish()
562 }
563}
564
565impl<T> Deref for Buffer<T> {
566 type Target = [T];
567
568 #[inline]
569 fn deref(&self) -> &Self::Target {
570 self.as_slice()
571 }
572}
573
574impl<T> AsRef<[T]> for Buffer<T> {
575 #[inline]
576 fn as_ref(&self) -> &[T] {
577 self.as_slice()
578 }
579}
580
581impl<T> FromIterator<T> for Buffer<T> {
582 #[inline]
583 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
584 BufferMut::from_iter(iter).freeze()
585 }
586}
587
588#[repr(transparent)]
590struct Wrapper<T>(Vec<T>);
591
592impl<T> AsRef<[u8]> for Wrapper<T> {
593 fn as_ref(&self) -> &[u8] {
594 let data = self.0.as_ptr().cast::<u8>();
595 let len = self.0.len() * size_of::<T>();
596 unsafe { std::slice::from_raw_parts(data, len) }
597 }
598}
599
600impl<T> From<Vec<T>> for Buffer<T>
601where
602 T: Send + 'static,
603{
604 fn from(value: Vec<T>) -> Self {
605 let original_len = value.len();
606 let wrapped_vec = Wrapper(value);
607
608 let bytes = Bytes::from_owner(wrapped_vec);
609
610 assert_eq!(bytes.as_ptr().align_offset(align_of::<T>()), 0);
611
612 Self {
613 bytes,
614 length: original_len,
615 alignment: Alignment::of::<T>(),
616 _marker: PhantomData,
617 }
618 }
619}
620
621impl From<Bytes> for ByteBuffer {
622 fn from(bytes: Bytes) -> Self {
623 let length = bytes.len();
624 Self {
625 bytes,
626 length,
627 alignment: Alignment::of::<u8>(),
628 _marker: Default::default(),
629 }
630 }
631}
632
633impl Buf for ByteBuffer {
634 #[inline]
635 fn remaining(&self) -> usize {
636 self.len()
637 }
638
639 #[inline]
640 fn chunk(&self) -> &[u8] {
641 self.as_slice()
642 }
643
644 #[inline]
645 fn advance(&mut self, cnt: usize) {
646 if !cnt.is_multiple_of(*self.alignment) {
647 vortex_panic!(
648 "Cannot advance buffer by {} items, resulting alignment is not {}",
649 cnt,
650 self.alignment
651 );
652 }
653 self.bytes.advance(cnt);
654 self.length -= cnt;
655 }
656}
657
658pub struct BufferIterator<T> {
660 buffer: Buffer<T>,
661 index: usize,
662}
663
664impl<T: Copy> Iterator for BufferIterator<T> {
665 type Item = T;
666
667 #[inline]
668 fn next(&mut self) -> Option<Self::Item> {
669 (self.index < self.buffer.len()).then(move || {
670 let value = self.buffer[self.index];
671 self.index += 1;
672 value
673 })
674 }
675
676 #[inline]
677 fn size_hint(&self) -> (usize, Option<usize>) {
678 let remaining = self.buffer.len() - self.index;
679 (remaining, Some(remaining))
680 }
681}
682
683impl<T: Copy> IntoIterator for Buffer<T> {
684 type Item = T;
685 type IntoIter = BufferIterator<T>;
686
687 #[inline]
688 fn into_iter(self) -> Self::IntoIter {
689 BufferIterator {
690 buffer: self,
691 index: 0,
692 }
693 }
694}
695
696impl<T> From<BufferMut<T>> for Buffer<T> {
697 #[inline]
698 fn from(value: BufferMut<T>) -> Self {
699 value.freeze()
700 }
701}
702
703#[cfg(test)]
704mod test {
705 use bytes::Buf;
706
707 use crate::{Alignment, Buffer, ByteBuffer, buffer};
708
709 #[test]
710 fn align() {
711 let buf = buffer![0u8, 1, 2];
712 let aligned = buf.aligned(Alignment::new(32));
713 assert_eq!(aligned.alignment(), Alignment::new(32));
714 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
715 }
716
717 #[test]
718 fn slice() {
719 let buf = buffer![0, 1, 2, 3, 4];
720 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
721 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
722 }
723
724 #[test]
725 fn slice_unaligned() {
726 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
727 let sliced = buf.slice_unaligned(1..2);
729 assert_eq!(sliced.len(), 1);
731 assert_eq!(sliced.as_slice(), &[0]);
734 }
735
736 #[test]
737 #[should_panic]
738 fn slice_bad_alignment() {
739 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
740 buf.slice(1..2);
742 }
743
744 #[test]
745 fn bytes_buf() {
746 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
747 assert_eq!(buf.remaining(), 10);
748 assert_eq!(buf.chunk(), b"helloworld");
749
750 Buf::advance(&mut buf, 5);
751 assert_eq!(buf.remaining(), 5);
752 assert_eq!(buf.as_slice(), b"world");
753 assert_eq!(buf.chunk(), b"world");
754 }
755
756 #[test]
757 fn from_vec() {
758 let vec = vec![1, 2, 3, 4, 5];
759 let buff = Buffer::from(vec.clone());
760 assert!(buff.is_aligned(Alignment::of::<i32>()));
761 assert_eq!(vec, buff);
762 }
763}