vortex_buffer/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::{Debug, Formatter};
8use std::hash::{Hash, Hasher};
9use std::ops::{Deref, RangeBounds};
10
11use bytes::{Buf, Bytes};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, BufferMut, ByteBuffer};
17
18/// An immutable buffer of items of `T`.
19#[derive(Clone)]
20pub struct Buffer<T> {
21    pub(crate) bytes: Bytes,
22    pub(crate) length: usize,
23    pub(crate) alignment: Alignment,
24    pub(crate) _marker: std::marker::PhantomData<T>,
25}
26
27impl<T> PartialEq for Buffer<T> {
28    fn eq(&self, other: &Self) -> bool {
29        self.bytes == other.bytes
30    }
31}
32
33impl<T> Eq for Buffer<T> {}
34
35impl<T> Ord for Buffer<T> {
36    fn cmp(&self, other: &Self) -> Ordering {
37        self.bytes.cmp(&other.bytes)
38    }
39}
40
41impl<T> PartialOrd for Buffer<T> {
42    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
43        Some(self.cmp(other))
44    }
45}
46
47impl<T> Hash for Buffer<T> {
48    fn hash<H: Hasher>(&self, state: &mut H) {
49        self.bytes.as_ref().hash(state)
50    }
51}
52
53impl<T> Buffer<T> {
54    /// Returns a new `Buffer<T>` copied from the provided `Vec<T>`, `&[T]`, etc.
55    ///
56    /// Due to our underlying usage of `bytes::Bytes`, we are unable to take zero-copy ownership
57    /// of the provided `Vec<T>` while maintaining the ability to convert it back into a mutable
58    /// buffer. We could fix this by forking `Bytes`, or in many other complex ways, but for now
59    /// callers should prefer to construct `Buffer<T>` from a `BufferMut<T>`.
60    pub fn copy_from(values: impl AsRef<[T]>) -> Self {
61        BufferMut::copy_from(values).freeze()
62    }
63
64    /// Returns a new `Buffer<T>` copied from the provided slice and with the requested alignment.
65    pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
66        BufferMut::copy_from_aligned(values, alignment).freeze()
67    }
68
69    /// Create a new zeroed `Buffer` with the given value.
70    pub fn zeroed(len: usize) -> Self {
71        Self::zeroed_aligned(len, Alignment::of::<T>())
72    }
73
74    /// Create a new zeroed `Buffer` with the given value.
75    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
76        BufferMut::zeroed_aligned(len, alignment).freeze()
77    }
78
79    /// Create a new empty `ByteBuffer` with the provided alignment.
80    pub fn empty() -> Self {
81        BufferMut::empty().freeze()
82    }
83
84    /// Create a new empty `ByteBuffer` with the provided alignment.
85    pub fn empty_aligned(alignment: Alignment) -> Self {
86        BufferMut::empty_aligned(alignment).freeze()
87    }
88
89    /// Create a new full `ByteBuffer` with the given value.
90    pub fn full(item: T, len: usize) -> Self
91    where
92        T: Copy,
93    {
94        BufferMut::full(item, len).freeze()
95    }
96
97    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
98    ///
99    /// ## Panics
100    ///
101    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
102    /// the size of `T`.
103    pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
104        // TODO(ngates): should this preserve the current alignment of the buffer?
105        Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
106    }
107
108    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
109    ///
110    /// ## Panics
111    ///
112    /// Panics if the buffer is not aligned to the given alignment, if the length is not a multiple
113    /// of the size of `T`, or if the given alignment is not aligned to that of `T`.
114    pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
115        Self::from_bytes_aligned(buffer.into_inner(), alignment)
116    }
117
118    /// Create a `Buffer<T>` zero-copy from a `Bytes`.
119    ///
120    /// ## Panics
121    ///
122    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
123    /// the size of `T`.
124    pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
125        if !alignment.is_aligned_to(Alignment::of::<T>()) {
126            vortex_panic!(
127                "Alignment {} must be compatible with the scalar type's alignment {}",
128                alignment,
129                Alignment::of::<T>(),
130            );
131        }
132        if bytes.as_ptr().align_offset(*alignment) != 0 {
133            vortex_panic!(
134                "Bytes alignment must align to the requested alignment {}",
135                alignment,
136            );
137        }
138        if bytes.len() % size_of::<T>() != 0 {
139            vortex_panic!(
140                "Bytes length {} must be a multiple of the scalar type's size {}",
141                bytes.len(),
142                size_of::<T>()
143            );
144        }
145        let length = bytes.len() / size_of::<T>();
146        Self {
147            bytes,
148            length,
149            alignment,
150            _marker: Default::default(),
151        }
152    }
153
154    /// Create a buffer with values from the TrustedLen iterator.
155    /// Should be preferred over `from_iter` when the iterator is known to be `TrustedLen`.
156    pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
157        let (_, high) = iter.size_hint();
158        let mut buffer =
159            BufferMut::with_capacity(high.vortex_expect("TrustedLen iterator has no upper bound"));
160        buffer.extend_trusted(iter);
161        buffer.freeze()
162    }
163
164    /// Returns the length of the buffer in elements of type T.
165    #[inline(always)]
166    pub fn len(&self) -> usize {
167        self.length
168    }
169
170    /// Returns whether the buffer is empty.
171    #[inline(always)]
172    pub fn is_empty(&self) -> bool {
173        self.length == 0
174    }
175
176    /// Returns the alignment of the buffer.
177    #[inline(always)]
178    pub fn alignment(&self) -> Alignment {
179        self.alignment
180    }
181
182    /// Returns a slice over the buffer of elements of type T.
183    #[inline(always)]
184    pub fn as_slice(&self) -> &[T] {
185        let raw_slice = self.bytes.as_ref();
186        // SAFETY: alignment of Buffer is checked on construction
187        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
188    }
189
190    /// Returns an iterator over the buffer of elements of type T.
191    pub fn iter(&self) -> Iter<'_, T> {
192        Iter {
193            inner: self.as_slice().iter(),
194        }
195    }
196
197    /// Returns a slice of self for the provided range.
198    ///
199    /// # Panics
200    ///
201    /// Requires that `begin <= end` and `end <= self.len()`.
202    /// Also requires that both `begin` and `end` are aligned to the buffer's required alignment.
203    #[inline(always)]
204    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
205        self.slice_with_alignment(range, self.alignment)
206    }
207
208    /// Returns a slice of self for the provided range, with no guarantees about the resulting
209    /// alignment.
210    ///
211    /// # Panics
212    ///
213    /// Requires that `begin <= end` and `end <= self.len()`.
214    #[inline(always)]
215    pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
216        self.slice_with_alignment(range, Alignment::of::<u8>())
217    }
218
219    /// Returns a slice of self for the provided range, ensuring the resulting slice has the
220    /// given alignment.
221    ///
222    /// # Panics
223    ///
224    /// Requires that `begin <= end` and `end <= self.len()`.
225    /// Also requires that both `begin` and `end` are aligned to the given alignment.
226    pub fn slice_with_alignment(
227        &self,
228        range: impl RangeBounds<usize>,
229        alignment: Alignment,
230    ) -> Self {
231        let len = self.len();
232        let begin = match range.start_bound() {
233            Bound::Included(&n) => n,
234            Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
235            Bound::Unbounded => 0,
236        };
237        let end = match range.end_bound() {
238            Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
239            Bound::Excluded(&n) => n,
240            Bound::Unbounded => len,
241        };
242
243        if begin > end {
244            vortex_panic!(
245                "range start must not be greater than end: {:?} <= {:?}",
246                begin,
247                end
248            );
249        }
250        if end > len {
251            vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
252        }
253
254        if end == begin {
255            // We prefer to return a new empty buffer instead of sharing this one and creating a
256            // strong reference just to hold an empty slice.
257            return Self::empty_aligned(alignment);
258        }
259
260        let begin_byte = begin * size_of::<T>();
261        let end_byte = end * size_of::<T>();
262
263        if !begin_byte.is_multiple_of(*alignment) {
264            vortex_panic!("range start must be aligned to {:?}", alignment);
265        }
266        if !end_byte.is_multiple_of(*alignment) {
267            vortex_panic!("range end must be aligned to {:?}", alignment);
268        }
269        if !alignment.is_aligned_to(Alignment::of::<T>()) {
270            vortex_panic!("Slice alignment must at least align to type T")
271        }
272
273        Self {
274            bytes: self.bytes.slice(begin_byte..end_byte),
275            length: end - begin,
276            alignment,
277            _marker: Default::default(),
278        }
279    }
280
281    /// Returns a slice of self that is equivalent to the given subset.
282    ///
283    /// When processing the buffer you will often end up with &\[T\] that is a subset
284    /// of the underlying buffer. This function turns the slice into a slice of the buffer
285    /// it has been taken from.
286    ///
287    /// # Panics:
288    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
289    #[inline(always)]
290    pub fn slice_ref(&self, subset: &[T]) -> Self {
291        self.slice_ref_with_alignment(subset, Alignment::of::<T>())
292    }
293
294    /// Returns a slice of self that is equivalent to the given subset.
295    ///
296    /// When processing the buffer you will often end up with &\[T\] that is a subset
297    /// of the underlying buffer. This function turns the slice into a slice of the buffer
298    /// it has been taken from.
299    ///
300    /// # Panics:
301    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
302    /// Also requires that the given alignment aligns to the type of slice and is smaller or equal to the buffers alignment
303    pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
304        if !alignment.is_aligned_to(Alignment::of::<T>()) {
305            vortex_panic!("slice_ref alignment must at least align to type T")
306        }
307
308        if !self.alignment.is_aligned_to(alignment) {
309            vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
310        }
311
312        if subset.as_ptr().align_offset(*alignment) != 0 {
313            vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
314        }
315
316        let subset_u8 =
317            unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
318
319        Self {
320            bytes: self.bytes.slice_ref(subset_u8),
321            length: subset.len(),
322            alignment,
323            _marker: Default::default(),
324        }
325    }
326
327    /// Returns the underlying aligned buffer.
328    pub fn inner(&self) -> &Bytes {
329        debug_assert_eq!(
330            self.length * size_of::<T>(),
331            self.bytes.len(),
332            "Own length has to be the same as the underlying bytes length"
333        );
334        &self.bytes
335    }
336
337    /// Returns the underlying aligned buffer.
338    pub fn into_inner(self) -> Bytes {
339        debug_assert_eq!(
340            self.length * size_of::<T>(),
341            self.bytes.len(),
342            "Own length has to be the same as the underlying bytes length"
343        );
344        self.bytes
345    }
346
347    /// Return the ByteBuffer for this `Buffer<T>`.
348    pub fn into_byte_buffer(self) -> ByteBuffer {
349        ByteBuffer {
350            bytes: self.bytes,
351            length: self.length * size_of::<T>(),
352            alignment: self.alignment,
353            _marker: Default::default(),
354        }
355    }
356
357    /// Convert self into `BufferMut<T>`, copying if there are multiple strong references.
358    pub fn into_mut(self) -> BufferMut<T> {
359        self.try_into_mut()
360            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
361    }
362
363    /// Try to convert self into `BufferMut<T>` if there is only a single strong reference.
364    pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
365        self.bytes
366            .try_into_mut()
367            .map(|bytes| BufferMut {
368                bytes,
369                length: self.length,
370                alignment: self.alignment,
371                _marker: Default::default(),
372            })
373            .map_err(|bytes| Self {
374                bytes,
375                length: self.length,
376                alignment: self.alignment,
377                _marker: Default::default(),
378            })
379    }
380
381    /// Returns whether a `Buffer<T>` is aligned to the given alignment.
382    pub fn is_aligned(&self, alignment: Alignment) -> bool {
383        self.bytes.as_ptr().align_offset(*alignment) == 0
384    }
385
386    /// Return a `Buffer<T>` with the given alignment. Where possible, this will be zero-copy.
387    pub fn aligned(mut self, alignment: Alignment) -> Self {
388        if self.as_ptr().align_offset(*alignment) == 0 {
389            self.alignment = alignment;
390            self
391        } else {
392            #[cfg(feature = "warn-copy")]
393            {
394                let bt = std::backtrace::Backtrace::capture();
395                log::warn!(
396                    "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
397                )
398            }
399            Self::copy_from_aligned(self, alignment)
400        }
401    }
402
403    /// Return a `Buffer<T>` with the given alignment. Panics if the buffer is not aligned.
404    pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
405        if self.as_ptr().align_offset(*alignment) == 0 {
406            self.alignment = alignment;
407            self
408        } else {
409            vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
410        }
411    }
412}
413
414/// An iterator over Buffer elements.
415///
416/// This is an analog to the `std::slice::Iter` type.
417pub struct Iter<'a, T> {
418    inner: std::slice::Iter<'a, T>,
419}
420
421impl<'a, T> Iterator for Iter<'a, T> {
422    type Item = &'a T;
423
424    fn next(&mut self) -> Option<Self::Item> {
425        self.inner.next()
426    }
427
428    fn size_hint(&self) -> (usize, Option<usize>) {
429        self.inner.size_hint()
430    }
431
432    fn count(self) -> usize {
433        self.inner.count()
434    }
435
436    fn last(self) -> Option<Self::Item> {
437        self.inner.last()
438    }
439
440    fn nth(&mut self, n: usize) -> Option<Self::Item> {
441        self.inner.nth(n)
442    }
443}
444
445impl<T> ExactSizeIterator for Iter<'_, T> {
446    fn len(&self) -> usize {
447        self.inner.len()
448    }
449}
450
451impl<T: Debug> Debug for Buffer<T> {
452    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453        f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
454            .field("length", &self.length)
455            .field("alignment", &self.alignment)
456            .field("as_slice", &TruncatedDebug(self.as_slice()))
457            .finish()
458    }
459}
460
461impl<T> Deref for Buffer<T> {
462    type Target = [T];
463
464    fn deref(&self) -> &Self::Target {
465        self.as_slice()
466    }
467}
468
469impl<T> AsRef<[T]> for Buffer<T> {
470    fn as_ref(&self) -> &[T] {
471        self.as_slice()
472    }
473}
474
475impl<T> FromIterator<T> for Buffer<T> {
476    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
477        BufferMut::from_iter(iter).freeze()
478    }
479}
480
481/// Only for `Buffer<u8>` can we zero-copy from a `Vec<u8>` since we can use a 1-byte alignment.
482impl From<Vec<u8>> for ByteBuffer {
483    fn from(value: Vec<u8>) -> Self {
484        Self::from(Bytes::from(value))
485    }
486}
487
488/// Only for `Buffer<u8>` can we zero-copy from a `Bytes` since we can use a 1-byte alignment.
489impl From<Bytes> for ByteBuffer {
490    fn from(bytes: Bytes) -> Self {
491        let length = bytes.len();
492        Self {
493            bytes,
494            length,
495            alignment: Alignment::of::<u8>(),
496            _marker: Default::default(),
497        }
498    }
499}
500
501impl Buf for ByteBuffer {
502    fn remaining(&self) -> usize {
503        self.len()
504    }
505
506    fn chunk(&self) -> &[u8] {
507        self.as_slice()
508    }
509
510    fn advance(&mut self, cnt: usize) {
511        if !cnt.is_multiple_of(*self.alignment) {
512            vortex_panic!(
513                "Cannot advance buffer by {} items, resulting alignment is not {}",
514                cnt,
515                self.alignment
516            );
517        }
518        self.bytes.advance(cnt);
519        self.length -= cnt;
520    }
521}
522
523/// Owned iterator over a [`Buffer`].
524pub struct BufferIterator<T> {
525    buffer: Buffer<T>,
526    index: usize,
527}
528
529impl<T: Copy> Iterator for BufferIterator<T> {
530    type Item = T;
531
532    fn next(&mut self) -> Option<Self::Item> {
533        (self.index < self.buffer.len()).then(move || {
534            let value = self.buffer[self.index];
535            self.index += 1;
536            value
537        })
538    }
539
540    fn size_hint(&self) -> (usize, Option<usize>) {
541        let remaining = self.buffer.len() - self.index;
542        (remaining, Some(remaining))
543    }
544}
545
546impl<T: Copy> IntoIterator for Buffer<T> {
547    type Item = T;
548    type IntoIter = BufferIterator<T>;
549
550    fn into_iter(self) -> Self::IntoIter {
551        BufferIterator {
552            buffer: self,
553            index: 0,
554        }
555    }
556}
557
558impl<T> From<BufferMut<T>> for Buffer<T> {
559    fn from(value: BufferMut<T>) -> Self {
560        value.freeze()
561    }
562}
563
564#[cfg(test)]
565mod test {
566    use bytes::Buf;
567
568    use crate::{Alignment, ByteBuffer, buffer};
569
570    #[test]
571    fn align() {
572        let buf = buffer![0u8, 1, 2];
573        let aligned = buf.aligned(Alignment::new(32));
574        assert_eq!(aligned.alignment(), Alignment::new(32));
575        assert_eq!(aligned.as_slice(), &[0, 1, 2]);
576    }
577
578    #[test]
579    fn slice() {
580        let buf = buffer![0, 1, 2, 3, 4];
581        assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
582        assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
583    }
584
585    #[test]
586    fn slice_unaligned() {
587        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
588        // With a regular slice, this would panic. See [`slice_bad_alignment`].
589        buf.slice_unaligned(1..2);
590    }
591
592    #[test]
593    #[should_panic]
594    fn slice_bad_alignment() {
595        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
596        // We should only be able to slice this buffer on 4-byte (i32) boundaries.
597        buf.slice(1..2);
598    }
599
600    #[test]
601    fn bytes_buf() {
602        let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
603        assert_eq!(buf.remaining(), 10);
604        assert_eq!(buf.chunk(), b"helloworld");
605
606        Buf::advance(&mut buf, 5);
607        assert_eq!(buf.remaining(), 5);
608        assert_eq!(buf.as_slice(), b"world");
609        assert_eq!(buf.chunk(), b"world");
610    }
611}