vortex_buffer/
buffer_mut.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use core::mem::MaybeUninit;
5use std::any::type_name;
6use std::fmt::{Debug, Formatter};
7use std::io::Write;
8use std::ops::{Deref, DerefMut};
9
10use bytes::buf::UninitSlice;
11use bytes::{Buf, BufMut, BytesMut};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, Buffer, ByteBufferMut};
17
18/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
19#[derive(PartialEq, Eq)]
20pub struct BufferMut<T> {
21    pub(crate) bytes: BytesMut,
22    pub(crate) length: usize,
23    pub(crate) alignment: Alignment,
24    pub(crate) _marker: std::marker::PhantomData<T>,
25}
26
27impl<T> BufferMut<T> {
28    /// Create a new `BufferMut` with the requested alignment and capacity.
29    pub fn with_capacity(capacity: usize) -> Self {
30        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
31    }
32
33    /// Create a new `BufferMut` with the requested alignment and capacity.
34    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
35        if !alignment.is_aligned_to(Alignment::of::<T>()) {
36            vortex_panic!(
37                "Alignment {} must align to the scalar type's alignment {}",
38                alignment,
39                align_of::<T>()
40            );
41        }
42
43        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
44        bytes.align_empty(alignment);
45
46        Self {
47            bytes,
48            length: 0,
49            alignment,
50            _marker: Default::default(),
51        }
52    }
53
54    /// Create a new zeroed `BufferMut`.
55    pub fn zeroed(len: usize) -> Self {
56        Self::zeroed_aligned(len, Alignment::of::<T>())
57    }
58
59    /// Create a new zeroed `BufferMut`.
60    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
61        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
62        bytes.advance(bytes.as_ptr().align_offset(*alignment));
63        unsafe { bytes.set_len(len * size_of::<T>()) };
64        Self {
65            bytes,
66            length: len,
67            alignment,
68            _marker: Default::default(),
69        }
70    }
71
72    /// Create a new empty `BufferMut` with the provided alignment.
73    pub fn empty() -> Self {
74        Self::empty_aligned(Alignment::of::<T>())
75    }
76
77    /// Create a new empty `BufferMut` with the provided alignment.
78    pub fn empty_aligned(alignment: Alignment) -> Self {
79        BufferMut::with_capacity_aligned(0, alignment)
80    }
81
82    /// Create a new full `BufferMut` with the given value.
83    pub fn full(item: T, len: usize) -> Self
84    where
85        T: Copy,
86    {
87        let mut buffer = BufferMut::<T>::with_capacity(len);
88        buffer.push_n(item, len);
89        buffer
90    }
91
92    /// Create a mutable scalar buffer by copying the contents of the slice.
93    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
94        Self::copy_from_aligned(other, Alignment::of::<T>())
95    }
96
97    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
98    ///
99    /// ## Panics
100    ///
101    /// Panics when the requested alignment isn't itself aligned to type T.
102    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
103        if !alignment.is_aligned_to(Alignment::of::<T>()) {
104            vortex_panic!("Given alignment is not aligned to type T")
105        }
106        let other = other.as_ref();
107        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
108        buffer.extend_from_slice(other);
109        debug_assert_eq!(buffer.alignment(), alignment);
110        buffer
111    }
112
113    /// Get the alignment of the buffer.
114    #[inline(always)]
115    pub fn alignment(&self) -> Alignment {
116        self.alignment
117    }
118
119    /// Returns the length of the buffer.
120    #[inline(always)]
121    pub fn len(&self) -> usize {
122        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
123        self.length
124    }
125
126    /// Returns whether the buffer is empty.
127    #[inline(always)]
128    pub fn is_empty(&self) -> bool {
129        self.length == 0
130    }
131
132    /// Returns the capacity of the buffer.
133    #[inline]
134    pub fn capacity(&self) -> usize {
135        self.bytes.capacity() / size_of::<T>()
136    }
137
138    /// Returns a slice over the buffer of elements of type T.
139    #[inline]
140    pub fn as_slice(&self) -> &[T] {
141        let raw_slice = self.bytes.as_ref();
142        // SAFETY: alignment of Buffer is checked on construction
143        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
144    }
145
146    /// Returns a slice over the buffer of elements of type T.
147    #[inline]
148    pub fn as_mut_slice(&mut self) -> &mut [T] {
149        let raw_slice = self.bytes.as_mut();
150        // SAFETY: alignment of Buffer is checked on construction
151        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
152    }
153
154    /// Clear the buffer, retaining any existing capacity.
155    #[inline]
156    pub fn clear(&mut self) {
157        unsafe { self.bytes.set_len(0) }
158        self.length = 0;
159    }
160
161    /// Shortens the buffer, keeping the first `len` bytes and dropping the
162    /// rest.
163    ///
164    /// If `len` is greater than the buffer's current length, this has no
165    /// effect.
166    ///
167    /// Existing underlying capacity is preserved.
168    #[inline]
169    pub fn truncate(&mut self, len: usize) {
170        if len <= self.len() {
171            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
172            unsafe { self.set_len(len) };
173        }
174    }
175
176    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
177    #[inline]
178    pub fn reserve(&mut self, additional: usize) {
179        let additional_bytes = additional * size_of::<T>();
180        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
181            // We can fit the additional bytes in the remaining capacity. Nothing to do.
182            return;
183        }
184
185        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
186        self.reserve_allocate(additional);
187    }
188
189    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
190    /// this has significant performance implications.
191    fn reserve_allocate(&mut self, additional: usize) {
192        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
193        // Make sure we at least double in size each time we re-allocate to amortize the cost
194        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
195
196        let mut bytes = BytesMut::with_capacity(new_capacity);
197        bytes.align_empty(self.alignment);
198        bytes.extend_from_slice(&self.bytes);
199        self.bytes = bytes;
200    }
201
202    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
203    /// Has identical semantics to [`Vec::spare_capacity_mut`].
204    ///
205    /// The returned slice can be used to fill the buffer with data (e.g. by
206    /// reading from a file) before marking the data as initialized using the
207    /// [`set_len`] method.
208    ///
209    /// [`set_len`]: BufferMut::set_len
210    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// use vortex_buffer::BufferMut;
216    ///
217    /// // Allocate vector big enough for 10 elements.
218    /// let mut b = BufferMut::<u64>::with_capacity(10);
219    ///
220    /// // Fill in the first 3 elements.
221    /// let uninit = b.spare_capacity_mut();
222    /// uninit[0].write(0);
223    /// uninit[1].write(1);
224    /// uninit[2].write(2);
225    ///
226    /// // Mark the first 3 elements of the vector as being initialized.
227    /// unsafe {
228    ///     b.set_len(3);
229    /// }
230    ///
231    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
232    /// ```
233    #[inline]
234    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
235        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
236        unsafe {
237            std::slice::from_raw_parts_mut(
238                dst as *mut MaybeUninit<T>,
239                self.capacity() - self.length,
240            )
241        }
242    }
243
244    /// Sets the length of the buffer.
245    ///
246    /// # Safety
247    ///
248    /// - `new_len` must be less than or equal to [`capacity()`].
249    /// - The elements at `old_len..new_len` must be initialized.
250    ///
251    /// [`capacity()`]: Self::capacity
252    #[inline]
253    pub unsafe fn set_len(&mut self, len: usize) {
254        debug_assert!(len <= self.capacity());
255        unsafe { self.bytes.set_len(len * size_of::<T>()) };
256        self.length = len;
257    }
258
259    /// Appends a scalar to the buffer.
260    #[inline]
261    pub fn push(&mut self, value: T) {
262        self.reserve(1);
263        unsafe { self.push_unchecked(value) }
264    }
265
266    /// Appends a scalar to the buffer without checking for sufficient capacity.
267    ///
268    /// ## Safety
269    ///
270    /// The caller must ensure there is sufficient capacity in the array.
271    #[inline]
272    pub unsafe fn push_unchecked(&mut self, item: T) {
273        // SAFETY: the caller ensures we have sufficient capacity
274        unsafe {
275            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
276            dst.write(item);
277            self.bytes.set_len(self.bytes.len() + size_of::<T>())
278        }
279        self.length += 1;
280    }
281
282    /// Appends n scalars to the buffer.
283    ///
284    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
285    #[inline]
286    pub fn push_n(&mut self, item: T, n: usize)
287    where
288        T: Copy,
289    {
290        self.reserve(n);
291        unsafe { self.push_n_unchecked(item, n) }
292    }
293
294    /// Appends n scalars to the buffer.
295    ///
296    /// ## Safety
297    ///
298    /// The caller must ensure there is sufficient capacity in the array.
299    #[inline]
300    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
301    where
302        T: Copy,
303    {
304        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
305        // SAFETY: we checked the capacity in the reserve call
306        unsafe {
307            let end = dst.add(n);
308            while dst < end {
309                dst.write(item);
310                dst = dst.add(1);
311            }
312            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
313        }
314        self.length += n;
315    }
316
317    /// Appends a slice of type `T`, growing the internal buffer as needed.
318    ///
319    /// # Example:
320    ///
321    /// ```
322    /// # use vortex_buffer::BufferMut;
323    ///
324    /// let mut builder = BufferMut::<u16>::with_capacity(10);
325    /// builder.extend_from_slice(&[42, 44, 46]);
326    ///
327    /// assert_eq!(builder.len(), 3);
328    /// ```
329    #[inline]
330    pub fn extend_from_slice(&mut self, slice: &[T]) {
331        self.reserve(slice.len());
332        let raw_slice =
333            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
334        self.bytes.extend_from_slice(raw_slice);
335        self.length += slice.len();
336    }
337
338    /// Splits the buffer into two at the given index.
339    ///
340    /// Afterward, self contains elements `[0, at)`, and the returned buffer contains elements
341    /// `[at, capacity)`. It’s guaranteed that the memory does not move, that is, the address of
342    /// self does not change, and the address of the returned slice is at bytes after that.
343    ///
344    /// This is an O(1) operation that just increases the reference count and sets a few indices.
345    ///
346    /// Panics if either half would have a length that is not a multiple of the alignment.
347    pub fn split_off(&mut self, at: usize) -> Self {
348        if at > self.capacity() {
349            vortex_panic!("Cannot split buffer of capacity {} at {}", self.len(), at);
350        }
351
352        let bytes_at = at * size_of::<T>();
353        if !bytes_at.is_multiple_of(*self.alignment) {
354            vortex_panic!(
355                "Cannot split buffer at {}, resulting alignment is not {}",
356                at,
357                self.alignment
358            );
359        }
360
361        let new_bytes = self.bytes.split_off(bytes_at);
362
363        // Adjust the lengths, given that length may be < at
364        let new_length = self.length.saturating_sub(at);
365        self.length = self.length.min(at);
366
367        BufferMut {
368            bytes: new_bytes,
369            length: new_length,
370            alignment: self.alignment,
371            _marker: Default::default(),
372        }
373    }
374
375    /// Absorbs a mutable buffer that was previously split off.
376    ///
377    /// If the two buffers were previously contiguous and not mutated in a way that causes
378    /// re-allocation i.e., if other was created by calling split_off on this buffer, then this is
379    /// an O(1) operation that just decreases a reference count and sets a few indices.
380    ///
381    /// Otherwise, this method degenerates to self.extend_from_slice(other.as_ref()).
382    pub fn unsplit(&mut self, other: Self) {
383        if self.alignment != other.alignment {
384            vortex_panic!(
385                "Cannot unsplit buffers with different alignments: {} and {}",
386                self.alignment,
387                other.alignment
388            );
389        }
390        self.bytes.unsplit(other.bytes);
391        self.length += other.length;
392    }
393
394    /// Freeze the `BufferMut` into a `Buffer`.
395    pub fn freeze(self) -> Buffer<T> {
396        Buffer {
397            bytes: self.bytes.freeze(),
398            length: self.length,
399            alignment: self.alignment,
400            _marker: Default::default(),
401        }
402    }
403
404    /// Map each element of the buffer with a closure.
405    pub fn map_each_in_place<R, F>(self, mut f: F) -> BufferMut<R>
406    where
407        T: Copy,
408        F: FnMut(T) -> R,
409    {
410        assert_eq!(
411            size_of::<T>(),
412            size_of::<R>(),
413            "Size of T and R do not match"
414        );
415        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
416        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
417        buf.iter_mut()
418            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
419        buf
420    }
421
422    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
423    pub fn aligned(self, alignment: Alignment) -> Self {
424        if self.as_ptr().align_offset(*alignment) == 0 {
425            self
426        } else {
427            Self::copy_from_aligned(self, alignment)
428        }
429    }
430}
431
432impl<T> Clone for BufferMut<T> {
433    fn clone(&self) -> Self {
434        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
435        //  might be messed up.
436        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
437        buffer.extend_from_slice(self.as_slice());
438        buffer
439    }
440}
441
442impl<T: Debug> Debug for BufferMut<T> {
443    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
444        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
445            .field("length", &self.length)
446            .field("alignment", &self.alignment)
447            .field("as_slice", &TruncatedDebug(self.as_slice()))
448            .finish()
449    }
450}
451
452impl<T> Default for BufferMut<T> {
453    fn default() -> Self {
454        Self::empty()
455    }
456}
457
458impl<T> Deref for BufferMut<T> {
459    type Target = [T];
460
461    #[inline]
462    fn deref(&self) -> &Self::Target {
463        self.as_slice()
464    }
465}
466
467impl<T> DerefMut for BufferMut<T> {
468    #[inline]
469    fn deref_mut(&mut self) -> &mut Self::Target {
470        self.as_mut_slice()
471    }
472}
473
474impl<T> AsRef<[T]> for BufferMut<T> {
475    #[inline]
476    fn as_ref(&self) -> &[T] {
477        self.as_slice()
478    }
479}
480
481impl<T> AsMut<[T]> for BufferMut<T> {
482    #[inline]
483    fn as_mut(&mut self) -> &mut [T] {
484        self.as_mut_slice()
485    }
486}
487
488impl<T> BufferMut<T> {
489    /// A helper method for the two [`Extend`] implementations.
490    ///
491    /// We use the lower bound hint on the iterator to manually write data, and then we continue to
492    /// push items normally past the lower bound.
493    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
494        // Since we do not know the length of the iterator, we can only guess how much memory we
495        // need to reserve. Note that these hints may be inaccurate.
496        let (lower_bound, _) = iter.size_hint();
497
498        // We choose not to use the optional upper bound size hint to match the standard library.
499
500        self.reserve(lower_bound);
501
502        let unwritten = self.capacity() - self.len();
503
504        // We store `begin` in the case that the lower bound hint is incorrect.
505        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
506        let mut dst: *mut T = begin.cast_mut();
507
508        // As a first step, we manually iterate the iterator up to the known capacity.
509        for _ in 0..unwritten {
510            let Some(item) = iter.next() else {
511                // The lower bound hint may be incorrect.
512                break;
513            };
514
515            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
516            // derived from a valid reference to byte data.
517            unsafe { dst.write(item) };
518
519            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
520            // slower than just incrementing `dst`.
521            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
522            // we know that `add` will not overflow.
523            unsafe { dst = dst.add(1) };
524        }
525
526        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
527        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
528        let items_written = unsafe { dst.offset_from_unsigned(begin) };
529        let length = self.len() + items_written;
530
531        // SAFETY: We have written valid items between the old length and the new length.
532        unsafe { self.set_len(length) };
533
534        // Finally, since the iterator will have arbitrarily more items to yield, we push the
535        // remaining items normally.
536        iter.for_each(|item| self.push(item));
537    }
538
539    /// Extends the `BufferMut` with an iterator with `TrustedLen`.
540    ///
541    /// The caller guarantees that the iterator will have a trusted upper bound, which allows the
542    /// implementation to reserve all of the memory needed up front.
543    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
544        // Since we know the exact upper bound (from `TrustedLen`), we can reserve all of the memory
545        // for this operation up front.
546        let (_, upper_bound) = iter.size_hint();
547        self.reserve(
548            upper_bound
549                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
550        );
551
552        // We store `begin` in the case that the upper bound hint is incorrect.
553        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
554        let mut dst: *mut T = begin.cast_mut();
555
556        iter.for_each(|item| {
557            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
558            // derived from a valid reference to byte data.
559            unsafe { dst.write(item) };
560
561            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
562            // slower than just incrementing `dst`.
563            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
564            // we know that `add` will not overflow.
565            unsafe { dst = dst.add(1) };
566        });
567
568        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
569        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
570        let items_written = unsafe { dst.offset_from_unsigned(begin) };
571        let length = self.len() + items_written;
572
573        // SAFETY: We have written valid items between the old length and the new length.
574        unsafe { self.set_len(length) };
575    }
576
577    /// Creates a `BufferMut` from an iterator with a trusted length.
578    ///
579    /// Internally, this calls [`extend_trusted()`](Self::extend_trusted).
580    pub fn from_trusted_len_iter<I>(iter: I) -> Self
581    where
582        I: TrustedLen<Item = T>,
583    {
584        let (_, upper_bound) = iter.size_hint();
585        let mut buffer = Self::with_capacity(
586            upper_bound
587                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
588        );
589
590        buffer.extend_trusted(iter);
591        buffer
592    }
593}
594
595impl<T> Extend<T> for BufferMut<T> {
596    #[inline]
597    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
598        self.extend_iter(iter.into_iter())
599    }
600}
601
602impl<'a, T> Extend<&'a T> for BufferMut<T>
603where
604    T: Copy + 'a,
605{
606    #[inline]
607    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
608        self.extend_iter(iter.into_iter().copied())
609    }
610}
611
612impl<T> FromIterator<T> for BufferMut<T> {
613    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
614        // We don't infer the capacity here and just let the first call to `extend` do it for us.
615        let mut buffer = Self::with_capacity(0);
616        buffer.extend(iter);
617        buffer
618    }
619}
620
621impl Buf for ByteBufferMut {
622    fn remaining(&self) -> usize {
623        self.len()
624    }
625
626    fn chunk(&self) -> &[u8] {
627        self.as_slice()
628    }
629
630    fn advance(&mut self, cnt: usize) {
631        if !cnt.is_multiple_of(*self.alignment) {
632            vortex_panic!(
633                "Cannot advance buffer by {} items, resulting alignment is not {}",
634                cnt,
635                self.alignment
636            );
637        }
638        self.bytes.advance(cnt);
639        self.length -= cnt;
640    }
641}
642
643/// As per the BufMut implementation, we must support internal resizing when
644/// asked to extend the buffer.
645/// See: <https://github.com/tokio-rs/bytes/issues/131>
646unsafe impl BufMut for ByteBufferMut {
647    #[inline]
648    fn remaining_mut(&self) -> usize {
649        usize::MAX - self.len()
650    }
651
652    #[inline]
653    unsafe fn advance_mut(&mut self, cnt: usize) {
654        if !cnt.is_multiple_of(*self.alignment) {
655            vortex_panic!(
656                "Cannot advance buffer by {} items, resulting alignment is not {}",
657                cnt,
658                self.alignment
659            );
660        }
661        unsafe { self.bytes.advance_mut(cnt) };
662        self.length -= cnt;
663    }
664
665    #[inline]
666    fn chunk_mut(&mut self) -> &mut UninitSlice {
667        self.bytes.chunk_mut()
668    }
669
670    fn put<T: Buf>(&mut self, mut src: T)
671    where
672        Self: Sized,
673    {
674        while src.has_remaining() {
675            let chunk = src.chunk();
676            self.extend_from_slice(chunk);
677            src.advance(chunk.len());
678        }
679    }
680
681    #[inline]
682    fn put_slice(&mut self, src: &[u8]) {
683        self.extend_from_slice(src);
684    }
685
686    #[inline]
687    fn put_bytes(&mut self, val: u8, cnt: usize) {
688        self.push_n(val, cnt)
689    }
690}
691
692/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
693trait AlignedBytesMut {
694    /// Align an empty `BytesMut` to the specified alignment.
695    ///
696    /// ## Panics
697    ///
698    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
699    fn align_empty(&mut self, alignment: Alignment);
700}
701
702impl AlignedBytesMut for BytesMut {
703    fn align_empty(&mut self, alignment: Alignment) {
704        // TODO(joe): this is slow fixme
705        if !self.is_empty() {
706            vortex_panic!("ByteBufferMut must be empty");
707        }
708
709        let padding = self.as_ptr().align_offset(*alignment);
710        self.capacity()
711            .checked_sub(padding)
712            .vortex_expect("Not enough capacity to align buffer");
713
714        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
715        // safely set the length to the padding and advance the buffer to the aligned offset.
716        unsafe { self.set_len(padding) };
717        self.advance(padding);
718    }
719}
720
721impl Write for ByteBufferMut {
722    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
723        self.extend_from_slice(buf);
724        Ok(buf.len())
725    }
726
727    fn flush(&mut self) -> std::io::Result<()> {
728        Ok(())
729    }
730}
731
732#[cfg(test)]
733mod test {
734    use bytes::{Buf, BufMut};
735
736    use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
737
738    #[test]
739    fn capacity() {
740        let mut n = 57;
741        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
742        assert!(buf.capacity() >= 57);
743
744        while n > 0 {
745            buf.push(0);
746            assert!(buf.capacity() >= n);
747            n -= 1
748        }
749
750        assert_eq!(buf.alignment(), Alignment::new(1024));
751    }
752
753    #[test]
754    fn from_iter() {
755        let buf = BufferMut::from_iter([0, 10, 20, 30]);
756        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
757    }
758
759    #[test]
760    fn extend() {
761        let mut buf = BufferMut::empty();
762        buf.extend([0i32, 10, 20, 30]);
763        buf.extend([40, 50, 60]);
764        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
765    }
766
767    #[test]
768    fn push() {
769        let mut buf = BufferMut::empty();
770        buf.push(1);
771        buf.push(2);
772        buf.push(3);
773        assert_eq!(buf.as_slice(), &[1, 2, 3]);
774    }
775
776    #[test]
777    fn push_n() {
778        let mut buf = BufferMut::empty();
779        buf.push_n(0, 100);
780        assert_eq!(buf.as_slice(), &[0; 100]);
781    }
782
783    #[test]
784    fn as_mut() {
785        let mut buf = buffer_mut![0, 1, 2];
786        // Uses DerefMut
787        buf[1] = 0;
788        // Uses as_mut
789        buf.as_mut()[2] = 0;
790        assert_eq!(buf.as_slice(), &[0, 0, 0]);
791    }
792
793    #[test]
794    fn map_each() {
795        let buf = buffer_mut![0i32, 1, 2];
796        // Add one, and cast to an unsigned u32 in the same closure
797        let buf = buf.map_each_in_place(|i| (i + 1) as u32);
798        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
799    }
800
801    #[test]
802    fn bytes_buf() {
803        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
804        assert_eq!(buf.remaining(), 10);
805        assert_eq!(buf.chunk(), b"helloworld");
806
807        Buf::advance(&mut buf, 5);
808        assert_eq!(buf.remaining(), 5);
809        assert_eq!(buf.as_slice(), b"world");
810        assert_eq!(buf.chunk(), b"world");
811    }
812
813    #[test]
814    fn bytes_buf_mut() {
815        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
816        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
817
818        BufMut::put_slice(&mut buf, b"world");
819        assert_eq!(buf.as_slice(), b"helloworld");
820    }
821}