1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
99pub struct MutableBuffer {
100 data: NonNull<u8>,
102 len: usize,
104 layout: Layout,
105
106 #[cfg(feature = "pool")]
108 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112 #[inline]
120 pub fn new(capacity: usize) -> Self {
121 Self::with_capacity(capacity)
122 }
123
124 #[inline]
131 pub fn with_capacity(capacity: usize) -> Self {
132 let capacity = bit_util::round_upto_multiple_of_64(capacity);
133 let layout = Layout::from_size_align(capacity, ALIGNMENT)
134 .expect("failed to create layout for MutableBuffer");
135 let data = match layout.size() {
136 0 => dangling_ptr(),
137 _ => {
138 let raw_ptr = unsafe { std::alloc::alloc(layout) };
140 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
141 }
142 };
143 Self {
144 data,
145 len: 0,
146 layout,
147 #[cfg(feature = "pool")]
148 reservation: std::sync::Mutex::new(None),
149 }
150 }
151
152 pub fn from_len_zeroed(len: usize) -> Self {
169 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
170 let data = match layout.size() {
171 0 => dangling_ptr(),
172 _ => {
173 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
175 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
176 }
177 };
178 Self {
179 data,
180 len,
181 layout,
182 #[cfg(feature = "pool")]
183 reservation: std::sync::Mutex::new(None),
184 }
185 }
186
187 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
189 let layout = match bytes.deallocation() {
190 Deallocation::Standard(layout) => *layout,
191 _ => return Err(bytes),
192 };
193
194 let len = bytes.len();
195 let data = bytes.ptr();
196 #[cfg(feature = "pool")]
197 let reservation = bytes.reservation.lock().unwrap().take();
198 mem::forget(bytes);
199
200 Ok(Self {
201 data,
202 len,
203 layout,
204 #[cfg(feature = "pool")]
205 reservation: Mutex::new(reservation),
206 })
207 }
208
209 pub fn new_null(len: usize) -> Self {
216 let num_bytes = bit_util::ceil(len, 8);
217 MutableBuffer::from_len_zeroed(num_bytes)
218 }
219
220 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
231 assert!(end <= self.layout.size());
232 let v = if val { 255 } else { 0 };
233 unsafe {
234 std::ptr::write_bytes(self.data.as_ptr(), v, end);
235 self.len = end;
236 }
237 self
238 }
239
240 pub fn set_null_bits(&mut self, start: usize, count: usize) {
250 assert!(
251 start.saturating_add(count) <= self.layout.size(),
252 "range start index {start} and count {count} out of bounds for \
253 buffer of length {}",
254 self.layout.size(),
255 );
256
257 unsafe {
259 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
260 }
261 }
262
263 #[inline(always)]
282 pub fn reserve(&mut self, additional: usize) {
283 let required_cap = self
284 .len
285 .checked_add(additional)
286 .expect("buffer length overflow");
287 if required_cap > self.layout.size() {
288 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
289 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
290 self.reallocate(new_capacity)
291 }
292 }
293
294 pub fn repeat_slice_n_times<T: ArrowNativeType>(
313 &mut self,
314 slice_to_repeat: &[T],
315 repeat_count: usize,
316 ) {
317 if repeat_count == 0 || slice_to_repeat.is_empty() {
318 return;
319 }
320
321 let bytes_to_repeat = size_of_val(slice_to_repeat);
322 let repeated_bytes = repeat_count
323 .checked_mul(bytes_to_repeat)
324 .expect("repeated slice byte length overflow");
325 self.len
326 .checked_add(repeated_bytes)
327 .expect("mutable buffer length overflow");
328
329 self.reserve(repeated_bytes);
331
332 let length_before = self.len;
334
335 self.extend_from_slice(slice_to_repeat);
337
338 let added_repeats_length = bytes_to_repeat;
340 assert_eq!(
341 self.len - length_before,
342 added_repeats_length,
343 "should copy exactly the same number of bytes"
344 );
345
346 let mut already_repeated_times = 1;
348
349 while already_repeated_times < repeat_count {
351 let number_of_slices_to_copy =
354 already_repeated_times.min(repeat_count - already_repeated_times);
355 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
356
357 unsafe {
358 let src = self.data.as_ptr().add(length_before) as *const u8;
360
361 let dst = self.data.as_ptr().add(self.len);
363
364 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
366 }
367
368 self.len += number_of_bytes_to_copy;
370
371 already_repeated_times += number_of_slices_to_copy;
372 }
373 }
374
375 #[cold]
376 fn reallocate(&mut self, capacity: usize) {
377 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
378 if new_layout.size() == 0 {
379 if self.layout.size() != 0 {
380 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
382 self.layout = new_layout
383 }
384 return;
385 }
386
387 let data = match self.layout.size() {
388 0 => unsafe { std::alloc::alloc(new_layout) },
390 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
392 };
393 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
394 self.layout = new_layout;
395 #[cfg(feature = "pool")]
396 {
397 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
398 reservation.resize(self.layout.size());
399 }
400 }
401 }
402
403 #[inline(always)]
407 pub fn truncate(&mut self, len: usize) {
408 if len > self.len {
409 return;
410 }
411 self.len = len;
412 #[cfg(feature = "pool")]
413 {
414 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
415 reservation.resize(self.len);
416 }
417 }
418 }
419
420 #[inline(always)]
437 pub fn resize(&mut self, new_len: usize, value: u8) {
438 if new_len > self.len {
439 let diff = new_len - self.len;
440 self.reserve(diff);
441 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
443 }
444 self.len = new_len;
446 #[cfg(feature = "pool")]
447 {
448 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
449 reservation.resize(self.len);
450 }
451 }
452 }
453
454 pub fn shrink_to_fit(&mut self) {
475 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
476 if new_capacity < self.layout.size() {
477 self.reallocate(new_capacity)
478 }
479 }
480
481 #[inline]
483 pub const fn is_empty(&self) -> bool {
484 self.len == 0
485 }
486
487 #[inline]
490 pub const fn len(&self) -> usize {
491 self.len
492 }
493
494 #[inline]
498 pub const fn capacity(&self) -> usize {
499 self.layout.size()
500 }
501
502 pub fn clear(&mut self) {
504 self.len = 0
505 }
506
507 pub fn as_slice(&self) -> &[u8] {
509 self
510 }
511
512 pub fn as_slice_mut(&mut self) -> &mut [u8] {
514 self
515 }
516
517 #[inline]
520 pub const fn as_ptr(&self) -> *const u8 {
521 self.data.as_ptr()
522 }
523
524 #[inline]
527 pub fn as_mut_ptr(&mut self) -> *mut u8 {
528 self.data.as_ptr()
529 }
530
531 #[inline]
532 pub(super) fn into_buffer(self) -> Buffer {
533 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
534 #[cfg(feature = "pool")]
535 {
536 let reservation = self.reservation.lock().unwrap().take();
537 *bytes.reservation.lock().unwrap() = reservation;
538 }
539 std::mem::forget(self);
540 Buffer::from(bytes)
541 }
542
543 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
550 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
554 assert!(prefix.is_empty() && suffix.is_empty());
555 offsets
556 }
557
558 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
565 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
569 assert!(prefix.is_empty() && suffix.is_empty());
570 offsets
571 }
572
573 #[inline]
587 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
588 let additional = mem::size_of_val(items);
589 self.reserve(additional);
590 unsafe {
591 let src = items.as_ptr() as *const u8;
595 let dst = self.data.as_ptr().add(self.len);
596 std::ptr::copy_nonoverlapping(src, dst, additional)
597 }
598 self.len += additional;
599 }
600
601 #[inline]
615 pub fn push<T: ToByteSlice>(&mut self, item: T) {
616 let additional = std::mem::size_of::<T>();
617 self.reserve(additional);
618 unsafe {
619 let src = item.to_byte_slice().as_ptr();
620 let dst = self.data.as_ptr().add(self.len);
621 std::ptr::copy_nonoverlapping(src, dst, additional);
622 }
623 self.len += additional;
624 }
625
626 #[inline]
630 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
631 let additional = std::mem::size_of::<T>();
632 let src = item.to_byte_slice().as_ptr();
633 let dst = unsafe { self.data.as_ptr().add(self.len) };
634 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
635 self.len += additional;
636 }
637
638 #[inline]
645 pub fn extend_zeros(&mut self, additional: usize) {
646 let new_len = self
647 .len
648 .checked_add(additional)
649 .expect("buffer length overflow");
650 self.resize(new_len, 0);
651 }
652
653 #[inline]
660 pub unsafe fn set_len(&mut self, len: usize) {
661 assert!(len <= self.capacity());
662 self.len = len;
663 }
664
665 #[inline]
670 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
671 let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
672
673 let chunks = len / 64;
674 let remainder = len % 64;
675 buffer.extend((0..chunks).map(|chunk| {
676 let mut packed = 0;
677 for bit_idx in 0..64 {
678 let i = bit_idx + chunk * 64;
679 packed |= (f(i) as u64) << bit_idx;
680 }
681
682 packed
683 }));
684
685 if remainder != 0 {
686 let mut packed = 0;
687 for bit_idx in 0..remainder {
688 let i = bit_idx + chunks * 64;
689 packed |= (f(i) as u64) << bit_idx;
690 }
691
692 buffer.push(packed)
693 }
694
695 let mut buffer: MutableBuffer = buffer.into();
696 buffer.truncate(bit_util::ceil(len, 8));
697 buffer
698 }
699
700 #[cfg(feature = "pool")]
707 pub fn claim(&self, pool: &dyn MemoryPool) {
708 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
709 }
710}
711
712#[inline]
716pub(crate) fn dangling_ptr() -> NonNull<u8> {
717 #[cfg(miri)]
721 {
722 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
724 }
725 #[cfg(not(miri))]
726 {
727 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
728 }
729}
730
731impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
732 #[inline]
733 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
734 let iterator = iter.into_iter();
735 self.extend_from_iter(iterator)
736 }
737}
738
739impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
740 fn from(value: Vec<T>) -> Self {
741 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
744 let len = value.len() * mem::size_of::<T>();
745 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
749 mem::forget(value);
750 Self {
751 data,
752 len,
753 layout,
754 #[cfg(feature = "pool")]
755 reservation: std::sync::Mutex::new(None),
756 }
757 }
758}
759
760impl MutableBuffer {
761 #[inline]
762 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
763 &mut self,
764 mut iterator: I,
765 ) {
766 let item_size = std::mem::size_of::<T>();
767 let (lower, _) = iterator.size_hint();
768 let additional = lower * item_size;
769 self.reserve(additional);
770
771 let mut len = SetLenOnDrop::new(&mut self.len);
773 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
774 let capacity = self.layout.size();
775
776 while len.local_len + item_size <= capacity {
777 if let Some(item) = iterator.next() {
778 unsafe {
779 let src = item.to_byte_slice().as_ptr();
780 std::ptr::copy_nonoverlapping(src, dst, item_size);
781 dst = dst.add(item_size);
782 }
783 len.local_len += item_size;
784 } else {
785 break;
786 }
787 }
788 drop(len);
789
790 iterator.for_each(|item| self.push(item));
791 }
792
793 #[inline]
818 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
819 iterator: I,
820 ) -> Self {
821 let item_size = std::mem::size_of::<T>();
822 let (_, upper) = iterator.size_hint();
823 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
824 let len = upper * item_size;
825
826 let mut buffer = MutableBuffer::new(len);
827
828 let mut dst = buffer.data.as_ptr();
829 for item in iterator {
830 let src = item.to_byte_slice().as_ptr();
832 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
833 dst = unsafe { dst.add(item_size) };
834 }
835 assert_eq!(
836 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
837 len,
838 "Trusted iterator length was not accurately reported"
839 );
840 buffer.len = len;
841 buffer
842 }
843
844 #[inline]
868 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
869 let (_, upper) = iterator.size_hint();
870 let len = upper.expect("from_trusted_len_iter requires an upper limit");
871
872 Self::collect_bool(len, |_| iterator.next().unwrap())
873 }
874
875 #[inline]
890 pub unsafe fn try_from_trusted_len_iter<
891 E,
892 T: ArrowNativeType,
893 I: Iterator<Item = Result<T, E>>,
894 >(
895 iterator: I,
896 ) -> Result<Self, E> {
897 let item_size = std::mem::size_of::<T>();
898 let (_, upper) = iterator.size_hint();
899 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
900 let len = upper * item_size;
901
902 let mut buffer = MutableBuffer::new(len);
903
904 let mut dst = buffer.data.as_ptr();
905 for item in iterator {
906 let item = item?;
907 let src = item.to_byte_slice().as_ptr();
909 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
910 dst = unsafe { dst.add(item_size) };
911 }
912 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
915 unsafe {
916 assert_eq!(
917 dst.offset_from(buffer.data.as_ptr()) as usize,
918 len,
919 "Trusted iterator length was not accurately reported"
920 );
921 buffer.len = len;
922 }
923 }
924 unsafe { finalize_buffer(dst, &mut buffer, len) };
925 Ok(buffer)
926 }
927}
928
929impl Default for MutableBuffer {
930 fn default() -> Self {
931 Self::with_capacity(0)
932 }
933}
934
935impl std::ops::Deref for MutableBuffer {
936 type Target = [u8];
937
938 fn deref(&self) -> &[u8] {
939 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
940 }
941}
942
943impl std::ops::DerefMut for MutableBuffer {
944 fn deref_mut(&mut self) -> &mut [u8] {
945 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
946 }
947}
948
949impl AsRef<[u8]> for &MutableBuffer {
950 fn as_ref(&self) -> &[u8] {
951 self.as_slice()
952 }
953}
954
955impl Drop for MutableBuffer {
956 fn drop(&mut self) {
957 if self.layout.size() != 0 {
958 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
960 }
961 }
962}
963
964impl PartialEq for MutableBuffer {
965 fn eq(&self, other: &MutableBuffer) -> bool {
966 if self.len != other.len {
967 return false;
968 }
969 if self.layout != other.layout {
970 return false;
971 }
972 self.as_slice() == other.as_slice()
973 }
974}
975
976unsafe impl Sync for MutableBuffer {}
977unsafe impl Send for MutableBuffer {}
978
979struct SetLenOnDrop<'a> {
980 len: &'a mut usize,
981 local_len: usize,
982}
983
984impl<'a> SetLenOnDrop<'a> {
985 #[inline]
986 fn new(len: &'a mut usize) -> Self {
987 SetLenOnDrop {
988 local_len: *len,
989 len,
990 }
991 }
992}
993
994impl Drop for SetLenOnDrop<'_> {
995 #[inline]
996 fn drop(&mut self) {
997 *self.len = self.local_len;
998 }
999}
1000
1001impl std::iter::FromIterator<bool> for MutableBuffer {
1003 fn from_iter<I>(iter: I) -> Self
1004 where
1005 I: IntoIterator<Item = bool>,
1006 {
1007 let mut iterator = iter.into_iter();
1008 let mut result = {
1009 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
1010 MutableBuffer::new(byte_capacity)
1011 };
1012
1013 loop {
1014 let mut exhausted = false;
1015 let mut byte_accum: u8 = 0;
1016 let mut mask: u8 = 1;
1017
1018 while mask != 0 {
1020 if let Some(value) = iterator.next() {
1021 byte_accum |= match value {
1022 true => mask,
1023 false => 0,
1024 };
1025 mask <<= 1;
1026 } else {
1027 exhausted = true;
1028 break;
1029 }
1030 }
1031
1032 if exhausted && mask == 1 {
1034 break;
1035 }
1036
1037 if result.len() == result.capacity() {
1039 let additional_byte_capacity = 1usize.saturating_add(
1041 iterator.size_hint().0.saturating_add(7) / 8, );
1043 result.reserve(additional_byte_capacity)
1044 }
1045
1046 unsafe { result.push_unchecked(byte_accum) };
1048 if exhausted {
1049 break;
1050 }
1051 }
1052 result
1053 }
1054}
1055
1056impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
1057 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1058 let mut buffer = Self::default();
1059 buffer.extend_from_iter(iter.into_iter());
1060 buffer
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067
1068 #[test]
1069 fn test_mutable_new() {
1070 let buf = MutableBuffer::new(63);
1071 assert_eq!(64, buf.capacity());
1072 assert_eq!(0, buf.len());
1073 assert!(buf.is_empty());
1074 }
1075
1076 #[test]
1077 fn test_mutable_default() {
1078 let buf = MutableBuffer::default();
1079 assert_eq!(0, buf.capacity());
1080 assert_eq!(0, buf.len());
1081 assert!(buf.is_empty());
1082
1083 let mut buf = MutableBuffer::default();
1084 buf.extend_from_slice(b"hello");
1085 assert_eq!(5, buf.len());
1086 assert_eq!(b"hello", buf.as_slice());
1087 }
1088
1089 #[test]
1090 fn test_mutable_extend_from_slice() {
1091 let mut buf = MutableBuffer::new(100);
1092 buf.extend_from_slice(b"hello");
1093 assert_eq!(5, buf.len());
1094 assert_eq!(b"hello", buf.as_slice());
1095
1096 buf.extend_from_slice(b" world");
1097 assert_eq!(11, buf.len());
1098 assert_eq!(b"hello world", buf.as_slice());
1099
1100 buf.clear();
1101 assert_eq!(0, buf.len());
1102 buf.extend_from_slice(b"hello arrow");
1103 assert_eq!(11, buf.len());
1104 assert_eq!(b"hello arrow", buf.as_slice());
1105 }
1106
1107 #[test]
1108 fn mutable_extend_from_iter() {
1109 let mut buf = MutableBuffer::new(0);
1110 buf.extend(vec![1u32, 2]);
1111 assert_eq!(8, buf.len());
1112 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1113
1114 buf.extend(vec![3u32, 4]);
1115 assert_eq!(16, buf.len());
1116 assert_eq!(
1117 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1118 buf.as_slice()
1119 );
1120 }
1121
1122 #[test]
1123 fn mutable_extend_from_iter_unaligned_u64() {
1124 let mut buf = MutableBuffer::new(16);
1125 buf.push(1_u8);
1126 buf.extend([1_u64]);
1127 assert_eq!(9, buf.len());
1128 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1129 }
1130
1131 #[test]
1132 fn mutable_extend_from_slice_unaligned_u64() {
1133 let mut buf = MutableBuffer::new(16);
1134 buf.extend_from_slice(&[1_u8]);
1135 buf.extend_from_slice(&[1_u64]);
1136 assert_eq!(9, buf.len());
1137 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1138 }
1139
1140 #[test]
1141 fn mutable_push_unaligned_u64() {
1142 let mut buf = MutableBuffer::new(16);
1143 buf.push(1_u8);
1144 buf.push(1_u64);
1145 assert_eq!(9, buf.len());
1146 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1147 }
1148
1149 #[test]
1150 fn mutable_push_unchecked_unaligned_u64() {
1151 let mut buf = MutableBuffer::new(16);
1152 unsafe {
1153 buf.push_unchecked(1_u8);
1154 buf.push_unchecked(1_u64);
1155 }
1156 assert_eq!(9, buf.len());
1157 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1158 }
1159
1160 #[test]
1161 fn test_from_trusted_len_iter() {
1162 let iter = vec![1u32, 2].into_iter();
1163 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1164 assert_eq!(8, buf.len());
1165 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1166 }
1167
1168 #[test]
1169 fn test_mutable_reserve() {
1170 let mut buf = MutableBuffer::new(1);
1171 assert_eq!(64, buf.capacity());
1172
1173 buf.reserve(10);
1175 assert_eq!(64, buf.capacity());
1176
1177 buf.reserve(80);
1178 assert_eq!(128, buf.capacity());
1179
1180 buf.reserve(129);
1181 assert_eq!(256, buf.capacity());
1182 }
1183
1184 #[test]
1185 fn test_mutable_resize() {
1186 let mut buf = MutableBuffer::new(1);
1187 assert_eq!(64, buf.capacity());
1188 assert_eq!(0, buf.len());
1189
1190 buf.resize(20, 0);
1191 assert_eq!(64, buf.capacity());
1192 assert_eq!(20, buf.len());
1193
1194 buf.resize(10, 0);
1195 assert_eq!(64, buf.capacity());
1196 assert_eq!(10, buf.len());
1197
1198 buf.resize(100, 0);
1199 assert_eq!(128, buf.capacity());
1200 assert_eq!(100, buf.len());
1201
1202 buf.resize(30, 0);
1203 assert_eq!(128, buf.capacity());
1204 assert_eq!(30, buf.len());
1205
1206 buf.resize(0, 0);
1207 assert_eq!(128, buf.capacity());
1208 assert_eq!(0, buf.len());
1209 }
1210
1211 #[test]
1212 fn test_mutable_into() {
1213 let mut buf = MutableBuffer::new(1);
1214 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1215 assert_eq!(19, buf.len());
1216 assert_eq!(64, buf.capacity());
1217 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1218
1219 let immutable_buf: Buffer = buf.into();
1220 assert_eq!(19, immutable_buf.len());
1221 assert_eq!(64, immutable_buf.capacity());
1222 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1223 }
1224
1225 #[test]
1226 fn test_mutable_equal() {
1227 let mut buf = MutableBuffer::new(1);
1228 let mut buf2 = MutableBuffer::new(1);
1229
1230 buf.extend_from_slice(&[0xaa]);
1231 buf2.extend_from_slice(&[0xaa, 0xbb]);
1232 assert!(buf != buf2);
1233
1234 buf.extend_from_slice(&[0xbb]);
1235 assert_eq!(buf, buf2);
1236
1237 buf2.reserve(65);
1238 assert!(buf != buf2);
1239 }
1240
1241 #[test]
1242 fn test_mutable_shrink_to_fit() {
1243 let mut buffer = MutableBuffer::new(128);
1244 assert_eq!(buffer.capacity(), 128);
1245 buffer.push(1);
1246 buffer.push(2);
1247
1248 buffer.shrink_to_fit();
1249 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1250 }
1251
1252 #[test]
1253 fn test_mutable_set_null_bits() {
1254 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1255
1256 for i in 0..=buffer.capacity() {
1257 buffer.set_null_bits(i, 0);
1258 assert_eq!(buffer[..8], [255; 8][..]);
1259 }
1260
1261 buffer.set_null_bits(1, 4);
1262 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1263 }
1264
1265 #[test]
1266 #[should_panic = "out of bounds for buffer of length"]
1267 fn test_mutable_set_null_bits_oob() {
1268 let mut buffer = MutableBuffer::new(64);
1269 buffer.set_null_bits(1, buffer.capacity());
1270 }
1271
1272 #[test]
1273 #[should_panic = "out of bounds for buffer of length"]
1274 fn test_mutable_set_null_bits_oob_by_overflow() {
1275 let mut buffer = MutableBuffer::new(0);
1276 buffer.set_null_bits(1, usize::MAX);
1277 }
1278
1279 #[test]
1280 fn from_iter() {
1281 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1282 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1283 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1284 }
1285
1286 #[test]
1287 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1288 fn test_with_capacity_panics_above_max_capacity() {
1289 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1290 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1291 }
1292
1293 #[cfg(feature = "pool")]
1294 mod pool_tests {
1295 use super::*;
1296 use crate::pool::{MemoryPool, TrackingMemoryPool};
1297
1298 #[test]
1299 fn test_reallocate_with_pool() {
1300 let pool = TrackingMemoryPool::default();
1301 let mut buffer = MutableBuffer::with_capacity(100);
1302 buffer.claim(&pool);
1303
1304 assert_eq!(buffer.capacity(), 128);
1306 assert_eq!(pool.used(), 128);
1307
1308 buffer.reallocate(200);
1310
1311 assert_eq!(buffer.capacity(), 200);
1313 assert_eq!(pool.used(), 200);
1314
1315 buffer.reallocate(50);
1317
1318 assert_eq!(buffer.capacity(), 50);
1320 assert_eq!(pool.used(), 50);
1321 }
1322
1323 #[test]
1324 fn test_truncate_with_pool() {
1325 let pool = TrackingMemoryPool::default();
1326 let mut buffer = MutableBuffer::with_capacity(100);
1327
1328 buffer.resize(80, 1);
1330 assert_eq!(buffer.len(), 80);
1331
1332 buffer.claim(&pool);
1333 assert_eq!(pool.used(), 128);
1334
1335 buffer.truncate(40);
1337 assert_eq!(buffer.len(), 40);
1338 assert_eq!(pool.used(), 40);
1339
1340 buffer.truncate(0);
1342 assert_eq!(buffer.len(), 0);
1343 assert_eq!(pool.used(), 0);
1344 }
1345
1346 #[test]
1347 fn test_resize_with_pool() {
1348 let pool = TrackingMemoryPool::default();
1349 let mut buffer = MutableBuffer::with_capacity(100);
1350 buffer.claim(&pool);
1351
1352 assert_eq!(buffer.len(), 0);
1354 assert_eq!(pool.used(), 128);
1355
1356 buffer.resize(50, 1);
1358 assert_eq!(buffer.len(), 50);
1359 assert_eq!(pool.used(), 50);
1360
1361 buffer.resize(150, 1);
1363 assert_eq!(buffer.len(), 150);
1364 assert_eq!(buffer.capacity(), 256);
1365 assert_eq!(pool.used(), 150);
1366
1367 buffer.resize(30, 1);
1369 assert_eq!(buffer.len(), 30);
1370 assert_eq!(pool.used(), 30);
1371 }
1372
1373 #[test]
1374 fn test_buffer_lifecycle_with_pool() {
1375 let pool = TrackingMemoryPool::default();
1376
1377 let mut mutable = MutableBuffer::with_capacity(100);
1379 mutable.resize(80, 1);
1380 mutable.claim(&pool);
1381
1382 assert_eq!(pool.used(), 128);
1384
1385 let buffer = mutable.into_buffer();
1387
1388 assert_eq!(pool.used(), 128);
1390
1391 drop(buffer);
1393 assert_eq!(pool.used(), 0);
1394 }
1395 }
1396
1397 fn create_expected_repeated_slice<T: ArrowNativeType>(
1398 slice_to_repeat: &[T],
1399 repeat_count: usize,
1400 ) -> Buffer {
1401 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1402 for _ in 0..repeat_count {
1403 expected.extend_from_slice(slice_to_repeat);
1405 }
1406 expected.into()
1407 }
1408
1409 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1411 repeat_count: usize,
1412 test_data: &[T],
1413 ) {
1414 let mut buffer = MutableBuffer::new(0);
1415 buffer.repeat_slice_n_times(test_data, repeat_count);
1416
1417 let expected = create_expected_repeated_slice(test_data, repeat_count);
1418 let result: Buffer = buffer.into();
1419
1420 assert_eq!(
1421 result,
1422 expected,
1423 "Failed for repeat_count={}, slice_len={}",
1424 repeat_count,
1425 test_data.len()
1426 );
1427 }
1428
1429 #[test]
1430 fn test_repeat_slice_count_edge_cases() {
1431 test_repeat_count(100, &[] as &[i32]);
1433
1434 test_repeat_count(0, &[1i32, 2, 3]);
1436 }
1437
1438 #[test]
1439 #[should_panic(expected = "repeated slice byte length overflow")]
1440 fn test_repeat_slice_count_multiply_overflow() {
1441 let mut buffer = MutableBuffer::new(0);
1442 buffer.repeat_slice_n_times(&[0_u64], usize::MAX / mem::size_of::<u64>() + 1);
1443 }
1444
1445 #[test]
1446 #[should_panic(expected = "mutable buffer length overflow")]
1447 fn test_repeat_slice_count_len_overflow() {
1448 let mut buffer = MutableBuffer::new(0);
1449 buffer.push(0_u8);
1450 buffer.repeat_slice_n_times(&[0_u8], usize::MAX);
1451 }
1452
1453 #[test]
1454 fn test_small_repeats_counts() {
1455 let data = &[1u8, 2, 3, 4, 5];
1457
1458 for _ in 1..=10 {
1459 test_repeat_count(2, data);
1460 }
1461 }
1462
1463 #[test]
1464 fn test_different_size_of_i32_repeat_slice() {
1465 let data: &[i32] = &[1, 2, 3];
1466 let data_with_single_item: &[i32] = &[42];
1467
1468 for data in &[data, data_with_single_item] {
1469 for item in 1..=9 {
1470 let base_repeat_count = 2_usize.pow(item);
1471 test_repeat_count(base_repeat_count - 1, data);
1472 test_repeat_count(base_repeat_count, data);
1473 test_repeat_count(base_repeat_count + 1, data);
1474 }
1475 }
1476 }
1477
1478 #[test]
1479 fn test_different_size_of_u8_repeat_slice() {
1480 let data: &[u8] = &[1, 2, 3];
1481 let data_with_single_item: &[u8] = &[10];
1482
1483 for data in &[data, data_with_single_item] {
1484 for item in 1..=9 {
1485 let base_repeat_count = 2_usize.pow(item);
1486 test_repeat_count(base_repeat_count - 1, data);
1487 test_repeat_count(base_repeat_count, data);
1488 test_repeat_count(base_repeat_count + 1, data);
1489 }
1490 }
1491 }
1492
1493 #[test]
1494 fn test_different_size_of_u16_repeat_slice() {
1495 let data: &[u16] = &[1, 2, 3];
1496 let data_with_single_item: &[u16] = &[10];
1497
1498 for data in &[data, data_with_single_item] {
1499 for item in 1..=9 {
1500 let base_repeat_count = 2_usize.pow(item);
1501 test_repeat_count(base_repeat_count - 1, data);
1502 test_repeat_count(base_repeat_count, data);
1503 test_repeat_count(base_repeat_count + 1, data);
1504 }
1505 }
1506 }
1507
1508 #[test]
1509 fn test_various_slice_lengths() {
1510 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1515
1516 test_repeat_count(repeat_count, &[1i32, 2]);
1518 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1519 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1520 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1521
1522 let data_10: Vec<i32> = (0..10).collect();
1524 test_repeat_count(repeat_count, &data_10);
1525
1526 let data_100: Vec<i32> = (0..100).collect();
1527 test_repeat_count(repeat_count, &data_100);
1528
1529 let data_1000: Vec<i32> = (0..1000).collect();
1530 test_repeat_count(repeat_count, &data_1000);
1531 }
1532}