Skip to main content

arrow_buffer/buffer/
mutable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24    bytes::Bytes,
25    native::{ArrowNativeType, ToByteSlice},
26    util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36/// A [`MutableBuffer`] is a wrapper over memory regions, used to build
37/// [`Buffer`]s out of items or slices of items.
38///
39/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to be
40/// aligned along cache lines and in multiples of 64 bytes.
41///
42/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
43/// to insert many items, and `into` to convert it to [`Buffer`]. For typed data,
44/// it is often more efficient to use [`Vec`] and convert it to [`Buffer`] rather
45/// than using [`MutableBuffer`] (see examples below).
46///
47/// # See Also
48/// * For a safe, strongly typed API consider using [`Vec`] and [`ScalarBuffer`](crate::ScalarBuffer)
49/// * To apply bitwise operations, see [`apply_bitwise_binary_op`] and [`apply_bitwise_unary_op`]
50///
51/// [`apply_bitwise_binary_op`]: crate::bit_util::apply_bitwise_binary_op
52/// [`apply_bitwise_unary_op`]: crate::bit_util::apply_bitwise_unary_op
53///
54/// # Example: Creating a [`Buffer`] from a [`MutableBuffer`]
55/// ```
56/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
57/// let mut buffer = MutableBuffer::new(0);
58/// buffer.push(256u32);
59/// buffer.extend_from_slice(&[1u32]);
60/// let buffer = Buffer::from(buffer);
61/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0])
62/// ```
63///
64/// The same can be achieved more efficiently by using a `Vec<u32>`
65/// ```
66/// # use arrow_buffer::buffer::Buffer;
67/// let mut vec = Vec::new();
68/// vec.push(256u32);
69/// vec.extend_from_slice(&[1u32]);
70/// let buffer = Buffer::from(vec);
71/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0]);
72/// ```
73///
74/// # Example: Creating a [`MutableBuffer`] from a `Vec<T>`
75/// ```
76/// # use arrow_buffer::buffer::MutableBuffer;
77/// let vec = vec![1u32, 2, 3];
78/// let mutable_buffer = MutableBuffer::from(vec); // reuses the allocation from vec
79/// assert_eq!(mutable_buffer.len(), 12); // 3 * 4 bytes
80/// ```
81///
82/// # Example: Creating a [`MutableBuffer`] from a [`Buffer`]
83/// ```
84/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
85/// let buffer: Buffer = Buffer::from(&[1u8, 2, 3, 4][..]);
86/// // Only possible to convert a Buffer into a MutableBuffer if uniquely owned
87/// // (i.e., there are no other references to it).
88/// let mut mutable_buffer = match buffer.into_mutable() {
89///    Ok(mutable) => mutable,
90///    Err(orig_buffer) => {
91///      panic!("buffer was not uniquely owned");
92///    }
93/// };
94/// mutable_buffer.push(5u8);
95/// let buffer = Buffer::from(mutable_buffer);
96/// assert_eq!(buffer.as_slice(), &[1u8, 2, 3, 4, 5])
97/// ```
98#[derive(Debug)]
99pub struct MutableBuffer {
100    // dangling iff capacity = 0
101    data: NonNull<u8>,
102    // invariant: len <= capacity
103    len: usize,
104    layout: Layout,
105
106    /// Memory reservation for tracking memory usage
107    #[cfg(feature = "pool")]
108    reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112    /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
113    ///
114    /// See [`MutableBuffer::with_capacity`].
115    ///
116    /// # Panics
117    ///
118    /// See [`MutableBuffer::with_capacity`].
119    #[inline]
120    pub fn new(capacity: usize) -> Self {
121        Self::with_capacity(capacity)
122    }
123
124    /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
125    ///
126    /// # Panics
127    ///
128    /// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater
129    /// then `isize::MAX`, then this function will panic.
130    #[inline]
131    pub fn with_capacity(capacity: usize) -> Self {
132        let capacity = bit_util::round_upto_multiple_of_64(capacity);
133        let layout = Layout::from_size_align(capacity, ALIGNMENT)
134            .expect("failed to create layout for MutableBuffer");
135        let data = match layout.size() {
136            0 => dangling_ptr(),
137            _ => {
138                // Safety: Verified size != 0
139                let raw_ptr = unsafe { std::alloc::alloc(layout) };
140                NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
141            }
142        };
143        Self {
144            data,
145            len: 0,
146            layout,
147            #[cfg(feature = "pool")]
148            reservation: std::sync::Mutex::new(None),
149        }
150    }
151
152    /// Allocates a new [MutableBuffer] with `len` and capacity to be at least `len` where
153    /// all bytes are guaranteed to be `0u8`.
154    /// # Example
155    /// ```
156    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
157    /// let mut buffer = MutableBuffer::from_len_zeroed(127);
158    /// assert_eq!(buffer.len(), 127);
159    /// assert!(buffer.capacity() >= 127);
160    /// let data = buffer.as_slice_mut();
161    /// assert_eq!(data[126], 0u8);
162    /// ```
163    ///
164    /// # Panics
165    ///
166    /// Panics if `len` is too large to construct a valid allocation [`Layout`]
167    pub fn from_len_zeroed(len: usize) -> Self {
168        let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
169        let data = match layout.size() {
170            0 => dangling_ptr(),
171            _ => {
172                // Safety: Verified size != 0
173                let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
174                NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
175            }
176        };
177        Self {
178            data,
179            len,
180            layout,
181            #[cfg(feature = "pool")]
182            reservation: std::sync::Mutex::new(None),
183        }
184    }
185
186    /// Allocates a new [MutableBuffer] from given `Bytes`.
187    pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
188        let layout = match bytes.deallocation() {
189            Deallocation::Standard(layout) => *layout,
190            _ => return Err(bytes),
191        };
192
193        let len = bytes.len();
194        let data = bytes.ptr();
195        #[cfg(feature = "pool")]
196        let reservation = bytes.reservation.lock().unwrap().take();
197        mem::forget(bytes);
198
199        Ok(Self {
200            data,
201            len,
202            layout,
203            #[cfg(feature = "pool")]
204            reservation: Mutex::new(reservation),
205        })
206    }
207
208    /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
209    /// This is useful to create a buffer for packed bitmaps.
210    ///
211    /// # Panics
212    ///
213    /// See [`MutableBuffer::from_len_zeroed`].
214    pub fn new_null(len: usize) -> Self {
215        let num_bytes = bit_util::ceil(len, 8);
216        MutableBuffer::from_len_zeroed(num_bytes)
217    }
218
219    /// Set the bits in the range of `[0, end)` to 0 (if `val` is false), or 1 (if `val`
220    /// is true). Also extend the length of this buffer to be `end`.
221    ///
222    /// This is useful when one wants to clear (or set) the bits and then manipulate
223    /// the buffer directly (e.g., modifying the buffer by holding a mutable reference
224    /// from `data_mut()`).
225    ///
226    /// # Panics
227    ///
228    /// Panics if `end` exceeds the buffer capacity.
229    pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
230        assert!(end <= self.layout.size());
231        let v = if val { 255 } else { 0 };
232        unsafe {
233            std::ptr::write_bytes(self.data.as_ptr(), v, end);
234            self.len = end;
235        }
236        self
237    }
238
239    /// Ensure that `count` bytes from `start` contain zero bits
240    ///
241    /// This is used to initialize the bits in a buffer, however, it has no impact on the
242    /// `len` of the buffer and so can be used to initialize the memory region from
243    /// `len` to `capacity`.
244    ///
245    /// # Panics
246    ///
247    /// Panics if the byte range `start..start + count` exceeds the buffer capacity.
248    pub fn set_null_bits(&mut self, start: usize, count: usize) {
249        assert!(
250            start.saturating_add(count) <= self.layout.size(),
251            "range start index {start} and count {count} out of bounds for \
252            buffer of length {}",
253            self.layout.size(),
254        );
255
256        // Safety: `self.data[start..][..count]` is in-bounds and well-aligned for `u8`
257        unsafe {
258            std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
259        }
260    }
261
262    /// Ensures that this buffer has at least `self.len + additional` bytes. This re-allocates iff
263    /// `self.len + additional > capacity`.
264    /// # Example
265    /// ```
266    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
267    /// let mut buffer = MutableBuffer::new(0);
268    /// buffer.reserve(253); // allocates for the first time
269    /// (0..253u8).for_each(|i| buffer.push(i)); // no reallocation
270    /// let buffer: Buffer = buffer.into();
271    /// assert_eq!(buffer.len(), 253);
272    /// ```
273    ///
274    /// # Panics
275    ///
276    /// Panics if `self.len + additional` overflows `usize`, or if the required capacity is too
277    /// large to round up to the next 64-byte boundary and construct a valid allocation layout.
278    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
279    // exits.
280    #[inline(always)]
281    pub fn reserve(&mut self, additional: usize) {
282        let required_cap = self
283            .len
284            .checked_add(additional)
285            .expect("buffer length overflow");
286        if required_cap > self.layout.size() {
287            let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
288            let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
289            self.reallocate(new_capacity)
290        }
291    }
292
293    /// Adding to this mutable buffer `slice_to_repeat` repeated `repeat_count` times.
294    ///
295    /// # Example
296    ///
297    /// ## Repeat the same string bytes multiple times
298    /// ```
299    /// # use arrow_buffer::buffer::MutableBuffer;
300    /// let mut buffer = MutableBuffer::new(0);
301    /// let bytes_to_repeat = b"ab";
302    /// buffer.repeat_slice_n_times(bytes_to_repeat, 3);
303    /// assert_eq!(buffer.as_slice(), b"ababab");
304    /// ```
305    ///
306    /// # Panics
307    ///
308    /// Panics if the repeated slice byte length overflows `usize`, if the resulting buffer
309    /// length overflows `usize`, or if reserving the required capacity fails for the same
310    /// reasons as [`MutableBuffer::reserve`].
311    pub fn repeat_slice_n_times<T: ArrowNativeType>(
312        &mut self,
313        slice_to_repeat: &[T],
314        repeat_count: usize,
315    ) {
316        if repeat_count == 0 || slice_to_repeat.is_empty() {
317            return;
318        }
319
320        let bytes_to_repeat = size_of_val(slice_to_repeat);
321        let repeated_bytes = repeat_count
322            .checked_mul(bytes_to_repeat)
323            .expect("repeated slice byte length overflow");
324        self.len
325            .checked_add(repeated_bytes)
326            .expect("mutable buffer length overflow");
327
328        // Ensure capacity
329        self.reserve(repeated_bytes);
330
331        // Save the length before we do all the copies to know where to start from
332        let length_before = self.len;
333
334        // Copy the initial slice once so we can use doubling strategy on it
335        self.extend_from_slice(slice_to_repeat);
336
337        // This tracks how much bytes we have added by repeating so far
338        let added_repeats_length = bytes_to_repeat;
339        assert_eq!(
340            self.len - length_before,
341            added_repeats_length,
342            "should copy exactly the same number of bytes"
343        );
344
345        // Number of times the slice was repeated
346        let mut already_repeated_times = 1;
347
348        // We will use doubling strategy to fill the buffer in log(repeat_count) steps
349        while already_repeated_times < repeat_count {
350            // How many slices can we copy in this iteration
351            // (either double what we have, or just the remaining ones)
352            let number_of_slices_to_copy =
353                already_repeated_times.min(repeat_count - already_repeated_times);
354            let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
355
356            unsafe {
357                // Get to the start of the data before we started copying anything
358                let src = self.data.as_ptr().add(length_before) as *const u8;
359
360                // Go to the current location to copy to (end of current data)
361                let dst = self.data.as_ptr().add(self.len);
362
363                // SAFETY: the pointers are not overlapping as there is `number_of_bytes_to_copy` or less between them
364                std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
365            }
366
367            // Advance the length by the amount of data we just copied (doubled)
368            self.len += number_of_bytes_to_copy;
369
370            already_repeated_times += number_of_slices_to_copy;
371        }
372    }
373
374    #[cold]
375    fn reallocate(&mut self, capacity: usize) {
376        let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
377        if new_layout.size() == 0 {
378            if self.layout.size() != 0 {
379                // Safety: data was allocated with layout
380                unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
381                self.layout = new_layout
382            }
383            return;
384        }
385
386        let data = match self.layout.size() {
387            // Safety: new_layout is not empty
388            0 => unsafe { std::alloc::alloc(new_layout) },
389            // Safety: verified new layout is valid and not empty
390            _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
391        };
392        self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
393        self.layout = new_layout;
394        #[cfg(feature = "pool")]
395        {
396            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
397                reservation.resize(self.layout.size());
398            }
399        }
400    }
401
402    /// Truncates this buffer to `len` bytes
403    ///
404    /// If `len` is greater than the buffer's current length, this has no effect
405    #[inline(always)]
406    pub fn truncate(&mut self, len: usize) {
407        if len > self.len {
408            return;
409        }
410        self.len = len;
411        #[cfg(feature = "pool")]
412        {
413            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
414                reservation.resize(self.len);
415            }
416        }
417    }
418
419    /// Resizes the buffer, either truncating its contents (with no change in capacity), or
420    /// growing it (potentially reallocating it) and writing `value` in the newly available bytes.
421    /// # Example
422    /// ```
423    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
424    /// let mut buffer = MutableBuffer::new(0);
425    /// buffer.resize(253, 2); // allocates for the first time
426    /// assert_eq!(buffer.as_slice()[252], 2u8);
427    /// ```
428    ///
429    /// # Panics
430    ///
431    /// Panics if growing the buffer requires reserving a capacity that fails for the same
432    /// reasons as [`MutableBuffer::reserve`].
433    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
434    // exits.
435    #[inline(always)]
436    pub fn resize(&mut self, new_len: usize, value: u8) {
437        if new_len > self.len {
438            let diff = new_len - self.len;
439            self.reserve(diff);
440            // write the value
441            unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
442        }
443        // this truncates the buffer when new_len < self.len
444        self.len = new_len;
445        #[cfg(feature = "pool")]
446        {
447            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
448                reservation.resize(self.len);
449            }
450        }
451    }
452
453    /// Shrinks the capacity of the buffer as much as possible.
454    /// The new capacity will aligned to the nearest 64 bit alignment.
455    ///
456    /// # Example
457    /// ```
458    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
459    /// // 2 cache lines
460    /// let mut buffer = MutableBuffer::new(128);
461    /// assert_eq!(buffer.capacity(), 128);
462    /// buffer.push(1);
463    /// buffer.push(2);
464    ///
465    /// buffer.shrink_to_fit();
466    /// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
467    /// ```
468    ///
469    /// # Panics
470    ///
471    /// Panics if the current length is too large to round up to the next 64-byte boundary and
472    /// construct a valid allocation layout.
473    pub fn shrink_to_fit(&mut self) {
474        let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
475        if new_capacity < self.layout.size() {
476            self.reallocate(new_capacity)
477        }
478    }
479
480    /// Returns whether this buffer is empty or not.
481    #[inline]
482    pub const fn is_empty(&self) -> bool {
483        self.len == 0
484    }
485
486    /// Returns the length (the number of bytes written) in this buffer.
487    /// The invariant `buffer.len() <= buffer.capacity()` is always upheld.
488    #[inline]
489    pub const fn len(&self) -> usize {
490        self.len
491    }
492
493    /// Returns the total capacity in this buffer, in bytes.
494    ///
495    /// The invariant `buffer.len() <= buffer.capacity()` is always upheld.
496    #[inline]
497    pub const fn capacity(&self) -> usize {
498        self.layout.size()
499    }
500
501    /// Clear all existing data from this buffer.
502    pub fn clear(&mut self) {
503        self.len = 0;
504        #[cfg(feature = "pool")]
505        {
506            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
507                reservation.resize(self.len);
508            }
509        }
510    }
511
512    /// Returns the data stored in this buffer as a slice.
513    pub fn as_slice(&self) -> &[u8] {
514        self
515    }
516
517    /// Returns the data stored in this buffer as a mutable slice.
518    pub fn as_slice_mut(&mut self) -> &mut [u8] {
519        self
520    }
521
522    /// Returns a raw pointer to this buffer's internal memory
523    /// This pointer is guaranteed to be aligned along cache-lines.
524    #[inline]
525    pub const fn as_ptr(&self) -> *const u8 {
526        self.data.as_ptr()
527    }
528
529    /// Returns a mutable raw pointer to this buffer's internal memory
530    /// This pointer is guaranteed to be aligned along cache-lines.
531    #[inline]
532    pub fn as_mut_ptr(&mut self) -> *mut u8 {
533        self.data.as_ptr()
534    }
535
536    #[inline]
537    pub(super) fn into_buffer(self) -> Buffer {
538        let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
539        #[cfg(feature = "pool")]
540        {
541            let reservation = self.reservation.lock().unwrap().take();
542            *bytes.reservation.lock().unwrap() = reservation;
543        }
544        std::mem::forget(self);
545        Buffer::from(bytes)
546    }
547
548    /// View this buffer as a mutable slice of a specific type.
549    ///
550    /// # Panics
551    ///
552    /// This function panics if the underlying buffer is not aligned correctly for type `T`, or
553    /// if its length is not a multiple of `size_of::<T>()`.
554    pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
555        // SAFETY
556        // ArrowNativeType is trivially transmutable, is sealed to prevent potentially incorrect
557        // implementation outside this crate, and this method checks alignment
558        let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
559        assert!(prefix.is_empty() && suffix.is_empty());
560        offsets
561    }
562
563    /// View buffer as a immutable slice of a specific type.
564    ///
565    /// # Panics
566    ///
567    /// This function panics if the underlying buffer is not aligned correctly for type `T`, or
568    /// if its length is not a multiple of `size_of::<T>()`.
569    pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
570        // SAFETY
571        // ArrowNativeType is trivially transmutable, is sealed to prevent potentially incorrect
572        // implementation outside this crate, and this method checks alignment
573        let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
574        assert!(prefix.is_empty() && suffix.is_empty());
575        offsets
576    }
577
578    /// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed.
579    /// # Example
580    /// ```
581    /// # use arrow_buffer::buffer::MutableBuffer;
582    /// let mut buffer = MutableBuffer::new(0);
583    /// buffer.extend_from_slice(&[2u32, 0]);
584    /// assert_eq!(buffer.len(), 8) // u32 has 4 bytes
585    /// ```
586    ///
587    /// # Panics
588    ///
589    /// Panics if extending the buffer requires reserving a capacity that fails for the same
590    /// reasons as [`MutableBuffer::reserve`].
591    #[inline]
592    pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
593        let additional = mem::size_of_val(items);
594        self.reserve(additional);
595        unsafe {
596            // this assumes that `[ToByteSlice]` can be copied directly
597            // without calling `to_byte_slice` for each element,
598            // which is correct for all ArrowNativeType implementations.
599            let src = items.as_ptr() as *const u8;
600            let dst = self.data.as_ptr().add(self.len);
601            std::ptr::copy_nonoverlapping(src, dst, additional)
602        }
603        self.len += additional;
604    }
605
606    /// Extends the buffer with a new item, increasing its capacity if needed.
607    /// # Example
608    /// ```
609    /// # use arrow_buffer::buffer::MutableBuffer;
610    /// let mut buffer = MutableBuffer::new(0);
611    /// buffer.push(256u32);
612    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
613    /// ```
614    ///
615    /// # Panics
616    ///
617    /// Panics if extending the buffer requires reserving a capacity that fails for the same
618    /// reasons as [`MutableBuffer::reserve`].
619    #[inline]
620    pub fn push<T: ToByteSlice>(&mut self, item: T) {
621        let additional = std::mem::size_of::<T>();
622        self.reserve(additional);
623        unsafe {
624            let src = item.to_byte_slice().as_ptr();
625            let dst = self.data.as_ptr().add(self.len);
626            std::ptr::copy_nonoverlapping(src, dst, additional);
627        }
628        self.len += additional;
629    }
630
631    /// Extends the buffer with a new item, without checking for sufficient capacity
632    /// # Safety
633    /// Caller must ensure that the capacity()-len()>=`size_of<T>`()
634    #[inline]
635    pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
636        let additional = std::mem::size_of::<T>();
637        let src = item.to_byte_slice().as_ptr();
638        let dst = unsafe { self.data.as_ptr().add(self.len) };
639        unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
640        self.len += additional;
641    }
642
643    /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
644    ///
645    /// # Panics
646    ///
647    /// Panics if `self.len + additional` overflows `usize`, or if growing the buffer requires
648    /// reserving a capacity that fails for the same reasons as [`MutableBuffer::reserve`].
649    #[inline]
650    pub fn extend_zeros(&mut self, additional: usize) {
651        let new_len = self
652            .len
653            .checked_add(additional)
654            .expect("buffer length overflow");
655        self.resize(new_len, 0);
656    }
657
658    /// # Safety
659    /// The caller must ensure that the buffer was properly initialized up to `len`.
660    ///
661    /// # Panics
662    ///
663    /// Panics if `len` exceeds the buffer capacity.
664    #[inline]
665    pub unsafe fn set_len(&mut self, len: usize) {
666        assert!(len <= self.capacity());
667        self.len = len;
668    }
669
670    /// Invokes `f` with values `0..len` collecting the boolean results into a new `MutableBuffer`
671    ///
672    /// This is similar to `from_trusted_len_iter_bool`, however, can be significantly faster
673    /// as it eliminates the conditional `Iterator::next`
674    #[inline]
675    pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
676        let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
677
678        let chunks = len / 64;
679        let remainder = len % 64;
680        buffer.extend((0..chunks).map(|chunk| {
681            let mut packed = 0;
682            for bit_idx in 0..64 {
683                let i = bit_idx + chunk * 64;
684                packed |= (f(i) as u64) << bit_idx;
685            }
686
687            packed
688        }));
689
690        if remainder != 0 {
691            let mut packed = 0;
692            for bit_idx in 0..remainder {
693                let i = bit_idx + chunks * 64;
694                packed |= (f(i) as u64) << bit_idx;
695            }
696
697            buffer.push(packed)
698        }
699
700        let mut buffer: MutableBuffer = buffer.into();
701        buffer.truncate(bit_util::ceil(len, 8));
702        buffer
703    }
704
705    /// Extends this buffer with boolean values.
706    ///
707    /// This requires `iter` to report an exact size via `size_hint`.
708    /// `offset` indicates the starting offset in bits in this buffer to begin writing to
709    /// and must be less than or equal to the current length of this buffer.
710    /// All bits not written to (but readable due to byte alignment) will be zeroed out.
711    ///
712    /// # Panics
713    ///
714    /// Panics if `iter` does not report an exact size via `size_hint`, or if it yields fewer
715    /// items than reported, or if extending the buffer requires reserving a capacity that fails
716    /// for the same reasons as [`MutableBuffer::reserve`].
717    ///
718    /// # Safety
719    /// Callers must ensure that `iter` reports an exact size via `size_hint`.
720    #[inline]
721    pub unsafe fn extend_bool_trusted_len<I: Iterator<Item = bool>>(
722        &mut self,
723        mut iter: I,
724        offset: usize,
725    ) {
726        let (lower, upper) = iter.size_hint();
727        let len = upper.expect("Iterator must have exact size_hint");
728        assert_eq!(lower, len, "Iterator must have exact size_hint");
729        debug_assert!(
730            offset <= self.len * 8,
731            "offset must be <= buffer length in bits"
732        );
733
734        if len == 0 {
735            return;
736        }
737
738        let start_len = offset;
739        let end_bit = start_len + len;
740
741        // SAFETY: we will initialize all newly exposed bytes before they are read
742        let new_len_bytes = bit_util::ceil(end_bit, 8);
743        if new_len_bytes > self.len {
744            self.reserve(new_len_bytes - self.len);
745            // SAFETY: caller will initialize all newly exposed bytes before they are read
746            unsafe { self.set_len(new_len_bytes) };
747        }
748
749        let slice = self.as_slice_mut();
750
751        let mut bit_idx = start_len;
752
753        // ---- Unaligned prefix: advance to the next 64-bit boundary ----
754        let misalignment = bit_idx & 63;
755        let prefix_bits = if misalignment == 0 {
756            0
757        } else {
758            (64 - misalignment).min(end_bit - bit_idx)
759        };
760
761        if prefix_bits != 0 {
762            let byte_start = bit_idx / 8;
763            let byte_end = bit_util::ceil(bit_idx + prefix_bits, 8);
764            let bit_offset = bit_idx % 8;
765
766            // Clear any newly-visible bits in the existing partial byte
767            if bit_offset != 0 {
768                let keep_mask = (1u8 << bit_offset).wrapping_sub(1);
769                slice[byte_start] &= keep_mask;
770            }
771
772            // Zero any new bytes we will partially fill in this prefix
773            let zero_from = if bit_offset == 0 {
774                byte_start
775            } else {
776                byte_start + 1
777            };
778            if byte_end > zero_from {
779                slice[zero_from..byte_end].fill(0);
780            }
781
782            for _ in 0..prefix_bits {
783                let v = iter.next().unwrap();
784                if v {
785                    let byte_idx = bit_idx / 8;
786                    let bit = bit_idx % 8;
787                    slice[byte_idx] |= 1 << bit;
788                }
789                bit_idx += 1;
790            }
791        }
792
793        if bit_idx < end_bit {
794            // ---- Aligned middle: write u64 chunks ----
795            debug_assert_eq!(bit_idx & 63, 0);
796            let remaining_bits = end_bit - bit_idx;
797            let chunks = remaining_bits / 64;
798
799            let words_start = bit_idx / 8;
800            let words_end = words_start + chunks * 8;
801            for dst in slice[words_start..words_end].chunks_exact_mut(8) {
802                let mut packed: u64 = 0;
803                for i in 0..64 {
804                    packed |= (iter.next().unwrap() as u64) << i;
805                }
806                dst.copy_from_slice(&packed.to_le_bytes());
807                bit_idx += 64;
808            }
809
810            // ---- Unaligned suffix: remaining < 64 bits ----
811            let suffix_bits = end_bit - bit_idx;
812            if suffix_bits != 0 {
813                debug_assert_eq!(bit_idx % 8, 0);
814                let byte_start = bit_idx / 8;
815                let byte_end = bit_util::ceil(end_bit, 8);
816                slice[byte_start..byte_end].fill(0);
817
818                for _ in 0..suffix_bits {
819                    let v = iter.next().unwrap();
820                    if v {
821                        let byte_idx = bit_idx / 8;
822                        let bit = bit_idx % 8;
823                        slice[byte_idx] |= 1 << bit;
824                    }
825                    bit_idx += 1;
826                }
827            }
828        }
829
830        // Clear any unused bits in the last byte
831        let remainder = end_bit % 8;
832        if remainder != 0 {
833            let mask = (1u8 << remainder).wrapping_sub(1);
834            slice[bit_util::ceil(end_bit, 8) - 1] &= mask;
835        }
836
837        debug_assert_eq!(bit_idx, end_bit);
838    }
839
840    /// Register this [`MutableBuffer`] with the provided [`MemoryPool`]
841    ///
842    /// This claims the memory used by this buffer in the pool, allowing for
843    /// accurate accounting of memory usage. Any prior reservation will be
844    /// released so this works well when the buffer is being shared among
845    /// multiple arrays.
846    #[cfg(feature = "pool")]
847    pub fn claim(&self, pool: &dyn MemoryPool) {
848        *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
849    }
850}
851
852/// Creates a non-null pointer with alignment of [`ALIGNMENT`]
853///
854/// This is similar to [`NonNull::dangling`]
855#[inline]
856pub(crate) fn dangling_ptr() -> NonNull<u8> {
857    // SAFETY: ALIGNMENT is a non-zero usize which is then cast
858    // to a *mut u8. Therefore, `ptr` is not null and the conditions for
859    // calling new_unchecked() are respected.
860    #[cfg(miri)]
861    {
862        // Since miri implies a nightly rust version we can use the unstable strict_provenance feature
863        unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
864    }
865    #[cfg(not(miri))]
866    {
867        unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
868    }
869}
870
871impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
872    #[inline]
873    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
874        let iterator = iter.into_iter();
875        self.extend_from_iter(iterator)
876    }
877}
878
879impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
880    fn from(value: Vec<T>) -> Self {
881        // Safety
882        // Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
883        let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
884        let len = value.len() * mem::size_of::<T>();
885        // Safety
886        // Vec guaranteed to have a valid layout matching that of `Layout::array`
887        // This is based on `RawVec::current_memory`
888        let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
889        mem::forget(value);
890        Self {
891            data,
892            len,
893            layout,
894            #[cfg(feature = "pool")]
895            reservation: std::sync::Mutex::new(None),
896        }
897    }
898}
899
900impl MutableBuffer {
901    #[inline]
902    pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
903        &mut self,
904        mut iterator: I,
905    ) {
906        let item_size = std::mem::size_of::<T>();
907        let (lower, _) = iterator.size_hint();
908        let additional = lower * item_size;
909        self.reserve(additional);
910
911        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
912        let mut len = SetLenOnDrop::new(&mut self.len);
913        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
914        let capacity = self.layout.size();
915
916        while len.local_len + item_size <= capacity {
917            if let Some(item) = iterator.next() {
918                unsafe {
919                    let src = item.to_byte_slice().as_ptr();
920                    std::ptr::copy_nonoverlapping(src, dst, item_size);
921                    dst = dst.add(item_size);
922                }
923                len.local_len += item_size;
924            } else {
925                break;
926            }
927        }
928        drop(len);
929
930        iterator.for_each(|item| self.push(item));
931    }
932
933    /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length.
934    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
935    /// # Example
936    /// ```
937    /// # use arrow_buffer::buffer::MutableBuffer;
938    /// let v = vec![1u32];
939    /// let iter = v.iter().map(|x| x * 2);
940    /// let buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
941    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
942    /// ```
943    ///
944    /// # Panics
945    ///
946    /// Panics if the iterator does not report an upper bound via `size_hint`, or if the
947    /// reported length does not match the number of items produced, or if allocating the
948    /// required buffer fails for the same reasons as [`MutableBuffer::new`].
949    ///
950    /// # Safety
951    /// This method assumes that the iterator's size is correct and is undefined behavior
952    /// to use it on an iterator that reports an incorrect length.
953    // This implementation is required for two reasons:
954    // 1. there is no trait `TrustedLen` in stable rust and therefore
955    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
956    // 2. `from_trusted_len_iter` is faster.
957    #[inline]
958    pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
959        iterator: I,
960    ) -> Self {
961        let item_size = std::mem::size_of::<T>();
962        let (_, upper) = iterator.size_hint();
963        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
964        let len = upper * item_size;
965
966        let mut buffer = MutableBuffer::new(len);
967
968        let mut dst = buffer.data.as_ptr();
969        for item in iterator {
970            // note how there is no reserve here (compared with `extend_from_iter`)
971            let src = item.to_byte_slice().as_ptr();
972            unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
973            dst = unsafe { dst.add(item_size) };
974        }
975        assert_eq!(
976            unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
977            len,
978            "Trusted iterator length was not accurately reported"
979        );
980        buffer.len = len;
981        buffer
982    }
983
984    /// Creates a [`MutableBuffer`] from a boolean [`Iterator`] with a trusted (upper) length.
985    /// # use arrow_buffer::buffer::MutableBuffer;
986    /// # Example
987    /// ```
988    /// # use arrow_buffer::buffer::MutableBuffer;
989    /// let v = vec![false, true, false];
990    /// let iter = v.iter().map(|x| *x || true);
991    /// let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(iter) };
992    /// assert_eq!(buffer.len(), 1) // 3 booleans have 1 byte
993    /// ```
994    ///
995    /// # Panics
996    ///
997    /// Panics if the iterator does not report an upper bound via `size_hint`, or if it yields
998    /// fewer items than reported.
999    ///
1000    /// # Safety
1001    /// This method assumes that the iterator's size is correct and is undefined behavior
1002    /// to use it on an iterator that reports an incorrect length.
1003    // This implementation is required for two reasons:
1004    // 1. there is no trait `TrustedLen` in stable rust and therefore
1005    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
1006    // 2. `from_trusted_len_iter_bool` is faster.
1007    #[inline]
1008    pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
1009        let (_, upper) = iterator.size_hint();
1010        let len = upper.expect("from_trusted_len_iter requires an upper limit");
1011
1012        Self::collect_bool(len, |_| iterator.next().unwrap())
1013    }
1014
1015    /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length or errors
1016    /// if any of the items of the iterator is an error.
1017    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
1018    ///
1019    /// # Panics
1020    ///
1021    /// Panics if the iterator does not report an upper bound via `size_hint`, or if the
1022    /// reported length does not match the number of items produced before an error-free finish,
1023    /// or if allocating the required buffer fails for the same reasons as
1024    /// [`MutableBuffer::new`].
1025    ///
1026    /// # Safety
1027    /// This method assumes that the iterator's size is correct and is undefined behavior
1028    /// to use it on an iterator that reports an incorrect length.
1029    #[inline]
1030    pub unsafe fn try_from_trusted_len_iter<
1031        E,
1032        T: ArrowNativeType,
1033        I: Iterator<Item = Result<T, E>>,
1034    >(
1035        iterator: I,
1036    ) -> Result<Self, E> {
1037        let item_size = std::mem::size_of::<T>();
1038        let (_, upper) = iterator.size_hint();
1039        let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
1040        let len = upper * item_size;
1041
1042        let mut buffer = MutableBuffer::new(len);
1043
1044        let mut dst = buffer.data.as_ptr();
1045        for item in iterator {
1046            let item = item?;
1047            // note how there is no reserve here (compared with `extend_from_iter`)
1048            let src = item.to_byte_slice().as_ptr();
1049            unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
1050            dst = unsafe { dst.add(item_size) };
1051        }
1052        // try_from_trusted_len_iter is instantiated a lot, so we extract part of it into a less
1053        // generic method to reduce compile time
1054        unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
1055            unsafe {
1056                assert_eq!(
1057                    dst.offset_from(buffer.data.as_ptr()) as usize,
1058                    len,
1059                    "Trusted iterator length was not accurately reported"
1060                );
1061                buffer.len = len;
1062            }
1063        }
1064        unsafe { finalize_buffer(dst, &mut buffer, len) };
1065        Ok(buffer)
1066    }
1067}
1068
1069impl Default for MutableBuffer {
1070    fn default() -> Self {
1071        Self::with_capacity(0)
1072    }
1073}
1074
1075impl std::ops::Deref for MutableBuffer {
1076    type Target = [u8];
1077
1078    fn deref(&self) -> &[u8] {
1079        unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
1080    }
1081}
1082
1083impl std::ops::DerefMut for MutableBuffer {
1084    fn deref_mut(&mut self) -> &mut [u8] {
1085        unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
1086    }
1087}
1088
1089impl AsRef<[u8]> for &MutableBuffer {
1090    fn as_ref(&self) -> &[u8] {
1091        self.as_slice()
1092    }
1093}
1094
1095impl Drop for MutableBuffer {
1096    fn drop(&mut self) {
1097        if self.layout.size() != 0 {
1098            // Safety: data was allocated with standard allocator with given layout
1099            unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
1100        }
1101    }
1102}
1103
1104impl PartialEq for MutableBuffer {
1105    fn eq(&self, other: &MutableBuffer) -> bool {
1106        if self.len != other.len {
1107            return false;
1108        }
1109        if self.layout != other.layout {
1110            return false;
1111        }
1112        self.as_slice() == other.as_slice()
1113    }
1114}
1115
1116unsafe impl Sync for MutableBuffer {}
1117unsafe impl Send for MutableBuffer {}
1118
1119struct SetLenOnDrop<'a> {
1120    len: &'a mut usize,
1121    local_len: usize,
1122}
1123
1124impl<'a> SetLenOnDrop<'a> {
1125    #[inline]
1126    fn new(len: &'a mut usize) -> Self {
1127        SetLenOnDrop {
1128            local_len: *len,
1129            len,
1130        }
1131    }
1132}
1133
1134impl Drop for SetLenOnDrop<'_> {
1135    #[inline]
1136    fn drop(&mut self) {
1137        *self.len = self.local_len;
1138    }
1139}
1140
1141/// Creating a `MutableBuffer` instance by setting bits according to the boolean values
1142impl std::iter::FromIterator<bool> for MutableBuffer {
1143    fn from_iter<I>(iter: I) -> Self
1144    where
1145        I: IntoIterator<Item = bool>,
1146    {
1147        let mut iterator = iter.into_iter();
1148        let mut result = {
1149            let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
1150            MutableBuffer::new(byte_capacity)
1151        };
1152
1153        loop {
1154            let mut exhausted = false;
1155            let mut byte_accum: u8 = 0;
1156            let mut mask: u8 = 1;
1157
1158            //collect (up to) 8 bits into a byte
1159            while mask != 0 {
1160                if let Some(value) = iterator.next() {
1161                    byte_accum |= match value {
1162                        true => mask,
1163                        false => 0,
1164                    };
1165                    mask <<= 1;
1166                } else {
1167                    exhausted = true;
1168                    break;
1169                }
1170            }
1171
1172            // break if the iterator was exhausted before it provided a bool for this byte
1173            if exhausted && mask == 1 {
1174                break;
1175            }
1176
1177            //ensure we have capacity to write the byte
1178            if result.len() == result.capacity() {
1179                //no capacity for new byte, allocate 1 byte more (plus however many more the iterator advertises)
1180                let additional_byte_capacity = 1usize.saturating_add(
1181                    iterator.size_hint().0.saturating_add(7) / 8, //convert bit count to byte count, rounding up
1182                );
1183                result.reserve(additional_byte_capacity)
1184            }
1185
1186            // Soundness: capacity was allocated above
1187            unsafe { result.push_unchecked(byte_accum) };
1188            if exhausted {
1189                break;
1190            }
1191        }
1192        result
1193    }
1194}
1195
1196impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
1197    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1198        let mut buffer = Self::default();
1199        buffer.extend_from_iter(iter.into_iter());
1200        buffer
1201    }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207
1208    #[test]
1209    fn test_mutable_new() {
1210        let buf = MutableBuffer::new(63);
1211        assert_eq!(64, buf.capacity());
1212        assert_eq!(0, buf.len());
1213        assert!(buf.is_empty());
1214    }
1215
1216    #[test]
1217    fn test_mutable_default() {
1218        let buf = MutableBuffer::default();
1219        assert_eq!(0, buf.capacity());
1220        assert_eq!(0, buf.len());
1221        assert!(buf.is_empty());
1222
1223        let mut buf = MutableBuffer::default();
1224        buf.extend_from_slice(b"hello");
1225        assert_eq!(5, buf.len());
1226        assert_eq!(b"hello", buf.as_slice());
1227    }
1228
1229    #[test]
1230    fn test_mutable_extend_from_slice() {
1231        let mut buf = MutableBuffer::new(100);
1232        buf.extend_from_slice(b"hello");
1233        assert_eq!(5, buf.len());
1234        assert_eq!(b"hello", buf.as_slice());
1235
1236        buf.extend_from_slice(b" world");
1237        assert_eq!(11, buf.len());
1238        assert_eq!(b"hello world", buf.as_slice());
1239
1240        buf.clear();
1241        assert_eq!(0, buf.len());
1242        buf.extend_from_slice(b"hello arrow");
1243        assert_eq!(11, buf.len());
1244        assert_eq!(b"hello arrow", buf.as_slice());
1245    }
1246
1247    #[test]
1248    fn mutable_extend_from_iter() {
1249        let mut buf = MutableBuffer::new(0);
1250        buf.extend(vec![1u32, 2]);
1251        assert_eq!(8, buf.len());
1252        assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1253
1254        buf.extend(vec![3u32, 4]);
1255        assert_eq!(16, buf.len());
1256        assert_eq!(
1257            &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1258            buf.as_slice()
1259        );
1260    }
1261
1262    #[test]
1263    fn mutable_extend_from_iter_unaligned_u64() {
1264        let mut buf = MutableBuffer::new(16);
1265        buf.push(1_u8);
1266        buf.extend([1_u64]);
1267        assert_eq!(9, buf.len());
1268        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1269    }
1270
1271    #[test]
1272    fn mutable_extend_from_slice_unaligned_u64() {
1273        let mut buf = MutableBuffer::new(16);
1274        buf.extend_from_slice(&[1_u8]);
1275        buf.extend_from_slice(&[1_u64]);
1276        assert_eq!(9, buf.len());
1277        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1278    }
1279
1280    #[test]
1281    fn mutable_push_unaligned_u64() {
1282        let mut buf = MutableBuffer::new(16);
1283        buf.push(1_u8);
1284        buf.push(1_u64);
1285        assert_eq!(9, buf.len());
1286        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1287    }
1288
1289    #[test]
1290    fn mutable_push_unchecked_unaligned_u64() {
1291        let mut buf = MutableBuffer::new(16);
1292        unsafe {
1293            buf.push_unchecked(1_u8);
1294            buf.push_unchecked(1_u64);
1295        }
1296        assert_eq!(9, buf.len());
1297        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1298    }
1299
1300    #[test]
1301    fn test_from_trusted_len_iter() {
1302        let iter = vec![1u32, 2].into_iter();
1303        let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1304        assert_eq!(8, buf.len());
1305        assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1306    }
1307
1308    #[test]
1309    fn test_mutable_reserve() {
1310        let mut buf = MutableBuffer::new(1);
1311        assert_eq!(64, buf.capacity());
1312
1313        // Reserving a smaller capacity should have no effect.
1314        buf.reserve(10);
1315        assert_eq!(64, buf.capacity());
1316
1317        buf.reserve(80);
1318        assert_eq!(128, buf.capacity());
1319
1320        buf.reserve(129);
1321        assert_eq!(256, buf.capacity());
1322    }
1323
1324    #[test]
1325    fn test_mutable_resize() {
1326        let mut buf = MutableBuffer::new(1);
1327        assert_eq!(64, buf.capacity());
1328        assert_eq!(0, buf.len());
1329
1330        buf.resize(20, 0);
1331        assert_eq!(64, buf.capacity());
1332        assert_eq!(20, buf.len());
1333
1334        buf.resize(10, 0);
1335        assert_eq!(64, buf.capacity());
1336        assert_eq!(10, buf.len());
1337
1338        buf.resize(100, 0);
1339        assert_eq!(128, buf.capacity());
1340        assert_eq!(100, buf.len());
1341
1342        buf.resize(30, 0);
1343        assert_eq!(128, buf.capacity());
1344        assert_eq!(30, buf.len());
1345
1346        buf.resize(0, 0);
1347        assert_eq!(128, buf.capacity());
1348        assert_eq!(0, buf.len());
1349    }
1350
1351    #[test]
1352    fn test_mutable_into() {
1353        let mut buf = MutableBuffer::new(1);
1354        buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1355        assert_eq!(19, buf.len());
1356        assert_eq!(64, buf.capacity());
1357        assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1358
1359        let immutable_buf: Buffer = buf.into();
1360        assert_eq!(19, immutable_buf.len());
1361        assert_eq!(64, immutable_buf.capacity());
1362        assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1363    }
1364
1365    #[test]
1366    fn test_mutable_equal() {
1367        let mut buf = MutableBuffer::new(1);
1368        let mut buf2 = MutableBuffer::new(1);
1369
1370        buf.extend_from_slice(&[0xaa]);
1371        buf2.extend_from_slice(&[0xaa, 0xbb]);
1372        assert!(buf != buf2);
1373
1374        buf.extend_from_slice(&[0xbb]);
1375        assert_eq!(buf, buf2);
1376
1377        buf2.reserve(65);
1378        assert!(buf != buf2);
1379    }
1380
1381    #[test]
1382    fn test_mutable_shrink_to_fit() {
1383        let mut buffer = MutableBuffer::new(128);
1384        assert_eq!(buffer.capacity(), 128);
1385        buffer.push(1);
1386        buffer.push(2);
1387
1388        buffer.shrink_to_fit();
1389        assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1390    }
1391
1392    #[test]
1393    fn test_mutable_set_null_bits() {
1394        let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1395
1396        for i in 0..=buffer.capacity() {
1397            buffer.set_null_bits(i, 0);
1398            assert_eq!(buffer[..8], [255; 8][..]);
1399        }
1400
1401        buffer.set_null_bits(1, 4);
1402        assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1403    }
1404
1405    #[test]
1406    #[should_panic = "out of bounds for buffer of length"]
1407    fn test_mutable_set_null_bits_oob() {
1408        let mut buffer = MutableBuffer::new(64);
1409        buffer.set_null_bits(1, buffer.capacity());
1410    }
1411
1412    #[test]
1413    #[should_panic = "out of bounds for buffer of length"]
1414    fn test_mutable_set_null_bits_oob_by_overflow() {
1415        let mut buffer = MutableBuffer::new(0);
1416        buffer.set_null_bits(1, usize::MAX);
1417    }
1418
1419    #[test]
1420    fn from_iter() {
1421        let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1422        assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1423        assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1424    }
1425
1426    #[test]
1427    #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1428    fn test_with_capacity_panics_above_max_capacity() {
1429        let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1430        let _ = MutableBuffer::with_capacity(max_capacity + 1);
1431    }
1432
1433    #[cfg(feature = "pool")]
1434    mod pool_tests {
1435        use super::*;
1436        use crate::pool::{MemoryPool, TrackingMemoryPool};
1437
1438        #[test]
1439        fn test_reallocate_with_pool() {
1440            let pool = TrackingMemoryPool::default();
1441            let mut buffer = MutableBuffer::with_capacity(100);
1442            buffer.claim(&pool);
1443
1444            // Initial capacity should be 128 (multiple of 64)
1445            assert_eq!(buffer.capacity(), 128);
1446            assert_eq!(pool.used(), 128);
1447
1448            // Reallocate to a larger size
1449            buffer.reallocate(200);
1450
1451            // The capacity is exactly the requested size, not rounded up
1452            assert_eq!(buffer.capacity(), 200);
1453            assert_eq!(pool.used(), 200);
1454
1455            // Reallocate to a smaller size
1456            buffer.reallocate(50);
1457
1458            // The capacity is exactly the requested size, not rounded up
1459            assert_eq!(buffer.capacity(), 50);
1460            assert_eq!(pool.used(), 50);
1461        }
1462
1463        #[test]
1464        fn test_truncate_with_pool() {
1465            let pool = TrackingMemoryPool::default();
1466            let mut buffer = MutableBuffer::with_capacity(100);
1467
1468            // Fill buffer with some data
1469            buffer.resize(80, 1);
1470            assert_eq!(buffer.len(), 80);
1471
1472            buffer.claim(&pool);
1473            assert_eq!(pool.used(), 128);
1474
1475            // Truncate buffer
1476            buffer.truncate(40);
1477            assert_eq!(buffer.len(), 40);
1478            assert_eq!(pool.used(), 40);
1479
1480            // Truncate to zero
1481            buffer.clear();
1482            assert_eq!(buffer.len(), 0);
1483            assert_eq!(pool.used(), 0);
1484        }
1485
1486        #[test]
1487        fn test_resize_with_pool() {
1488            let pool = TrackingMemoryPool::default();
1489            let mut buffer = MutableBuffer::with_capacity(100);
1490            buffer.claim(&pool);
1491
1492            // Initial state
1493            assert_eq!(buffer.len(), 0);
1494            assert_eq!(pool.used(), 128);
1495
1496            // Resize to increase length
1497            buffer.resize(50, 1);
1498            assert_eq!(buffer.len(), 50);
1499            assert_eq!(pool.used(), 50);
1500
1501            // Resize to increase length beyond capacity
1502            buffer.resize(150, 1);
1503            assert_eq!(buffer.len(), 150);
1504            assert_eq!(buffer.capacity(), 256);
1505            assert_eq!(pool.used(), 150);
1506
1507            // Resize to decrease length
1508            buffer.resize(30, 1);
1509            assert_eq!(buffer.len(), 30);
1510            assert_eq!(pool.used(), 30);
1511        }
1512
1513        #[test]
1514        fn test_buffer_lifecycle_with_pool() {
1515            let pool = TrackingMemoryPool::default();
1516
1517            // Create a buffer with memory reservation
1518            let mut mutable = MutableBuffer::with_capacity(100);
1519            mutable.resize(80, 1);
1520            mutable.claim(&pool);
1521
1522            // Memory reservation is based on capacity when using claim()
1523            assert_eq!(pool.used(), 128);
1524
1525            // Convert to immutable Buffer
1526            let buffer = mutable.into_buffer();
1527
1528            // Memory reservation should be preserved
1529            assert_eq!(pool.used(), 128);
1530
1531            // Drop the buffer and the reservation should be released
1532            drop(buffer);
1533            assert_eq!(pool.used(), 0);
1534        }
1535    }
1536
1537    fn create_expected_repeated_slice<T: ArrowNativeType>(
1538        slice_to_repeat: &[T],
1539        repeat_count: usize,
1540    ) -> Buffer {
1541        let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1542        for _ in 0..repeat_count {
1543            // Not using push_slice_repeated as this is the function under test
1544            expected.extend_from_slice(slice_to_repeat);
1545        }
1546        expected.into()
1547    }
1548
1549    // Helper to test a specific repeat count with various slice sizes
1550    fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1551        repeat_count: usize,
1552        test_data: &[T],
1553    ) {
1554        let mut buffer = MutableBuffer::new(0);
1555        buffer.repeat_slice_n_times(test_data, repeat_count);
1556
1557        let expected = create_expected_repeated_slice(test_data, repeat_count);
1558        let result: Buffer = buffer.into();
1559
1560        assert_eq!(
1561            result,
1562            expected,
1563            "Failed for repeat_count={}, slice_len={}",
1564            repeat_count,
1565            test_data.len()
1566        );
1567    }
1568
1569    #[test]
1570    fn test_repeat_slice_count_edge_cases() {
1571        // Empty slice
1572        test_repeat_count(100, &[] as &[i32]);
1573
1574        // Zero repeats
1575        test_repeat_count(0, &[1i32, 2, 3]);
1576    }
1577
1578    #[test]
1579    #[should_panic(expected = "repeated slice byte length overflow")]
1580    fn test_repeat_slice_count_multiply_overflow() {
1581        let mut buffer = MutableBuffer::new(0);
1582        buffer.repeat_slice_n_times(&[0_u64], usize::MAX / mem::size_of::<u64>() + 1);
1583    }
1584
1585    #[test]
1586    #[should_panic(expected = "mutable buffer length overflow")]
1587    fn test_repeat_slice_count_len_overflow() {
1588        let mut buffer = MutableBuffer::new(0);
1589        buffer.push(0_u8);
1590        buffer.repeat_slice_n_times(&[0_u8], usize::MAX);
1591    }
1592
1593    #[test]
1594    fn test_small_repeats_counts() {
1595        // test any special implementation for small repeat counts
1596        let data = &[1u8, 2, 3, 4, 5];
1597
1598        for _ in 1..=10 {
1599            test_repeat_count(2, data);
1600        }
1601    }
1602
1603    #[test]
1604    fn test_different_size_of_i32_repeat_slice() {
1605        let data: &[i32] = &[1, 2, 3];
1606        let data_with_single_item: &[i32] = &[42];
1607
1608        for data in &[data, data_with_single_item] {
1609            for item in 1..=9 {
1610                let base_repeat_count = 2_usize.pow(item);
1611                test_repeat_count(base_repeat_count - 1, data);
1612                test_repeat_count(base_repeat_count, data);
1613                test_repeat_count(base_repeat_count + 1, data);
1614            }
1615        }
1616    }
1617
1618    #[test]
1619    fn test_different_size_of_u8_repeat_slice() {
1620        let data: &[u8] = &[1, 2, 3];
1621        let data_with_single_item: &[u8] = &[10];
1622
1623        for data in &[data, data_with_single_item] {
1624            for item in 1..=9 {
1625                let base_repeat_count = 2_usize.pow(item);
1626                test_repeat_count(base_repeat_count - 1, data);
1627                test_repeat_count(base_repeat_count, data);
1628                test_repeat_count(base_repeat_count + 1, data);
1629            }
1630        }
1631    }
1632
1633    #[test]
1634    fn test_different_size_of_u16_repeat_slice() {
1635        let data: &[u16] = &[1, 2, 3];
1636        let data_with_single_item: &[u16] = &[10];
1637
1638        for data in &[data, data_with_single_item] {
1639            for item in 1..=9 {
1640                let base_repeat_count = 2_usize.pow(item);
1641                test_repeat_count(base_repeat_count - 1, data);
1642                test_repeat_count(base_repeat_count, data);
1643                test_repeat_count(base_repeat_count + 1, data);
1644            }
1645        }
1646    }
1647
1648    #[test]
1649    fn test_various_slice_lengths() {
1650        // Test different slice lengths with same repeat pattern
1651        let repeat_count = 37; // Arbitrary non-power-of-2
1652
1653        // Single element
1654        test_repeat_count(repeat_count, &[42i32]);
1655
1656        // Small slices
1657        test_repeat_count(repeat_count, &[1i32, 2]);
1658        test_repeat_count(repeat_count, &[1i32, 2, 3]);
1659        test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1660        test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1661
1662        // Larger slices
1663        let data_10: Vec<i32> = (0..10).collect();
1664        test_repeat_count(repeat_count, &data_10);
1665
1666        let data_100: Vec<i32> = (0..100).collect();
1667        test_repeat_count(repeat_count, &data_100);
1668
1669        let data_1000: Vec<i32> = (0..1000).collect();
1670        test_repeat_count(repeat_count, &data_1000);
1671    }
1672}