vortex_buffer/
buffer_mut.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use core::mem::MaybeUninit;
5use std::any::type_name;
6use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::io::Write;
9use std::ops::Deref;
10use std::ops::DerefMut;
11
12use bytes::Buf;
13use bytes::BufMut;
14use bytes::BytesMut;
15use bytes::buf::UninitSlice;
16use vortex_error::VortexExpect;
17use vortex_error::vortex_panic;
18
19use crate::Alignment;
20use crate::Buffer;
21use crate::ByteBufferMut;
22use crate::debug::TruncatedDebug;
23use crate::trusted_len::TrustedLen;
24
25/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
26#[derive(PartialEq, Eq)]
27pub struct BufferMut<T> {
28    pub(crate) bytes: BytesMut,
29    pub(crate) length: usize,
30    pub(crate) alignment: Alignment,
31    pub(crate) _marker: std::marker::PhantomData<T>,
32}
33
34impl<T> BufferMut<T> {
35    /// Create a new `BufferMut` with the requested alignment and capacity.
36    pub fn with_capacity(capacity: usize) -> Self {
37        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
38    }
39
40    /// Create a new `BufferMut` with the requested alignment and capacity.
41    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
42        if !alignment.is_aligned_to(Alignment::of::<T>()) {
43            vortex_panic!(
44                "Alignment {} must align to the scalar type's alignment {}",
45                alignment,
46                align_of::<T>()
47            );
48        }
49
50        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
51        bytes.align_empty(alignment);
52
53        Self {
54            bytes,
55            length: 0,
56            alignment,
57            _marker: Default::default(),
58        }
59    }
60
61    /// Create a new zeroed `BufferMut`.
62    pub fn zeroed(len: usize) -> Self {
63        Self::zeroed_aligned(len, Alignment::of::<T>())
64    }
65
66    /// Create a new zeroed `BufferMut`.
67    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
68        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
69        bytes.advance(bytes.as_ptr().align_offset(*alignment));
70        unsafe { bytes.set_len(len * size_of::<T>()) };
71        Self {
72            bytes,
73            length: len,
74            alignment,
75            _marker: Default::default(),
76        }
77    }
78
79    /// Create a new empty `BufferMut` with the provided alignment.
80    pub fn empty() -> Self {
81        Self::empty_aligned(Alignment::of::<T>())
82    }
83
84    /// Create a new empty `BufferMut` with the provided alignment.
85    pub fn empty_aligned(alignment: Alignment) -> Self {
86        BufferMut::with_capacity_aligned(0, alignment)
87    }
88
89    /// Create a new full `BufferMut` with the given value.
90    pub fn full(item: T, len: usize) -> Self
91    where
92        T: Copy,
93    {
94        let mut buffer = BufferMut::<T>::with_capacity(len);
95        buffer.push_n(item, len);
96        buffer
97    }
98
99    /// Create a mutable scalar buffer by copying the contents of the slice.
100    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
101        Self::copy_from_aligned(other, Alignment::of::<T>())
102    }
103
104    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
105    ///
106    /// ## Panics
107    ///
108    /// Panics when the requested alignment isn't itself aligned to type T.
109    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
110        if !alignment.is_aligned_to(Alignment::of::<T>()) {
111            vortex_panic!("Given alignment is not aligned to type T")
112        }
113        let other = other.as_ref();
114        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
115        buffer.extend_from_slice(other);
116        debug_assert_eq!(buffer.alignment(), alignment);
117        buffer
118    }
119
120    /// Get the alignment of the buffer.
121    #[inline(always)]
122    pub fn alignment(&self) -> Alignment {
123        self.alignment
124    }
125
126    /// Returns the length of the buffer.
127    #[inline(always)]
128    pub fn len(&self) -> usize {
129        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
130        self.length
131    }
132
133    /// Returns whether the buffer is empty.
134    #[inline(always)]
135    pub fn is_empty(&self) -> bool {
136        self.length == 0
137    }
138
139    /// Returns the capacity of the buffer.
140    #[inline]
141    pub fn capacity(&self) -> usize {
142        self.bytes.capacity() / size_of::<T>()
143    }
144
145    /// Returns a slice over the buffer of elements of type T.
146    #[inline]
147    pub fn as_slice(&self) -> &[T] {
148        let raw_slice = self.bytes.as_ref();
149        // SAFETY: alignment of Buffer is checked on construction
150        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
151    }
152
153    /// Returns a slice over the buffer of elements of type T.
154    #[inline]
155    pub fn as_mut_slice(&mut self) -> &mut [T] {
156        let raw_slice = self.bytes.as_mut();
157        // SAFETY: alignment of Buffer is checked on construction
158        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
159    }
160
161    /// Clear the buffer, retaining any existing capacity.
162    #[inline]
163    pub fn clear(&mut self) {
164        unsafe { self.bytes.set_len(0) }
165        self.length = 0;
166    }
167
168    /// Shortens the buffer, keeping the first `len` bytes and dropping the
169    /// rest.
170    ///
171    /// If `len` is greater than the buffer's current length, this has no
172    /// effect.
173    ///
174    /// Existing underlying capacity is preserved.
175    #[inline]
176    pub fn truncate(&mut self, len: usize) {
177        if len <= self.len() {
178            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
179            unsafe { self.set_len(len) };
180        }
181    }
182
183    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
184    #[inline]
185    pub fn reserve(&mut self, additional: usize) {
186        let additional_bytes = additional * size_of::<T>();
187        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
188            // We can fit the additional bytes in the remaining capacity. Nothing to do.
189            return;
190        }
191
192        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
193        self.reserve_allocate(additional);
194    }
195
196    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
197    /// this has significant performance implications.
198    fn reserve_allocate(&mut self, additional: usize) {
199        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
200        // Make sure we at least double in size each time we re-allocate to amortize the cost
201        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
202
203        let mut bytes = BytesMut::with_capacity(new_capacity);
204        bytes.align_empty(self.alignment);
205        bytes.extend_from_slice(&self.bytes);
206        self.bytes = bytes;
207    }
208
209    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
210    /// Has identical semantics to [`Vec::spare_capacity_mut`].
211    ///
212    /// The returned slice can be used to fill the buffer with data (e.g. by
213    /// reading from a file) before marking the data as initialized using the
214    /// [`set_len`] method.
215    ///
216    /// [`set_len`]: BufferMut::set_len
217    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// use vortex_buffer::BufferMut;
223    ///
224    /// // Allocate vector big enough for 10 elements.
225    /// let mut b = BufferMut::<u64>::with_capacity(10);
226    ///
227    /// // Fill in the first 3 elements.
228    /// let uninit = b.spare_capacity_mut();
229    /// uninit[0].write(0);
230    /// uninit[1].write(1);
231    /// uninit[2].write(2);
232    ///
233    /// // Mark the first 3 elements of the vector as being initialized.
234    /// unsafe {
235    ///     b.set_len(3);
236    /// }
237    ///
238    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
239    /// ```
240    #[inline]
241    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
242        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
243        unsafe {
244            std::slice::from_raw_parts_mut(
245                dst as *mut MaybeUninit<T>,
246                self.capacity() - self.length,
247            )
248        }
249    }
250
251    /// Sets the length of the buffer.
252    ///
253    /// # Safety
254    ///
255    /// - `new_len` must be less than or equal to [`capacity()`].
256    /// - The elements at `old_len..new_len` must be initialized.
257    ///
258    /// [`capacity()`]: Self::capacity
259    #[inline]
260    pub unsafe fn set_len(&mut self, len: usize) {
261        debug_assert!(len <= self.capacity());
262        unsafe { self.bytes.set_len(len * size_of::<T>()) };
263        self.length = len;
264    }
265
266    /// Appends a scalar to the buffer.
267    #[inline]
268    pub fn push(&mut self, value: T) {
269        self.reserve(1);
270        unsafe { self.push_unchecked(value) }
271    }
272
273    /// Appends a scalar to the buffer without checking for sufficient capacity.
274    ///
275    /// ## Safety
276    ///
277    /// The caller must ensure there is sufficient capacity in the array.
278    #[inline]
279    pub unsafe fn push_unchecked(&mut self, item: T) {
280        // SAFETY: the caller ensures we have sufficient capacity
281        unsafe {
282            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
283            dst.write(item);
284            self.bytes.set_len(self.bytes.len() + size_of::<T>())
285        }
286        self.length += 1;
287    }
288
289    /// Appends n scalars to the buffer.
290    ///
291    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
292    #[inline]
293    pub fn push_n(&mut self, item: T, n: usize)
294    where
295        T: Copy,
296    {
297        self.reserve(n);
298        unsafe { self.push_n_unchecked(item, n) }
299    }
300
301    /// Appends n scalars to the buffer.
302    ///
303    /// ## Safety
304    ///
305    /// The caller must ensure there is sufficient capacity in the array.
306    #[inline]
307    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
308    where
309        T: Copy,
310    {
311        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
312        // SAFETY: we checked the capacity in the reserve call
313        unsafe {
314            let end = dst.add(n);
315            while dst < end {
316                dst.write(item);
317                dst = dst.add(1);
318            }
319            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
320        }
321        self.length += n;
322    }
323
324    /// Appends a slice of type `T`, growing the internal buffer as needed.
325    ///
326    /// # Example:
327    ///
328    /// ```
329    /// # use vortex_buffer::BufferMut;
330    ///
331    /// let mut builder = BufferMut::<u16>::with_capacity(10);
332    /// builder.extend_from_slice(&[42, 44, 46]);
333    ///
334    /// assert_eq!(builder.len(), 3);
335    /// ```
336    #[inline]
337    pub fn extend_from_slice(&mut self, slice: &[T]) {
338        self.reserve(slice.len());
339        let raw_slice =
340            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
341        self.bytes.extend_from_slice(raw_slice);
342        self.length += slice.len();
343    }
344
345    /// Splits the buffer into two at the given index.
346    ///
347    /// Afterward, self contains elements `[0, at)`, and the returned buffer contains elements
348    /// `[at, capacity)`. It’s guaranteed that the memory does not move, that is, the address of
349    /// self does not change, and the address of the returned slice is at bytes after that.
350    ///
351    /// This is an O(1) operation that just increases the reference count and sets a few indices.
352    ///
353    /// Panics if either half would have a length that is not a multiple of the alignment.
354    pub fn split_off(&mut self, at: usize) -> Self {
355        if at > self.capacity() {
356            vortex_panic!("Cannot split buffer of capacity {} at {}", self.len(), at);
357        }
358
359        let bytes_at = at * size_of::<T>();
360        if !bytes_at.is_multiple_of(*self.alignment) {
361            vortex_panic!(
362                "Cannot split buffer at {}, resulting alignment is not {}",
363                at,
364                self.alignment
365            );
366        }
367
368        let new_bytes = self.bytes.split_off(bytes_at);
369
370        // Adjust the lengths, given that length may be < at
371        let new_length = self.length.saturating_sub(at);
372        self.length = self.length.min(at);
373
374        BufferMut {
375            bytes: new_bytes,
376            length: new_length,
377            alignment: self.alignment,
378            _marker: Default::default(),
379        }
380    }
381
382    /// Absorbs a mutable buffer that was previously split off.
383    ///
384    /// If the two buffers were previously contiguous and not mutated in a way that causes
385    /// re-allocation i.e., if other was created by calling split_off on this buffer, then this is
386    /// an O(1) operation that just decreases a reference count and sets a few indices.
387    ///
388    /// Otherwise, this method degenerates to self.extend_from_slice(other.as_ref()).
389    pub fn unsplit(&mut self, other: Self) {
390        if self.alignment != other.alignment {
391            vortex_panic!(
392                "Cannot unsplit buffers with different alignments: {} and {}",
393                self.alignment,
394                other.alignment
395            );
396        }
397        self.bytes.unsplit(other.bytes);
398        self.length += other.length;
399    }
400
401    /// Return the [`ByteBufferMut`] for this [`BufferMut`].
402    pub fn into_byte_buffer(self) -> ByteBufferMut {
403        ByteBufferMut {
404            bytes: self.bytes,
405            length: self.length * size_of::<T>(),
406            alignment: self.alignment,
407            _marker: Default::default(),
408        }
409    }
410
411    /// Freeze the `BufferMut` into a `Buffer`.
412    pub fn freeze(self) -> Buffer<T> {
413        Buffer {
414            bytes: self.bytes.freeze(),
415            length: self.length,
416            alignment: self.alignment,
417            _marker: Default::default(),
418        }
419    }
420
421    /// Map each element of the buffer with a closure.
422    pub fn map_each_in_place<R, F>(self, mut f: F) -> BufferMut<R>
423    where
424        T: Copy,
425        F: FnMut(T) -> R,
426    {
427        assert_eq!(
428            size_of::<T>(),
429            size_of::<R>(),
430            "Size of T and R do not match"
431        );
432        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
433        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
434        buf.iter_mut()
435            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
436        buf
437    }
438
439    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
440    pub fn aligned(self, alignment: Alignment) -> Self {
441        if self.as_ptr().align_offset(*alignment) == 0 {
442            self
443        } else {
444            Self::copy_from_aligned(self, alignment)
445        }
446    }
447}
448
449impl<T> Clone for BufferMut<T> {
450    fn clone(&self) -> Self {
451        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
452        //  might be messed up.
453        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
454        buffer.extend_from_slice(self.as_slice());
455        buffer
456    }
457}
458
459impl<T: Debug> Debug for BufferMut<T> {
460    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
461        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
462            .field("length", &self.length)
463            .field("alignment", &self.alignment)
464            .field("as_slice", &TruncatedDebug(self.as_slice()))
465            .finish()
466    }
467}
468
469impl<T> Default for BufferMut<T> {
470    fn default() -> Self {
471        Self::empty()
472    }
473}
474
475impl<T> Deref for BufferMut<T> {
476    type Target = [T];
477
478    #[inline]
479    fn deref(&self) -> &Self::Target {
480        self.as_slice()
481    }
482}
483
484impl<T> DerefMut for BufferMut<T> {
485    #[inline]
486    fn deref_mut(&mut self) -> &mut Self::Target {
487        self.as_mut_slice()
488    }
489}
490
491impl<T> AsRef<[T]> for BufferMut<T> {
492    #[inline]
493    fn as_ref(&self) -> &[T] {
494        self.as_slice()
495    }
496}
497
498impl<T> AsMut<[T]> for BufferMut<T> {
499    #[inline]
500    fn as_mut(&mut self) -> &mut [T] {
501        self.as_mut_slice()
502    }
503}
504
505impl<T> BufferMut<T> {
506    /// A helper method for the two [`Extend`] implementations.
507    ///
508    /// We use the lower bound hint on the iterator to manually write data, and then we continue to
509    /// push items normally past the lower bound.
510    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
511        // Since we do not know the length of the iterator, we can only guess how much memory we
512        // need to reserve. Note that these hints may be inaccurate.
513        let (lower_bound, _) = iter.size_hint();
514
515        // We choose not to use the optional upper bound size hint to match the standard library.
516
517        self.reserve(lower_bound);
518
519        let unwritten = self.capacity() - self.len();
520
521        // We store `begin` in the case that the lower bound hint is incorrect.
522        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
523        let mut dst: *mut T = begin.cast_mut();
524
525        // As a first step, we manually iterate the iterator up to the known capacity.
526        for _ in 0..unwritten {
527            let Some(item) = iter.next() else {
528                // The lower bound hint may be incorrect.
529                break;
530            };
531
532            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
533            // derived from a valid reference to byte data.
534            unsafe { dst.write(item) };
535
536            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
537            // slower than just incrementing `dst`.
538            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
539            // we know that `add` will not overflow.
540            unsafe { dst = dst.add(1) };
541        }
542
543        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
544        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
545        let items_written = unsafe { dst.offset_from_unsigned(begin) };
546        let length = self.len() + items_written;
547
548        // SAFETY: We have written valid items between the old length and the new length.
549        unsafe { self.set_len(length) };
550
551        // Finally, since the iterator will have arbitrarily more items to yield, we push the
552        // remaining items normally.
553        iter.for_each(|item| self.push(item));
554    }
555
556    /// Extends the `BufferMut` with an iterator with `TrustedLen`.
557    ///
558    /// The caller guarantees that the iterator will have a trusted upper bound, which allows the
559    /// implementation to reserve all of the memory needed up front.
560    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
561        // Since we know the exact upper bound (from `TrustedLen`), we can reserve all of the memory
562        // for this operation up front.
563        let (_, upper_bound) = iter.size_hint();
564        self.reserve(
565            upper_bound
566                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
567        );
568
569        // We store `begin` in the case that the upper bound hint is incorrect.
570        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
571        let mut dst: *mut T = begin.cast_mut();
572
573        iter.for_each(|item| {
574            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
575            // derived from a valid reference to byte data.
576            unsafe { dst.write(item) };
577
578            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
579            // slower than just incrementing `dst`.
580            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
581            // we know that `add` will not overflow.
582            unsafe { dst = dst.add(1) };
583        });
584
585        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
586        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
587        let items_written = unsafe { dst.offset_from_unsigned(begin) };
588        let length = self.len() + items_written;
589
590        // SAFETY: We have written valid items between the old length and the new length.
591        unsafe { self.set_len(length) };
592    }
593
594    /// Creates a `BufferMut` from an iterator with a trusted length.
595    ///
596    /// Internally, this calls [`extend_trusted()`](Self::extend_trusted).
597    pub fn from_trusted_len_iter<I>(iter: I) -> Self
598    where
599        I: TrustedLen<Item = T>,
600    {
601        let (_, upper_bound) = iter.size_hint();
602        let mut buffer = Self::with_capacity(
603            upper_bound
604                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
605        );
606
607        buffer.extend_trusted(iter);
608        buffer
609    }
610}
611
612impl<T> Extend<T> for BufferMut<T> {
613    #[inline]
614    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
615        self.extend_iter(iter.into_iter())
616    }
617}
618
619impl<'a, T> Extend<&'a T> for BufferMut<T>
620where
621    T: Copy + 'a,
622{
623    #[inline]
624    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
625        self.extend_iter(iter.into_iter().copied())
626    }
627}
628
629impl<T> FromIterator<T> for BufferMut<T> {
630    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
631        // We don't infer the capacity here and just let the first call to `extend` do it for us.
632        let mut buffer = Self::with_capacity(0);
633        buffer.extend(iter);
634        buffer
635    }
636}
637
638impl Buf for ByteBufferMut {
639    fn remaining(&self) -> usize {
640        self.len()
641    }
642
643    fn chunk(&self) -> &[u8] {
644        self.as_slice()
645    }
646
647    fn advance(&mut self, cnt: usize) {
648        if !cnt.is_multiple_of(*self.alignment) {
649            vortex_panic!(
650                "Cannot advance buffer by {} items, resulting alignment is not {}",
651                cnt,
652                self.alignment
653            );
654        }
655        self.bytes.advance(cnt);
656        self.length -= cnt;
657    }
658}
659
660/// As per the BufMut implementation, we must support internal resizing when
661/// asked to extend the buffer.
662/// See: <https://github.com/tokio-rs/bytes/issues/131>
663unsafe impl BufMut for ByteBufferMut {
664    #[inline]
665    fn remaining_mut(&self) -> usize {
666        usize::MAX - self.len()
667    }
668
669    #[inline]
670    unsafe fn advance_mut(&mut self, cnt: usize) {
671        if !cnt.is_multiple_of(*self.alignment) {
672            vortex_panic!(
673                "Cannot advance buffer by {} items, resulting alignment is not {}",
674                cnt,
675                self.alignment
676            );
677        }
678        unsafe { self.bytes.advance_mut(cnt) };
679        self.length -= cnt;
680    }
681
682    #[inline]
683    fn chunk_mut(&mut self) -> &mut UninitSlice {
684        self.bytes.chunk_mut()
685    }
686
687    fn put<T: Buf>(&mut self, mut src: T)
688    where
689        Self: Sized,
690    {
691        while src.has_remaining() {
692            let chunk = src.chunk();
693            self.extend_from_slice(chunk);
694            src.advance(chunk.len());
695        }
696    }
697
698    #[inline]
699    fn put_slice(&mut self, src: &[u8]) {
700        self.extend_from_slice(src);
701    }
702
703    #[inline]
704    fn put_bytes(&mut self, val: u8, cnt: usize) {
705        self.push_n(val, cnt)
706    }
707}
708
709/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
710trait AlignedBytesMut {
711    /// Align an empty `BytesMut` to the specified alignment.
712    ///
713    /// ## Panics
714    ///
715    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
716    fn align_empty(&mut self, alignment: Alignment);
717}
718
719impl AlignedBytesMut for BytesMut {
720    fn align_empty(&mut self, alignment: Alignment) {
721        // TODO(joe): this is slow fixme
722        if !self.is_empty() {
723            vortex_panic!("ByteBufferMut must be empty");
724        }
725
726        let padding = self.as_ptr().align_offset(*alignment);
727        self.capacity()
728            .checked_sub(padding)
729            .vortex_expect("Not enough capacity to align buffer");
730
731        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
732        // safely set the length to the padding and advance the buffer to the aligned offset.
733        unsafe { self.set_len(padding) };
734        self.advance(padding);
735    }
736}
737
738impl Write for ByteBufferMut {
739    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
740        self.extend_from_slice(buf);
741        Ok(buf.len())
742    }
743
744    fn flush(&mut self) -> std::io::Result<()> {
745        Ok(())
746    }
747}
748
749#[cfg(test)]
750mod test {
751    use bytes::Buf;
752    use bytes::BufMut;
753
754    use crate::Alignment;
755    use crate::BufferMut;
756    use crate::ByteBufferMut;
757    use crate::buffer_mut;
758
759    #[test]
760    fn capacity() {
761        let mut n = 57;
762        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
763        assert!(buf.capacity() >= 57);
764
765        while n > 0 {
766            buf.push(0);
767            assert!(buf.capacity() >= n);
768            n -= 1
769        }
770
771        assert_eq!(buf.alignment(), Alignment::new(1024));
772    }
773
774    #[test]
775    fn from_iter() {
776        let buf = BufferMut::from_iter([0, 10, 20, 30]);
777        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
778    }
779
780    #[test]
781    fn extend() {
782        let mut buf = BufferMut::empty();
783        buf.extend([0i32, 10, 20, 30]);
784        buf.extend([40, 50, 60]);
785        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
786    }
787
788    #[test]
789    fn push() {
790        let mut buf = BufferMut::empty();
791        buf.push(1);
792        buf.push(2);
793        buf.push(3);
794        assert_eq!(buf.as_slice(), &[1, 2, 3]);
795    }
796
797    #[test]
798    fn push_n() {
799        let mut buf = BufferMut::empty();
800        buf.push_n(0, 100);
801        assert_eq!(buf.as_slice(), &[0; 100]);
802    }
803
804    #[test]
805    fn as_mut() {
806        let mut buf = buffer_mut![0, 1, 2];
807        // Uses DerefMut
808        buf[1] = 0;
809        // Uses as_mut
810        buf.as_mut()[2] = 0;
811        assert_eq!(buf.as_slice(), &[0, 0, 0]);
812    }
813
814    #[test]
815    fn map_each() {
816        let buf = buffer_mut![0i32, 1, 2];
817        // Add one, and cast to an unsigned u32 in the same closure
818        let buf = buf.map_each_in_place(|i| (i + 1) as u32);
819        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
820    }
821
822    #[test]
823    fn bytes_buf() {
824        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
825        assert_eq!(buf.remaining(), 10);
826        assert_eq!(buf.chunk(), b"helloworld");
827
828        Buf::advance(&mut buf, 5);
829        assert_eq!(buf.remaining(), 5);
830        assert_eq!(buf.as_slice(), b"world");
831        assert_eq!(buf.chunk(), b"world");
832    }
833
834    #[test]
835    fn bytes_buf_mut() {
836        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
837        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
838
839        BufMut::put_slice(&mut buf, b"world");
840        assert_eq!(buf.as_slice(), b"helloworld");
841    }
842}