1use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::{Debug, Formatter};
8use std::hash::{Hash, Hasher};
9use std::ops::{Deref, RangeBounds};
10
11use bytes::{Buf, Bytes};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, BufferMut, ByteBuffer};
17
18pub struct Buffer<T> {
20 pub(crate) bytes: Bytes,
21 pub(crate) length: usize,
22 pub(crate) alignment: Alignment,
23 pub(crate) _marker: std::marker::PhantomData<T>,
24}
25
26impl<T> Clone for Buffer<T> {
27 #[inline]
28 fn clone(&self) -> Self {
29 Self {
30 bytes: self.bytes.clone(),
31 length: self.length,
32 alignment: self.alignment,
33 _marker: std::marker::PhantomData,
34 }
35 }
36}
37
38impl<T> PartialEq for Buffer<T> {
39 #[inline]
40 fn eq(&self, other: &Self) -> bool {
41 self.bytes == other.bytes
42 }
43}
44
45impl<T> Eq for Buffer<T> {}
46
47impl<T> Ord for Buffer<T> {
48 #[inline]
49 fn cmp(&self, other: &Self) -> Ordering {
50 self.bytes.cmp(&other.bytes)
51 }
52}
53
54impl<T> PartialOrd for Buffer<T> {
55 #[inline]
56 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl<T> Hash for Buffer<T> {
62 #[inline]
63 fn hash<H: Hasher>(&self, state: &mut H) {
64 self.bytes.as_ref().hash(state)
65 }
66}
67
68impl<T> Buffer<T> {
69 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
76 BufferMut::copy_from(values).freeze()
77 }
78
79 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
81 BufferMut::copy_from_aligned(values, alignment).freeze()
82 }
83
84 pub fn zeroed(len: usize) -> Self {
86 Self::zeroed_aligned(len, Alignment::of::<T>())
87 }
88
89 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
91 BufferMut::zeroed_aligned(len, alignment).freeze()
92 }
93
94 pub fn empty() -> Self {
96 BufferMut::empty().freeze()
97 }
98
99 pub fn empty_aligned(alignment: Alignment) -> Self {
101 BufferMut::empty_aligned(alignment).freeze()
102 }
103
104 pub fn full(item: T, len: usize) -> Self
106 where
107 T: Copy,
108 {
109 BufferMut::full(item, len).freeze()
110 }
111
112 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
119 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
121 }
122
123 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
130 Self::from_bytes_aligned(buffer.into_inner(), alignment)
131 }
132
133 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
140 if !alignment.is_aligned_to(Alignment::of::<T>()) {
141 vortex_panic!(
142 "Alignment {} must be compatible with the scalar type's alignment {}",
143 alignment,
144 Alignment::of::<T>(),
145 );
146 }
147 if bytes.as_ptr().align_offset(*alignment) != 0 {
148 vortex_panic!(
149 "Bytes alignment must align to the requested alignment {}",
150 alignment,
151 );
152 }
153 if bytes.len() % size_of::<T>() != 0 {
154 vortex_panic!(
155 "Bytes length {} must be a multiple of the scalar type's size {}",
156 bytes.len(),
157 size_of::<T>()
158 );
159 }
160 let length = bytes.len() / size_of::<T>();
161 Self {
162 bytes,
163 length,
164 alignment,
165 _marker: Default::default(),
166 }
167 }
168
169 pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
172 let (_, high) = iter.size_hint();
173 let mut buffer =
174 BufferMut::with_capacity(high.vortex_expect("TrustedLen iterator has no upper bound"));
175 buffer.extend_trusted(iter);
176 buffer.freeze()
177 }
178
179 #[inline(always)]
181 pub fn len(&self) -> usize {
182 self.length
183 }
184
185 #[inline(always)]
187 pub fn is_empty(&self) -> bool {
188 self.length == 0
189 }
190
191 #[inline(always)]
193 pub fn alignment(&self) -> Alignment {
194 self.alignment
195 }
196
197 #[inline(always)]
199 pub fn as_slice(&self) -> &[T] {
200 let raw_slice = self.bytes.as_ref();
201 unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
203 }
204
205 pub fn iter(&self) -> Iter<'_, T> {
207 Iter {
208 inner: self.as_slice().iter(),
209 }
210 }
211
212 #[inline(always)]
219 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
220 self.slice_with_alignment(range, self.alignment)
221 }
222
223 #[inline(always)]
230 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
231 self.slice_with_alignment(range, Alignment::of::<u8>())
232 }
233
234 pub fn slice_with_alignment(
242 &self,
243 range: impl RangeBounds<usize>,
244 alignment: Alignment,
245 ) -> Self {
246 let len = self.len();
247 let begin = match range.start_bound() {
248 Bound::Included(&n) => n,
249 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
250 Bound::Unbounded => 0,
251 };
252 let end = match range.end_bound() {
253 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
254 Bound::Excluded(&n) => n,
255 Bound::Unbounded => len,
256 };
257
258 if begin > end {
259 vortex_panic!(
260 "range start must not be greater than end: {:?} <= {:?}",
261 begin,
262 end
263 );
264 }
265 if end > len {
266 vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
267 }
268
269 if end == begin {
270 return Self::empty_aligned(alignment);
273 }
274
275 let begin_byte = begin * size_of::<T>();
276 let end_byte = end * size_of::<T>();
277
278 if !begin_byte.is_multiple_of(*alignment) {
279 vortex_panic!("range start must be aligned to {:?}", alignment);
280 }
281 if !end_byte.is_multiple_of(*alignment) {
282 vortex_panic!("range end must be aligned to {:?}", alignment);
283 }
284 if !alignment.is_aligned_to(Alignment::of::<T>()) {
285 vortex_panic!("Slice alignment must at least align to type T")
286 }
287
288 Self {
289 bytes: self.bytes.slice(begin_byte..end_byte),
290 length: end - begin,
291 alignment,
292 _marker: Default::default(),
293 }
294 }
295
296 #[inline(always)]
305 pub fn slice_ref(&self, subset: &[T]) -> Self {
306 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
307 }
308
309 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
319 if !alignment.is_aligned_to(Alignment::of::<T>()) {
320 vortex_panic!("slice_ref alignment must at least align to type T")
321 }
322
323 if !self.alignment.is_aligned_to(alignment) {
324 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
325 }
326
327 if subset.as_ptr().align_offset(*alignment) != 0 {
328 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
329 }
330
331 let subset_u8 =
332 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
333
334 Self {
335 bytes: self.bytes.slice_ref(subset_u8),
336 length: subset.len(),
337 alignment,
338 _marker: Default::default(),
339 }
340 }
341
342 pub fn inner(&self) -> &Bytes {
344 debug_assert_eq!(
345 self.length * size_of::<T>(),
346 self.bytes.len(),
347 "Own length has to be the same as the underlying bytes length"
348 );
349 &self.bytes
350 }
351
352 pub fn into_inner(self) -> Bytes {
354 debug_assert_eq!(
355 self.length * size_of::<T>(),
356 self.bytes.len(),
357 "Own length has to be the same as the underlying bytes length"
358 );
359 self.bytes
360 }
361
362 pub fn into_byte_buffer(self) -> ByteBuffer {
364 ByteBuffer {
365 bytes: self.bytes,
366 length: self.length * size_of::<T>(),
367 alignment: self.alignment,
368 _marker: Default::default(),
369 }
370 }
371
372 pub fn into_mut(self) -> BufferMut<T> {
374 self.try_into_mut()
375 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
376 }
377
378 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
380 self.bytes
381 .try_into_mut()
382 .map(|bytes| BufferMut {
383 bytes,
384 length: self.length,
385 alignment: self.alignment,
386 _marker: Default::default(),
387 })
388 .map_err(|bytes| Self {
389 bytes,
390 length: self.length,
391 alignment: self.alignment,
392 _marker: Default::default(),
393 })
394 }
395
396 pub fn is_aligned(&self, alignment: Alignment) -> bool {
398 self.bytes.as_ptr().align_offset(*alignment) == 0
399 }
400
401 pub fn aligned(mut self, alignment: Alignment) -> Self {
403 if self.as_ptr().align_offset(*alignment) == 0 {
404 self.alignment = alignment;
405 self
406 } else {
407 #[cfg(feature = "warn-copy")]
408 {
409 let bt = std::backtrace::Backtrace::capture();
410 log::warn!(
411 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
412 )
413 }
414 Self::copy_from_aligned(self, alignment)
415 }
416 }
417
418 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
420 if self.as_ptr().align_offset(*alignment) == 0 {
421 self.alignment = alignment;
422 self
423 } else {
424 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
425 }
426 }
427}
428
429pub struct Iter<'a, T> {
433 inner: std::slice::Iter<'a, T>,
434}
435
436impl<'a, T> Iterator for Iter<'a, T> {
437 type Item = &'a T;
438
439 #[inline]
440 fn next(&mut self) -> Option<Self::Item> {
441 self.inner.next()
442 }
443
444 #[inline]
445 fn size_hint(&self) -> (usize, Option<usize>) {
446 self.inner.size_hint()
447 }
448
449 #[inline]
450 fn count(self) -> usize {
451 self.inner.count()
452 }
453
454 #[inline]
455 fn last(self) -> Option<Self::Item> {
456 self.inner.last()
457 }
458
459 #[inline]
460 fn nth(&mut self, n: usize) -> Option<Self::Item> {
461 self.inner.nth(n)
462 }
463}
464
465impl<T> ExactSizeIterator for Iter<'_, T> {
466 #[inline]
467 fn len(&self) -> usize {
468 self.inner.len()
469 }
470}
471
472impl<T: Debug> Debug for Buffer<T> {
473 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
474 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
475 .field("length", &self.length)
476 .field("alignment", &self.alignment)
477 .field("as_slice", &TruncatedDebug(self.as_slice()))
478 .finish()
479 }
480}
481
482impl<T> Deref for Buffer<T> {
483 type Target = [T];
484
485 #[inline]
486 fn deref(&self) -> &Self::Target {
487 self.as_slice()
488 }
489}
490
491impl<T> AsRef<[T]> for Buffer<T> {
492 #[inline]
493 fn as_ref(&self) -> &[T] {
494 self.as_slice()
495 }
496}
497
498impl<T> FromIterator<T> for Buffer<T> {
499 #[inline]
500 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
501 BufferMut::from_iter(iter).freeze()
502 }
503}
504
505impl From<Vec<u8>> for ByteBuffer {
507 fn from(value: Vec<u8>) -> Self {
508 Self::from(Bytes::from(value))
509 }
510}
511
512impl From<Bytes> for ByteBuffer {
514 fn from(bytes: Bytes) -> Self {
515 let length = bytes.len();
516 Self {
517 bytes,
518 length,
519 alignment: Alignment::of::<u8>(),
520 _marker: Default::default(),
521 }
522 }
523}
524
525impl Buf for ByteBuffer {
526 #[inline]
527 fn remaining(&self) -> usize {
528 self.len()
529 }
530
531 #[inline]
532 fn chunk(&self) -> &[u8] {
533 self.as_slice()
534 }
535
536 #[inline]
537 fn advance(&mut self, cnt: usize) {
538 if !cnt.is_multiple_of(*self.alignment) {
539 vortex_panic!(
540 "Cannot advance buffer by {} items, resulting alignment is not {}",
541 cnt,
542 self.alignment
543 );
544 }
545 self.bytes.advance(cnt);
546 self.length -= cnt;
547 }
548}
549
550pub struct BufferIterator<T> {
552 buffer: Buffer<T>,
553 index: usize,
554}
555
556impl<T: Copy> Iterator for BufferIterator<T> {
557 type Item = T;
558
559 #[inline]
560 fn next(&mut self) -> Option<Self::Item> {
561 (self.index < self.buffer.len()).then(move || {
562 let value = self.buffer[self.index];
563 self.index += 1;
564 value
565 })
566 }
567
568 #[inline]
569 fn size_hint(&self) -> (usize, Option<usize>) {
570 let remaining = self.buffer.len() - self.index;
571 (remaining, Some(remaining))
572 }
573}
574
575impl<T: Copy> IntoIterator for Buffer<T> {
576 type Item = T;
577 type IntoIter = BufferIterator<T>;
578
579 #[inline]
580 fn into_iter(self) -> Self::IntoIter {
581 BufferIterator {
582 buffer: self,
583 index: 0,
584 }
585 }
586}
587
588impl<T> From<BufferMut<T>> for Buffer<T> {
589 #[inline]
590 fn from(value: BufferMut<T>) -> Self {
591 value.freeze()
592 }
593}
594
595#[cfg(test)]
596mod test {
597 use bytes::Buf;
598
599 use crate::{Alignment, ByteBuffer, buffer};
600
601 #[test]
602 fn align() {
603 let buf = buffer![0u8, 1, 2];
604 let aligned = buf.aligned(Alignment::new(32));
605 assert_eq!(aligned.alignment(), Alignment::new(32));
606 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
607 }
608
609 #[test]
610 fn slice() {
611 let buf = buffer![0, 1, 2, 3, 4];
612 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
613 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
614 }
615
616 #[test]
617 fn slice_unaligned() {
618 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
619 buf.slice_unaligned(1..2);
621 }
622
623 #[test]
624 #[should_panic]
625 fn slice_bad_alignment() {
626 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
627 buf.slice(1..2);
629 }
630
631 #[test]
632 fn bytes_buf() {
633 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
634 assert_eq!(buf.remaining(), 10);
635 assert_eq!(buf.chunk(), b"helloworld");
636
637 Buf::advance(&mut buf, 5);
638 assert_eq!(buf.remaining(), 5);
639 assert_eq!(buf.as_slice(), b"world");
640 assert_eq!(buf.chunk(), b"world");
641 }
642}