vortex_buffer/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::{Debug, Formatter};
8use std::hash::{Hash, Hasher};
9use std::ops::{Deref, RangeBounds};
10
11use bytes::{Buf, Bytes};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, BufferMut, ByteBuffer};
17
18/// An immutable buffer of items of `T`.
19pub struct Buffer<T> {
20    pub(crate) bytes: Bytes,
21    pub(crate) length: usize,
22    pub(crate) alignment: Alignment,
23    pub(crate) _marker: std::marker::PhantomData<T>,
24}
25
26impl<T> Clone for Buffer<T> {
27    #[inline]
28    fn clone(&self) -> Self {
29        Self {
30            bytes: self.bytes.clone(),
31            length: self.length,
32            alignment: self.alignment,
33            _marker: std::marker::PhantomData,
34        }
35    }
36}
37
38impl<T> PartialEq for Buffer<T> {
39    #[inline]
40    fn eq(&self, other: &Self) -> bool {
41        self.bytes == other.bytes
42    }
43}
44
45impl<T> Eq for Buffer<T> {}
46
47impl<T> Ord for Buffer<T> {
48    #[inline]
49    fn cmp(&self, other: &Self) -> Ordering {
50        self.bytes.cmp(&other.bytes)
51    }
52}
53
54impl<T> PartialOrd for Buffer<T> {
55    #[inline]
56    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
57        Some(self.cmp(other))
58    }
59}
60
61impl<T> Hash for Buffer<T> {
62    #[inline]
63    fn hash<H: Hasher>(&self, state: &mut H) {
64        self.bytes.as_ref().hash(state)
65    }
66}
67
68impl<T> Buffer<T> {
69    /// Returns a new `Buffer<T>` copied from the provided `Vec<T>`, `&[T]`, etc.
70    ///
71    /// Due to our underlying usage of `bytes::Bytes`, we are unable to take zero-copy ownership
72    /// of the provided `Vec<T>` while maintaining the ability to convert it back into a mutable
73    /// buffer. We could fix this by forking `Bytes`, or in many other complex ways, but for now
74    /// callers should prefer to construct `Buffer<T>` from a `BufferMut<T>`.
75    pub fn copy_from(values: impl AsRef<[T]>) -> Self {
76        BufferMut::copy_from(values).freeze()
77    }
78
79    /// Returns a new `Buffer<T>` copied from the provided slice and with the requested alignment.
80    pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
81        BufferMut::copy_from_aligned(values, alignment).freeze()
82    }
83
84    /// Create a new zeroed `Buffer` with the given value.
85    pub fn zeroed(len: usize) -> Self {
86        Self::zeroed_aligned(len, Alignment::of::<T>())
87    }
88
89    /// Create a new zeroed `Buffer` with the given value.
90    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
91        BufferMut::zeroed_aligned(len, alignment).freeze()
92    }
93
94    /// Create a new empty `ByteBuffer` with the provided alignment.
95    pub fn empty() -> Self {
96        BufferMut::empty().freeze()
97    }
98
99    /// Create a new empty `ByteBuffer` with the provided alignment.
100    pub fn empty_aligned(alignment: Alignment) -> Self {
101        BufferMut::empty_aligned(alignment).freeze()
102    }
103
104    /// Create a new full `ByteBuffer` with the given value.
105    pub fn full(item: T, len: usize) -> Self
106    where
107        T: Copy,
108    {
109        BufferMut::full(item, len).freeze()
110    }
111
112    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
113    ///
114    /// ## Panics
115    ///
116    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
117    /// the size of `T`.
118    pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
119        // TODO(ngates): should this preserve the current alignment of the buffer?
120        Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
121    }
122
123    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
124    ///
125    /// ## Panics
126    ///
127    /// Panics if the buffer is not aligned to the given alignment, if the length is not a multiple
128    /// of the size of `T`, or if the given alignment is not aligned to that of `T`.
129    pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
130        Self::from_bytes_aligned(buffer.into_inner(), alignment)
131    }
132
133    /// Create a `Buffer<T>` zero-copy from a `Bytes`.
134    ///
135    /// ## Panics
136    ///
137    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
138    /// the size of `T`.
139    pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
140        if !alignment.is_aligned_to(Alignment::of::<T>()) {
141            vortex_panic!(
142                "Alignment {} must be compatible with the scalar type's alignment {}",
143                alignment,
144                Alignment::of::<T>(),
145            );
146        }
147        if bytes.as_ptr().align_offset(*alignment) != 0 {
148            vortex_panic!(
149                "Bytes alignment must align to the requested alignment {}",
150                alignment,
151            );
152        }
153        if bytes.len() % size_of::<T>() != 0 {
154            vortex_panic!(
155                "Bytes length {} must be a multiple of the scalar type's size {}",
156                bytes.len(),
157                size_of::<T>()
158            );
159        }
160        let length = bytes.len() / size_of::<T>();
161        Self {
162            bytes,
163            length,
164            alignment,
165            _marker: Default::default(),
166        }
167    }
168
169    /// Create a buffer with values from the TrustedLen iterator.
170    /// Should be preferred over `from_iter` when the iterator is known to be `TrustedLen`.
171    pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
172        let (_, high) = iter.size_hint();
173        let mut buffer =
174            BufferMut::with_capacity(high.vortex_expect("TrustedLen iterator has no upper bound"));
175        buffer.extend_trusted(iter);
176        buffer.freeze()
177    }
178
179    /// Returns the length of the buffer in elements of type T.
180    #[inline(always)]
181    pub fn len(&self) -> usize {
182        self.length
183    }
184
185    /// Returns whether the buffer is empty.
186    #[inline(always)]
187    pub fn is_empty(&self) -> bool {
188        self.length == 0
189    }
190
191    /// Returns the alignment of the buffer.
192    #[inline(always)]
193    pub fn alignment(&self) -> Alignment {
194        self.alignment
195    }
196
197    /// Returns a slice over the buffer of elements of type T.
198    #[inline(always)]
199    pub fn as_slice(&self) -> &[T] {
200        let raw_slice = self.bytes.as_ref();
201        // SAFETY: alignment of Buffer is checked on construction
202        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
203    }
204
205    /// Returns an iterator over the buffer of elements of type T.
206    pub fn iter(&self) -> Iter<'_, T> {
207        Iter {
208            inner: self.as_slice().iter(),
209        }
210    }
211
212    /// Returns a slice of self for the provided range.
213    ///
214    /// # Panics
215    ///
216    /// Requires that `begin <= end` and `end <= self.len()`.
217    /// Also requires that both `begin` and `end` are aligned to the buffer's required alignment.
218    #[inline(always)]
219    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
220        self.slice_with_alignment(range, self.alignment)
221    }
222
223    /// Returns a slice of self for the provided range, with no guarantees about the resulting
224    /// alignment.
225    ///
226    /// # Panics
227    ///
228    /// Requires that `begin <= end` and `end <= self.len()`.
229    #[inline(always)]
230    pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
231        self.slice_with_alignment(range, Alignment::of::<u8>())
232    }
233
234    /// Returns a slice of self for the provided range, ensuring the resulting slice has the
235    /// given alignment.
236    ///
237    /// # Panics
238    ///
239    /// Requires that `begin <= end` and `end <= self.len()`.
240    /// Also requires that both `begin` and `end` are aligned to the given alignment.
241    pub fn slice_with_alignment(
242        &self,
243        range: impl RangeBounds<usize>,
244        alignment: Alignment,
245    ) -> Self {
246        let len = self.len();
247        let begin = match range.start_bound() {
248            Bound::Included(&n) => n,
249            Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
250            Bound::Unbounded => 0,
251        };
252        let end = match range.end_bound() {
253            Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
254            Bound::Excluded(&n) => n,
255            Bound::Unbounded => len,
256        };
257
258        if begin > end {
259            vortex_panic!(
260                "range start must not be greater than end: {:?} <= {:?}",
261                begin,
262                end
263            );
264        }
265        if end > len {
266            vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
267        }
268
269        if end == begin {
270            // We prefer to return a new empty buffer instead of sharing this one and creating a
271            // strong reference just to hold an empty slice.
272            return Self::empty_aligned(alignment);
273        }
274
275        let begin_byte = begin * size_of::<T>();
276        let end_byte = end * size_of::<T>();
277
278        if !begin_byte.is_multiple_of(*alignment) {
279            vortex_panic!("range start must be aligned to {:?}", alignment);
280        }
281        if !end_byte.is_multiple_of(*alignment) {
282            vortex_panic!("range end must be aligned to {:?}", alignment);
283        }
284        if !alignment.is_aligned_to(Alignment::of::<T>()) {
285            vortex_panic!("Slice alignment must at least align to type T")
286        }
287
288        Self {
289            bytes: self.bytes.slice(begin_byte..end_byte),
290            length: end - begin,
291            alignment,
292            _marker: Default::default(),
293        }
294    }
295
296    /// Returns a slice of self that is equivalent to the given subset.
297    ///
298    /// When processing the buffer you will often end up with &\[T\] that is a subset
299    /// of the underlying buffer. This function turns the slice into a slice of the buffer
300    /// it has been taken from.
301    ///
302    /// # Panics:
303    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
304    #[inline(always)]
305    pub fn slice_ref(&self, subset: &[T]) -> Self {
306        self.slice_ref_with_alignment(subset, Alignment::of::<T>())
307    }
308
309    /// Returns a slice of self that is equivalent to the given subset.
310    ///
311    /// When processing the buffer you will often end up with &\[T\] that is a subset
312    /// of the underlying buffer. This function turns the slice into a slice of the buffer
313    /// it has been taken from.
314    ///
315    /// # Panics:
316    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
317    /// Also requires that the given alignment aligns to the type of slice and is smaller or equal to the buffers alignment
318    pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
319        if !alignment.is_aligned_to(Alignment::of::<T>()) {
320            vortex_panic!("slice_ref alignment must at least align to type T")
321        }
322
323        if !self.alignment.is_aligned_to(alignment) {
324            vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
325        }
326
327        if subset.as_ptr().align_offset(*alignment) != 0 {
328            vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
329        }
330
331        let subset_u8 =
332            unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
333
334        Self {
335            bytes: self.bytes.slice_ref(subset_u8),
336            length: subset.len(),
337            alignment,
338            _marker: Default::default(),
339        }
340    }
341
342    /// Returns the underlying aligned buffer.
343    pub fn inner(&self) -> &Bytes {
344        debug_assert_eq!(
345            self.length * size_of::<T>(),
346            self.bytes.len(),
347            "Own length has to be the same as the underlying bytes length"
348        );
349        &self.bytes
350    }
351
352    /// Returns the underlying aligned buffer.
353    pub fn into_inner(self) -> Bytes {
354        debug_assert_eq!(
355            self.length * size_of::<T>(),
356            self.bytes.len(),
357            "Own length has to be the same as the underlying bytes length"
358        );
359        self.bytes
360    }
361
362    /// Return the ByteBuffer for this `Buffer<T>`.
363    pub fn into_byte_buffer(self) -> ByteBuffer {
364        ByteBuffer {
365            bytes: self.bytes,
366            length: self.length * size_of::<T>(),
367            alignment: self.alignment,
368            _marker: Default::default(),
369        }
370    }
371
372    /// Convert self into `BufferMut<T>`, copying if there are multiple strong references.
373    pub fn into_mut(self) -> BufferMut<T> {
374        self.try_into_mut()
375            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
376    }
377
378    /// Try to convert self into `BufferMut<T>` if there is only a single strong reference.
379    pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
380        self.bytes
381            .try_into_mut()
382            .map(|bytes| BufferMut {
383                bytes,
384                length: self.length,
385                alignment: self.alignment,
386                _marker: Default::default(),
387            })
388            .map_err(|bytes| Self {
389                bytes,
390                length: self.length,
391                alignment: self.alignment,
392                _marker: Default::default(),
393            })
394    }
395
396    /// Returns whether a `Buffer<T>` is aligned to the given alignment.
397    pub fn is_aligned(&self, alignment: Alignment) -> bool {
398        self.bytes.as_ptr().align_offset(*alignment) == 0
399    }
400
401    /// Return a `Buffer<T>` with the given alignment. Where possible, this will be zero-copy.
402    pub fn aligned(mut self, alignment: Alignment) -> Self {
403        if self.as_ptr().align_offset(*alignment) == 0 {
404            self.alignment = alignment;
405            self
406        } else {
407            #[cfg(feature = "warn-copy")]
408            {
409                let bt = std::backtrace::Backtrace::capture();
410                log::warn!(
411                    "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
412                )
413            }
414            Self::copy_from_aligned(self, alignment)
415        }
416    }
417
418    /// Return a `Buffer<T>` with the given alignment. Panics if the buffer is not aligned.
419    pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
420        if self.as_ptr().align_offset(*alignment) == 0 {
421            self.alignment = alignment;
422            self
423        } else {
424            vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
425        }
426    }
427}
428
429/// An iterator over Buffer elements.
430///
431/// This is an analog to the `std::slice::Iter` type.
432pub struct Iter<'a, T> {
433    inner: std::slice::Iter<'a, T>,
434}
435
436impl<'a, T> Iterator for Iter<'a, T> {
437    type Item = &'a T;
438
439    #[inline]
440    fn next(&mut self) -> Option<Self::Item> {
441        self.inner.next()
442    }
443
444    #[inline]
445    fn size_hint(&self) -> (usize, Option<usize>) {
446        self.inner.size_hint()
447    }
448
449    #[inline]
450    fn count(self) -> usize {
451        self.inner.count()
452    }
453
454    #[inline]
455    fn last(self) -> Option<Self::Item> {
456        self.inner.last()
457    }
458
459    #[inline]
460    fn nth(&mut self, n: usize) -> Option<Self::Item> {
461        self.inner.nth(n)
462    }
463}
464
465impl<T> ExactSizeIterator for Iter<'_, T> {
466    #[inline]
467    fn len(&self) -> usize {
468        self.inner.len()
469    }
470}
471
472impl<T: Debug> Debug for Buffer<T> {
473    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
474        f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
475            .field("length", &self.length)
476            .field("alignment", &self.alignment)
477            .field("as_slice", &TruncatedDebug(self.as_slice()))
478            .finish()
479    }
480}
481
482impl<T> Deref for Buffer<T> {
483    type Target = [T];
484
485    #[inline]
486    fn deref(&self) -> &Self::Target {
487        self.as_slice()
488    }
489}
490
491impl<T> AsRef<[T]> for Buffer<T> {
492    #[inline]
493    fn as_ref(&self) -> &[T] {
494        self.as_slice()
495    }
496}
497
498impl<T> FromIterator<T> for Buffer<T> {
499    #[inline]
500    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
501        BufferMut::from_iter(iter).freeze()
502    }
503}
504
505/// Only for `Buffer<u8>` can we zero-copy from a `Vec<u8>` since we can use a 1-byte alignment.
506impl From<Vec<u8>> for ByteBuffer {
507    fn from(value: Vec<u8>) -> Self {
508        Self::from(Bytes::from(value))
509    }
510}
511
512/// Only for `Buffer<u8>` can we zero-copy from a `Bytes` since we can use a 1-byte alignment.
513impl From<Bytes> for ByteBuffer {
514    fn from(bytes: Bytes) -> Self {
515        let length = bytes.len();
516        Self {
517            bytes,
518            length,
519            alignment: Alignment::of::<u8>(),
520            _marker: Default::default(),
521        }
522    }
523}
524
525impl Buf for ByteBuffer {
526    #[inline]
527    fn remaining(&self) -> usize {
528        self.len()
529    }
530
531    #[inline]
532    fn chunk(&self) -> &[u8] {
533        self.as_slice()
534    }
535
536    #[inline]
537    fn advance(&mut self, cnt: usize) {
538        if !cnt.is_multiple_of(*self.alignment) {
539            vortex_panic!(
540                "Cannot advance buffer by {} items, resulting alignment is not {}",
541                cnt,
542                self.alignment
543            );
544        }
545        self.bytes.advance(cnt);
546        self.length -= cnt;
547    }
548}
549
550/// Owned iterator over a [`Buffer`].
551pub struct BufferIterator<T> {
552    buffer: Buffer<T>,
553    index: usize,
554}
555
556impl<T: Copy> Iterator for BufferIterator<T> {
557    type Item = T;
558
559    #[inline]
560    fn next(&mut self) -> Option<Self::Item> {
561        (self.index < self.buffer.len()).then(move || {
562            let value = self.buffer[self.index];
563            self.index += 1;
564            value
565        })
566    }
567
568    #[inline]
569    fn size_hint(&self) -> (usize, Option<usize>) {
570        let remaining = self.buffer.len() - self.index;
571        (remaining, Some(remaining))
572    }
573}
574
575impl<T: Copy> IntoIterator for Buffer<T> {
576    type Item = T;
577    type IntoIter = BufferIterator<T>;
578
579    #[inline]
580    fn into_iter(self) -> Self::IntoIter {
581        BufferIterator {
582            buffer: self,
583            index: 0,
584        }
585    }
586}
587
588impl<T> From<BufferMut<T>> for Buffer<T> {
589    #[inline]
590    fn from(value: BufferMut<T>) -> Self {
591        value.freeze()
592    }
593}
594
595#[cfg(test)]
596mod test {
597    use bytes::Buf;
598
599    use crate::{Alignment, ByteBuffer, buffer};
600
601    #[test]
602    fn align() {
603        let buf = buffer![0u8, 1, 2];
604        let aligned = buf.aligned(Alignment::new(32));
605        assert_eq!(aligned.alignment(), Alignment::new(32));
606        assert_eq!(aligned.as_slice(), &[0, 1, 2]);
607    }
608
609    #[test]
610    fn slice() {
611        let buf = buffer![0, 1, 2, 3, 4];
612        assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
613        assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
614    }
615
616    #[test]
617    fn slice_unaligned() {
618        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
619        // With a regular slice, this would panic. See [`slice_bad_alignment`].
620        buf.slice_unaligned(1..2);
621    }
622
623    #[test]
624    #[should_panic]
625    fn slice_bad_alignment() {
626        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
627        // We should only be able to slice this buffer on 4-byte (i32) boundaries.
628        buf.slice(1..2);
629    }
630
631    #[test]
632    fn bytes_buf() {
633        let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
634        assert_eq!(buf.remaining(), 10);
635        assert_eq!(buf.chunk(), b"helloworld");
636
637        Buf::advance(&mut buf, 5);
638        assert_eq!(buf.remaining(), 5);
639        assert_eq!(buf.as_slice(), b"world");
640        assert_eq!(buf.chunk(), b"world");
641    }
642}