vortex_buffer/
buffer.rs

1use std::any::type_name;
2use std::cmp::Ordering;
3use std::collections::Bound;
4use std::fmt::{Debug, Formatter};
5use std::hash::{Hash, Hasher};
6use std::ops::{Deref, RangeBounds};
7
8use bytes::{Buf, Bytes};
9use vortex_error::{VortexExpect, vortex_panic};
10
11use crate::debug::TruncatedDebug;
12use crate::{Alignment, BufferMut, ByteBuffer};
13
14/// An immutable buffer of items of `T`.
15#[derive(Clone)]
16pub struct Buffer<T> {
17    pub(crate) bytes: Bytes,
18    pub(crate) length: usize,
19    pub(crate) alignment: Alignment,
20    pub(crate) _marker: std::marker::PhantomData<T>,
21}
22
23impl<T> PartialEq for Buffer<T> {
24    fn eq(&self, other: &Self) -> bool {
25        self.bytes == other.bytes
26    }
27}
28
29impl<T> Eq for Buffer<T> {}
30
31impl<T> Ord for Buffer<T> {
32    fn cmp(&self, other: &Self) -> Ordering {
33        self.bytes.cmp(&other.bytes)
34    }
35}
36
37impl<T> PartialOrd for Buffer<T> {
38    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
39        Some(self.bytes.cmp(&other.bytes))
40    }
41}
42
43impl<T> Hash for Buffer<T> {
44    fn hash<H: Hasher>(&self, state: &mut H) {
45        self.bytes.as_ref().hash(state)
46    }
47}
48
49impl<T> Buffer<T> {
50    /// Returns a new `Buffer<T>` copied from the provided `Vec<T>`, `&[T]`, etc.
51    ///
52    /// Due to our underlying usage of `bytes::Bytes`, we are unable to take zero-copy ownership
53    /// of the provided `Vec<T>` while maintaining the ability to convert it back into a mutable
54    /// buffer. We could fix this by forking `Bytes`, or in many other complex ways, but for now
55    /// callers should prefer to construct `Buffer<T>` from a `BufferMut<T>`.
56    pub fn copy_from(values: impl AsRef<[T]>) -> Self {
57        BufferMut::copy_from(values).freeze()
58    }
59
60    /// Returns a new `Buffer<T>` copied from the provided slice and with the requested alignment.
61    pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
62        BufferMut::copy_from_aligned(values, alignment).freeze()
63    }
64
65    /// Create a new zeroed `Buffer` with the given value.
66    pub fn zeroed(len: usize) -> Self {
67        Self::zeroed_aligned(len, Alignment::of::<T>())
68    }
69
70    /// Create a new zeroed `Buffer` with the given value.
71    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
72        BufferMut::zeroed_aligned(len, alignment).freeze()
73    }
74
75    /// Create a new empty `ByteBuffer` with the provided alignment.
76    pub fn empty() -> Self {
77        BufferMut::empty().freeze()
78    }
79
80    /// Create a new empty `ByteBuffer` with the provided alignment.
81    pub fn empty_aligned(alignment: Alignment) -> Self {
82        BufferMut::empty_aligned(alignment).freeze()
83    }
84
85    /// Create a new full `ByteBuffer` with the given value.
86    pub fn full(item: T, len: usize) -> Self
87    where
88        T: Copy,
89    {
90        BufferMut::full(item, len).freeze()
91    }
92
93    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
94    ///
95    /// ## Panics
96    ///
97    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
98    /// the size of `T`.
99    pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
100        // TODO(ngates): should this preserve the current alignment of the buffer?
101        Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
102    }
103
104    /// Create a `Buffer<T>` zero-copy from a `ByteBuffer`.
105    ///
106    /// ## Panics
107    ///
108    /// Panics if the buffer is not aligned to the given alignment, if the length is not a multiple
109    /// of the size of `T`, or if the given alignment is not aligned to that of `T`.
110    pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
111        Self::from_bytes_aligned(buffer.into_inner(), alignment)
112    }
113
114    /// Create a `Buffer<T>` zero-copy from a `Bytes`.
115    ///
116    /// ## Panics
117    ///
118    /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of
119    /// the size of `T`.
120    pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
121        if !alignment.is_aligned_to(Alignment::of::<T>()) {
122            vortex_panic!(
123                "Alignment {} must be compatible with the scalar type's alignment {}",
124                alignment,
125                Alignment::of::<T>(),
126            );
127        }
128        if bytes.as_ptr().align_offset(*alignment) != 0 {
129            vortex_panic!(
130                "Bytes alignment must align to the requested alignment {}",
131                alignment,
132            );
133        }
134        if bytes.len() % size_of::<T>() != 0 {
135            vortex_panic!(
136                "Bytes length {} must be a multiple of the scalar type's size {}",
137                bytes.len(),
138                size_of::<T>()
139            );
140        }
141        let length = bytes.len() / size_of::<T>();
142        Self {
143            bytes,
144            length,
145            alignment,
146            _marker: Default::default(),
147        }
148    }
149
150    /// Returns the length of the buffer in elements of type T.
151    #[inline(always)]
152    pub fn len(&self) -> usize {
153        self.length
154    }
155
156    /// Returns whether the buffer is empty.
157    #[inline(always)]
158    pub fn is_empty(&self) -> bool {
159        self.length == 0
160    }
161
162    /// Returns the alignment of the buffer.
163    #[inline(always)]
164    pub fn alignment(&self) -> Alignment {
165        self.alignment
166    }
167
168    /// Returns a slice over the buffer of elements of type T.
169    #[inline(always)]
170    pub fn as_slice(&self) -> &[T] {
171        let raw_slice = self.bytes.as_ref();
172        // SAFETY: alignment of Buffer is checked on construction
173        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
174    }
175
176    /// Returns an iterator over the buffer of elements of type T.
177    pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
178        self.as_slice().iter()
179    }
180
181    /// Returns a slice of self for the provided range.
182    ///
183    /// # Panics
184    ///
185    /// Requires that `begin <= end` and `end <= self.len()`.
186    /// Also requires that both `begin` and `end` are aligned to the buffer's required alignment.
187    #[inline(always)]
188    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
189        self.slice_with_alignment(range, self.alignment)
190    }
191
192    /// Returns a slice of self for the provided range, with no guarantees about the resulting
193    /// alignment.
194    ///
195    /// # Panics
196    ///
197    /// Requires that `begin <= end` and `end <= self.len()`.
198    #[inline(always)]
199    pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
200        self.slice_with_alignment(range, Alignment::of::<u8>())
201    }
202
203    /// Returns a slice of self for the provided range, ensuring the resulting slice has the
204    /// given alignment.
205    ///
206    /// # Panics
207    ///
208    /// Requires that `begin <= end` and `end <= self.len()`.
209    /// Also requires that both `begin` and `end` are aligned to the given alignment.
210    pub fn slice_with_alignment(
211        &self,
212        range: impl RangeBounds<usize>,
213        alignment: Alignment,
214    ) -> Self {
215        let len = self.len();
216        let begin = match range.start_bound() {
217            Bound::Included(&n) => n,
218            Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
219            Bound::Unbounded => 0,
220        };
221        let end = match range.end_bound() {
222            Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
223            Bound::Excluded(&n) => n,
224            Bound::Unbounded => len,
225        };
226
227        if begin > end {
228            vortex_panic!(
229                "range start must not be greater than end: {:?} <= {:?}",
230                begin,
231                end
232            );
233        }
234        if end > len {
235            vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
236        }
237
238        if end == begin {
239            // We prefer to return a new empty buffer instead of sharing this one and creating a
240            // strong reference just to hold an empty slice.
241            return Self::empty_aligned(alignment);
242        }
243
244        let begin_byte = begin * size_of::<T>();
245        let end_byte = end * size_of::<T>();
246
247        if !begin_byte.is_multiple_of(*alignment) {
248            vortex_panic!("range start must be aligned to {:?}", alignment);
249        }
250        if !end_byte.is_multiple_of(*alignment) {
251            vortex_panic!("range end must be aligned to {:?}", alignment);
252        }
253        if !alignment.is_aligned_to(Alignment::of::<T>()) {
254            vortex_panic!("Slice alignment must at least align to type T")
255        }
256
257        Self {
258            bytes: self.bytes.slice(begin_byte..end_byte),
259            length: end - begin,
260            alignment,
261            _marker: Default::default(),
262        }
263    }
264
265    /// Returns a slice of self that is equivalent to the given subset.
266    ///
267    /// When processing the buffer you will often end up with &\[T\] that is a subset
268    /// of the underlying buffer. This function turns the slice into a slice of the buffer
269    /// it has been taken from.
270    ///
271    /// # Panics:
272    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
273    #[inline(always)]
274    pub fn slice_ref(&self, subset: &[T]) -> Self {
275        self.slice_ref_with_alignment(subset, Alignment::of::<T>())
276    }
277
278    /// Returns a slice of self that is equivalent to the given subset.
279    ///
280    /// When processing the buffer you will often end up with &\[T\] that is a subset
281    /// of the underlying buffer. This function turns the slice into a slice of the buffer
282    /// it has been taken from.
283    ///
284    /// # Panics:
285    /// Requires that the given sub slice is in fact contained within the Bytes buffer; otherwise this function will panic.
286    /// Also requires that the given alignment aligns to the type of slice and is smaller or equal to the buffers alignment
287    pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
288        if !alignment.is_aligned_to(Alignment::of::<T>()) {
289            vortex_panic!("slice_ref alignment must at least align to type T")
290        }
291
292        if !self.alignment.is_aligned_to(alignment) {
293            vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
294        }
295
296        if subset.as_ptr().align_offset(*alignment) != 0 {
297            vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
298        }
299
300        let subset_u8 =
301            unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
302
303        Self {
304            bytes: self.bytes.slice_ref(subset_u8),
305            length: subset.len(),
306            alignment,
307            _marker: Default::default(),
308        }
309    }
310
311    /// Returns the underlying aligned buffer.
312    pub fn inner(&self) -> &Bytes {
313        debug_assert_eq!(
314            self.length * size_of::<T>(),
315            self.bytes.len(),
316            "Own length has to be the same as the underlying bytes length"
317        );
318        &self.bytes
319    }
320
321    /// Returns the underlying aligned buffer.
322    pub fn into_inner(self) -> Bytes {
323        debug_assert_eq!(
324            self.length * size_of::<T>(),
325            self.bytes.len(),
326            "Own length has to be the same as the underlying bytes length"
327        );
328        self.bytes
329    }
330
331    /// Return the ByteBuffer for this `Buffer<T>`.
332    pub fn into_byte_buffer(self) -> ByteBuffer {
333        ByteBuffer {
334            bytes: self.bytes,
335            length: self.length * size_of::<T>(),
336            alignment: self.alignment,
337            _marker: Default::default(),
338        }
339    }
340
341    /// Convert self into `BufferMut<T>`, copying if there are multiple strong references.
342    pub fn into_mut(self) -> BufferMut<T> {
343        self.try_into_mut()
344            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
345    }
346
347    /// Try to convert self into `BufferMut<T>` if there is only a single strong reference.
348    pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
349        self.bytes
350            .try_into_mut()
351            .map(|bytes| BufferMut {
352                bytes,
353                length: self.length,
354                alignment: self.alignment,
355                _marker: Default::default(),
356            })
357            .map_err(|bytes| Self {
358                bytes,
359                length: self.length,
360                alignment: self.alignment,
361                _marker: Default::default(),
362            })
363    }
364
365    /// Returns whether a `Buffer<T>` is aligned to the given alignment.
366    pub fn is_aligned(&self, alignment: Alignment) -> bool {
367        self.bytes.as_ptr().align_offset(*alignment) == 0
368    }
369
370    /// Return a `Buffer<T>` with the given alignment. Where possible, this will be zero-copy.
371    pub fn aligned(mut self, alignment: Alignment) -> Self {
372        if self.as_ptr().align_offset(*alignment) == 0 {
373            self.alignment = alignment;
374            self
375        } else {
376            #[cfg(feature = "warn-copy")]
377            {
378                let bt = std::backtrace::Backtrace::capture();
379                log::warn!(
380                    "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
381                )
382            }
383            Self::copy_from_aligned(self, alignment)
384        }
385    }
386
387    /// Return a `Buffer<T>` with the given alignment. Panics if the buffer is not aligned.
388    pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
389        if self.as_ptr().align_offset(*alignment) == 0 {
390            self.alignment = alignment;
391            self
392        } else {
393            vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
394        }
395    }
396}
397
398impl<T: Debug> Debug for Buffer<T> {
399    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
400        f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
401            .field("length", &self.length)
402            .field("alignment", &self.alignment)
403            .field("as_slice", &TruncatedDebug(self.as_slice()))
404            .finish()
405    }
406}
407
408impl<T> Deref for Buffer<T> {
409    type Target = [T];
410
411    fn deref(&self) -> &Self::Target {
412        self.as_slice()
413    }
414}
415
416impl<T> AsRef<[T]> for Buffer<T> {
417    fn as_ref(&self) -> &[T] {
418        self.as_slice()
419    }
420}
421
422impl<T> FromIterator<T> for Buffer<T> {
423    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
424        BufferMut::from_iter(iter).freeze()
425    }
426}
427
428/// Only for `Buffer<u8>` can we zero-copy from a `Vec<u8>` since we can use a 1-byte alignment.
429impl From<Vec<u8>> for ByteBuffer {
430    fn from(value: Vec<u8>) -> Self {
431        Self::from(Bytes::from(value))
432    }
433}
434
435/// Only for `Buffer<u8>` can we zero-copy from a `Bytes` since we can use a 1-byte alignment.
436impl From<Bytes> for ByteBuffer {
437    fn from(bytes: Bytes) -> Self {
438        let length = bytes.len();
439        Self {
440            bytes,
441            length,
442            alignment: Alignment::of::<u8>(),
443            _marker: Default::default(),
444        }
445    }
446}
447
448impl Buf for ByteBuffer {
449    fn remaining(&self) -> usize {
450        self.len()
451    }
452
453    fn chunk(&self) -> &[u8] {
454        self.as_slice()
455    }
456
457    fn advance(&mut self, cnt: usize) {
458        if !cnt.is_multiple_of(*self.alignment) {
459            vortex_panic!(
460                "Cannot advance buffer by {} items, resulting alignment is not {}",
461                cnt,
462                self.alignment
463            );
464        }
465        self.bytes.advance(cnt);
466        self.length -= cnt;
467    }
468}
469
470/// Owned iterator over a [`Buffer`].
471pub struct BufferIterator<T> {
472    buffer: Buffer<T>,
473    index: usize,
474}
475
476impl<T: Copy> Iterator for BufferIterator<T> {
477    type Item = T;
478
479    fn next(&mut self) -> Option<Self::Item> {
480        (self.index < self.buffer.len()).then(move || {
481            let value = self.buffer.as_slice()[self.index];
482            self.index += 1;
483            value
484        })
485    }
486
487    fn size_hint(&self) -> (usize, Option<usize>) {
488        let remaining = self.buffer.len() - self.index;
489        (remaining, Some(remaining))
490    }
491}
492
493impl<T: Copy> IntoIterator for Buffer<T> {
494    type Item = T;
495    type IntoIter = BufferIterator<T>;
496
497    fn into_iter(self) -> Self::IntoIter {
498        BufferIterator {
499            buffer: self,
500            index: 0,
501        }
502    }
503}
504
505impl<T> From<BufferMut<T>> for Buffer<T> {
506    fn from(value: BufferMut<T>) -> Self {
507        value.freeze()
508    }
509}
510
511#[cfg(test)]
512mod test {
513    use bytes::Buf;
514
515    use crate::{Alignment, ByteBuffer, buffer};
516
517    #[test]
518    fn align() {
519        let buf = buffer![0u8, 1, 2];
520        let aligned = buf.aligned(Alignment::new(32));
521        assert_eq!(aligned.alignment(), Alignment::new(32));
522        assert_eq!(aligned.as_slice(), &[0, 1, 2]);
523    }
524
525    #[test]
526    fn slice() {
527        let buf = buffer![0, 1, 2, 3, 4];
528        assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
529        assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
530    }
531
532    #[test]
533    fn slice_unaligned() {
534        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
535        // With a regular slice, this would panic. See [`slice_bad_alignment`].
536        buf.slice_unaligned(1..2);
537    }
538
539    #[test]
540    #[should_panic]
541    fn slice_bad_alignment() {
542        let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
543        // We should only be able to slice this buffer on 4-byte (i32) boundaries.
544        buf.slice(1..2);
545    }
546
547    #[test]
548    fn bytes_buf() {
549        let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
550        assert_eq!(buf.remaining(), 10);
551        assert_eq!(buf.chunk(), b"helloworld");
552
553        Buf::advance(&mut buf, 5);
554        assert_eq!(buf.remaining(), 5);
555        assert_eq!(buf.as_slice(), b"world");
556        assert_eq!(buf.chunk(), b"world");
557    }
558}