vortex_buffer/
buffer_mut.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use core::mem::MaybeUninit;
5use std::any::type_name;
6use std::fmt::{Debug, Formatter};
7use std::io::Write;
8use std::ops::{Deref, DerefMut};
9
10use bytes::buf::UninitSlice;
11use bytes::{Buf, BufMut, BytesMut};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, Buffer, ByteBufferMut};
17
18/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
19#[derive(PartialEq, Eq)]
20pub struct BufferMut<T> {
21    pub(crate) bytes: BytesMut,
22    pub(crate) length: usize,
23    pub(crate) alignment: Alignment,
24    pub(crate) _marker: std::marker::PhantomData<T>,
25}
26
27impl<T> BufferMut<T> {
28    /// Create a new `BufferMut` with the requested alignment and capacity.
29    pub fn with_capacity(capacity: usize) -> Self {
30        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
31    }
32
33    /// Create a new `BufferMut` with the requested alignment and capacity.
34    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
35        if !alignment.is_aligned_to(Alignment::of::<T>()) {
36            vortex_panic!(
37                "Alignment {} must align to the scalar type's alignment {}",
38                alignment,
39                align_of::<T>()
40            );
41        }
42
43        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
44        bytes.align_empty(alignment);
45
46        Self {
47            bytes,
48            length: 0,
49            alignment,
50            _marker: Default::default(),
51        }
52    }
53
54    /// Create a new zeroed `BufferMut`.
55    pub fn zeroed(len: usize) -> Self {
56        Self::zeroed_aligned(len, Alignment::of::<T>())
57    }
58
59    /// Create a new zeroed `BufferMut`.
60    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
61        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
62        bytes.advance(bytes.as_ptr().align_offset(*alignment));
63        unsafe { bytes.set_len(len * size_of::<T>()) };
64        Self {
65            bytes,
66            length: len,
67            alignment,
68            _marker: Default::default(),
69        }
70    }
71
72    /// Create a new empty `BufferMut` with the provided alignment.
73    pub fn empty() -> Self {
74        Self::empty_aligned(Alignment::of::<T>())
75    }
76
77    /// Create a new empty `BufferMut` with the provided alignment.
78    pub fn empty_aligned(alignment: Alignment) -> Self {
79        BufferMut::with_capacity_aligned(0, alignment)
80    }
81
82    /// Create a new full `BufferMut` with the given value.
83    pub fn full(item: T, len: usize) -> Self
84    where
85        T: Copy,
86    {
87        let mut buffer = BufferMut::<T>::with_capacity(len);
88        buffer.push_n(item, len);
89        buffer
90    }
91
92    /// Create a mutable scalar buffer by copying the contents of the slice.
93    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
94        Self::copy_from_aligned(other, Alignment::of::<T>())
95    }
96
97    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
98    ///
99    /// ## Panics
100    ///
101    /// Panics when the requested alignment isn't itself aligned to type T.
102    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
103        if !alignment.is_aligned_to(Alignment::of::<T>()) {
104            vortex_panic!("Given alignment is not aligned to type T")
105        }
106        let other = other.as_ref();
107        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
108        buffer.extend_from_slice(other);
109        debug_assert_eq!(buffer.alignment(), alignment);
110        buffer
111    }
112
113    /// Get the alignment of the buffer.
114    #[inline(always)]
115    pub fn alignment(&self) -> Alignment {
116        self.alignment
117    }
118
119    /// Returns the length of the buffer.
120    #[inline(always)]
121    pub fn len(&self) -> usize {
122        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
123        self.length
124    }
125
126    /// Returns whether the buffer is empty.
127    #[inline(always)]
128    pub fn is_empty(&self) -> bool {
129        self.length == 0
130    }
131
132    /// Returns the capacity of the buffer.
133    #[inline]
134    pub fn capacity(&self) -> usize {
135        self.bytes.capacity() / size_of::<T>()
136    }
137
138    /// Returns a slice over the buffer of elements of type T.
139    #[inline]
140    pub fn as_slice(&self) -> &[T] {
141        let raw_slice = self.bytes.as_ref();
142        // SAFETY: alignment of Buffer is checked on construction
143        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
144    }
145
146    /// Returns a slice over the buffer of elements of type T.
147    #[inline]
148    pub fn as_mut_slice(&mut self) -> &mut [T] {
149        let raw_slice = self.bytes.as_mut();
150        // SAFETY: alignment of Buffer is checked on construction
151        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
152    }
153
154    /// Clear the buffer, retaining any existing capacity.
155    #[inline]
156    pub fn clear(&mut self) {
157        unsafe { self.bytes.set_len(0) }
158        self.length = 0;
159    }
160
161    /// Shortens the buffer, keeping the first `len` bytes and dropping the
162    /// rest.
163    ///
164    /// If `len` is greater than the buffer's current length, this has no
165    /// effect.
166    ///
167    /// Existing underlying capacity is preserved.
168    #[inline]
169    pub fn truncate(&mut self, len: usize) {
170        if len <= self.len() {
171            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
172            unsafe { self.set_len(len) };
173        }
174    }
175
176    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
177    #[inline]
178    pub fn reserve(&mut self, additional: usize) {
179        let additional_bytes = additional * size_of::<T>();
180        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
181            // We can fit the additional bytes in the remaining capacity. Nothing to do.
182            return;
183        }
184
185        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
186        self.reserve_allocate(additional);
187    }
188
189    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
190    /// this has significant performance implications.
191    fn reserve_allocate(&mut self, additional: usize) {
192        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
193        // Make sure we at least double in size each time we re-allocate to amortize the cost
194        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
195
196        let mut bytes = BytesMut::with_capacity(new_capacity);
197        bytes.align_empty(self.alignment);
198        bytes.extend_from_slice(&self.bytes);
199        self.bytes = bytes;
200    }
201
202    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
203    /// Has identical semantics to [`Vec::spare_capacity_mut`].
204    ///
205    /// The returned slice can be used to fill the buffer with data (e.g. by
206    /// reading from a file) before marking the data as initialized using the
207    /// [`set_len`] method.
208    ///
209    /// [`set_len`]: BufferMut::set_len
210    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// use vortex_buffer::BufferMut;
216    ///
217    /// // Allocate vector big enough for 10 elements.
218    /// let mut b = BufferMut::<u64>::with_capacity(10);
219    ///
220    /// // Fill in the first 3 elements.
221    /// let uninit = b.spare_capacity_mut();
222    /// uninit[0].write(0);
223    /// uninit[1].write(1);
224    /// uninit[2].write(2);
225    ///
226    /// // Mark the first 3 elements of the vector as being initialized.
227    /// unsafe {
228    ///     b.set_len(3);
229    /// }
230    ///
231    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
232    /// ```
233    #[inline]
234    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
235        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
236        unsafe {
237            std::slice::from_raw_parts_mut(
238                dst as *mut MaybeUninit<T>,
239                self.capacity() - self.length,
240            )
241        }
242    }
243
244    /// # Safety
245    /// The caller must ensure that the buffer was properly initialized up to `len`.
246    #[inline]
247    pub unsafe fn set_len(&mut self, len: usize) {
248        unsafe { self.bytes.set_len(len * size_of::<T>()) };
249        self.length = len;
250    }
251
252    /// Appends a scalar to the buffer.
253    #[inline]
254    pub fn push(&mut self, value: T) {
255        self.reserve(1);
256        unsafe { self.push_unchecked(value) }
257    }
258
259    /// Appends a scalar to the buffer without checking for sufficient capacity.
260    ///
261    /// ## Safety
262    ///
263    /// The caller must ensure there is sufficient capacity in the array.
264    #[inline]
265    pub unsafe fn push_unchecked(&mut self, item: T) {
266        // SAFETY: the caller ensures we have sufficient capacity
267        unsafe {
268            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
269            dst.write(item);
270            self.bytes.set_len(self.bytes.len() + size_of::<T>())
271        }
272        self.length += 1;
273    }
274
275    /// Appends n scalars to the buffer.
276    ///
277    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
278    #[inline]
279    pub fn push_n(&mut self, item: T, n: usize)
280    where
281        T: Copy,
282    {
283        self.reserve(n);
284        unsafe { self.push_n_unchecked(item, n) }
285    }
286
287    /// Appends n scalars to the buffer.
288    ///
289    /// ## Safety
290    ///
291    /// The caller must ensure there is sufficient capacity in the array.
292    #[inline]
293    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
294    where
295        T: Copy,
296    {
297        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
298        // SAFETY: we checked the capacity in the reserve call
299        unsafe {
300            let end = dst.add(n);
301            while dst < end {
302                dst.write(item);
303                dst = dst.add(1);
304            }
305            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
306        }
307        self.length += n;
308    }
309
310    /// Appends a slice of type `T`, growing the internal buffer as needed.
311    ///
312    /// # Example:
313    ///
314    /// ```
315    /// # use vortex_buffer::BufferMut;
316    ///
317    /// let mut builder = BufferMut::<u16>::with_capacity(10);
318    /// builder.extend_from_slice(&[42, 44, 46]);
319    ///
320    /// assert_eq!(builder.len(), 3);
321    /// ```
322    #[inline]
323    pub fn extend_from_slice(&mut self, slice: &[T]) {
324        self.reserve(slice.len());
325        let raw_slice =
326            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
327        self.bytes.extend_from_slice(raw_slice);
328        self.length += slice.len();
329    }
330
331    /// Freeze the `BufferMut` into a `Buffer`.
332    pub fn freeze(self) -> Buffer<T> {
333        Buffer {
334            bytes: self.bytes.freeze(),
335            length: self.length,
336            alignment: self.alignment,
337            _marker: Default::default(),
338        }
339    }
340
341    /// Map each element of the buffer with a closure.
342    pub fn map_each<R, F>(self, mut f: F) -> BufferMut<R>
343    where
344        T: Copy,
345        F: FnMut(T) -> R,
346    {
347        assert_eq!(
348            size_of::<T>(),
349            size_of::<R>(),
350            "Size of T and R do not match"
351        );
352        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
353        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
354        buf.iter_mut()
355            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
356        buf
357    }
358
359    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
360    pub fn aligned(self, alignment: Alignment) -> Self {
361        if self.as_ptr().align_offset(*alignment) == 0 {
362            self
363        } else {
364            Self::copy_from_aligned(self, alignment)
365        }
366    }
367}
368
369impl<T> Clone for BufferMut<T> {
370    fn clone(&self) -> Self {
371        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
372        //  might be messed up.
373        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
374        buffer.extend_from_slice(self.as_slice());
375        buffer
376    }
377}
378
379impl<T: Debug> Debug for BufferMut<T> {
380    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
381        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
382            .field("length", &self.length)
383            .field("alignment", &self.alignment)
384            .field("as_slice", &TruncatedDebug(self.as_slice()))
385            .finish()
386    }
387}
388
389impl<T> Default for BufferMut<T> {
390    fn default() -> Self {
391        Self::empty()
392    }
393}
394
395impl<T> Deref for BufferMut<T> {
396    type Target = [T];
397
398    #[inline]
399    fn deref(&self) -> &Self::Target {
400        self.as_slice()
401    }
402}
403
404impl<T> DerefMut for BufferMut<T> {
405    #[inline]
406    fn deref_mut(&mut self) -> &mut Self::Target {
407        self.as_mut_slice()
408    }
409}
410
411impl<T> AsRef<[T]> for BufferMut<T> {
412    #[inline]
413    fn as_ref(&self) -> &[T] {
414        self.as_slice()
415    }
416}
417
418impl<T> AsMut<[T]> for BufferMut<T> {
419    #[inline]
420    fn as_mut(&mut self) -> &mut [T] {
421        self.as_mut_slice()
422    }
423}
424
425impl<T> BufferMut<T> {
426    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
427        // Attempt to reserve enough memory up-front, although this is only a lower bound.
428        let (lower, _) = iter.size_hint();
429        self.reserve(lower);
430
431        let remaining = self.capacity() - self.len();
432
433        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
434        let mut dst: *mut T = begin.cast_mut();
435        for _ in 0..remaining {
436            if let Some(item) = iter.next() {
437                unsafe {
438                    // SAFETY: We know we have enough capacity to write the item.
439                    dst.write(item);
440                    // Note. we used to have dst.add(iteration).write(item), here.
441                    // however this was much slower than just incrementing dst.
442                    dst = dst.add(1);
443                }
444            } else {
445                break;
446            }
447        }
448
449        // TODO(joe): replace with ptr_sub when stable
450        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
451        unsafe { self.set_len(length) };
452
453        // Append remaining elements
454        iter.for_each(|item| self.push(item));
455    }
456
457    /// An unsafe variant of the `Extend` trait and its `extend` method that receives what the
458    /// caller guarantees to be an iterator with a trusted upper bound.
459    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
460        // Reserve all memory upfront since it's an exact upper bound
461        let (_, high) = iter.size_hint();
462        self.reserve(high.vortex_expect("TrustedLen iterator didn't have valid upper bound"));
463
464        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
465        let mut dst: *mut T = begin.cast_mut();
466        iter.for_each(|item| {
467            unsafe {
468                // SAFETY: We know we have enough capacity to write the item.
469                dst.write(item);
470                // Note. we used to have dst.add(iteration).write(item), here.
471                // however this was much slower than just incrementing dst.
472                dst = dst.add(1);
473            }
474        });
475        // TODO(joe): replace with ptr_sub when stable
476        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
477        unsafe { self.set_len(length) };
478    }
479}
480
481impl<T> Extend<T> for BufferMut<T> {
482    #[inline]
483    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
484        self.extend_iter(iter.into_iter())
485    }
486}
487
488impl<'a, T> Extend<&'a T> for BufferMut<T>
489where
490    T: Copy + 'a,
491{
492    #[inline]
493    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
494        self.extend_iter(iter.into_iter().copied())
495    }
496}
497
498impl<T> FromIterator<T> for BufferMut<T> {
499    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
500        // We don't infer the capacity here and just let the first call to `extend` do it for us.
501        let mut buffer = Self::with_capacity(0);
502        buffer.extend(iter);
503        debug_assert_eq!(buffer.alignment(), Alignment::of::<T>());
504        buffer
505    }
506}
507
508impl Buf for ByteBufferMut {
509    fn remaining(&self) -> usize {
510        self.len()
511    }
512
513    fn chunk(&self) -> &[u8] {
514        self.as_slice()
515    }
516
517    fn advance(&mut self, cnt: usize) {
518        if !cnt.is_multiple_of(*self.alignment) {
519            vortex_panic!(
520                "Cannot advance buffer by {} items, resulting alignment is not {}",
521                cnt,
522                self.alignment
523            );
524        }
525        self.bytes.advance(cnt);
526        self.length -= cnt;
527    }
528}
529
530/// As per the BufMut implementation, we must support internal resizing when
531/// asked to extend the buffer.
532/// See: <https://github.com/tokio-rs/bytes/issues/131>
533unsafe impl BufMut for ByteBufferMut {
534    #[inline]
535    fn remaining_mut(&self) -> usize {
536        usize::MAX - self.len()
537    }
538
539    #[inline]
540    unsafe fn advance_mut(&mut self, cnt: usize) {
541        if !cnt.is_multiple_of(*self.alignment) {
542            vortex_panic!(
543                "Cannot advance buffer by {} items, resulting alignment is not {}",
544                cnt,
545                self.alignment
546            );
547        }
548        unsafe { self.bytes.advance_mut(cnt) };
549        self.length -= cnt;
550    }
551
552    #[inline]
553    fn chunk_mut(&mut self) -> &mut UninitSlice {
554        self.bytes.chunk_mut()
555    }
556
557    fn put<T: Buf>(&mut self, mut src: T)
558    where
559        Self: Sized,
560    {
561        while src.has_remaining() {
562            let chunk = src.chunk();
563            self.extend_from_slice(chunk);
564            src.advance(chunk.len());
565        }
566    }
567
568    #[inline]
569    fn put_slice(&mut self, src: &[u8]) {
570        self.extend_from_slice(src);
571    }
572
573    #[inline]
574    fn put_bytes(&mut self, val: u8, cnt: usize) {
575        self.push_n(val, cnt)
576    }
577}
578
579/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
580trait AlignedBytesMut {
581    /// Align an empty `BytesMut` to the specified alignment.
582    ///
583    /// ## Panics
584    ///
585    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
586    fn align_empty(&mut self, alignment: Alignment);
587}
588
589impl AlignedBytesMut for BytesMut {
590    fn align_empty(&mut self, alignment: Alignment) {
591        // TODO(joe): this is slow fixme
592        if !self.is_empty() {
593            vortex_panic!("ByteBufferMut must be empty");
594        }
595
596        let padding = self.as_ptr().align_offset(*alignment);
597        self.capacity()
598            .checked_sub(padding)
599            .vortex_expect("Not enough capacity to align buffer");
600
601        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
602        // safely set the length to the padding and advance the buffer to the aligned offset.
603        unsafe { self.set_len(padding) };
604        self.advance(padding);
605    }
606}
607
608impl Write for ByteBufferMut {
609    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
610        self.extend_from_slice(buf);
611        Ok(buf.len())
612    }
613
614    fn flush(&mut self) -> std::io::Result<()> {
615        Ok(())
616    }
617}
618
619#[cfg(test)]
620mod test {
621    use bytes::{Buf, BufMut};
622
623    use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
624
625    #[test]
626    fn capacity() {
627        let mut n = 57;
628        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
629        assert!(buf.capacity() >= 57);
630
631        while n > 0 {
632            buf.push(0);
633            assert!(buf.capacity() >= n);
634            n -= 1
635        }
636
637        assert_eq!(buf.alignment(), Alignment::new(1024));
638    }
639
640    #[test]
641    fn from_iter() {
642        let buf = BufferMut::from_iter([0, 10, 20, 30]);
643        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
644    }
645
646    #[test]
647    fn extend() {
648        let mut buf = BufferMut::empty();
649        buf.extend([0i32, 10, 20, 30]);
650        buf.extend([40, 50, 60]);
651        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
652    }
653
654    #[test]
655    fn push() {
656        let mut buf = BufferMut::empty();
657        buf.push(1);
658        buf.push(2);
659        buf.push(3);
660        assert_eq!(buf.as_slice(), &[1, 2, 3]);
661    }
662
663    #[test]
664    fn push_n() {
665        let mut buf = BufferMut::empty();
666        buf.push_n(0, 100);
667        assert_eq!(buf.as_slice(), &[0; 100]);
668    }
669
670    #[test]
671    fn as_mut() {
672        let mut buf = buffer_mut![0, 1, 2];
673        // Uses DerefMut
674        buf[1] = 0;
675        // Uses as_mut
676        buf.as_mut()[2] = 0;
677        assert_eq!(buf.as_slice(), &[0, 0, 0]);
678    }
679
680    #[test]
681    fn map_each() {
682        let buf = buffer_mut![0i32, 1, 2];
683        // Add one, and cast to an unsigned u32 in the same closure
684        let buf = buf.map_each(|i| (i + 1) as u32);
685        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
686    }
687
688    #[test]
689    fn bytes_buf() {
690        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
691        assert_eq!(buf.remaining(), 10);
692        assert_eq!(buf.chunk(), b"helloworld");
693
694        Buf::advance(&mut buf, 5);
695        assert_eq!(buf.remaining(), 5);
696        assert_eq!(buf.as_slice(), b"world");
697        assert_eq!(buf.chunk(), b"world");
698    }
699
700    #[test]
701    fn bytes_buf_mut() {
702        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
703        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
704
705        BufMut::put_slice(&mut buf, b"world");
706        assert_eq!(buf.as_slice(), b"helloworld");
707    }
708}