Skip to main content

vortex_buffer/
buffer_mut.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use core::mem::MaybeUninit;
5use std::any::type_name;
6use std::cmp::max;
7use std::fmt::Debug;
8use std::fmt::Formatter;
9use std::io::Write;
10use std::ops::Deref;
11use std::ops::DerefMut;
12
13use bytes::Buf;
14use bytes::BufMut;
15use bytes::BytesMut;
16use bytes::buf::UninitSlice;
17use vortex_error::VortexExpect;
18use vortex_error::vortex_panic;
19
20use crate::Alignment;
21use crate::Buffer;
22use crate::ByteBufferMut;
23use crate::debug::TruncatedDebug;
24use crate::trusted_len::TrustedLen;
25
26/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
27#[derive(PartialEq, Eq)]
28pub struct BufferMut<T> {
29    pub(crate) bytes: BytesMut,
30    pub(crate) length: usize,
31    pub(crate) alignment: Alignment,
32    pub(crate) _marker: std::marker::PhantomData<T>,
33}
34
35impl<T> BufferMut<T> {
36    /// Create a new `BufferMut` with the requested alignment and capacity.
37    pub fn with_capacity(capacity: usize) -> Self {
38        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
39    }
40
41    /// Create a new `BufferMut` with the requested alignment and capacity.
42    ///
43    /// The allocation is over-aligned to [`Alignment::DEFAULT_ALIGNMENT`] when that is larger than
44    /// `alignment`. Use [`with_capacity_preferred_aligned`] to control the over-alignment.
45    ///
46    /// [`with_capacity_preferred_aligned`]: Self::with_capacity_preferred_aligned
47    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
48        Self::with_capacity_preferred_aligned(
49            capacity,
50            alignment,
51            Some(Alignment::DEFAULT_ALIGNMENT),
52        )
53    }
54
55    /// Create a new `BufferMut` with the requested alignment and capacity.
56    ///
57    /// The buffer reports `alignment`, but the underlying allocation is over-aligned to the larger
58    /// of `alignment` and `preferred_alignment`.
59    pub fn with_capacity_preferred_aligned(
60        capacity: usize,
61        alignment: Alignment,
62        preferred_alignment: Option<Alignment>,
63    ) -> Self {
64        let actual = max(
65            alignment,
66            preferred_alignment.unwrap_or(Alignment::of::<u8>()),
67        );
68
69        if !alignment.is_aligned_to(Alignment::of::<T>()) {
70            vortex_panic!(
71                "Alignment {} must align to the scalar type's alignment {}",
72                alignment,
73                align_of::<T>()
74            );
75        }
76
77        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *actual);
78        bytes.align_empty(actual);
79
80        Self {
81            bytes,
82            length: 0,
83            alignment,
84            _marker: Default::default(),
85        }
86    }
87
88    /// Create a new zeroed `BufferMut`.
89    pub fn zeroed(len: usize) -> Self {
90        Self::zeroed_aligned(len, Alignment::of::<T>())
91    }
92
93    /// Create a new zeroed `BufferMut` with the requested alignment.
94    ///
95    /// The allocation is over-aligned to [`Alignment::DEFAULT_ALIGNMENT`] when that is larger than
96    /// `alignment`. Use [`zeroed_preferred_aligned`] to control the over-alignment.
97    ///
98    /// [`zeroed_preferred_aligned`]: Self::zeroed_preferred_aligned
99    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
100        Self::zeroed_preferred_aligned(len, alignment, Some(Alignment::DEFAULT_ALIGNMENT))
101    }
102
103    /// Create a new zeroed `BufferMut` with the requested alignment.
104    ///
105    /// The buffer reports `alignment`, but the underlying allocation is over-aligned to the larger
106    /// of `alignment` and `preferred_alignment`.
107    pub fn zeroed_preferred_aligned(
108        len: usize,
109        alignment: Alignment,
110        preferred_alignment: Option<Alignment>,
111    ) -> Self {
112        let preferred_alignment = preferred_alignment.unwrap_or(Alignment::of::<u8>());
113        let actual_alignment = max(preferred_alignment, alignment);
114        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *actual_alignment);
115        bytes.advance(bytes.as_ptr().align_offset(*actual_alignment));
116        unsafe { bytes.set_len(len * size_of::<T>()) };
117        let actual_len = bytes.len().checked_div(size_of::<T>()).unwrap_or(0);
118        Self {
119            bytes,
120            length: actual_len,
121            alignment,
122            _marker: Default::default(),
123        }
124    }
125
126    /// Create a new empty `BufferMut` with the provided alignment.
127    pub fn empty() -> Self {
128        Self::empty_aligned(Alignment::of::<T>())
129    }
130
131    /// Create a new empty `BufferMut` with the provided alignment.
132    ///
133    /// The allocation is over-aligned to [`Alignment::DEFAULT_ALIGNMENT`] when that is larger than
134    /// `alignment`. Use [`empty_preferred_aligned`] to control the over-alignment.
135    ///
136    /// [`empty_preferred_aligned`]: Self::empty_preferred_aligned
137    pub fn empty_aligned(alignment: Alignment) -> Self {
138        Self::empty_preferred_aligned(alignment, Some(Alignment::DEFAULT_ALIGNMENT))
139    }
140
141    /// Create a new empty `BufferMut` with the provided alignment.
142    ///
143    /// The buffer reports `alignment`, but the underlying allocation is over-aligned to the larger
144    /// of `alignment` and `preferred_alignment`.
145    pub fn empty_preferred_aligned(
146        alignment: Alignment,
147        preferred_alignment: Option<Alignment>,
148    ) -> Self {
149        BufferMut::with_capacity_preferred_aligned(0, alignment, preferred_alignment)
150    }
151
152    /// Create a new full `BufferMut` with the given value.
153    pub fn full(item: T, len: usize) -> Self
154    where
155        T: Copy,
156    {
157        let mut buffer = BufferMut::<T>::with_capacity(len);
158        buffer.push_n(item, len);
159        buffer
160    }
161
162    /// Create a mutable scalar buffer by copying the contents of the slice.
163    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
164        Self::copy_from_aligned(other, Alignment::of::<T>())
165    }
166
167    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
168    ///
169    /// The allocation is over-aligned to [`Alignment::DEFAULT_ALIGNMENT`] when that is larger than
170    /// `alignment`. Use [`copy_from_preferred_aligned`] to control the over-alignment.
171    ///
172    /// [`copy_from_preferred_aligned`]: Self::copy_from_preferred_aligned
173    ///
174    /// ## Panics
175    ///
176    /// Panics when the requested alignment isn't itself aligned to type T.
177    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
178        Self::copy_from_preferred_aligned(other, alignment, Some(Alignment::DEFAULT_ALIGNMENT))
179    }
180
181    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
182    ///
183    /// The buffer reports `alignment`, but the underlying allocation is over-aligned to the larger
184    /// of `alignment` and `preferred_alignment`.
185    ///
186    /// ## Panics
187    ///
188    /// Panics when the requested alignment isn't itself aligned to type T.
189    pub fn copy_from_preferred_aligned(
190        other: impl AsRef<[T]>,
191        alignment: Alignment,
192        preferred_alignment: Option<Alignment>,
193    ) -> Self {
194        if !alignment.is_aligned_to(Alignment::of::<T>()) {
195            vortex_panic!("Given alignment is not aligned to type T")
196        }
197        let other = other.as_ref();
198        let mut buffer =
199            Self::with_capacity_preferred_aligned(other.len(), alignment, preferred_alignment);
200        buffer.extend_from_slice(other);
201        debug_assert_eq!(buffer.alignment(), alignment);
202        buffer
203    }
204
205    /// Get the alignment of the buffer.
206    #[inline(always)]
207    pub fn alignment(&self) -> Alignment {
208        self.alignment
209    }
210
211    /// Returns the length of the buffer.
212    #[inline(always)]
213    pub fn len(&self) -> usize {
214        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
215        self.length
216    }
217
218    /// Returns whether the buffer is empty.
219    #[inline(always)]
220    pub fn is_empty(&self) -> bool {
221        self.length == 0
222    }
223
224    /// Returns the capacity of the buffer.
225    #[inline]
226    pub fn capacity(&self) -> usize {
227        self.bytes.capacity() / size_of::<T>()
228    }
229
230    /// Returns a slice over the buffer of elements of type T.
231    #[inline]
232    pub fn as_slice(&self) -> &[T] {
233        let raw_slice = self.bytes.as_ref();
234        // SAFETY: alignment of Buffer is checked on construction
235        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
236    }
237
238    /// Returns a slice over the buffer of elements of type T.
239    #[inline]
240    pub fn as_mut_slice(&mut self) -> &mut [T] {
241        let raw_slice = self.bytes.as_mut();
242        // SAFETY: alignment of Buffer is checked on construction
243        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
244    }
245
246    /// Clear the buffer, retaining any existing capacity.
247    #[inline]
248    pub fn clear(&mut self) {
249        unsafe { self.bytes.set_len(0) }
250        self.length = 0;
251    }
252
253    /// Shortens the buffer, keeping the first `len` bytes and dropping the
254    /// rest.
255    ///
256    /// If `len` is greater than the buffer's current length, this has no
257    /// effect.
258    ///
259    /// Existing underlying capacity is preserved.
260    #[inline]
261    pub fn truncate(&mut self, len: usize) {
262        if len <= self.len() {
263            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
264            unsafe { self.set_len(len) };
265        }
266    }
267
268    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
269    #[inline]
270    pub fn reserve(&mut self, additional: usize) {
271        let additional_bytes = additional * size_of::<T>();
272        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
273            // We can fit the additional bytes in the remaining capacity. Nothing to do.
274            return;
275        }
276
277        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
278        self.reserve_allocate(additional);
279    }
280
281    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
282    /// this has significant performance implications.
283    fn reserve_allocate(&mut self, additional: usize) {
284        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
285        // Make sure we at least double in size each time we re-allocate to amortize the cost
286        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
287
288        let mut bytes = BytesMut::with_capacity(new_capacity);
289        bytes.align_empty(self.alignment);
290        bytes.extend_from_slice(&self.bytes);
291        self.bytes = bytes;
292    }
293
294    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
295    /// Has identical semantics to [`Vec::spare_capacity_mut`].
296    ///
297    /// The returned slice can be used to fill the buffer with data (e.g. by
298    /// reading from a file) before marking the data as initialized using the
299    /// [`set_len`] method.
300    ///
301    /// Note that the returned slice may be larger than the capacity requested at
302    /// construction, since the underlying allocation can be rounded up (e.g. to
303    /// satisfy alignment requirements).
304    ///
305    /// [`set_len`]: BufferMut::set_len
306    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
307    ///
308    /// # Examples
309    ///
310    /// ```
311    /// use vortex_buffer::BufferMut;
312    ///
313    /// // Allocate vector big enough for 10 elements.
314    /// let mut b = BufferMut::<u64>::with_capacity(10);
315    ///
316    /// // Fill in the first 3 elements.
317    /// let uninit = b.spare_capacity_mut();
318    /// uninit[0].write(0);
319    /// uninit[1].write(1);
320    /// uninit[2].write(2);
321    ///
322    /// // Mark the first 3 elements of the vector as being initialized.
323    /// unsafe {
324    ///     b.set_len(3);
325    /// }
326    ///
327    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
328    /// ```
329    #[inline]
330    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
331        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
332        unsafe {
333            std::slice::from_raw_parts_mut(
334                dst as *mut MaybeUninit<T>,
335                self.capacity() - self.length,
336            )
337        }
338    }
339
340    /// Sets the length of the buffer.
341    ///
342    /// # Safety
343    ///
344    /// - `new_len` must be less than or equal to [`capacity()`].
345    /// - The elements at `old_len..new_len` must be initialized.
346    ///
347    /// [`capacity()`]: Self::capacity
348    #[inline]
349    pub unsafe fn set_len(&mut self, len: usize) {
350        debug_assert!(len <= self.capacity());
351        unsafe { self.bytes.set_len(len * size_of::<T>()) };
352        self.length = len;
353    }
354
355    /// Appends a scalar to the buffer.
356    #[inline]
357    pub fn push(&mut self, value: T) {
358        self.reserve(1);
359        unsafe { self.push_unchecked(value) }
360    }
361
362    /// Appends a scalar to the buffer without checking for sufficient capacity.
363    ///
364    /// ## Safety
365    ///
366    /// The caller must ensure there is sufficient capacity in the array.
367    #[inline]
368    pub unsafe fn push_unchecked(&mut self, item: T) {
369        // SAFETY: the caller ensures we have sufficient capacity
370        unsafe {
371            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
372            dst.write(item);
373            self.bytes.set_len(self.bytes.len() + size_of::<T>())
374        }
375        self.length += 1;
376    }
377
378    /// Appends n scalars to the buffer.
379    ///
380    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
381    #[inline]
382    pub fn push_n(&mut self, item: T, n: usize)
383    where
384        T: Copy,
385    {
386        self.reserve(n);
387        unsafe { self.push_n_unchecked(item, n) }
388    }
389
390    /// Appends n scalars to the buffer.
391    ///
392    /// ## Safety
393    ///
394    /// The caller must ensure there is sufficient capacity in the array.
395    #[inline]
396    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
397    where
398        T: Copy,
399    {
400        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
401        // SAFETY: we checked the capacity in the reserve call
402        unsafe {
403            let end = dst.add(n);
404            while dst < end {
405                dst.write(item);
406                dst = dst.add(1);
407            }
408            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
409        }
410        self.length += n;
411    }
412
413    /// Appends a slice of type `T`, growing the internal buffer as needed.
414    ///
415    /// # Example:
416    ///
417    /// ```
418    /// # use vortex_buffer::BufferMut;
419    ///
420    /// let mut builder = BufferMut::<u16>::with_capacity(10);
421    /// builder.extend_from_slice(&[42, 44, 46]);
422    ///
423    /// assert_eq!(builder.len(), 3);
424    /// ```
425    #[inline]
426    pub fn extend_from_slice(&mut self, slice: &[T]) {
427        self.reserve(slice.len());
428        let raw_slice =
429            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
430        self.bytes.extend_from_slice(raw_slice);
431        self.length += slice.len();
432    }
433
434    /// Splits the buffer into two at the given index.
435    ///
436    /// Afterward, self contains elements `[0, at)`, and the returned buffer contains elements
437    /// `[at, capacity)`. It’s guaranteed that the memory does not move, that is, the address of
438    /// self does not change, and the address of the returned slice is at bytes after that.
439    ///
440    /// This is an O(1) operation that just increases the reference count and sets a few indices.
441    ///
442    /// Panics if either half would have a length that is not a multiple of the alignment.
443    pub fn split_off(&mut self, at: usize) -> Self {
444        if at > self.capacity() {
445            vortex_panic!("Cannot split buffer of capacity {} at {}", self.len(), at);
446        }
447
448        let bytes_at = at * size_of::<T>();
449        if !self.alignment.is_offset_aligned(bytes_at) {
450            vortex_panic!(
451                "Cannot split buffer at {}, resulting alignment is not {}",
452                at,
453                self.alignment
454            );
455        }
456
457        let new_bytes = self.bytes.split_off(bytes_at);
458
459        // Adjust the lengths, given that length may be < at
460        let new_length = self.length.saturating_sub(at);
461        self.length = self.length.min(at);
462
463        BufferMut {
464            bytes: new_bytes,
465            length: new_length,
466            alignment: self.alignment,
467            _marker: Default::default(),
468        }
469    }
470
471    /// Absorbs a mutable buffer that was previously split off.
472    ///
473    /// If the two buffers were previously contiguous and not mutated in a way that causes
474    /// re-allocation i.e., if other was created by calling split_off on this buffer, then this is
475    /// an O(1) operation that just decreases a reference count and sets a few indices.
476    ///
477    /// Otherwise, this method degenerates to self.extend_from_slice(other.as_ref()).
478    pub fn unsplit(&mut self, other: Self) {
479        if self.alignment != other.alignment {
480            vortex_panic!(
481                "Cannot unsplit buffers with different alignments: {} and {}",
482                self.alignment,
483                other.alignment
484            );
485        }
486        self.bytes.unsplit(other.bytes);
487        self.length += other.length;
488    }
489
490    /// Return the [`ByteBufferMut`] for this [`BufferMut`].
491    pub fn into_byte_buffer(self) -> ByteBufferMut {
492        ByteBufferMut {
493            bytes: self.bytes,
494            length: self.length * size_of::<T>(),
495            alignment: self.alignment,
496            _marker: Default::default(),
497        }
498    }
499
500    /// Freeze the `BufferMut` into a `Buffer`.
501    pub fn freeze(self) -> Buffer<T> {
502        Buffer {
503            bytes: self.bytes.freeze(),
504            length: self.length,
505            alignment: self.alignment,
506            _marker: Default::default(),
507        }
508    }
509
510    /// Map each element of the buffer with a closure.
511    pub fn map_each_in_place<R, F>(self, mut f: F) -> BufferMut<R>
512    where
513        T: Copy,
514        F: FnMut(T) -> R,
515    {
516        assert_eq!(
517            size_of::<T>(),
518            size_of::<R>(),
519            "Size of T and R do not match"
520        );
521        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
522        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
523        buf.iter_mut()
524            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
525        buf
526    }
527
528    /// Return a `BufferMut<T>` with the same data as this one with the given alignment.
529    ///
530    /// If the data is already properly aligned, this is a metadata-only operation.
531    ///
532    /// If the data is not aligned, we copy it into a new allocation.
533    pub fn aligned(self, alignment: Alignment) -> Self {
534        if self.as_ptr().align_offset(*alignment) == 0 {
535            Self {
536                bytes: self.bytes,
537                length: self.length,
538                alignment,
539                _marker: std::marker::PhantomData,
540            }
541        } else {
542            Self::copy_from_aligned(self, alignment)
543        }
544    }
545
546    /// Transmute a `Buffer<T>` into a `Buffer<U>`.
547    ///
548    /// # Safety
549    ///
550    /// The caller must ensure that all possible bit representations of type `T` are valid when
551    /// interpreted as type `U`.
552    /// See [`std::mem::transmute`] for more details.
553    ///
554    /// # Panics
555    ///
556    /// Panics if the type `U` does not have the same size and alignment as `T`.
557    pub unsafe fn transmute<U>(self) -> BufferMut<U> {
558        assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
559        assert_eq!(
560            align_of::<T>(),
561            align_of::<U>(),
562            "Buffer type alignment mismatch"
563        );
564
565        BufferMut {
566            bytes: self.bytes,
567            length: self.length,
568            alignment: self.alignment,
569            _marker: std::marker::PhantomData,
570        }
571    }
572}
573
574impl<T> Clone for BufferMut<T> {
575    fn clone(&self) -> Self {
576        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
577        //  might be messed up.
578        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
579        buffer.extend_from_slice(self.as_slice());
580        buffer
581    }
582}
583
584impl<T: Debug> Debug for BufferMut<T> {
585    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
586        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
587            .field("length", &self.length)
588            .field("alignment", &self.alignment)
589            .field("as_slice", &TruncatedDebug(self.as_slice()))
590            .finish()
591    }
592}
593
594impl<T> Default for BufferMut<T> {
595    fn default() -> Self {
596        Self::empty()
597    }
598}
599
600impl<T> Deref for BufferMut<T> {
601    type Target = [T];
602
603    #[inline]
604    fn deref(&self) -> &Self::Target {
605        self.as_slice()
606    }
607}
608
609impl<T> DerefMut for BufferMut<T> {
610    #[inline]
611    fn deref_mut(&mut self) -> &mut Self::Target {
612        self.as_mut_slice()
613    }
614}
615
616impl<T> AsRef<[T]> for BufferMut<T> {
617    #[inline]
618    fn as_ref(&self) -> &[T] {
619        self.as_slice()
620    }
621}
622
623impl<T> AsMut<[T]> for BufferMut<T> {
624    #[inline]
625    fn as_mut(&mut self) -> &mut [T] {
626        self.as_mut_slice()
627    }
628}
629
630impl<T> BufferMut<T> {
631    /// A helper method for the two [`Extend`] implementations.
632    ///
633    /// We use the lower bound hint on the iterator to manually write data, and then we continue to
634    /// push items normally past the lower bound.
635    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
636        // Since we do not know the length of the iterator, we can only guess how much memory we
637        // need to reserve. Note that these hints may be inaccurate.
638        let (lower_bound, _) = iter.size_hint();
639
640        // We choose not to use the optional upper bound size hint to match the standard library.
641
642        self.reserve(lower_bound);
643
644        let unwritten = self.capacity() - self.len();
645
646        // We store `begin` in the case that the lower bound hint is incorrect.
647        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
648        let mut dst: *mut T = begin.cast_mut();
649
650        // As a first step, we manually iterate the iterator up to the known capacity.
651        for _ in 0..unwritten {
652            let Some(item) = iter.next() else {
653                // The lower bound hint may be incorrect.
654                break;
655            };
656
657            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
658            // derived from a valid reference to byte data.
659            unsafe { dst.write(item) };
660
661            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
662            // slower than just incrementing `dst`.
663            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
664            // we know that `add` will not overflow.
665            unsafe { dst = dst.add(1) };
666        }
667
668        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
669        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
670        let items_written = unsafe { dst.offset_from_unsigned(begin) };
671        let length = self.len() + items_written;
672
673        // SAFETY: We have written valid items between the old length and the new length.
674        unsafe { self.set_len(length) };
675
676        // Finally, since the iterator will have arbitrarily more items to yield, we push the
677        // remaining items normally.
678        iter.for_each(|item| self.push(item));
679    }
680
681    /// Extends the `BufferMut` with an iterator with `TrustedLen`.
682    ///
683    /// The caller guarantees that the iterator will have a trusted upper bound, which allows the
684    /// implementation to reserve all of the memory needed up front.
685    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
686        // Since we know the exact upper bound (from `TrustedLen`), we can reserve all of the memory
687        // for this operation up front.
688        let (_, upper_bound) = iter.size_hint();
689        self.reserve(
690            upper_bound
691                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
692        );
693
694        // We store `begin` in the case that the upper bound hint is incorrect.
695        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
696        let mut dst: *mut T = begin.cast_mut();
697
698        iter.for_each(|item| {
699            // SAFETY: We have reserved enough capacity to hold this item, and `dst` is a pointer
700            // derived from a valid reference to byte data.
701            unsafe { dst.write(item) };
702
703            // Note: We used to have `dst.add(iteration).write(item)`, here. However this was much
704            // slower than just incrementing `dst`.
705            // SAFETY: The offsets fits in `isize`, and because we were able to reserve the memory
706            // we know that `add` will not overflow.
707            unsafe { dst = dst.add(1) };
708        });
709
710        // SAFETY: `dst` was derived from `begin`, which were both valid references to byte data,
711        // and since the only operation that `dst` has is `add`, we know that `dst >= begin`.
712        let items_written = unsafe { dst.offset_from_unsigned(begin) };
713        let length = self.len() + items_written;
714
715        // SAFETY: We have written valid items between the old length and the new length.
716        unsafe { self.set_len(length) };
717    }
718
719    /// Creates a `BufferMut` from an iterator with a trusted length.
720    ///
721    /// Internally, this calls [`extend_trusted()`](Self::extend_trusted).
722    pub fn from_trusted_len_iter<I>(iter: I) -> Self
723    where
724        I: TrustedLen<Item = T>,
725    {
726        let (_, upper_bound) = iter.size_hint();
727        let mut buffer = Self::with_capacity(
728            upper_bound
729                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
730        );
731
732        buffer.extend_trusted(iter);
733        buffer
734    }
735
736    /// Like [`extend_trusted()`](Self::extend_trusted), but the iterator yields `Result<T, E>`
737    /// and the extension short-circuits on the first `Err`.
738    ///
739    /// On error, items written before the failure remain in the buffer.
740    pub fn try_extend_trusted<E, I>(&mut self, iter: I) -> Result<(), E>
741    where
742        I: TrustedLen<Item = Result<T, E>>,
743    {
744        let (_, upper_bound) = iter.size_hint();
745        self.reserve(
746            upper_bound
747                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
748        );
749
750        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
751        let mut dst: *mut T = begin.cast_mut();
752        let mut result: Result<(), E> = Ok(());
753
754        for item in iter {
755            match item {
756                Ok(value) => {
757                    // SAFETY: We reserved enough capacity to hold this item, and `dst` is a
758                    // pointer derived from a valid reference to byte data.
759                    unsafe { dst.write(value) };
760                    // SAFETY: The offset fits in `isize` because we reserved that much capacity.
761                    unsafe { dst = dst.add(1) };
762                }
763                Err(e) => {
764                    result = Err(e);
765                    break;
766                }
767            }
768        }
769
770        // SAFETY: `dst` was derived from `begin`, both valid references to byte data, and
771        // `dst >= begin` since the only operation on `dst` is `add`.
772        let items_written = unsafe { dst.offset_from_unsigned(begin) };
773        let length = self.len() + items_written;
774        // SAFETY: We have written valid items between the old length and the new length.
775        unsafe { self.set_len(length) };
776
777        result
778    }
779
780    /// Like [`from_trusted_len_iter()`](Self::from_trusted_len_iter), but the iterator yields
781    /// `Result<T, E>` and construction short-circuits on the first `Err`.
782    pub fn try_from_trusted_len_iter<E, I>(iter: I) -> Result<Self, E>
783    where
784        I: TrustedLen<Item = Result<T, E>>,
785    {
786        let (_, upper_bound) = iter.size_hint();
787        let mut buffer = Self::with_capacity(
788            upper_bound
789                .vortex_expect("`TrustedLen` iterator somehow didn't have valid upper bound"),
790        );
791
792        buffer.try_extend_trusted(iter)?;
793        Ok(buffer)
794    }
795}
796
797impl<T> Extend<T> for BufferMut<T> {
798    #[inline]
799    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
800        self.extend_iter(iter.into_iter())
801    }
802}
803
804impl<'a, T> Extend<&'a T> for BufferMut<T>
805where
806    T: Copy + 'a,
807{
808    #[inline]
809    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
810        self.extend_iter(iter.into_iter().copied())
811    }
812}
813
814impl<T> FromIterator<T> for BufferMut<T> {
815    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
816        // We don't infer the capacity here and just let the first call to `extend` do it for us.
817        let mut buffer = Self::with_capacity(0);
818        buffer.extend(iter);
819        buffer
820    }
821}
822
823impl Buf for ByteBufferMut {
824    fn remaining(&self) -> usize {
825        self.len()
826    }
827
828    fn chunk(&self) -> &[u8] {
829        self.as_slice()
830    }
831
832    fn advance(&mut self, cnt: usize) {
833        if !self.alignment.is_offset_aligned(cnt) {
834            vortex_panic!(
835                "Cannot advance buffer by {} items, resulting alignment is not {}",
836                cnt,
837                self.alignment
838            );
839        }
840        self.bytes.advance(cnt);
841        self.length -= cnt;
842    }
843}
844
845/// As per the BufMut implementation, we must support internal resizing when
846/// asked to extend the buffer.
847/// See: <https://github.com/tokio-rs/bytes/issues/131>
848unsafe impl BufMut for ByteBufferMut {
849    #[inline]
850    fn remaining_mut(&self) -> usize {
851        usize::MAX - self.len()
852    }
853
854    #[inline]
855    unsafe fn advance_mut(&mut self, cnt: usize) {
856        if !self.alignment.is_offset_aligned(cnt) {
857            vortex_panic!(
858                "Cannot advance buffer by {} items, resulting alignment is not {}",
859                cnt,
860                self.alignment
861            );
862        }
863        unsafe { self.bytes.advance_mut(cnt) };
864        self.length -= cnt;
865    }
866
867    #[inline]
868    fn chunk_mut(&mut self) -> &mut UninitSlice {
869        self.bytes.chunk_mut()
870    }
871
872    fn put<T: Buf>(&mut self, mut src: T)
873    where
874        Self: Sized,
875    {
876        while src.has_remaining() {
877            let chunk = src.chunk();
878            self.extend_from_slice(chunk);
879            src.advance(chunk.len());
880        }
881    }
882
883    #[inline]
884    fn put_slice(&mut self, src: &[u8]) {
885        self.extend_from_slice(src);
886    }
887
888    #[inline]
889    fn put_bytes(&mut self, val: u8, cnt: usize) {
890        self.push_n(val, cnt)
891    }
892}
893
894/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
895trait AlignedBytesMut {
896    /// Align an empty `BytesMut` to the specified alignment.
897    ///
898    /// ## Panics
899    ///
900    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
901    fn align_empty(&mut self, alignment: Alignment);
902}
903
904impl AlignedBytesMut for BytesMut {
905    fn align_empty(&mut self, alignment: Alignment) {
906        // TODO(joe): this is slow fixme
907        if !self.is_empty() {
908            vortex_panic!("ByteBufferMut must be empty");
909        }
910
911        let padding = self.as_ptr().align_offset(*alignment);
912        self.capacity()
913            .checked_sub(padding)
914            .vortex_expect("Not enough capacity to align buffer");
915
916        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
917        // safely set the length to the padding and advance the buffer to the aligned offset.
918        unsafe { self.set_len(padding) };
919        self.advance(padding);
920    }
921}
922
923impl Write for ByteBufferMut {
924    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
925        self.extend_from_slice(buf);
926        Ok(buf.len())
927    }
928
929    fn flush(&mut self) -> std::io::Result<()> {
930        Ok(())
931    }
932}
933
934#[cfg(test)]
935mod test {
936    use bytes::Buf;
937    use bytes::BufMut;
938
939    use crate::Alignment;
940    use crate::BufferMut;
941    use crate::ByteBufferMut;
942    use crate::buffer_mut;
943
944    #[test]
945    fn capacity() {
946        let mut n = 57;
947        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
948        assert!(buf.capacity() >= 57);
949
950        while n > 0 {
951            buf.push(0);
952            assert!(buf.capacity() >= n);
953            n -= 1
954        }
955
956        assert_eq!(buf.alignment(), Alignment::new(1024));
957    }
958
959    #[test]
960    fn from_iter() {
961        let buf = BufferMut::from_iter([0, 10, 20, 30]);
962        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
963    }
964
965    #[test]
966    fn try_from_trusted_len_iter_ok() {
967        let buf = BufferMut::<i32>::try_from_trusted_len_iter(
968            [0, 10, 20, 30].iter().map(|&v| Ok::<_, ()>(v)),
969        )
970        .unwrap();
971        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
972    }
973
974    #[test]
975    fn try_from_trusted_len_iter_err() {
976        let result: Result<BufferMut<i32>, &'static str> = BufferMut::try_from_trusted_len_iter(
977            [0, 10, 20, 30]
978                .iter()
979                .map(|&v| if v == 20 { Err("bad") } else { Ok(v) }),
980        );
981        assert_eq!(result.err(), Some("bad"));
982    }
983
984    #[test]
985    fn extend() {
986        let mut buf = BufferMut::empty();
987        buf.extend([0i32, 10, 20, 30]);
988        buf.extend([40, 50, 60]);
989        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
990    }
991
992    #[test]
993    fn push() {
994        let mut buf = BufferMut::empty();
995        buf.push(1);
996        buf.push(2);
997        buf.push(3);
998        assert_eq!(buf.as_slice(), &[1, 2, 3]);
999    }
1000
1001    #[test]
1002    fn push_n() {
1003        let mut buf = BufferMut::empty();
1004        buf.push_n(0, 100);
1005        assert_eq!(buf.as_slice(), &[0; 100]);
1006    }
1007
1008    #[test]
1009    fn as_mut() {
1010        let mut buf = buffer_mut![0, 1, 2];
1011        // Uses DerefMut
1012        buf[1] = 0;
1013        // Uses as_mut
1014        buf.as_mut()[2] = 0;
1015        assert_eq!(buf.as_slice(), &[0, 0, 0]);
1016    }
1017
1018    #[test]
1019    fn map_each() {
1020        let buf = buffer_mut![0i32, 1, 2];
1021        // Add one, and cast to an unsigned u32 in the same closure
1022        let buf = buf.map_each_in_place(|i| (i + 1) as u32);
1023        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
1024    }
1025
1026    #[test]
1027    fn bytes_buf() {
1028        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
1029        assert_eq!(buf.remaining(), 10);
1030        assert_eq!(buf.chunk(), b"helloworld");
1031
1032        buf.advance(5);
1033        assert_eq!(buf.remaining(), 5);
1034        assert_eq!(buf.as_slice(), b"world");
1035        assert_eq!(buf.chunk(), b"world");
1036    }
1037
1038    #[test]
1039    fn bytes_buf_mut() {
1040        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
1041        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
1042
1043        buf.put_slice(b"world");
1044        assert_eq!(buf.as_slice(), b"helloworld");
1045    }
1046
1047    #[test]
1048    fn buffer_mut_zeroed() {
1049        const LEN: usize = 17;
1050
1051        let mut buf = BufferMut::<u32>::zeroed(LEN);
1052
1053        assert_eq!(buf.as_ptr().align_offset(*Alignment::of::<u32>()), 0);
1054        assert_eq!(buf.as_slice(), &[0; LEN]);
1055
1056        buf[3] = 7;
1057        assert_eq!(buf.as_slice()[3], 7);
1058    }
1059
1060    #[test]
1061    fn buffer_mut_zeroed_aligned() {
1062        const LEN: usize = 17;
1063        let alignment = Alignment::new(64);
1064
1065        let mut buf = BufferMut::<u32>::zeroed_aligned(LEN, alignment);
1066
1067        assert_eq!(buf.as_ptr().align_offset(*alignment), 0);
1068        assert_eq!(buf.as_slice(), &[0; LEN]);
1069
1070        buf[3] = 7;
1071        assert_eq!(buf.as_slice()[3], 7);
1072    }
1073}