Skip to main content

vortex_buffer/
buffer_mut.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use core::mem::MaybeUninit;
5use std::any::type_name;
6use std::fmt::Debug;
7use std::fmt::Formatter;
8use std::io::Write;
9use std::ops::Deref;
10use std::ops::DerefMut;
11
12use bytes::Buf;
13use bytes::BufMut;
14use bytes::BytesMut;
15use bytes::buf::UninitSlice;
16use vortex_error::VortexExpect;
17use vortex_error::vortex_panic;
18
19use crate::Alignment;
20use crate::Buffer;
21use crate::ByteBufferMut;
22use crate::debug::TruncatedDebug;
23use crate::trusted_len::TrustedLen;
24
25/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
26#[derive(PartialEq, Eq)]
27pub struct BufferMut<T> {
28    pub(crate) bytes: BytesMut,
29    pub(crate) length: usize,
30    pub(crate) alignment: Alignment,
31    pub(crate) _marker: std::marker::PhantomData<T>,
32}
33
34impl<T> BufferMut<T> {
35    /// Create a new `BufferMut` with the requested alignment and capacity.
36    pub fn with_capacity(capacity: usize) -> Self {
37        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
38    }
39
40    /// Create a new `BufferMut` with the requested alignment and capacity.
41    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
42        if !alignment.is_aligned_to(Alignment::of::<T>()) {
43            vortex_panic!(
44                "Alignment {} must align to the scalar type's alignment {}",
45                alignment,
46                align_of::<T>()
47            );
48        }
49
50        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
51        bytes.align_empty(alignment);
52
53        Self {
54            bytes,
55            length: 0,
56            alignment,
57            _marker: Default::default(),
58        }
59    }
60
61    /// Create a new zeroed `BufferMut`.
62    pub fn zeroed(len: usize) -> Self {
63        Self::zeroed_aligned(len, Alignment::of::<T>())
64    }
65
66    /// Create a new zeroed `BufferMut`.
67    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
68        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
69        bytes.advance(bytes.as_ptr().align_offset(*alignment));
70        unsafe { bytes.set_len(len * size_of::<T>()) };
71        let actual_len = bytes.len().checked_div(size_of::<T>()).unwrap_or(0);
72        Self {
73            bytes,
74            length: actual_len,
75            alignment,
76            _marker: Default::default(),
77        }
78    }
79
80    /// Create a new empty `BufferMut` with the provided alignment.
81    pub fn empty() -> Self {
82        Self::empty_aligned(Alignment::of::<T>())
83    }
84
85    /// Create a new empty `BufferMut` with the provided alignment.
86    pub fn empty_aligned(alignment: Alignment) -> Self {
87        BufferMut::with_capacity_aligned(0, alignment)
88    }
89
90    /// Create a new full `BufferMut` with the given value.
91    pub fn full(item: T, len: usize) -> Self
92    where
93        T: Copy,
94    {
95        let mut buffer = BufferMut::<T>::with_capacity(len);
96        buffer.push_n(item, len);
97        buffer
98    }
99
100    /// Create a mutable scalar buffer by copying the contents of the slice.
101    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
102        Self::copy_from_aligned(other, Alignment::of::<T>())
103    }
104
105    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
106    ///
107    /// ## Panics
108    ///
109    /// Panics when the requested alignment isn't itself aligned to type T.
110    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
111        if !alignment.is_aligned_to(Alignment::of::<T>()) {
112            vortex_panic!("Given alignment is not aligned to type T")
113        }
114        let other = other.as_ref();
115        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
116        buffer.extend_from_slice(other);
117        debug_assert_eq!(buffer.alignment(), alignment);
118        buffer
119    }
120
121    /// Get the alignment of the buffer.
122    #[inline(always)]
123    pub fn alignment(&self) -> Alignment {
124        self.alignment
125    }
126
127    /// Returns the length of the buffer.
128    #[inline(always)]
129    pub fn len(&self) -> usize {
130        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
131        self.length
132    }
133
134    /// Returns whether the buffer is empty.
135    #[inline(always)]
136    pub fn is_empty(&self) -> bool {
137        self.length == 0
138    }
139
140    /// Returns the capacity of the buffer.
141    #[inline]
142    pub fn capacity(&self) -> usize {
143        self.bytes.capacity() / size_of::<T>()
144    }
145
146    /// Returns a slice over the buffer of elements of type T.
147    #[inline]
148    pub fn as_slice(&self) -> &[T] {
149        let raw_slice = self.bytes.as_ref();
150        // SAFETY: alignment of Buffer is checked on construction
151        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
152    }
153
154    /// Returns a slice over the buffer of elements of type T.
155    #[inline]
156    pub fn as_mut_slice(&mut self) -> &mut [T] {
157        let raw_slice = self.bytes.as_mut();
158        // SAFETY: alignment of Buffer is checked on construction
159        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
160    }
161
162    /// Clear the buffer, retaining any existing capacity.
163    #[inline]
164    pub fn clear(&mut self) {
165        unsafe { self.bytes.set_len(0) }
166        self.length = 0;
167    }
168
169    /// Shortens the buffer, keeping the first `len` bytes and dropping the
170    /// rest.
171    ///
172    /// If `len` is greater than the buffer's current length, this has no
173    /// effect.
174    ///
175    /// Existing underlying capacity is preserved.
176    #[inline]
177    pub fn truncate(&mut self, len: usize) {
178        if len <= self.len() {
179            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
180            unsafe { self.set_len(len) };
181        }
182    }
183
184    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
185    #[inline]
186    pub fn reserve(&mut self, additional: usize) {
187        let additional_bytes = additional * size_of::<T>();
188        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
189            // We can fit the additional bytes in the remaining capacity. Nothing to do.
190            return;
191        }
192
193        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
194        self.reserve_allocate(additional);
195    }
196
197    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
198    /// this has significant performance implications.
199    fn reserve_allocate(&mut self, additional: usize) {
200        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
201        // Make sure we at least double in size each time we re-allocate to amortize the cost
202        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
203
204        let mut bytes = BytesMut::with_capacity(new_capacity);
205        bytes.align_empty(self.alignment);
206        bytes.extend_from_slice(&self.bytes);
207        self.bytes = bytes;
208    }
209
210    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
211    /// Has identical semantics to [`Vec::spare_capacity_mut`].
212    ///
213    /// The returned slice can be used to fill the buffer with data (e.g. by
214    /// reading from a file) before marking the data as initialized using the
215    /// [`set_len`] method.
216    ///
217    /// [`set_len`]: BufferMut::set_len
218    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use vortex_buffer::BufferMut;
224    ///
225    /// // Allocate vector big enough for 10 elements.
226    /// let mut b = BufferMut::<u64>::with_capacity(10);
227    ///
228    /// // Fill in the first 3 elements.
229    /// let uninit = b.spare_capacity_mut();
230    /// uninit[0].write(0);
231    /// uninit[1].write(1);
232    /// uninit[2].write(2);
233    ///
234    /// // Mark the first 3 elements of the vector as being initialized.
235    /// unsafe {
236    ///     b.set_len(3);
237    /// }
238    ///
239    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
240    /// ```
241    #[inline]
242    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
243        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
244        unsafe {
245            std::slice::from_raw_parts_mut(
246                dst as *mut MaybeUninit<T>,
247                self.capacity() - self.length,
248            )
249        }
250    }
251
252    /// Sets the length of the buffer.
253    ///
254    /// # Safety
255    ///
256    /// - `new_len` must be less than or equal to [`capacity()`].
257    /// - The elements at `old_len..new_len` must be initialized.
258    ///
259    /// [`capacity()`]: Self::capacity
260    #[inline]
261    pub unsafe fn set_len(&mut self, len: usize) {
262        debug_assert!(len <= self.capacity());
263        unsafe { self.bytes.set_len(len * size_of::<T>()) };
264        self.length = len;
265    }
266
267    /// Appends a scalar to the buffer.
268    #[inline]
269    pub fn push(&mut self, value: T) {
270        self.reserve(1);
271        unsafe { self.push_unchecked(value) }
272    }
273
274    /// Appends a scalar to the buffer without checking for sufficient capacity.
275    ///
276    /// ## Safety
277    ///
278    /// The caller must ensure there is sufficient capacity in the array.
279    #[inline]
280    pub unsafe fn push_unchecked(&mut self, item: T) {
281        // SAFETY: the caller ensures we have sufficient capacity
282        unsafe {
283            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
284            dst.write(item);
285            self.bytes.set_len(self.bytes.len() + size_of::<T>())
286        }
287        self.length += 1;
288    }
289
290    /// Appends n scalars to the buffer.
291    ///
292    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
293    #[inline]
294    pub fn push_n(&mut self, item: T, n: usize)
295    where
296        T: Copy,
297    {
298        self.reserve(n);
299        unsafe { self.push_n_unchecked(item, n) }
300    }
301
302    /// Appends n scalars to the buffer.
303    ///
304    /// ## Safety
305    ///
306    /// The caller must ensure there is sufficient capacity in the array.
307    #[inline]
308    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
309    where
310        T: Copy,
311    {
312        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
313        // SAFETY: we checked the capacity in the reserve call
314        unsafe {
315            let end = dst.add(n);
316            while dst < end {
317                dst.write(item);
318                dst = dst.add(1);
319            }
320            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
321        }
322        self.length += n;
323    }
324
325    /// Appends a slice of type `T`, growing the internal buffer as needed.
326    ///
327    /// # Example:
328    ///
329    /// ```
330    /// # use vortex_buffer::BufferMut;
331    ///
332    /// let mut builder = BufferMut::<u16>::with_capacity(10);
333    /// builder.extend_from_slice(&[42, 44, 46]);
334    ///
335    /// assert_eq!(builder.len(), 3);
336    /// ```
337    #[inline]
338    pub fn extend_from_slice(&mut self, slice: &[T]) {
339        self.reserve(slice.len());
340        let raw_slice =
341            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
342        self.bytes.extend_from_slice(raw_slice);
343        self.length += slice.len();
344    }
345
346    /// Splits the buffer into two at the given index.
347    ///
348    /// Afterward, self contains elements `[0, at)`, and the returned buffer contains elements
349    /// `[at, capacity)`. It’s guaranteed that the memory does not move, that is, the address of
350    /// self does not change, and the address of the returned slice is at bytes after that.
351    ///
352    /// This is an O(1) operation that just increases the reference count and sets a few indices.
353    ///
354    /// Panics if either half would have a length that is not a multiple of the alignment.
355    pub fn split_off(&mut self, at: usize) -> Self {
356        if at > self.capacity() {
357            vortex_panic!("Cannot split buffer of capacity {} at {}", self.len(), at);
358        }
359
360        let bytes_at = at * size_of::<T>();
361        if !bytes_at.is_multiple_of(*self.alignment) {
362            vortex_panic!(
363                "Cannot split buffer at {}, resulting alignment is not {}",
364                at,
365                self.alignment
366            );
367        }
368
369        let new_bytes = self.bytes.split_off(bytes_at);
370
371        // Adjust the lengths, given that length may be < at
372        let new_length = self.length.saturating_sub(at);
373        self.length = self.length.min(at);
374
375        BufferMut {
376            bytes: new_bytes,
377            length: new_length,
378            alignment: self.alignment,
379            _marker: Default::default(),
380        }
381    }
382
383    /// Absorbs a mutable buffer that was previously split off.
384    ///
385    /// If the two buffers were previously contiguous and not mutated in a way that causes
386    /// re-allocation i.e., if other was created by calling split_off on this buffer, then this is
387    /// an O(1) operation that just decreases a reference count and sets a few indices.
388    ///
389    /// Otherwise, this method degenerates to self.extend_from_slice(other.as_ref()).
390    pub fn unsplit(&mut self, other: Self) {
391        if self.alignment != other.alignment {
392            vortex_panic!(
393                "Cannot unsplit buffers with different alignments: {} and {}",
394                self.alignment,
395                other.alignment
396            );
397        }
398        self.bytes.unsplit(other.bytes);
399        self.length += other.length;
400    }
401
402    /// Return the [`ByteBufferMut`] for this [`BufferMut`].
403    pub fn into_byte_buffer(self) -> ByteBufferMut {
404        ByteBufferMut {
405            bytes: self.bytes,
406            length: self.length * size_of::<T>(),
407            alignment: self.alignment,
408            _marker: Default::default(),
409        }
410    }
411
412    /// Freeze the `BufferMut` into a `Buffer`.
413    pub fn freeze(self) -> Buffer<T> {
414        Buffer {
415            bytes: self.bytes.freeze(),
416            length: self.length,
417            alignment: self.alignment,
418            _marker: Default::default(),
419        }
420    }
421
422    /// Map each element of the buffer with a closure.
423    pub fn map_each_in_place<R, F>(self, mut f: F) -> BufferMut<R>
424    where
425        T: Copy,
426        F: FnMut(T) -> R,
427    {
428        assert_eq!(
429            size_of::<T>(),
430            size_of::<R>(),
431            "Size of T and R do not match"
432        );
433        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
434        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
435        buf.iter_mut()
436            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
437        buf
438    }
439
440    /// Return a `BufferMut<T>` with the same data as this one with the given alignment.
441    ///
442    /// If the data is already properly aligned, this is a metadata-only operation.
443    ///
444    /// If the data is not aligned, we copy it into a new allocation.
445    pub fn aligned(self, alignment: Alignment) -> Self {
446        if self.as_ptr().align_offset(*alignment) == 0 {
447            Self {
448                bytes: self.bytes,
449                length: self.length,
450                alignment,
451                _marker: std::marker::PhantomData,
452            }
453        } else {
454            Self::copy_from_aligned(self, alignment)
455        }
456    }
457
458    /// Transmute a `Buffer<T>` into a `Buffer<U>`.
459    ///
460    /// # Safety
461    ///
462    /// The caller must ensure that all possible bit representations of type `T` are valid when
463    /// interpreted as type `U`.
464    /// See [`std::mem::transmute`] for more details.
465    ///
466    /// # Panics
467    ///
468    /// Panics if the type `U` does not have the same size and alignment as `T`.
469    pub unsafe fn transmute<U>(self) -> BufferMut<U> {
470        assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
471        assert_eq!(
472            align_of::<T>(),
473            align_of::<U>(),
474            "Buffer type alignment mismatch"
475        );
476
477        BufferMut {
478            bytes: self.bytes,
479            length: self.length,
480            alignment: self.alignment,
481            _marker: std::marker::PhantomData,
482        }
483    }
484}
485
486impl<T> Clone for BufferMut<T> {
487    fn clone(&self) -> Self {
488        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
489        //  might be messed up.
490        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
491        buffer.extend_from_slice(self.as_slice());
492        buffer
493    }
494}
495
496impl<T: Debug> Debug for BufferMut<T> {
497    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
498        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
499            .field("length", &self.length)
500            .field("alignment", &self.alignment)
501            .field("as_slice", &TruncatedDebug(self.as_slice()))
502            .finish()
503    }
504}
505
506impl<T> Default for BufferMut<T> {
507    fn default() -> Self {
508        Self::empty()
509    }
510}
511
512impl<T> Deref for BufferMut<T> {
513    type Target = [T];
514
515    #[inline]
516    fn deref(&self) -> &Self::Target {
517        self.as_slice()
518    }
519}
520
521impl<T> DerefMut for BufferMut<T> {
522    #[inline]
523    fn deref_mut(&mut self) -> &mut Self::Target {
524        self.as_mut_slice()
525    }
526}
527
528impl<T> AsRef<[T]> for BufferMut<T> {
529    #[inline]
530    fn as_ref(&self) -> &[T] {
531        self.as_slice()
532    }
533}
534
535impl<T> AsMut<[T]> for BufferMut<T> {
536    #[inline]
537    fn as_mut(&mut self) -> &mut [T] {
538        self.as_mut_slice()
539    }
540}
541
542impl<T> BufferMut<T> {
543    /// A helper method for the two [`Extend`] implementations.
544    ///
545    /// We use the lower bound hint on the iterator to manually write data, and then we continue to
546    /// push items normally past the lower bound.
547    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
548        // Since we do not know the length of the iterator, we can only guess how much memory we
549        // need to reserve. Note that these hints may be inaccurate.
550        let (lower_bound, _) = iter.size_hint();
551
552        // We choose not to use the optional upper bound size hint to match the standard library.
553
554        self.reserve(lower_bound);
555
556        let unwritten = self.capacity() - self.len();
557
558        // We store `begin` in the case that the lower bound hint is incorrect.
559        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
560        let mut dst: *mut T = begin.cast_mut();
561
562        // As a first step, we manually iterate the iterator up to the known capacity.
563        for _ in 0..unwritten {
564            let Some(item) = iter.next() else {
565                // The lower bound hint may be incorrect.
566                break;
567            };
568
569            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
570            // derived from a valid reference to byte data.
571            unsafe { dst.write(item) };
572
573            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
574            // slower than just incrementing `dst`.
575            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
576            // we know that `add` will not overflow.
577            unsafe { dst = dst.add(1) };
578        }
579
580        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
581        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
582        let items_written = unsafe { dst.offset_from_unsigned(begin) };
583        let length = self.len() + items_written;
584
585        // SAFETY: We have written valid items between the old length and the new length.
586        unsafe { self.set_len(length) };
587
588        // Finally, since the iterator will have arbitrarily more items to yield, we push the
589        // remaining items normally.
590        iter.for_each(|item| self.push(item));
591    }
592
593    /// Extends the `BufferMut` with an iterator with `TrustedLen`.
594    ///
595    /// The caller guarantees that the iterator will have a trusted upper bound, which allows the
596    /// implementation to reserve all of the memory needed up front.
597    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
598        // Since we know the exact upper bound (from `TrustedLen`), we can reserve all of the memory
599        // for this operation up front.
600        let (_, upper_bound) = iter.size_hint();
601        self.reserve(
602            upper_bound
603                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
604        );
605
606        // We store `begin` in the case that the upper bound hint is incorrect.
607        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
608        let mut dst: *mut T = begin.cast_mut();
609
610        iter.for_each(|item| {
611            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
612            // derived from a valid reference to byte data.
613            unsafe { dst.write(item) };
614
615            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
616            // slower than just incrementing `dst`.
617            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
618            // we know that `add` will not overflow.
619            unsafe { dst = dst.add(1) };
620        });
621
622        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
623        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
624        let items_written = unsafe { dst.offset_from_unsigned(begin) };
625        let length = self.len() + items_written;
626
627        // SAFETY: We have written valid items between the old length and the new length.
628        unsafe { self.set_len(length) };
629    }
630
631    /// Creates a `BufferMut` from an iterator with a trusted length.
632    ///
633    /// Internally, this calls [`extend_trusted()`](Self::extend_trusted).
634    pub fn from_trusted_len_iter<I>(iter: I) -> Self
635    where
636        I: TrustedLen<Item = T>,
637    {
638        let (_, upper_bound) = iter.size_hint();
639        let mut buffer = Self::with_capacity(
640            upper_bound
641                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
642        );
643
644        buffer.extend_trusted(iter);
645        buffer
646    }
647}
648
649impl<T> Extend<T> for BufferMut<T> {
650    #[inline]
651    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
652        self.extend_iter(iter.into_iter())
653    }
654}
655
656impl<'a, T> Extend<&'a T> for BufferMut<T>
657where
658    T: Copy + 'a,
659{
660    #[inline]
661    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
662        self.extend_iter(iter.into_iter().copied())
663    }
664}
665
666impl<T> FromIterator<T> for BufferMut<T> {
667    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
668        // We don't infer the capacity here and just let the first call to `extend` do it for us.
669        let mut buffer = Self::with_capacity(0);
670        buffer.extend(iter);
671        buffer
672    }
673}
674
675impl Buf for ByteBufferMut {
676    fn remaining(&self) -> usize {
677        self.len()
678    }
679
680    fn chunk(&self) -> &[u8] {
681        self.as_slice()
682    }
683
684    fn advance(&mut self, cnt: usize) {
685        if !cnt.is_multiple_of(*self.alignment) {
686            vortex_panic!(
687                "Cannot advance buffer by {} items, resulting alignment is not {}",
688                cnt,
689                self.alignment
690            );
691        }
692        self.bytes.advance(cnt);
693        self.length -= cnt;
694    }
695}
696
697/// As per the BufMut implementation, we must support internal resizing when
698/// asked to extend the buffer.
699/// See: <https://github.com/tokio-rs/bytes/issues/131>
700unsafe impl BufMut for ByteBufferMut {
701    #[inline]
702    fn remaining_mut(&self) -> usize {
703        usize::MAX - self.len()
704    }
705
706    #[inline]
707    unsafe fn advance_mut(&mut self, cnt: usize) {
708        if !cnt.is_multiple_of(*self.alignment) {
709            vortex_panic!(
710                "Cannot advance buffer by {} items, resulting alignment is not {}",
711                cnt,
712                self.alignment
713            );
714        }
715        unsafe { self.bytes.advance_mut(cnt) };
716        self.length -= cnt;
717    }
718
719    #[inline]
720    fn chunk_mut(&mut self) -> &mut UninitSlice {
721        self.bytes.chunk_mut()
722    }
723
724    fn put<T: Buf>(&mut self, mut src: T)
725    where
726        Self: Sized,
727    {
728        while src.has_remaining() {
729            let chunk = src.chunk();
730            self.extend_from_slice(chunk);
731            src.advance(chunk.len());
732        }
733    }
734
735    #[inline]
736    fn put_slice(&mut self, src: &[u8]) {
737        self.extend_from_slice(src);
738    }
739
740    #[inline]
741    fn put_bytes(&mut self, val: u8, cnt: usize) {
742        self.push_n(val, cnt)
743    }
744}
745
746/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
747trait AlignedBytesMut {
748    /// Align an empty `BytesMut` to the specified alignment.
749    ///
750    /// ## Panics
751    ///
752    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
753    fn align_empty(&mut self, alignment: Alignment);
754}
755
756impl AlignedBytesMut for BytesMut {
757    fn align_empty(&mut self, alignment: Alignment) {
758        // TODO(joe): this is slow fixme
759        if !self.is_empty() {
760            vortex_panic!("ByteBufferMut must be empty");
761        }
762
763        let padding = self.as_ptr().align_offset(*alignment);
764        self.capacity()
765            .checked_sub(padding)
766            .vortex_expect("Not enough capacity to align buffer");
767
768        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
769        // safely set the length to the padding and advance the buffer to the aligned offset.
770        unsafe { self.set_len(padding) };
771        self.advance(padding);
772    }
773}
774
775impl Write for ByteBufferMut {
776    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
777        self.extend_from_slice(buf);
778        Ok(buf.len())
779    }
780
781    fn flush(&mut self) -> std::io::Result<()> {
782        Ok(())
783    }
784}
785
786#[cfg(test)]
787mod test {
788    use bytes::Buf;
789    use bytes::BufMut;
790
791    use crate::Alignment;
792    use crate::BufferMut;
793    use crate::ByteBufferMut;
794    use crate::buffer_mut;
795
796    #[test]
797    fn capacity() {
798        let mut n = 57;
799        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
800        assert!(buf.capacity() >= 57);
801
802        while n > 0 {
803            buf.push(0);
804            assert!(buf.capacity() >= n);
805            n -= 1
806        }
807
808        assert_eq!(buf.alignment(), Alignment::new(1024));
809    }
810
811    #[test]
812    fn from_iter() {
813        let buf = BufferMut::from_iter([0, 10, 20, 30]);
814        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
815    }
816
817    #[test]
818    fn extend() {
819        let mut buf = BufferMut::empty();
820        buf.extend([0i32, 10, 20, 30]);
821        buf.extend([40, 50, 60]);
822        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
823    }
824
825    #[test]
826    fn push() {
827        let mut buf = BufferMut::empty();
828        buf.push(1);
829        buf.push(2);
830        buf.push(3);
831        assert_eq!(buf.as_slice(), &[1, 2, 3]);
832    }
833
834    #[test]
835    fn push_n() {
836        let mut buf = BufferMut::empty();
837        buf.push_n(0, 100);
838        assert_eq!(buf.as_slice(), &[0; 100]);
839    }
840
841    #[test]
842    fn as_mut() {
843        let mut buf = buffer_mut![0, 1, 2];
844        // Uses DerefMut
845        buf[1] = 0;
846        // Uses as_mut
847        buf.as_mut()[2] = 0;
848        assert_eq!(buf.as_slice(), &[0, 0, 0]);
849    }
850
851    #[test]
852    fn map_each() {
853        let buf = buffer_mut![0i32, 1, 2];
854        // Add one, and cast to an unsigned u32 in the same closure
855        let buf = buf.map_each_in_place(|i| (i + 1) as u32);
856        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
857    }
858
859    #[test]
860    fn bytes_buf() {
861        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
862        assert_eq!(buf.remaining(), 10);
863        assert_eq!(buf.chunk(), b"helloworld");
864
865        buf.advance(5);
866        assert_eq!(buf.remaining(), 5);
867        assert_eq!(buf.as_slice(), b"world");
868        assert_eq!(buf.chunk(), b"world");
869    }
870
871    #[test]
872    fn bytes_buf_mut() {
873        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
874        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
875
876        buf.put_slice(b"world");
877        assert_eq!(buf.as_slice(), b"helloworld");
878    }
879
880    #[test]
881    fn buffer_mut_zeroed() {
882        const LEN: usize = 17;
883
884        let mut buf = BufferMut::<u32>::zeroed(LEN);
885
886        assert_eq!(buf.as_ptr().align_offset(*Alignment::of::<u32>()), 0);
887        assert_eq!(buf.as_slice(), &[0; LEN]);
888
889        buf[3] = 7;
890        assert_eq!(buf.as_slice()[3], 7);
891    }
892
893    #[test]
894    fn buffer_mut_zeroed_aligned() {
895        const LEN: usize = 17;
896        let alignment = Alignment::new(64);
897
898        let mut buf = BufferMut::<u32>::zeroed_aligned(LEN, alignment);
899
900        assert_eq!(buf.as_ptr().align_offset(*alignment), 0);
901        assert_eq!(buf.as_slice(), &[0; LEN]);
902
903        buf[3] = 7;
904        assert_eq!(buf.as_slice()[3], 7);
905    }
906}