vortex_buffer/
buffer_mut.rs

1use core::mem::MaybeUninit;
2use std::any::type_name;
3use std::fmt::{Debug, Formatter};
4use std::io::Write;
5use std::ops::{Deref, DerefMut};
6
7use bytes::buf::UninitSlice;
8use bytes::{Buf, BufMut, BytesMut};
9use vortex_error::{VortexExpect, vortex_panic};
10
11use crate::debug::TruncatedDebug;
12use crate::spec_extend::SpecExtend;
13use crate::{Alignment, Buffer, ByteBufferMut};
14
15/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
16#[derive(PartialEq, Eq)]
17pub struct BufferMut<T> {
18    pub(crate) bytes: BytesMut,
19    pub(crate) length: usize,
20    pub(crate) alignment: Alignment,
21    pub(crate) _marker: std::marker::PhantomData<T>,
22}
23
24impl<T> BufferMut<T> {
25    /// Create a new `BufferMut` with the requested alignment and capacity.
26    pub fn with_capacity(capacity: usize) -> Self {
27        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
28    }
29
30    /// Create a new `BufferMut` with the requested alignment and capacity.
31    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
32        if !alignment.is_aligned_to(Alignment::of::<T>()) {
33            vortex_panic!(
34                "Alignment {} must align to the scalar type's alignment {}",
35                alignment,
36                align_of::<T>()
37            );
38        }
39
40        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
41        bytes.align_empty(alignment);
42
43        Self {
44            bytes,
45            length: 0,
46            alignment,
47            _marker: Default::default(),
48        }
49    }
50
51    /// Create a new zeroed `BufferMut`.
52    pub fn zeroed(len: usize) -> Self {
53        Self::zeroed_aligned(len, Alignment::of::<T>())
54    }
55
56    /// Create a new zeroed `BufferMut`.
57    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
58        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
59        bytes.advance(bytes.as_ptr().align_offset(*alignment));
60        unsafe { bytes.set_len(len * size_of::<T>()) };
61        Self {
62            bytes,
63            length: len,
64            alignment,
65            _marker: Default::default(),
66        }
67    }
68
69    /// Create a new empty `BufferMut` with the provided alignment.
70    pub fn empty() -> Self {
71        Self::empty_aligned(Alignment::of::<T>())
72    }
73
74    /// Create a new empty `BufferMut` with the provided alignment.
75    pub fn empty_aligned(alignment: Alignment) -> Self {
76        BufferMut::with_capacity_aligned(0, alignment)
77    }
78
79    /// Create a new full `BufferMut` with the given value.
80    pub fn full(item: T, len: usize) -> Self
81    where
82        T: Copy,
83    {
84        let mut buffer = BufferMut::<T>::with_capacity(len);
85        buffer.push_n(item, len);
86        buffer
87    }
88
89    /// Create a mutable scalar buffer by copying the contents of the slice.
90    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
91        Self::copy_from_aligned(other, Alignment::of::<T>())
92    }
93
94    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
95    ///
96    /// ## Panics
97    ///
98    /// Panics when the requested alignment isn't itself aligned to type T.
99    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
100        if !alignment.is_aligned_to(Alignment::of::<T>()) {
101            vortex_panic!("Given alignment is not aligned to type T")
102        }
103        let other = other.as_ref();
104        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
105        buffer.extend_from_slice(other);
106        debug_assert_eq!(buffer.alignment(), alignment);
107        buffer
108    }
109
110    /// Get the alignment of the buffer.
111    #[inline(always)]
112    pub fn alignment(&self) -> Alignment {
113        self.alignment
114    }
115
116    /// Returns the length of the buffer.
117    #[inline(always)]
118    pub fn len(&self) -> usize {
119        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
120        self.length
121    }
122
123    /// Returns whether the buffer is empty.
124    #[inline(always)]
125    pub fn is_empty(&self) -> bool {
126        self.length == 0
127    }
128
129    /// Returns the capacity of the buffer.
130    #[inline]
131    pub fn capacity(&self) -> usize {
132        self.bytes.capacity() / size_of::<T>()
133    }
134
135    /// Returns a slice over the buffer of elements of type T.
136    #[inline]
137    pub fn as_slice(&self) -> &[T] {
138        let raw_slice = self.bytes.as_ref();
139        // SAFETY: alignment of Buffer is checked on construction
140        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
141    }
142
143    /// Returns a slice over the buffer of elements of type T.
144    #[inline]
145    pub fn as_mut_slice(&mut self) -> &mut [T] {
146        let raw_slice = self.bytes.as_mut();
147        // SAFETY: alignment of Buffer is checked on construction
148        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
149    }
150
151    /// Clear the buffer, retaining any existing capacity.
152    #[inline]
153    pub fn clear(&mut self) {
154        unsafe { self.bytes.set_len(0) }
155        self.length = 0;
156    }
157
158    /// Shortens the buffer, keeping the first `len` bytes and dropping the
159    /// rest.
160    ///
161    /// If `len` is greater than the buffer's current length, this has no
162    /// effect.
163    ///
164    /// Existing underlying capacity is preserved.
165    #[inline]
166    pub fn truncate(&mut self, len: usize) {
167        if len <= self.len() {
168            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
169            unsafe { self.set_len(len) };
170        }
171    }
172
173    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
174    #[inline]
175    pub fn reserve(&mut self, additional: usize) {
176        let additional_bytes = additional * size_of::<T>();
177        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
178            // We can fit the additional bytes in the remaining capacity. Nothing to do.
179            return;
180        }
181
182        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
183        self.reserve_allocate(additional);
184    }
185
186    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
187    /// this has significant performance implications.
188    fn reserve_allocate(&mut self, additional: usize) {
189        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
190        // Make sure we at least double in size each time we re-allocate to amortize the cost
191        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
192
193        let mut bytes = BytesMut::with_capacity(new_capacity);
194        bytes.align_empty(self.alignment);
195        bytes.extend_from_slice(&self.bytes);
196        self.bytes = bytes;
197    }
198
199    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
200    /// Has identical semantics to [`Vec::spare_capacity_mut`].
201    ///
202    /// The returned slice can be used to fill the buffer with data (e.g. by
203    /// reading from a file) before marking the data as initialized using the
204    /// [`set_len`] method.
205    ///
206    /// [`set_len`]: BufferMut::set_len
207    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use vortex_buffer::BufferMut;
213    ///
214    /// // Allocate vector big enough for 10 elements.
215    /// let mut b = BufferMut::<u64>::with_capacity(10);
216    ///
217    /// // Fill in the first 3 elements.
218    /// let uninit = b.spare_capacity_mut();
219    /// uninit[0].write(0);
220    /// uninit[1].write(1);
221    /// uninit[2].write(2);
222    ///
223    /// // Mark the first 3 elements of the vector as being initialized.
224    /// unsafe {
225    ///     b.set_len(3);
226    /// }
227    ///
228    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
229    /// ```
230    #[inline]
231    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
232        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
233        unsafe {
234            std::slice::from_raw_parts_mut(
235                dst as *mut MaybeUninit<T>,
236                self.capacity() - self.length,
237            )
238        }
239    }
240
241    /// # Safety
242    /// The caller must ensure that the buffer was properly initialized up to `len`.
243    #[inline]
244    pub unsafe fn set_len(&mut self, len: usize) {
245        unsafe { self.bytes.set_len(len * size_of::<T>()) };
246        self.length = len;
247    }
248
249    /// Appends a scalar to the buffer.
250    #[inline]
251    pub fn push(&mut self, value: T) {
252        self.reserve(1);
253        unsafe { self.push_unchecked(value) }
254    }
255
256    /// Appends a scalar to the buffer without checking for sufficient capacity.
257    ///
258    /// ## Safety
259    ///
260    /// The caller must ensure there is sufficient capacity in the array.
261    #[inline]
262    pub unsafe fn push_unchecked(&mut self, item: T) {
263        // SAFETY: the caller ensures we have sufficient capacity
264        unsafe {
265            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
266            dst.write(item);
267            self.bytes.set_len(self.bytes.len() + size_of::<T>())
268        }
269        self.length += 1;
270    }
271
272    /// Appends n scalars to the buffer.
273    ///
274    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
275    #[inline]
276    pub fn push_n(&mut self, item: T, n: usize)
277    where
278        T: Copy,
279    {
280        self.reserve(n);
281        unsafe { self.push_n_unchecked(item, n) }
282    }
283
284    /// Appends n scalars to the buffer.
285    ///
286    /// ## Safety
287    ///
288    /// The caller must ensure there is sufficient capacity in the array.
289    #[inline]
290    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
291    where
292        T: Copy,
293    {
294        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
295        // SAFETY: we checked the capacity in the reserve call
296        unsafe {
297            let end = dst.add(n);
298            while dst < end {
299                dst.write(item);
300                dst = dst.add(1);
301            }
302            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
303        }
304        self.length += n;
305    }
306
307    /// Appends a slice of type `T`, growing the internal buffer as needed.
308    ///
309    /// # Example:
310    ///
311    /// ```
312    /// # use vortex_buffer::BufferMut;
313    ///
314    /// let mut builder = BufferMut::<u16>::with_capacity(10);
315    /// builder.extend_from_slice(&[42, 44, 46]);
316    ///
317    /// assert_eq!(builder.len(), 3);
318    /// ```
319    #[inline]
320    pub fn extend_from_slice(&mut self, slice: &[T]) {
321        self.reserve(slice.len());
322        let raw_slice =
323            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
324        self.bytes.extend_from_slice(raw_slice);
325        self.length += slice.len();
326    }
327
328    /// Freeze the `BufferMut` into a `Buffer`.
329    pub fn freeze(self) -> Buffer<T> {
330        Buffer {
331            bytes: self.bytes.freeze(),
332            length: self.length,
333            alignment: self.alignment,
334            _marker: Default::default(),
335        }
336    }
337
338    /// Map each element of the buffer with a closure.
339    pub fn map_each<R, F>(self, mut f: F) -> BufferMut<R>
340    where
341        T: Copy,
342        F: FnMut(T) -> R,
343    {
344        assert_eq!(
345            size_of::<T>(),
346            size_of::<R>(),
347            "Size of T and R do not match"
348        );
349        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
350        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
351        buf.iter_mut()
352            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
353        buf
354    }
355
356    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
357    pub fn aligned(self, alignment: Alignment) -> Self {
358        if self.as_ptr().align_offset(*alignment) == 0 {
359            self
360        } else {
361            Self::copy_from_aligned(self, alignment)
362        }
363    }
364}
365
366impl<T> Clone for BufferMut<T> {
367    fn clone(&self) -> Self {
368        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
369        //  might be messed up.
370        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
371        buffer.extend_from_slice(self.as_slice());
372        buffer
373    }
374}
375
376impl<T: Debug> Debug for BufferMut<T> {
377    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
378        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
379            .field("length", &self.length)
380            .field("alignment", &self.alignment)
381            .field("as_slice", &TruncatedDebug(self.as_slice()))
382            .finish()
383    }
384}
385
386impl<T> Default for BufferMut<T> {
387    fn default() -> Self {
388        Self::empty()
389    }
390}
391
392impl<T> Deref for BufferMut<T> {
393    type Target = [T];
394
395    fn deref(&self) -> &Self::Target {
396        self.as_slice()
397    }
398}
399
400impl<T> DerefMut for BufferMut<T> {
401    fn deref_mut(&mut self) -> &mut Self::Target {
402        self.as_mut_slice()
403    }
404}
405
406impl<T> AsRef<[T]> for BufferMut<T> {
407    fn as_ref(&self) -> &[T] {
408        self.as_slice()
409    }
410}
411
412impl<T> AsMut<[T]> for BufferMut<T> {
413    fn as_mut(&mut self) -> &mut [T] {
414        self.as_mut_slice()
415    }
416}
417
418impl<T> Extend<T> for BufferMut<T> {
419    #[inline]
420    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
421        <Self as SpecExtend<T, I::IntoIter>>::spec_extend(self, iter.into_iter())
422    }
423}
424
425impl<'a, T> Extend<&'a T> for BufferMut<T>
426where
427    T: Copy + 'a,
428{
429    #[inline]
430    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
431        self.spec_extend(iter.into_iter())
432    }
433}
434
435impl<T> FromIterator<T> for BufferMut<T> {
436    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
437        // We don't infer the capacity here and just let the first call to `extend` do it for us.
438        let mut buffer = Self::with_capacity(0);
439        buffer.extend(iter);
440        debug_assert_eq!(buffer.alignment(), Alignment::of::<T>());
441        buffer
442    }
443}
444
445impl Buf for ByteBufferMut {
446    fn remaining(&self) -> usize {
447        self.len()
448    }
449
450    fn chunk(&self) -> &[u8] {
451        self.as_slice()
452    }
453
454    fn advance(&mut self, cnt: usize) {
455        if !cnt.is_multiple_of(*self.alignment) {
456            vortex_panic!(
457                "Cannot advance buffer by {} items, resulting alignment is not {}",
458                cnt,
459                self.alignment
460            );
461        }
462        self.bytes.advance(cnt);
463        self.length -= cnt;
464    }
465}
466
467/// As per the BufMut implementation, we must support internal resizing when
468/// asked to extend the buffer.
469/// See: <https://github.com/tokio-rs/bytes/issues/131>
470unsafe impl BufMut for ByteBufferMut {
471    #[inline]
472    fn remaining_mut(&self) -> usize {
473        usize::MAX - self.len()
474    }
475
476    #[inline]
477    unsafe fn advance_mut(&mut self, cnt: usize) {
478        if !cnt.is_multiple_of(*self.alignment) {
479            vortex_panic!(
480                "Cannot advance buffer by {} items, resulting alignment is not {}",
481                cnt,
482                self.alignment
483            );
484        }
485        unsafe { self.bytes.advance_mut(cnt) };
486        self.length -= cnt;
487    }
488
489    #[inline]
490    fn chunk_mut(&mut self) -> &mut UninitSlice {
491        self.bytes.chunk_mut()
492    }
493
494    fn put<T: Buf>(&mut self, mut src: T)
495    where
496        Self: Sized,
497    {
498        while src.has_remaining() {
499            let chunk = src.chunk();
500            self.extend_from_slice(chunk);
501            src.advance(chunk.len());
502        }
503    }
504
505    #[inline]
506    fn put_slice(&mut self, src: &[u8]) {
507        self.extend_from_slice(src);
508    }
509
510    #[inline]
511    fn put_bytes(&mut self, val: u8, cnt: usize) {
512        self.push_n(val, cnt)
513    }
514}
515
516/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
517trait AlignedBytesMut {
518    /// Align an empty `BytesMut` to the specified alignment.
519    ///
520    /// ## Panics
521    ///
522    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
523    fn align_empty(&mut self, alignment: Alignment);
524}
525
526impl AlignedBytesMut for BytesMut {
527    fn align_empty(&mut self, alignment: Alignment) {
528        if !self.is_empty() {
529            vortex_panic!("ByteBufferMut must be empty");
530        }
531
532        let padding = self.as_ptr().align_offset(*alignment);
533        self.capacity()
534            .checked_sub(padding)
535            .vortex_expect("Not enough capacity to align buffer");
536
537        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
538        // safely set the length to the padding and advance the buffer to the aligned offset.
539        unsafe { self.set_len(padding) };
540        self.advance(padding);
541    }
542}
543
544impl Write for ByteBufferMut {
545    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
546        self.extend_from_slice(buf);
547        Ok(buf.len())
548    }
549
550    fn flush(&mut self) -> std::io::Result<()> {
551        Ok(())
552    }
553}
554
555#[cfg(test)]
556mod test {
557    use bytes::{Buf, BufMut};
558
559    use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
560
561    #[test]
562    fn capacity() {
563        let mut n = 57;
564        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
565        assert!(buf.capacity() >= 57);
566
567        while n > 0 {
568            buf.push(0);
569            assert!(buf.capacity() >= n);
570            n -= 1
571        }
572
573        assert_eq!(buf.alignment(), Alignment::new(1024));
574    }
575
576    #[test]
577    fn from_iter() {
578        let buf = BufferMut::from_iter([0, 10, 20, 30]);
579        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
580    }
581
582    #[test]
583    fn extend() {
584        let mut buf = BufferMut::empty();
585        buf.extend([0i32, 10, 20, 30]);
586        buf.extend([40, 50, 60]);
587        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
588    }
589
590    #[test]
591    fn push() {
592        let mut buf = BufferMut::empty();
593        buf.push(1);
594        buf.push(2);
595        buf.push(3);
596        assert_eq!(buf.as_slice(), &[1, 2, 3]);
597    }
598
599    #[test]
600    fn push_n() {
601        let mut buf = BufferMut::empty();
602        buf.push_n(0, 100);
603        assert_eq!(buf.as_slice(), &[0; 100]);
604    }
605
606    #[test]
607    fn as_mut() {
608        let mut buf = buffer_mut![0, 1, 2];
609        // Uses DerefMut
610        buf[1] = 0;
611        // Uses as_mut
612        buf.as_mut()[2] = 0;
613        assert_eq!(buf.as_slice(), &[0, 0, 0]);
614    }
615
616    #[test]
617    fn map_each() {
618        let buf = buffer_mut![0i32, 1, 2];
619        // Add one, and cast to an unsigned u32 in the same closure
620        let buf = buf.map_each(|i| (i + 1) as u32);
621        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
622    }
623
624    #[test]
625    fn bytes_buf() {
626        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
627        assert_eq!(buf.remaining(), 10);
628        assert_eq!(buf.chunk(), b"helloworld");
629
630        Buf::advance(&mut buf, 5);
631        assert_eq!(buf.remaining(), 5);
632        assert_eq!(buf.as_slice(), b"world");
633        assert_eq!(buf.chunk(), b"world");
634    }
635
636    #[test]
637    fn bytes_buf_mut() {
638        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
639        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
640
641        BufMut::put_slice(&mut buf, b"world");
642        assert_eq!(buf.as_slice(), b"helloworld");
643    }
644}