1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
99pub struct MutableBuffer {
100 data: NonNull<u8>,
102 len: usize,
104 layout: Layout,
105
106 #[cfg(feature = "pool")]
108 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112 #[inline]
120 pub fn new(capacity: usize) -> Self {
121 Self::with_capacity(capacity)
122 }
123
124 #[inline]
131 pub fn with_capacity(capacity: usize) -> Self {
132 let capacity = bit_util::round_upto_multiple_of_64(capacity);
133 let layout = Layout::from_size_align(capacity, ALIGNMENT)
134 .expect("failed to create layout for MutableBuffer");
135 let data = match layout.size() {
136 0 => dangling_ptr(),
137 _ => {
138 let raw_ptr = unsafe { std::alloc::alloc(layout) };
140 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
141 }
142 };
143 Self {
144 data,
145 len: 0,
146 layout,
147 #[cfg(feature = "pool")]
148 reservation: std::sync::Mutex::new(None),
149 }
150 }
151
152 pub fn from_len_zeroed(len: usize) -> Self {
168 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
169 let data = match layout.size() {
170 0 => dangling_ptr(),
171 _ => {
172 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
174 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
175 }
176 };
177 Self {
178 data,
179 len,
180 layout,
181 #[cfg(feature = "pool")]
182 reservation: std::sync::Mutex::new(None),
183 }
184 }
185
186 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
188 let layout = match bytes.deallocation() {
189 Deallocation::Standard(layout) => *layout,
190 _ => return Err(bytes),
191 };
192
193 let len = bytes.len();
194 let data = bytes.ptr();
195 #[cfg(feature = "pool")]
196 let reservation = bytes.reservation.lock().unwrap().take();
197 mem::forget(bytes);
198
199 Ok(Self {
200 data,
201 len,
202 layout,
203 #[cfg(feature = "pool")]
204 reservation: Mutex::new(reservation),
205 })
206 }
207
208 pub fn new_null(len: usize) -> Self {
215 let num_bytes = bit_util::ceil(len, 8);
216 MutableBuffer::from_len_zeroed(num_bytes)
217 }
218
219 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
230 assert!(end <= self.layout.size());
231 let v = if val { 255 } else { 0 };
232 unsafe {
233 std::ptr::write_bytes(self.data.as_ptr(), v, end);
234 self.len = end;
235 }
236 self
237 }
238
239 pub fn set_null_bits(&mut self, start: usize, count: usize) {
249 assert!(
250 start.saturating_add(count) <= self.layout.size(),
251 "range start index {start} and count {count} out of bounds for \
252 buffer of length {}",
253 self.layout.size(),
254 );
255
256 unsafe {
258 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
259 }
260 }
261
262 #[inline(always)]
281 pub fn reserve(&mut self, additional: usize) {
282 let required_cap = self
283 .len
284 .checked_add(additional)
285 .expect("buffer length overflow");
286 if required_cap > self.layout.size() {
287 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
288 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
289 self.reallocate(new_capacity)
290 }
291 }
292
293 pub fn repeat_slice_n_times<T: ArrowNativeType>(
312 &mut self,
313 slice_to_repeat: &[T],
314 repeat_count: usize,
315 ) {
316 if repeat_count == 0 || slice_to_repeat.is_empty() {
317 return;
318 }
319
320 let bytes_to_repeat = size_of_val(slice_to_repeat);
321 let repeated_bytes = repeat_count
322 .checked_mul(bytes_to_repeat)
323 .expect("repeated slice byte length overflow");
324 self.len
325 .checked_add(repeated_bytes)
326 .expect("mutable buffer length overflow");
327
328 self.reserve(repeated_bytes);
330
331 let length_before = self.len;
333
334 self.extend_from_slice(slice_to_repeat);
336
337 let added_repeats_length = bytes_to_repeat;
339 assert_eq!(
340 self.len - length_before,
341 added_repeats_length,
342 "should copy exactly the same number of bytes"
343 );
344
345 let mut already_repeated_times = 1;
347
348 while already_repeated_times < repeat_count {
350 let number_of_slices_to_copy =
353 already_repeated_times.min(repeat_count - already_repeated_times);
354 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
355
356 unsafe {
357 let src = self.data.as_ptr().add(length_before) as *const u8;
359
360 let dst = self.data.as_ptr().add(self.len);
362
363 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
365 }
366
367 self.len += number_of_bytes_to_copy;
369
370 already_repeated_times += number_of_slices_to_copy;
371 }
372 }
373
374 #[cold]
375 fn reallocate(&mut self, capacity: usize) {
376 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
377 if new_layout.size() == 0 {
378 if self.layout.size() != 0 {
379 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
381 self.layout = new_layout
382 }
383 return;
384 }
385
386 let data = match self.layout.size() {
387 0 => unsafe { std::alloc::alloc(new_layout) },
389 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
391 };
392 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
393 self.layout = new_layout;
394 #[cfg(feature = "pool")]
395 {
396 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
397 reservation.resize(self.layout.size());
398 }
399 }
400 }
401
402 #[inline(always)]
406 pub fn truncate(&mut self, len: usize) {
407 if len > self.len {
408 return;
409 }
410 self.len = len;
411 #[cfg(feature = "pool")]
412 {
413 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
414 reservation.resize(self.len);
415 }
416 }
417 }
418
419 #[inline(always)]
436 pub fn resize(&mut self, new_len: usize, value: u8) {
437 if new_len > self.len {
438 let diff = new_len - self.len;
439 self.reserve(diff);
440 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
442 }
443 self.len = new_len;
445 #[cfg(feature = "pool")]
446 {
447 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
448 reservation.resize(self.len);
449 }
450 }
451 }
452
453 pub fn shrink_to_fit(&mut self) {
474 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
475 if new_capacity < self.layout.size() {
476 self.reallocate(new_capacity)
477 }
478 }
479
480 #[inline]
482 pub const fn is_empty(&self) -> bool {
483 self.len == 0
484 }
485
486 #[inline]
489 pub const fn len(&self) -> usize {
490 self.len
491 }
492
493 #[inline]
497 pub const fn capacity(&self) -> usize {
498 self.layout.size()
499 }
500
501 pub fn clear(&mut self) {
503 self.len = 0;
504 #[cfg(feature = "pool")]
505 {
506 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
507 reservation.resize(self.len);
508 }
509 }
510 }
511
512 pub fn as_slice(&self) -> &[u8] {
514 self
515 }
516
517 pub fn as_slice_mut(&mut self) -> &mut [u8] {
519 self
520 }
521
522 #[inline]
525 pub const fn as_ptr(&self) -> *const u8 {
526 self.data.as_ptr()
527 }
528
529 #[inline]
532 pub fn as_mut_ptr(&mut self) -> *mut u8 {
533 self.data.as_ptr()
534 }
535
536 #[inline]
537 pub(super) fn into_buffer(self) -> Buffer {
538 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
539 #[cfg(feature = "pool")]
540 {
541 let reservation = self.reservation.lock().unwrap().take();
542 *bytes.reservation.lock().unwrap() = reservation;
543 }
544 std::mem::forget(self);
545 Buffer::from(bytes)
546 }
547
548 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
555 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
559 assert!(prefix.is_empty() && suffix.is_empty());
560 offsets
561 }
562
563 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
570 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
574 assert!(prefix.is_empty() && suffix.is_empty());
575 offsets
576 }
577
578 #[inline]
592 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
593 let additional = mem::size_of_val(items);
594 self.reserve(additional);
595 unsafe {
596 let src = items.as_ptr() as *const u8;
600 let dst = self.data.as_ptr().add(self.len);
601 std::ptr::copy_nonoverlapping(src, dst, additional)
602 }
603 self.len += additional;
604 }
605
606 #[inline]
620 pub fn push<T: ToByteSlice>(&mut self, item: T) {
621 let additional = std::mem::size_of::<T>();
622 self.reserve(additional);
623 unsafe {
624 let src = item.to_byte_slice().as_ptr();
625 let dst = self.data.as_ptr().add(self.len);
626 std::ptr::copy_nonoverlapping(src, dst, additional);
627 }
628 self.len += additional;
629 }
630
631 #[inline]
635 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
636 let additional = std::mem::size_of::<T>();
637 let src = item.to_byte_slice().as_ptr();
638 let dst = unsafe { self.data.as_ptr().add(self.len) };
639 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
640 self.len += additional;
641 }
642
643 #[inline]
650 pub fn extend_zeros(&mut self, additional: usize) {
651 let new_len = self
652 .len
653 .checked_add(additional)
654 .expect("buffer length overflow");
655 self.resize(new_len, 0);
656 }
657
658 #[inline]
665 pub unsafe fn set_len(&mut self, len: usize) {
666 assert!(len <= self.capacity());
667 self.len = len;
668 }
669
670 #[inline]
675 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
676 let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
677
678 let chunks = len / 64;
679 let remainder = len % 64;
680 buffer.extend((0..chunks).map(|chunk| {
681 let mut packed = 0;
682 for bit_idx in 0..64 {
683 let i = bit_idx + chunk * 64;
684 packed |= (f(i) as u64) << bit_idx;
685 }
686
687 packed
688 }));
689
690 if remainder != 0 {
691 let mut packed = 0;
692 for bit_idx in 0..remainder {
693 let i = bit_idx + chunks * 64;
694 packed |= (f(i) as u64) << bit_idx;
695 }
696
697 buffer.push(packed)
698 }
699
700 let mut buffer: MutableBuffer = buffer.into();
701 buffer.truncate(bit_util::ceil(len, 8));
702 buffer
703 }
704
705 #[inline]
721 pub unsafe fn extend_bool_trusted_len<I: Iterator<Item = bool>>(
722 &mut self,
723 mut iter: I,
724 offset: usize,
725 ) {
726 let (lower, upper) = iter.size_hint();
727 let len = upper.expect("Iterator must have exact size_hint");
728 assert_eq!(lower, len, "Iterator must have exact size_hint");
729 debug_assert!(
730 offset <= self.len * 8,
731 "offset must be <= buffer length in bits"
732 );
733
734 if len == 0 {
735 return;
736 }
737
738 let start_len = offset;
739 let end_bit = start_len + len;
740
741 let new_len_bytes = bit_util::ceil(end_bit, 8);
743 if new_len_bytes > self.len {
744 self.reserve(new_len_bytes - self.len);
745 unsafe { self.set_len(new_len_bytes) };
747 }
748
749 let slice = self.as_slice_mut();
750
751 let mut bit_idx = start_len;
752
753 let misalignment = bit_idx & 63;
755 let prefix_bits = if misalignment == 0 {
756 0
757 } else {
758 (64 - misalignment).min(end_bit - bit_idx)
759 };
760
761 if prefix_bits != 0 {
762 let byte_start = bit_idx / 8;
763 let byte_end = bit_util::ceil(bit_idx + prefix_bits, 8);
764 let bit_offset = bit_idx % 8;
765
766 if bit_offset != 0 {
768 let keep_mask = (1u8 << bit_offset).wrapping_sub(1);
769 slice[byte_start] &= keep_mask;
770 }
771
772 let zero_from = if bit_offset == 0 {
774 byte_start
775 } else {
776 byte_start + 1
777 };
778 if byte_end > zero_from {
779 slice[zero_from..byte_end].fill(0);
780 }
781
782 for _ in 0..prefix_bits {
783 let v = iter.next().unwrap();
784 if v {
785 let byte_idx = bit_idx / 8;
786 let bit = bit_idx % 8;
787 slice[byte_idx] |= 1 << bit;
788 }
789 bit_idx += 1;
790 }
791 }
792
793 if bit_idx < end_bit {
794 debug_assert_eq!(bit_idx & 63, 0);
796 let remaining_bits = end_bit - bit_idx;
797 let chunks = remaining_bits / 64;
798
799 let words_start = bit_idx / 8;
800 let words_end = words_start + chunks * 8;
801 for dst in slice[words_start..words_end].chunks_exact_mut(8) {
802 let mut packed: u64 = 0;
803 for i in 0..64 {
804 packed |= (iter.next().unwrap() as u64) << i;
805 }
806 dst.copy_from_slice(&packed.to_le_bytes());
807 bit_idx += 64;
808 }
809
810 let suffix_bits = end_bit - bit_idx;
812 if suffix_bits != 0 {
813 debug_assert_eq!(bit_idx % 8, 0);
814 let byte_start = bit_idx / 8;
815 let byte_end = bit_util::ceil(end_bit, 8);
816 slice[byte_start..byte_end].fill(0);
817
818 for _ in 0..suffix_bits {
819 let v = iter.next().unwrap();
820 if v {
821 let byte_idx = bit_idx / 8;
822 let bit = bit_idx % 8;
823 slice[byte_idx] |= 1 << bit;
824 }
825 bit_idx += 1;
826 }
827 }
828 }
829
830 let remainder = end_bit % 8;
832 if remainder != 0 {
833 let mask = (1u8 << remainder).wrapping_sub(1);
834 slice[bit_util::ceil(end_bit, 8) - 1] &= mask;
835 }
836
837 debug_assert_eq!(bit_idx, end_bit);
838 }
839
840 #[cfg(feature = "pool")]
847 pub fn claim(&self, pool: &dyn MemoryPool) {
848 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
849 }
850}
851
852#[inline]
856pub(crate) fn dangling_ptr() -> NonNull<u8> {
857 #[cfg(miri)]
861 {
862 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
864 }
865 #[cfg(not(miri))]
866 {
867 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
868 }
869}
870
871impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
872 #[inline]
873 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
874 let iterator = iter.into_iter();
875 self.extend_from_iter(iterator)
876 }
877}
878
879impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
880 fn from(value: Vec<T>) -> Self {
881 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
884 let len = value.len() * mem::size_of::<T>();
885 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
889 mem::forget(value);
890 Self {
891 data,
892 len,
893 layout,
894 #[cfg(feature = "pool")]
895 reservation: std::sync::Mutex::new(None),
896 }
897 }
898}
899
900impl MutableBuffer {
901 #[inline]
902 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
903 &mut self,
904 mut iterator: I,
905 ) {
906 let item_size = std::mem::size_of::<T>();
907 let (lower, _) = iterator.size_hint();
908 let additional = lower * item_size;
909 self.reserve(additional);
910
911 let mut len = SetLenOnDrop::new(&mut self.len);
913 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
914 let capacity = self.layout.size();
915
916 while len.local_len + item_size <= capacity {
917 if let Some(item) = iterator.next() {
918 unsafe {
919 let src = item.to_byte_slice().as_ptr();
920 std::ptr::copy_nonoverlapping(src, dst, item_size);
921 dst = dst.add(item_size);
922 }
923 len.local_len += item_size;
924 } else {
925 break;
926 }
927 }
928 drop(len);
929
930 iterator.for_each(|item| self.push(item));
931 }
932
933 #[inline]
958 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
959 iterator: I,
960 ) -> Self {
961 let item_size = std::mem::size_of::<T>();
962 let (_, upper) = iterator.size_hint();
963 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
964 let len = upper * item_size;
965
966 let mut buffer = MutableBuffer::new(len);
967
968 let mut dst = buffer.data.as_ptr();
969 for item in iterator {
970 let src = item.to_byte_slice().as_ptr();
972 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
973 dst = unsafe { dst.add(item_size) };
974 }
975 assert_eq!(
976 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
977 len,
978 "Trusted iterator length was not accurately reported"
979 );
980 buffer.len = len;
981 buffer
982 }
983
984 #[inline]
1008 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
1009 let (_, upper) = iterator.size_hint();
1010 let len = upper.expect("from_trusted_len_iter requires an upper limit");
1011
1012 Self::collect_bool(len, |_| iterator.next().unwrap())
1013 }
1014
1015 #[inline]
1030 pub unsafe fn try_from_trusted_len_iter<
1031 E,
1032 T: ArrowNativeType,
1033 I: Iterator<Item = Result<T, E>>,
1034 >(
1035 iterator: I,
1036 ) -> Result<Self, E> {
1037 let item_size = std::mem::size_of::<T>();
1038 let (_, upper) = iterator.size_hint();
1039 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
1040 let len = upper * item_size;
1041
1042 let mut buffer = MutableBuffer::new(len);
1043
1044 let mut dst = buffer.data.as_ptr();
1045 for item in iterator {
1046 let item = item?;
1047 let src = item.to_byte_slice().as_ptr();
1049 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
1050 dst = unsafe { dst.add(item_size) };
1051 }
1052 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
1055 unsafe {
1056 assert_eq!(
1057 dst.offset_from(buffer.data.as_ptr()) as usize,
1058 len,
1059 "Trusted iterator length was not accurately reported"
1060 );
1061 buffer.len = len;
1062 }
1063 }
1064 unsafe { finalize_buffer(dst, &mut buffer, len) };
1065 Ok(buffer)
1066 }
1067}
1068
1069impl Default for MutableBuffer {
1070 fn default() -> Self {
1071 Self::with_capacity(0)
1072 }
1073}
1074
1075impl std::ops::Deref for MutableBuffer {
1076 type Target = [u8];
1077
1078 fn deref(&self) -> &[u8] {
1079 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
1080 }
1081}
1082
1083impl std::ops::DerefMut for MutableBuffer {
1084 fn deref_mut(&mut self) -> &mut [u8] {
1085 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
1086 }
1087}
1088
1089impl AsRef<[u8]> for &MutableBuffer {
1090 fn as_ref(&self) -> &[u8] {
1091 self.as_slice()
1092 }
1093}
1094
1095impl Drop for MutableBuffer {
1096 fn drop(&mut self) {
1097 if self.layout.size() != 0 {
1098 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
1100 }
1101 }
1102}
1103
1104impl PartialEq for MutableBuffer {
1105 fn eq(&self, other: &MutableBuffer) -> bool {
1106 if self.len != other.len {
1107 return false;
1108 }
1109 if self.layout != other.layout {
1110 return false;
1111 }
1112 self.as_slice() == other.as_slice()
1113 }
1114}
1115
1116unsafe impl Sync for MutableBuffer {}
1117unsafe impl Send for MutableBuffer {}
1118
1119struct SetLenOnDrop<'a> {
1120 len: &'a mut usize,
1121 local_len: usize,
1122}
1123
1124impl<'a> SetLenOnDrop<'a> {
1125 #[inline]
1126 fn new(len: &'a mut usize) -> Self {
1127 SetLenOnDrop {
1128 local_len: *len,
1129 len,
1130 }
1131 }
1132}
1133
1134impl Drop for SetLenOnDrop<'_> {
1135 #[inline]
1136 fn drop(&mut self) {
1137 *self.len = self.local_len;
1138 }
1139}
1140
1141impl std::iter::FromIterator<bool> for MutableBuffer {
1143 fn from_iter<I>(iter: I) -> Self
1144 where
1145 I: IntoIterator<Item = bool>,
1146 {
1147 let mut iterator = iter.into_iter();
1148 let mut result = {
1149 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
1150 MutableBuffer::new(byte_capacity)
1151 };
1152
1153 loop {
1154 let mut exhausted = false;
1155 let mut byte_accum: u8 = 0;
1156 let mut mask: u8 = 1;
1157
1158 while mask != 0 {
1160 if let Some(value) = iterator.next() {
1161 byte_accum |= match value {
1162 true => mask,
1163 false => 0,
1164 };
1165 mask <<= 1;
1166 } else {
1167 exhausted = true;
1168 break;
1169 }
1170 }
1171
1172 if exhausted && mask == 1 {
1174 break;
1175 }
1176
1177 if result.len() == result.capacity() {
1179 let additional_byte_capacity = 1usize.saturating_add(
1181 iterator.size_hint().0.saturating_add(7) / 8, );
1183 result.reserve(additional_byte_capacity)
1184 }
1185
1186 unsafe { result.push_unchecked(byte_accum) };
1188 if exhausted {
1189 break;
1190 }
1191 }
1192 result
1193 }
1194}
1195
1196impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
1197 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1198 let mut buffer = Self::default();
1199 buffer.extend_from_iter(iter.into_iter());
1200 buffer
1201 }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207
1208 #[test]
1209 fn test_mutable_new() {
1210 let buf = MutableBuffer::new(63);
1211 assert_eq!(64, buf.capacity());
1212 assert_eq!(0, buf.len());
1213 assert!(buf.is_empty());
1214 }
1215
1216 #[test]
1217 fn test_mutable_default() {
1218 let buf = MutableBuffer::default();
1219 assert_eq!(0, buf.capacity());
1220 assert_eq!(0, buf.len());
1221 assert!(buf.is_empty());
1222
1223 let mut buf = MutableBuffer::default();
1224 buf.extend_from_slice(b"hello");
1225 assert_eq!(5, buf.len());
1226 assert_eq!(b"hello", buf.as_slice());
1227 }
1228
1229 #[test]
1230 fn test_mutable_extend_from_slice() {
1231 let mut buf = MutableBuffer::new(100);
1232 buf.extend_from_slice(b"hello");
1233 assert_eq!(5, buf.len());
1234 assert_eq!(b"hello", buf.as_slice());
1235
1236 buf.extend_from_slice(b" world");
1237 assert_eq!(11, buf.len());
1238 assert_eq!(b"hello world", buf.as_slice());
1239
1240 buf.clear();
1241 assert_eq!(0, buf.len());
1242 buf.extend_from_slice(b"hello arrow");
1243 assert_eq!(11, buf.len());
1244 assert_eq!(b"hello arrow", buf.as_slice());
1245 }
1246
1247 #[test]
1248 fn mutable_extend_from_iter() {
1249 let mut buf = MutableBuffer::new(0);
1250 buf.extend(vec![1u32, 2]);
1251 assert_eq!(8, buf.len());
1252 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1253
1254 buf.extend(vec![3u32, 4]);
1255 assert_eq!(16, buf.len());
1256 assert_eq!(
1257 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1258 buf.as_slice()
1259 );
1260 }
1261
1262 #[test]
1263 fn mutable_extend_from_iter_unaligned_u64() {
1264 let mut buf = MutableBuffer::new(16);
1265 buf.push(1_u8);
1266 buf.extend([1_u64]);
1267 assert_eq!(9, buf.len());
1268 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1269 }
1270
1271 #[test]
1272 fn mutable_extend_from_slice_unaligned_u64() {
1273 let mut buf = MutableBuffer::new(16);
1274 buf.extend_from_slice(&[1_u8]);
1275 buf.extend_from_slice(&[1_u64]);
1276 assert_eq!(9, buf.len());
1277 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1278 }
1279
1280 #[test]
1281 fn mutable_push_unaligned_u64() {
1282 let mut buf = MutableBuffer::new(16);
1283 buf.push(1_u8);
1284 buf.push(1_u64);
1285 assert_eq!(9, buf.len());
1286 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1287 }
1288
1289 #[test]
1290 fn mutable_push_unchecked_unaligned_u64() {
1291 let mut buf = MutableBuffer::new(16);
1292 unsafe {
1293 buf.push_unchecked(1_u8);
1294 buf.push_unchecked(1_u64);
1295 }
1296 assert_eq!(9, buf.len());
1297 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1298 }
1299
1300 #[test]
1301 fn test_from_trusted_len_iter() {
1302 let iter = vec![1u32, 2].into_iter();
1303 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1304 assert_eq!(8, buf.len());
1305 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1306 }
1307
1308 #[test]
1309 fn test_mutable_reserve() {
1310 let mut buf = MutableBuffer::new(1);
1311 assert_eq!(64, buf.capacity());
1312
1313 buf.reserve(10);
1315 assert_eq!(64, buf.capacity());
1316
1317 buf.reserve(80);
1318 assert_eq!(128, buf.capacity());
1319
1320 buf.reserve(129);
1321 assert_eq!(256, buf.capacity());
1322 }
1323
1324 #[test]
1325 fn test_mutable_resize() {
1326 let mut buf = MutableBuffer::new(1);
1327 assert_eq!(64, buf.capacity());
1328 assert_eq!(0, buf.len());
1329
1330 buf.resize(20, 0);
1331 assert_eq!(64, buf.capacity());
1332 assert_eq!(20, buf.len());
1333
1334 buf.resize(10, 0);
1335 assert_eq!(64, buf.capacity());
1336 assert_eq!(10, buf.len());
1337
1338 buf.resize(100, 0);
1339 assert_eq!(128, buf.capacity());
1340 assert_eq!(100, buf.len());
1341
1342 buf.resize(30, 0);
1343 assert_eq!(128, buf.capacity());
1344 assert_eq!(30, buf.len());
1345
1346 buf.resize(0, 0);
1347 assert_eq!(128, buf.capacity());
1348 assert_eq!(0, buf.len());
1349 }
1350
1351 #[test]
1352 fn test_mutable_into() {
1353 let mut buf = MutableBuffer::new(1);
1354 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1355 assert_eq!(19, buf.len());
1356 assert_eq!(64, buf.capacity());
1357 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1358
1359 let immutable_buf: Buffer = buf.into();
1360 assert_eq!(19, immutable_buf.len());
1361 assert_eq!(64, immutable_buf.capacity());
1362 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1363 }
1364
1365 #[test]
1366 fn test_mutable_equal() {
1367 let mut buf = MutableBuffer::new(1);
1368 let mut buf2 = MutableBuffer::new(1);
1369
1370 buf.extend_from_slice(&[0xaa]);
1371 buf2.extend_from_slice(&[0xaa, 0xbb]);
1372 assert!(buf != buf2);
1373
1374 buf.extend_from_slice(&[0xbb]);
1375 assert_eq!(buf, buf2);
1376
1377 buf2.reserve(65);
1378 assert!(buf != buf2);
1379 }
1380
1381 #[test]
1382 fn test_mutable_shrink_to_fit() {
1383 let mut buffer = MutableBuffer::new(128);
1384 assert_eq!(buffer.capacity(), 128);
1385 buffer.push(1);
1386 buffer.push(2);
1387
1388 buffer.shrink_to_fit();
1389 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1390 }
1391
1392 #[test]
1393 fn test_mutable_set_null_bits() {
1394 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1395
1396 for i in 0..=buffer.capacity() {
1397 buffer.set_null_bits(i, 0);
1398 assert_eq!(buffer[..8], [255; 8][..]);
1399 }
1400
1401 buffer.set_null_bits(1, 4);
1402 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1403 }
1404
1405 #[test]
1406 #[should_panic = "out of bounds for buffer of length"]
1407 fn test_mutable_set_null_bits_oob() {
1408 let mut buffer = MutableBuffer::new(64);
1409 buffer.set_null_bits(1, buffer.capacity());
1410 }
1411
1412 #[test]
1413 #[should_panic = "out of bounds for buffer of length"]
1414 fn test_mutable_set_null_bits_oob_by_overflow() {
1415 let mut buffer = MutableBuffer::new(0);
1416 buffer.set_null_bits(1, usize::MAX);
1417 }
1418
1419 #[test]
1420 fn from_iter() {
1421 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1422 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1423 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1424 }
1425
1426 #[test]
1427 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1428 fn test_with_capacity_panics_above_max_capacity() {
1429 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1430 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1431 }
1432
1433 #[cfg(feature = "pool")]
1434 mod pool_tests {
1435 use super::*;
1436 use crate::pool::{MemoryPool, TrackingMemoryPool};
1437
1438 #[test]
1439 fn test_reallocate_with_pool() {
1440 let pool = TrackingMemoryPool::default();
1441 let mut buffer = MutableBuffer::with_capacity(100);
1442 buffer.claim(&pool);
1443
1444 assert_eq!(buffer.capacity(), 128);
1446 assert_eq!(pool.used(), 128);
1447
1448 buffer.reallocate(200);
1450
1451 assert_eq!(buffer.capacity(), 200);
1453 assert_eq!(pool.used(), 200);
1454
1455 buffer.reallocate(50);
1457
1458 assert_eq!(buffer.capacity(), 50);
1460 assert_eq!(pool.used(), 50);
1461 }
1462
1463 #[test]
1464 fn test_truncate_with_pool() {
1465 let pool = TrackingMemoryPool::default();
1466 let mut buffer = MutableBuffer::with_capacity(100);
1467
1468 buffer.resize(80, 1);
1470 assert_eq!(buffer.len(), 80);
1471
1472 buffer.claim(&pool);
1473 assert_eq!(pool.used(), 128);
1474
1475 buffer.truncate(40);
1477 assert_eq!(buffer.len(), 40);
1478 assert_eq!(pool.used(), 40);
1479
1480 buffer.clear();
1482 assert_eq!(buffer.len(), 0);
1483 assert_eq!(pool.used(), 0);
1484 }
1485
1486 #[test]
1487 fn test_resize_with_pool() {
1488 let pool = TrackingMemoryPool::default();
1489 let mut buffer = MutableBuffer::with_capacity(100);
1490 buffer.claim(&pool);
1491
1492 assert_eq!(buffer.len(), 0);
1494 assert_eq!(pool.used(), 128);
1495
1496 buffer.resize(50, 1);
1498 assert_eq!(buffer.len(), 50);
1499 assert_eq!(pool.used(), 50);
1500
1501 buffer.resize(150, 1);
1503 assert_eq!(buffer.len(), 150);
1504 assert_eq!(buffer.capacity(), 256);
1505 assert_eq!(pool.used(), 150);
1506
1507 buffer.resize(30, 1);
1509 assert_eq!(buffer.len(), 30);
1510 assert_eq!(pool.used(), 30);
1511 }
1512
1513 #[test]
1514 fn test_buffer_lifecycle_with_pool() {
1515 let pool = TrackingMemoryPool::default();
1516
1517 let mut mutable = MutableBuffer::with_capacity(100);
1519 mutable.resize(80, 1);
1520 mutable.claim(&pool);
1521
1522 assert_eq!(pool.used(), 128);
1524
1525 let buffer = mutable.into_buffer();
1527
1528 assert_eq!(pool.used(), 128);
1530
1531 drop(buffer);
1533 assert_eq!(pool.used(), 0);
1534 }
1535 }
1536
1537 fn create_expected_repeated_slice<T: ArrowNativeType>(
1538 slice_to_repeat: &[T],
1539 repeat_count: usize,
1540 ) -> Buffer {
1541 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1542 for _ in 0..repeat_count {
1543 expected.extend_from_slice(slice_to_repeat);
1545 }
1546 expected.into()
1547 }
1548
1549 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1551 repeat_count: usize,
1552 test_data: &[T],
1553 ) {
1554 let mut buffer = MutableBuffer::new(0);
1555 buffer.repeat_slice_n_times(test_data, repeat_count);
1556
1557 let expected = create_expected_repeated_slice(test_data, repeat_count);
1558 let result: Buffer = buffer.into();
1559
1560 assert_eq!(
1561 result,
1562 expected,
1563 "Failed for repeat_count={}, slice_len={}",
1564 repeat_count,
1565 test_data.len()
1566 );
1567 }
1568
1569 #[test]
1570 fn test_repeat_slice_count_edge_cases() {
1571 test_repeat_count(100, &[] as &[i32]);
1573
1574 test_repeat_count(0, &[1i32, 2, 3]);
1576 }
1577
1578 #[test]
1579 #[should_panic(expected = "repeated slice byte length overflow")]
1580 fn test_repeat_slice_count_multiply_overflow() {
1581 let mut buffer = MutableBuffer::new(0);
1582 buffer.repeat_slice_n_times(&[0_u64], usize::MAX / mem::size_of::<u64>() + 1);
1583 }
1584
1585 #[test]
1586 #[should_panic(expected = "mutable buffer length overflow")]
1587 fn test_repeat_slice_count_len_overflow() {
1588 let mut buffer = MutableBuffer::new(0);
1589 buffer.push(0_u8);
1590 buffer.repeat_slice_n_times(&[0_u8], usize::MAX);
1591 }
1592
1593 #[test]
1594 fn test_small_repeats_counts() {
1595 let data = &[1u8, 2, 3, 4, 5];
1597
1598 for _ in 1..=10 {
1599 test_repeat_count(2, data);
1600 }
1601 }
1602
1603 #[test]
1604 fn test_different_size_of_i32_repeat_slice() {
1605 let data: &[i32] = &[1, 2, 3];
1606 let data_with_single_item: &[i32] = &[42];
1607
1608 for data in &[data, data_with_single_item] {
1609 for item in 1..=9 {
1610 let base_repeat_count = 2_usize.pow(item);
1611 test_repeat_count(base_repeat_count - 1, data);
1612 test_repeat_count(base_repeat_count, data);
1613 test_repeat_count(base_repeat_count + 1, data);
1614 }
1615 }
1616 }
1617
1618 #[test]
1619 fn test_different_size_of_u8_repeat_slice() {
1620 let data: &[u8] = &[1, 2, 3];
1621 let data_with_single_item: &[u8] = &[10];
1622
1623 for data in &[data, data_with_single_item] {
1624 for item in 1..=9 {
1625 let base_repeat_count = 2_usize.pow(item);
1626 test_repeat_count(base_repeat_count - 1, data);
1627 test_repeat_count(base_repeat_count, data);
1628 test_repeat_count(base_repeat_count + 1, data);
1629 }
1630 }
1631 }
1632
1633 #[test]
1634 fn test_different_size_of_u16_repeat_slice() {
1635 let data: &[u16] = &[1, 2, 3];
1636 let data_with_single_item: &[u16] = &[10];
1637
1638 for data in &[data, data_with_single_item] {
1639 for item in 1..=9 {
1640 let base_repeat_count = 2_usize.pow(item);
1641 test_repeat_count(base_repeat_count - 1, data);
1642 test_repeat_count(base_repeat_count, data);
1643 test_repeat_count(base_repeat_count + 1, data);
1644 }
1645 }
1646 }
1647
1648 #[test]
1649 fn test_various_slice_lengths() {
1650 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1655
1656 test_repeat_count(repeat_count, &[1i32, 2]);
1658 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1659 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1660 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1661
1662 let data_10: Vec<i32> = (0..10).collect();
1664 test_repeat_count(repeat_count, &data_10);
1665
1666 let data_100: Vec<i32> = (0..100).collect();
1667 test_repeat_count(repeat_count, &data_100);
1668
1669 let data_1000: Vec<i32> = (0..1000).collect();
1670 test_repeat_count(repeat_count, &data_1000);
1671 }
1672}