vortex_buffer/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::{Debug, Formatter};
8use std::hash::{Hash, Hasher};
9use std::ops::{Deref, RangeBounds};
10
11use bytes::{Buf, Bytes};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, BufferMut, ByteBuffer};
17
18/// An immutable buffer of items of `T`.
19pub struct Buffer<T> {
20    pub(crate) bytes: Bytes,
21    pub(crate) length: usize,
22    pub(crate) alignment: Alignment,
23    pub(crate) _marker: std::marker::PhantomData<T>,
24}
25
26impl<T> Clone for Buffer<T> {
27    fn clone(&self) -> Self {
28        Self {
29            bytes: self.bytes.clone(),
30            length: self.length,
31            alignment: self.alignment,
32            _marker: std::marker::PhantomData,
33        }
34    }
35}
36
37impl<T> PartialEq for Buffer<T> {
38    fn eq(&self, other: &Self) -> bool {
39        self.bytes == other.bytes
40    }
41}
42
43impl<T> Eq for Buffer<T> {}
44
45impl<T> Ord for Buffer<T> {
46    fn cmp(&self, other: &Self) -> Ordering {
47        self.bytes.cmp(&other.bytes)
48    }
49}
50
51impl<T> PartialOrd for Buffer<T> {
52    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
53        Some(self.cmp(other))
54    }
55}
56
57impl<T> Hash for Buffer<T> {
58    fn hash<H: Hasher>(&self, state: &mut H) {
59        self.bytes.as_ref().hash(state)
60    }
61}
62
63impl<T> Buffer<T> {
64    /// Returns a new `Buffer<T>` copied from the provided `Vec<T>`, `&[T]`, etc.
65    ///
66    /// Due to our underlying usage of `bytes::Bytes`, we are unable to take zero-copy ownership
67    /// of the provided `Vec<T>` while maintaining the ability to convert it back into a mutable
68    /// buffer. We could fix this by forking `Bytes`, or in many other complex ways, but for now
69    /// callers should prefer to construct `Buffer<T>` from a `BufferMut<T>`.
70    pub fn copy_from(values: impl AsRef<[T]>) -> Self {
71        BufferMut::copy_from(values).freeze()
72    }
73
74    /// Returns a new `Buffer<T>` copied from the provided slice and with the requested alignment.
75    pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
76        BufferMut::copy_from_aligned(values, alignment).freeze()
77    }
78
79    /// Create a new zeroed `Buffer` with the given value.
80    pub fn zeroed(len: usize) -> Self {
81        Self::zeroed_aligned(len, Alignment::of::<T>())
82    }
83
84    /// Create a new zeroed `Buffer` with the given value.
85    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
86        BufferMut::zeroed_aligned(len, alignment).freeze()
87    }
88
89    /// Create a new empty `ByteBuffer` with the provided alignment.
90    pub fn empty() -> Self {
91        BufferMut::empty().freeze()
92    }
93
94    /// Create a new empty `ByteBuffer` with the provided alignment.
95    pub fn empty_aligned(alignment: Alignment) -> Self {
96        BufferMut::empty_aligned(alignment).freeze()
97    }
98
99    /// Create a new full `ByteBuffer` with the given value.
100    pub fn full(item: T, len: usize) -> Self
101    where
102        T: Copy,
103    {
104        BufferMut::full(item, len).freeze()
105    }
106
107    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
108    ///
109    /// ## Panics
110    ///
111    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
112    /// the size of `T`.
113    pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
114        // TODO(ngates): should this preserve the current alignment of the buffer?
115        Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
116    }
117
118    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
119    ///
120    /// ## Panics
121    ///
122    /// Panics if the buffer is not aligned to the given alignment, if the length is not a multiple
123    /// of the size of `T`, or if the given alignment is not aligned to that of `T`.
124    pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
125        Self::from_bytes_aligned(buffer.into_inner(), alignment)
126    }
127
128    /// Create a `Buffer<T>` zero-copy from a `Bytes`.
129    ///
130    /// ## Panics
131    ///
132    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
133    /// the size of `T`.
134    pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
135        if !alignment.is_aligned_to(Alignment::of::<T>()) {
136            vortex_panic!(
137                "Alignment {} must be compatible with the scalar type's alignment {}",
138                alignment,
139                Alignment::of::<T>(),
140            );
141        }
142        if bytes.as_ptr().align_offset(*alignment) != 0 {
143            vortex_panic!(
144                "Bytes alignment must align to the requested alignment {}",
145                alignment,
146            );
147        }
148        if bytes.len() % size_of::<T>() != 0 {
149            vortex_panic!(
150                "Bytes length {} must be a multiple of the scalar type's size {}",
151                bytes.len(),
152                size_of::<T>()
153            );
154        }
155        let length = bytes.len() / size_of::<T>();
156        Self {
157            bytes,
158            length,
159            alignment,
160            _marker: Default::default(),
161        }
162    }
163
164    /// Create a buffer with values from the TrustedLen iterator.
165    /// Should be preferred over `from_iter` when the iterator is known to be `TrustedLen`.
166    pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
167        let (_, high) = iter.size_hint();
168        let mut buffer =
169            BufferMut::with_capacity(high.vortex_expect("TrustedLen iterator has no upper bound"));
170        buffer.extend_trusted(iter);
171        buffer.freeze()
172    }
173
174    /// Returns the length of the buffer in elements of type T.
175    #[inline(always)]
176    pub fn len(&self) -> usize {
177        self.length
178    }
179
180    /// Returns whether the buffer is empty.
181    #[inline(always)]
182    pub fn is_empty(&self) -> bool {
183        self.length == 0
184    }
185
186    /// Returns the alignment of the buffer.
187    #[inline(always)]
188    pub fn alignment(&self) -> Alignment {
189        self.alignment
190    }
191
192    /// Returns a slice over the buffer of elements of type T.
193    #[inline(always)]
194    pub fn as_slice(&self) -> &[T] {
195        let raw_slice = self.bytes.as_ref();
196        // SAFETY: alignment of Buffer is checked on construction
197        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
198    }
199
200    /// Returns an iterator over the buffer of elements of type T.
201    pub fn iter(&self) -> Iter<'_, T> {
202        Iter {
203            inner: self.as_slice().iter(),
204        }
205    }
206
207    /// Returns a slice of self for the provided range.
208    ///
209    /// # Panics
210    ///
211    /// Requires that `begin <= end` and `end <= self.len()`.
212    /// Also requires that both `begin` and `end` are aligned to the buffer's required alignment.
213    #[inline(always)]
214    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
215        self.slice_with_alignment(range, self.alignment)
216    }
217
218    /// Returns a slice of self for the provided range, with no guarantees about the resulting
219    /// alignment.
220    ///
221    /// # Panics
222    ///
223    /// Requires that `begin <= end` and `end <= self.len()`.
224    #[inline(always)]
225    pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
226        self.slice_with_alignment(range, Alignment::of::<u8>())
227    }
228
229    /// Returns a slice of self for the provided range, ensuring the resulting slice has the
230    /// given alignment.
231    ///
232    /// # Panics
233    ///
234    /// Requires that `begin <= end` and `end <= self.len()`.
235    /// Also requires that both `begin` and `end` are aligned to the given alignment.
236    pub fn slice_with_alignment(
237        &self,
238        range: impl RangeBounds<usize>,
239        alignment: Alignment,
240    ) -> Self {
241        let len = self.len();
242        let begin = match range.start_bound() {
243            Bound::Included(&n) => n,
244            Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
245            Bound::Unbounded => 0,
246        };
247        let end = match range.end_bound() {
248            Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
249            Bound::Excluded(&n) => n,
250            Bound::Unbounded => len,
251        };
252
253        if begin > end {
254            vortex_panic!(
255                "range start must not be greater than end: {:?} <= {:?}",
256                begin,
257                end
258            );
259        }
260        if end > len {
261            vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
262        }
263
264        if end == begin {
265            // We prefer to return a new empty buffer instead of sharing this one and creating a
266            // strong reference just to hold an empty slice.
267            return Self::empty_aligned(alignment);
268        }
269
270        let begin_byte = begin * size_of::<T>();
271        let end_byte = end * size_of::<T>();
272
273        if !begin_byte.is_multiple_of(*alignment) {
274            vortex_panic!("range start must be aligned to {:?}", alignment);
275        }
276        if !end_byte.is_multiple_of(*alignment) {
277            vortex_panic!("range end must be aligned to {:?}", alignment);
278        }
279        if !alignment.is_aligned_to(Alignment::of::<T>()) {
280            vortex_panic!("Slice alignment must at least align to type T")
281        }
282
283        Self {
284            bytes: self.bytes.slice(begin_byte..end_byte),
285            length: end - begin,
286            alignment,
287            _marker: Default::default(),
288        }
289    }
290
291    /// Returns a slice of self that is equivalent to the given subset.
292    ///
293    /// When processing the buffer you will often end up with &\[T\] that is a subset
294    /// of the underlying buffer. This function turns the slice into a slice of the buffer
295    /// it has been taken from.
296    ///
297    /// # Panics:
298    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
299    #[inline(always)]
300    pub fn slice_ref(&self, subset: &[T]) -> Self {
301        self.slice_ref_with_alignment(subset, Alignment::of::<T>())
302    }
303
304    /// Returns a slice of self that is equivalent to the given subset.
305    ///
306    /// When processing the buffer you will often end up with &\[T\] that is a subset
307    /// of the underlying buffer. This function turns the slice into a slice of the buffer
308    /// it has been taken from.
309    ///
310    /// # Panics:
311    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
312    /// Also requires that the given alignment aligns to the type of slice and is smaller or equal to the buffers alignment
313    pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
314        if !alignment.is_aligned_to(Alignment::of::<T>()) {
315            vortex_panic!("slice_ref alignment must at least align to type T")
316        }
317
318        if !self.alignment.is_aligned_to(alignment) {
319            vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
320        }
321
322        if subset.as_ptr().align_offset(*alignment) != 0 {
323            vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
324        }
325
326        let subset_u8 =
327            unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
328
329        Self {
330            bytes: self.bytes.slice_ref(subset_u8),
331            length: subset.len(),
332            alignment,
333            _marker: Default::default(),
334        }
335    }
336
337    /// Returns the underlying aligned buffer.
338    pub fn inner(&self) -> &Bytes {
339        debug_assert_eq!(
340            self.length * size_of::<T>(),
341            self.bytes.len(),
342            "Own length has to be the same as the underlying bytes length"
343        );
344        &self.bytes
345    }
346
347    /// Returns the underlying aligned buffer.
348    pub fn into_inner(self) -> Bytes {
349        debug_assert_eq!(
350            self.length * size_of::<T>(),
351            self.bytes.len(),
352            "Own length has to be the same as the underlying bytes length"
353        );
354        self.bytes
355    }
356
357    /// Return the ByteBuffer for this `Buffer<T>`.
358    pub fn into_byte_buffer(self) -> ByteBuffer {
359        ByteBuffer {
360            bytes: self.bytes,
361            length: self.length * size_of::<T>(),
362            alignment: self.alignment,
363            _marker: Default::default(),
364        }
365    }
366
367    /// Convert self into `BufferMut<T>`, copying if there are multiple strong references.
368    pub fn into_mut(self) -> BufferMut<T> {
369        self.try_into_mut()
370            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
371    }
372
373    /// Try to convert self into `BufferMut<T>` if there is only a single strong reference.
374    pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
375        self.bytes
376            .try_into_mut()
377            .map(|bytes| BufferMut {
378                bytes,
379                length: self.length,
380                alignment: self.alignment,
381                _marker: Default::default(),
382            })
383            .map_err(|bytes| Self {
384                bytes,
385                length: self.length,
386                alignment: self.alignment,
387                _marker: Default::default(),
388            })
389    }
390
391    /// Returns whether a `Buffer<T>` is aligned to the given alignment.
392    pub fn is_aligned(&self, alignment: Alignment) -> bool {
393        self.bytes.as_ptr().align_offset(*alignment) == 0
394    }
395
396    /// Return a `Buffer<T>` with the given alignment. Where possible, this will be zero-copy.
397    pub fn aligned(mut self, alignment: Alignment) -> Self {
398        if self.as_ptr().align_offset(*alignment) == 0 {
399            self.alignment = alignment;
400            self
401        } else {
402            #[cfg(feature = "warn-copy")]
403            {
404                let bt = std::backtrace::Backtrace::capture();
405                log::warn!(
406                    "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
407                )
408            }
409            Self::copy_from_aligned(self, alignment)
410        }
411    }
412
413    /// Return a `Buffer<T>` with the given alignment. Panics if the buffer is not aligned.
414    pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
415        if self.as_ptr().align_offset(*alignment) == 0 {
416            self.alignment = alignment;
417            self
418        } else {
419            vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
420        }
421    }
422}
423
424/// An iterator over Buffer elements.
425///
426/// This is an analog to the `std::slice::Iter` type.
427pub struct Iter<'a, T> {
428    inner: std::slice::Iter<'a, T>,
429}
430
431impl<'a, T> Iterator for Iter<'a, T> {
432    type Item = &'a T;
433
434    fn next(&mut self) -> Option<Self::Item> {
435        self.inner.next()
436    }
437
438    fn size_hint(&self) -> (usize, Option<usize>) {
439        self.inner.size_hint()
440    }
441
442    fn count(self) -> usize {
443        self.inner.count()
444    }
445
446    fn last(self) -> Option<Self::Item> {
447        self.inner.last()
448    }
449
450    fn nth(&mut self, n: usize) -> Option<Self::Item> {
451        self.inner.nth(n)
452    }
453}
454
455impl<T> ExactSizeIterator for Iter<'_, T> {
456    fn len(&self) -> usize {
457        self.inner.len()
458    }
459}
460
461impl<T: Debug> Debug for Buffer<T> {
462    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
463        f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
464            .field("length", &self.length)
465            .field("alignment", &self.alignment)
466            .field("as_slice", &TruncatedDebug(self.as_slice()))
467            .finish()
468    }
469}
470
471impl<T> Deref for Buffer<T> {
472    type Target = [T];
473
474    fn deref(&self) -> &Self::Target {
475        self.as_slice()
476    }
477}
478
479impl<T> AsRef<[T]> for Buffer<T> {
480    fn as_ref(&self) -> &[T] {
481        self.as_slice()
482    }
483}
484
485impl<T> FromIterator<T> for Buffer<T> {
486    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
487        BufferMut::from_iter(iter).freeze()
488    }
489}
490
491/// Only for `Buffer<u8>` can we zero-copy from a `Vec<u8>` since we can use a 1-byte alignment.
492impl From<Vec<u8>> for ByteBuffer {
493    fn from(value: Vec<u8>) -> Self {
494        Self::from(Bytes::from(value))
495    }
496}
497
498/// Only for `Buffer<u8>` can we zero-copy from a `Bytes` since we can use a 1-byte alignment.
499impl From<Bytes> for ByteBuffer {
500    fn from(bytes: Bytes) -> Self {
501        let length = bytes.len();
502        Self {
503            bytes,
504            length,
505            alignment: Alignment::of::<u8>(),
506            _marker: Default::default(),
507        }
508    }
509}
510
511impl Buf for ByteBuffer {
512    fn remaining(&self) -> usize {
513        self.len()
514    }
515
516    fn chunk(&self) -> &[u8] {
517        self.as_slice()
518    }
519
520    fn advance(&mut self, cnt: usize) {
521        if !cnt.is_multiple_of(*self.alignment) {
522            vortex_panic!(
523                "Cannot advance buffer by {} items, resulting alignment is not {}",
524                cnt,
525                self.alignment
526            );
527        }
528        self.bytes.advance(cnt);
529        self.length -= cnt;
530    }
531}
532
533/// Owned iterator over a [`Buffer`].
534pub struct BufferIterator<T> {
535    buffer: Buffer<T>,
536    index: usize,
537}
538
539impl<T: Copy> Iterator for BufferIterator<T> {
540    type Item = T;
541
542    fn next(&mut self) -> Option<Self::Item> {
543        (self.index < self.buffer.len()).then(move || {
544            let value = self.buffer[self.index];
545            self.index += 1;
546            value
547        })
548    }
549
550    fn size_hint(&self) -> (usize, Option<usize>) {
551        let remaining = self.buffer.len() - self.index;
552        (remaining, Some(remaining))
553    }
554}
555
556impl<T: Copy> IntoIterator for Buffer<T> {
557    type Item = T;
558    type IntoIter = BufferIterator<T>;
559
560    fn into_iter(self) -> Self::IntoIter {
561        BufferIterator {
562            buffer: self,
563            index: 0,
564        }
565    }
566}
567
568impl<T> From<BufferMut<T>> for Buffer<T> {
569    fn from(value: BufferMut<T>) -> Self {
570        value.freeze()
571    }
572}
573
574#[cfg(test)]
575mod test {
576    use bytes::Buf;
577
578    use crate::{Alignment, ByteBuffer, buffer};
579
580    #[test]
581    fn align() {
582        let buf = buffer![0u8, 1, 2];
583        let aligned = buf.aligned(Alignment::new(32));
584        assert_eq!(aligned.alignment(), Alignment::new(32));
585        assert_eq!(aligned.as_slice(), &[0, 1, 2]);
586    }
587
588    #[test]
589    fn slice() {
590        let buf = buffer![0, 1, 2, 3, 4];
591        assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
592        assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
593    }
594
595    #[test]
596    fn slice_unaligned() {
597        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
598        // With a regular slice, this would panic. See [`slice_bad_alignment`].
599        buf.slice_unaligned(1..2);
600    }
601
602    #[test]
603    #[should_panic]
604    fn slice_bad_alignment() {
605        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
606        // We should only be able to slice this buffer on 4-byte (i32) boundaries.
607        buf.slice(1..2);
608    }
609
610    #[test]
611    fn bytes_buf() {
612        let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
613        assert_eq!(buf.remaining(), 10);
614        assert_eq!(buf.chunk(), b"helloworld");
615
616        Buf::advance(&mut buf, 5);
617        assert_eq!(buf.remaining(), 5);
618        assert_eq!(buf.as_slice(), b"world");
619        assert_eq!(buf.chunk(), b"world");
620    }
621}