1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
99pub struct MutableBuffer {
100 data: NonNull<u8>,
102 len: usize,
104 layout: Layout,
105
106 #[cfg(feature = "pool")]
108 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112 #[inline]
116 pub fn new(capacity: usize) -> Self {
117 Self::with_capacity(capacity)
118 }
119
120 #[inline]
127 pub fn with_capacity(capacity: usize) -> Self {
128 let capacity = bit_util::round_upto_multiple_of_64(capacity);
129 let layout = Layout::from_size_align(capacity, ALIGNMENT)
130 .expect("failed to create layout for MutableBuffer");
131 let data = match layout.size() {
132 0 => dangling_ptr(),
133 _ => {
134 let raw_ptr = unsafe { std::alloc::alloc(layout) };
136 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
137 }
138 };
139 Self {
140 data,
141 len: 0,
142 layout,
143 #[cfg(feature = "pool")]
144 reservation: std::sync::Mutex::new(None),
145 }
146 }
147
148 pub fn from_len_zeroed(len: usize) -> Self {
160 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
161 let data = match layout.size() {
162 0 => dangling_ptr(),
163 _ => {
164 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
166 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
167 }
168 };
169 Self {
170 data,
171 len,
172 layout,
173 #[cfg(feature = "pool")]
174 reservation: std::sync::Mutex::new(None),
175 }
176 }
177
178 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
180 let layout = match bytes.deallocation() {
181 Deallocation::Standard(layout) => *layout,
182 _ => return Err(bytes),
183 };
184
185 let len = bytes.len();
186 let data = bytes.ptr();
187 #[cfg(feature = "pool")]
188 let reservation = bytes.reservation.lock().unwrap().take();
189 mem::forget(bytes);
190
191 Ok(Self {
192 data,
193 len,
194 layout,
195 #[cfg(feature = "pool")]
196 reservation: Mutex::new(reservation),
197 })
198 }
199
200 pub fn new_null(len: usize) -> Self {
203 let num_bytes = bit_util::ceil(len, 8);
204 MutableBuffer::from_len_zeroed(num_bytes)
205 }
206
207 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
214 assert!(end <= self.layout.size());
215 let v = if val { 255 } else { 0 };
216 unsafe {
217 std::ptr::write_bytes(self.data.as_ptr(), v, end);
218 self.len = end;
219 }
220 self
221 }
222
223 pub fn set_null_bits(&mut self, start: usize, count: usize) {
229 assert!(
230 start.saturating_add(count) <= self.layout.size(),
231 "range start index {start} and count {count} out of bounds for \
232 buffer of length {}",
233 self.layout.size(),
234 );
235
236 unsafe {
238 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
239 }
240 }
241
242 #[inline(always)]
256 pub fn reserve(&mut self, additional: usize) {
257 let required_cap = self.len + additional;
258 if required_cap > self.layout.size() {
259 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
260 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
261 self.reallocate(new_capacity)
262 }
263 }
264
265 pub fn repeat_slice_n_times<T: ArrowNativeType>(
278 &mut self,
279 slice_to_repeat: &[T],
280 repeat_count: usize,
281 ) {
282 if repeat_count == 0 || slice_to_repeat.is_empty() {
283 return;
284 }
285
286 let bytes_to_repeat = size_of_val(slice_to_repeat);
287
288 self.reserve(repeat_count * bytes_to_repeat);
290
291 let length_before = self.len;
293
294 self.extend_from_slice(slice_to_repeat);
296
297 let added_repeats_length = bytes_to_repeat;
299 assert_eq!(
300 self.len - length_before,
301 added_repeats_length,
302 "should copy exactly the same number of bytes"
303 );
304
305 let mut already_repeated_times = 1;
307
308 while already_repeated_times < repeat_count {
310 let number_of_slices_to_copy =
313 already_repeated_times.min(repeat_count - already_repeated_times);
314 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
315
316 unsafe {
317 let src = self.data.as_ptr().add(length_before) as *const u8;
319
320 let dst = self.data.as_ptr().add(self.len);
322
323 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
325 }
326
327 self.len += number_of_bytes_to_copy;
329
330 already_repeated_times += number_of_slices_to_copy;
331 }
332 }
333
334 #[cold]
335 fn reallocate(&mut self, capacity: usize) {
336 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
337 if new_layout.size() == 0 {
338 if self.layout.size() != 0 {
339 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
341 self.layout = new_layout
342 }
343 return;
344 }
345
346 let data = match self.layout.size() {
347 0 => unsafe { std::alloc::alloc(new_layout) },
349 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
351 };
352 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
353 self.layout = new_layout;
354 #[cfg(feature = "pool")]
355 {
356 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
357 reservation.resize(self.layout.size());
358 }
359 }
360 }
361
362 #[inline(always)]
366 pub fn truncate(&mut self, len: usize) {
367 if len > self.len {
368 return;
369 }
370 self.len = len;
371 #[cfg(feature = "pool")]
372 {
373 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
374 reservation.resize(self.len);
375 }
376 }
377 }
378
379 #[inline(always)]
391 pub fn resize(&mut self, new_len: usize, value: u8) {
392 if new_len > self.len {
393 let diff = new_len - self.len;
394 self.reserve(diff);
395 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
397 }
398 self.len = new_len;
400 #[cfg(feature = "pool")]
401 {
402 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
403 reservation.resize(self.len);
404 }
405 }
406 }
407
408 pub fn shrink_to_fit(&mut self) {
424 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
425 if new_capacity < self.layout.size() {
426 self.reallocate(new_capacity)
427 }
428 }
429
430 #[inline]
432 pub const fn is_empty(&self) -> bool {
433 self.len == 0
434 }
435
436 #[inline]
439 pub const fn len(&self) -> usize {
440 self.len
441 }
442
443 #[inline]
447 pub const fn capacity(&self) -> usize {
448 self.layout.size()
449 }
450
451 pub fn clear(&mut self) {
453 self.len = 0
454 }
455
456 pub fn as_slice(&self) -> &[u8] {
458 self
459 }
460
461 pub fn as_slice_mut(&mut self) -> &mut [u8] {
463 self
464 }
465
466 #[inline]
469 pub const fn as_ptr(&self) -> *const u8 {
470 self.data.as_ptr()
471 }
472
473 #[inline]
476 pub fn as_mut_ptr(&mut self) -> *mut u8 {
477 self.data.as_ptr()
478 }
479
480 #[inline]
481 pub(super) fn into_buffer(self) -> Buffer {
482 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
483 #[cfg(feature = "pool")]
484 {
485 let reservation = self.reservation.lock().unwrap().take();
486 *bytes.reservation.lock().unwrap() = reservation;
487 }
488 std::mem::forget(self);
489 Buffer::from(bytes)
490 }
491
492 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
499 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
503 assert!(prefix.is_empty() && suffix.is_empty());
504 offsets
505 }
506
507 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
514 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
518 assert!(prefix.is_empty() && suffix.is_empty());
519 offsets
520 }
521
522 #[inline]
531 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
532 let additional = mem::size_of_val(items);
533 self.reserve(additional);
534 unsafe {
535 let src = items.as_ptr() as *const u8;
539 let dst = self.data.as_ptr().add(self.len);
540 std::ptr::copy_nonoverlapping(src, dst, additional)
541 }
542 self.len += additional;
543 }
544
545 #[inline]
554 pub fn push<T: ToByteSlice>(&mut self, item: T) {
555 let additional = std::mem::size_of::<T>();
556 self.reserve(additional);
557 unsafe {
558 let src = item.to_byte_slice().as_ptr();
559 let dst = self.data.as_ptr().add(self.len);
560 std::ptr::copy_nonoverlapping(src, dst, additional);
561 }
562 self.len += additional;
563 }
564
565 #[inline]
569 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
570 let additional = std::mem::size_of::<T>();
571 let src = item.to_byte_slice().as_ptr();
572 let dst = unsafe { self.data.as_ptr().add(self.len) };
573 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
574 self.len += additional;
575 }
576
577 #[inline]
579 pub fn extend_zeros(&mut self, additional: usize) {
580 self.resize(self.len + additional, 0);
581 }
582
583 #[inline]
586 pub unsafe fn set_len(&mut self, len: usize) {
587 assert!(len <= self.capacity());
588 self.len = len;
589 }
590
591 #[inline]
596 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
597 let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
598
599 let chunks = len / 64;
600 let remainder = len % 64;
601 buffer.extend((0..chunks).map(|chunk| {
602 let mut packed = 0;
603 for bit_idx in 0..64 {
604 let i = bit_idx + chunk * 64;
605 packed |= (f(i) as u64) << bit_idx;
606 }
607
608 packed
609 }));
610
611 if remainder != 0 {
612 let mut packed = 0;
613 for bit_idx in 0..remainder {
614 let i = bit_idx + chunks * 64;
615 packed |= (f(i) as u64) << bit_idx;
616 }
617
618 buffer.push(packed)
619 }
620
621 let mut buffer: MutableBuffer = buffer.into();
622 buffer.truncate(bit_util::ceil(len, 8));
623 buffer
624 }
625
626 #[cfg(feature = "pool")]
633 pub fn claim(&self, pool: &dyn MemoryPool) {
634 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
635 }
636}
637
638#[inline]
642pub(crate) fn dangling_ptr() -> NonNull<u8> {
643 #[cfg(miri)]
647 {
648 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
650 }
651 #[cfg(not(miri))]
652 {
653 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
654 }
655}
656
657impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
658 #[inline]
659 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
660 let iterator = iter.into_iter();
661 self.extend_from_iter(iterator)
662 }
663}
664
665impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
666 fn from(value: Vec<T>) -> Self {
667 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
670 let len = value.len() * mem::size_of::<T>();
671 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
675 mem::forget(value);
676 Self {
677 data,
678 len,
679 layout,
680 #[cfg(feature = "pool")]
681 reservation: std::sync::Mutex::new(None),
682 }
683 }
684}
685
686impl MutableBuffer {
687 #[inline]
688 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
689 &mut self,
690 mut iterator: I,
691 ) {
692 let item_size = std::mem::size_of::<T>();
693 let (lower, _) = iterator.size_hint();
694 let additional = lower * item_size;
695 self.reserve(additional);
696
697 let mut len = SetLenOnDrop::new(&mut self.len);
699 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
700 let capacity = self.layout.size();
701
702 while len.local_len + item_size <= capacity {
703 if let Some(item) = iterator.next() {
704 unsafe {
705 let src = item.to_byte_slice().as_ptr();
706 std::ptr::copy_nonoverlapping(src, dst, item_size);
707 dst = dst.add(item_size);
708 }
709 len.local_len += item_size;
710 } else {
711 break;
712 }
713 }
714 drop(len);
715
716 iterator.for_each(|item| self.push(item));
717 }
718
719 #[inline]
737 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
738 iterator: I,
739 ) -> Self {
740 let item_size = std::mem::size_of::<T>();
741 let (_, upper) = iterator.size_hint();
742 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
743 let len = upper * item_size;
744
745 let mut buffer = MutableBuffer::new(len);
746
747 let mut dst = buffer.data.as_ptr();
748 for item in iterator {
749 let src = item.to_byte_slice().as_ptr();
751 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
752 dst = unsafe { dst.add(item_size) };
753 }
754 assert_eq!(
755 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
756 len,
757 "Trusted iterator length was not accurately reported"
758 );
759 buffer.len = len;
760 buffer
761 }
762
763 #[inline]
781 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
782 let (_, upper) = iterator.size_hint();
783 let len = upper.expect("from_trusted_len_iter requires an upper limit");
784
785 Self::collect_bool(len, |_| iterator.next().unwrap())
786 }
787
788 #[inline]
795 pub unsafe fn try_from_trusted_len_iter<
796 E,
797 T: ArrowNativeType,
798 I: Iterator<Item = Result<T, E>>,
799 >(
800 iterator: I,
801 ) -> Result<Self, E> {
802 let item_size = std::mem::size_of::<T>();
803 let (_, upper) = iterator.size_hint();
804 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
805 let len = upper * item_size;
806
807 let mut buffer = MutableBuffer::new(len);
808
809 let mut dst = buffer.data.as_ptr();
810 for item in iterator {
811 let item = item?;
812 let src = item.to_byte_slice().as_ptr();
814 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
815 dst = unsafe { dst.add(item_size) };
816 }
817 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
820 unsafe {
821 assert_eq!(
822 dst.offset_from(buffer.data.as_ptr()) as usize,
823 len,
824 "Trusted iterator length was not accurately reported"
825 );
826 buffer.len = len;
827 }
828 }
829 unsafe { finalize_buffer(dst, &mut buffer, len) };
830 Ok(buffer)
831 }
832}
833
834impl Default for MutableBuffer {
835 fn default() -> Self {
836 Self::with_capacity(0)
837 }
838}
839
840impl std::ops::Deref for MutableBuffer {
841 type Target = [u8];
842
843 fn deref(&self) -> &[u8] {
844 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
845 }
846}
847
848impl std::ops::DerefMut for MutableBuffer {
849 fn deref_mut(&mut self) -> &mut [u8] {
850 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
851 }
852}
853
854impl AsRef<[u8]> for &MutableBuffer {
855 fn as_ref(&self) -> &[u8] {
856 self.as_slice()
857 }
858}
859
860impl Drop for MutableBuffer {
861 fn drop(&mut self) {
862 if self.layout.size() != 0 {
863 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
865 }
866 }
867}
868
869impl PartialEq for MutableBuffer {
870 fn eq(&self, other: &MutableBuffer) -> bool {
871 if self.len != other.len {
872 return false;
873 }
874 if self.layout != other.layout {
875 return false;
876 }
877 self.as_slice() == other.as_slice()
878 }
879}
880
881unsafe impl Sync for MutableBuffer {}
882unsafe impl Send for MutableBuffer {}
883
884struct SetLenOnDrop<'a> {
885 len: &'a mut usize,
886 local_len: usize,
887}
888
889impl<'a> SetLenOnDrop<'a> {
890 #[inline]
891 fn new(len: &'a mut usize) -> Self {
892 SetLenOnDrop {
893 local_len: *len,
894 len,
895 }
896 }
897}
898
899impl Drop for SetLenOnDrop<'_> {
900 #[inline]
901 fn drop(&mut self) {
902 *self.len = self.local_len;
903 }
904}
905
906impl std::iter::FromIterator<bool> for MutableBuffer {
908 fn from_iter<I>(iter: I) -> Self
909 where
910 I: IntoIterator<Item = bool>,
911 {
912 let mut iterator = iter.into_iter();
913 let mut result = {
914 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
915 MutableBuffer::new(byte_capacity)
916 };
917
918 loop {
919 let mut exhausted = false;
920 let mut byte_accum: u8 = 0;
921 let mut mask: u8 = 1;
922
923 while mask != 0 {
925 if let Some(value) = iterator.next() {
926 byte_accum |= match value {
927 true => mask,
928 false => 0,
929 };
930 mask <<= 1;
931 } else {
932 exhausted = true;
933 break;
934 }
935 }
936
937 if exhausted && mask == 1 {
939 break;
940 }
941
942 if result.len() == result.capacity() {
944 let additional_byte_capacity = 1usize.saturating_add(
946 iterator.size_hint().0.saturating_add(7) / 8, );
948 result.reserve(additional_byte_capacity)
949 }
950
951 unsafe { result.push_unchecked(byte_accum) };
953 if exhausted {
954 break;
955 }
956 }
957 result
958 }
959}
960
961impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
962 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
963 let mut buffer = Self::default();
964 buffer.extend_from_iter(iter.into_iter());
965 buffer
966 }
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972
973 #[test]
974 fn test_mutable_new() {
975 let buf = MutableBuffer::new(63);
976 assert_eq!(64, buf.capacity());
977 assert_eq!(0, buf.len());
978 assert!(buf.is_empty());
979 }
980
981 #[test]
982 fn test_mutable_default() {
983 let buf = MutableBuffer::default();
984 assert_eq!(0, buf.capacity());
985 assert_eq!(0, buf.len());
986 assert!(buf.is_empty());
987
988 let mut buf = MutableBuffer::default();
989 buf.extend_from_slice(b"hello");
990 assert_eq!(5, buf.len());
991 assert_eq!(b"hello", buf.as_slice());
992 }
993
994 #[test]
995 fn test_mutable_extend_from_slice() {
996 let mut buf = MutableBuffer::new(100);
997 buf.extend_from_slice(b"hello");
998 assert_eq!(5, buf.len());
999 assert_eq!(b"hello", buf.as_slice());
1000
1001 buf.extend_from_slice(b" world");
1002 assert_eq!(11, buf.len());
1003 assert_eq!(b"hello world", buf.as_slice());
1004
1005 buf.clear();
1006 assert_eq!(0, buf.len());
1007 buf.extend_from_slice(b"hello arrow");
1008 assert_eq!(11, buf.len());
1009 assert_eq!(b"hello arrow", buf.as_slice());
1010 }
1011
1012 #[test]
1013 fn mutable_extend_from_iter() {
1014 let mut buf = MutableBuffer::new(0);
1015 buf.extend(vec![1u32, 2]);
1016 assert_eq!(8, buf.len());
1017 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1018
1019 buf.extend(vec![3u32, 4]);
1020 assert_eq!(16, buf.len());
1021 assert_eq!(
1022 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1023 buf.as_slice()
1024 );
1025 }
1026
1027 #[test]
1028 fn mutable_extend_from_iter_unaligned_u64() {
1029 let mut buf = MutableBuffer::new(16);
1030 buf.push(1_u8);
1031 buf.extend([1_u64]);
1032 assert_eq!(9, buf.len());
1033 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1034 }
1035
1036 #[test]
1037 fn mutable_extend_from_slice_unaligned_u64() {
1038 let mut buf = MutableBuffer::new(16);
1039 buf.extend_from_slice(&[1_u8]);
1040 buf.extend_from_slice(&[1_u64]);
1041 assert_eq!(9, buf.len());
1042 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1043 }
1044
1045 #[test]
1046 fn mutable_push_unaligned_u64() {
1047 let mut buf = MutableBuffer::new(16);
1048 buf.push(1_u8);
1049 buf.push(1_u64);
1050 assert_eq!(9, buf.len());
1051 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1052 }
1053
1054 #[test]
1055 fn mutable_push_unchecked_unaligned_u64() {
1056 let mut buf = MutableBuffer::new(16);
1057 unsafe {
1058 buf.push_unchecked(1_u8);
1059 buf.push_unchecked(1_u64);
1060 }
1061 assert_eq!(9, buf.len());
1062 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1063 }
1064
1065 #[test]
1066 fn test_from_trusted_len_iter() {
1067 let iter = vec![1u32, 2].into_iter();
1068 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1069 assert_eq!(8, buf.len());
1070 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1071 }
1072
1073 #[test]
1074 fn test_mutable_reserve() {
1075 let mut buf = MutableBuffer::new(1);
1076 assert_eq!(64, buf.capacity());
1077
1078 buf.reserve(10);
1080 assert_eq!(64, buf.capacity());
1081
1082 buf.reserve(80);
1083 assert_eq!(128, buf.capacity());
1084
1085 buf.reserve(129);
1086 assert_eq!(256, buf.capacity());
1087 }
1088
1089 #[test]
1090 fn test_mutable_resize() {
1091 let mut buf = MutableBuffer::new(1);
1092 assert_eq!(64, buf.capacity());
1093 assert_eq!(0, buf.len());
1094
1095 buf.resize(20, 0);
1096 assert_eq!(64, buf.capacity());
1097 assert_eq!(20, buf.len());
1098
1099 buf.resize(10, 0);
1100 assert_eq!(64, buf.capacity());
1101 assert_eq!(10, buf.len());
1102
1103 buf.resize(100, 0);
1104 assert_eq!(128, buf.capacity());
1105 assert_eq!(100, buf.len());
1106
1107 buf.resize(30, 0);
1108 assert_eq!(128, buf.capacity());
1109 assert_eq!(30, buf.len());
1110
1111 buf.resize(0, 0);
1112 assert_eq!(128, buf.capacity());
1113 assert_eq!(0, buf.len());
1114 }
1115
1116 #[test]
1117 fn test_mutable_into() {
1118 let mut buf = MutableBuffer::new(1);
1119 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1120 assert_eq!(19, buf.len());
1121 assert_eq!(64, buf.capacity());
1122 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1123
1124 let immutable_buf: Buffer = buf.into();
1125 assert_eq!(19, immutable_buf.len());
1126 assert_eq!(64, immutable_buf.capacity());
1127 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1128 }
1129
1130 #[test]
1131 fn test_mutable_equal() {
1132 let mut buf = MutableBuffer::new(1);
1133 let mut buf2 = MutableBuffer::new(1);
1134
1135 buf.extend_from_slice(&[0xaa]);
1136 buf2.extend_from_slice(&[0xaa, 0xbb]);
1137 assert!(buf != buf2);
1138
1139 buf.extend_from_slice(&[0xbb]);
1140 assert_eq!(buf, buf2);
1141
1142 buf2.reserve(65);
1143 assert!(buf != buf2);
1144 }
1145
1146 #[test]
1147 fn test_mutable_shrink_to_fit() {
1148 let mut buffer = MutableBuffer::new(128);
1149 assert_eq!(buffer.capacity(), 128);
1150 buffer.push(1);
1151 buffer.push(2);
1152
1153 buffer.shrink_to_fit();
1154 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1155 }
1156
1157 #[test]
1158 fn test_mutable_set_null_bits() {
1159 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1160
1161 for i in 0..=buffer.capacity() {
1162 buffer.set_null_bits(i, 0);
1163 assert_eq!(buffer[..8], [255; 8][..]);
1164 }
1165
1166 buffer.set_null_bits(1, 4);
1167 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1168 }
1169
1170 #[test]
1171 #[should_panic = "out of bounds for buffer of length"]
1172 fn test_mutable_set_null_bits_oob() {
1173 let mut buffer = MutableBuffer::new(64);
1174 buffer.set_null_bits(1, buffer.capacity());
1175 }
1176
1177 #[test]
1178 #[should_panic = "out of bounds for buffer of length"]
1179 fn test_mutable_set_null_bits_oob_by_overflow() {
1180 let mut buffer = MutableBuffer::new(0);
1181 buffer.set_null_bits(1, usize::MAX);
1182 }
1183
1184 #[test]
1185 fn from_iter() {
1186 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1187 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1188 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1189 }
1190
1191 #[test]
1192 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1193 fn test_with_capacity_panics_above_max_capacity() {
1194 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1195 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1196 }
1197
1198 #[cfg(feature = "pool")]
1199 mod pool_tests {
1200 use super::*;
1201 use crate::pool::{MemoryPool, TrackingMemoryPool};
1202
1203 #[test]
1204 fn test_reallocate_with_pool() {
1205 let pool = TrackingMemoryPool::default();
1206 let mut buffer = MutableBuffer::with_capacity(100);
1207 buffer.claim(&pool);
1208
1209 assert_eq!(buffer.capacity(), 128);
1211 assert_eq!(pool.used(), 128);
1212
1213 buffer.reallocate(200);
1215
1216 assert_eq!(buffer.capacity(), 200);
1218 assert_eq!(pool.used(), 200);
1219
1220 buffer.reallocate(50);
1222
1223 assert_eq!(buffer.capacity(), 50);
1225 assert_eq!(pool.used(), 50);
1226 }
1227
1228 #[test]
1229 fn test_truncate_with_pool() {
1230 let pool = TrackingMemoryPool::default();
1231 let mut buffer = MutableBuffer::with_capacity(100);
1232
1233 buffer.resize(80, 1);
1235 assert_eq!(buffer.len(), 80);
1236
1237 buffer.claim(&pool);
1238 assert_eq!(pool.used(), 128);
1239
1240 buffer.truncate(40);
1242 assert_eq!(buffer.len(), 40);
1243 assert_eq!(pool.used(), 40);
1244
1245 buffer.truncate(0);
1247 assert_eq!(buffer.len(), 0);
1248 assert_eq!(pool.used(), 0);
1249 }
1250
1251 #[test]
1252 fn test_resize_with_pool() {
1253 let pool = TrackingMemoryPool::default();
1254 let mut buffer = MutableBuffer::with_capacity(100);
1255 buffer.claim(&pool);
1256
1257 assert_eq!(buffer.len(), 0);
1259 assert_eq!(pool.used(), 128);
1260
1261 buffer.resize(50, 1);
1263 assert_eq!(buffer.len(), 50);
1264 assert_eq!(pool.used(), 50);
1265
1266 buffer.resize(150, 1);
1268 assert_eq!(buffer.len(), 150);
1269 assert_eq!(buffer.capacity(), 256);
1270 assert_eq!(pool.used(), 150);
1271
1272 buffer.resize(30, 1);
1274 assert_eq!(buffer.len(), 30);
1275 assert_eq!(pool.used(), 30);
1276 }
1277
1278 #[test]
1279 fn test_buffer_lifecycle_with_pool() {
1280 let pool = TrackingMemoryPool::default();
1281
1282 let mut mutable = MutableBuffer::with_capacity(100);
1284 mutable.resize(80, 1);
1285 mutable.claim(&pool);
1286
1287 assert_eq!(pool.used(), 128);
1289
1290 let buffer = mutable.into_buffer();
1292
1293 assert_eq!(pool.used(), 128);
1295
1296 drop(buffer);
1298 assert_eq!(pool.used(), 0);
1299 }
1300 }
1301
1302 fn create_expected_repeated_slice<T: ArrowNativeType>(
1303 slice_to_repeat: &[T],
1304 repeat_count: usize,
1305 ) -> Buffer {
1306 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1307 for _ in 0..repeat_count {
1308 expected.extend_from_slice(slice_to_repeat);
1310 }
1311 expected.into()
1312 }
1313
1314 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1316 repeat_count: usize,
1317 test_data: &[T],
1318 ) {
1319 let mut buffer = MutableBuffer::new(0);
1320 buffer.repeat_slice_n_times(test_data, repeat_count);
1321
1322 let expected = create_expected_repeated_slice(test_data, repeat_count);
1323 let result: Buffer = buffer.into();
1324
1325 assert_eq!(
1326 result,
1327 expected,
1328 "Failed for repeat_count={}, slice_len={}",
1329 repeat_count,
1330 test_data.len()
1331 );
1332 }
1333
1334 #[test]
1335 fn test_repeat_slice_count_edge_cases() {
1336 test_repeat_count(100, &[] as &[i32]);
1338
1339 test_repeat_count(0, &[1i32, 2, 3]);
1341 }
1342
1343 #[test]
1344 fn test_small_repeats_counts() {
1345 let data = &[1u8, 2, 3, 4, 5];
1347
1348 for _ in 1..=10 {
1349 test_repeat_count(2, data);
1350 }
1351 }
1352
1353 #[test]
1354 fn test_different_size_of_i32_repeat_slice() {
1355 let data: &[i32] = &[1, 2, 3];
1356 let data_with_single_item: &[i32] = &[42];
1357
1358 for data in &[data, data_with_single_item] {
1359 for item in 1..=9 {
1360 let base_repeat_count = 2_usize.pow(item);
1361 test_repeat_count(base_repeat_count - 1, data);
1362 test_repeat_count(base_repeat_count, data);
1363 test_repeat_count(base_repeat_count + 1, data);
1364 }
1365 }
1366 }
1367
1368 #[test]
1369 fn test_different_size_of_u8_repeat_slice() {
1370 let data: &[u8] = &[1, 2, 3];
1371 let data_with_single_item: &[u8] = &[10];
1372
1373 for data in &[data, data_with_single_item] {
1374 for item in 1..=9 {
1375 let base_repeat_count = 2_usize.pow(item);
1376 test_repeat_count(base_repeat_count - 1, data);
1377 test_repeat_count(base_repeat_count, data);
1378 test_repeat_count(base_repeat_count + 1, data);
1379 }
1380 }
1381 }
1382
1383 #[test]
1384 fn test_different_size_of_u16_repeat_slice() {
1385 let data: &[u16] = &[1, 2, 3];
1386 let data_with_single_item: &[u16] = &[10];
1387
1388 for data in &[data, data_with_single_item] {
1389 for item in 1..=9 {
1390 let base_repeat_count = 2_usize.pow(item);
1391 test_repeat_count(base_repeat_count - 1, data);
1392 test_repeat_count(base_repeat_count, data);
1393 test_repeat_count(base_repeat_count + 1, data);
1394 }
1395 }
1396 }
1397
1398 #[test]
1399 fn test_various_slice_lengths() {
1400 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1405
1406 test_repeat_count(repeat_count, &[1i32, 2]);
1408 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1409 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1410 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1411
1412 let data_10: Vec<i32> = (0..10).collect();
1414 test_repeat_count(repeat_count, &data_10);
1415
1416 let data_100: Vec<i32> = (0..100).collect();
1417 test_repeat_count(repeat_count, &data_100);
1418
1419 let data_1000: Vec<i32> = (0..1000).collect();
1420 test_repeat_count(repeat_count, &data_1000);
1421 }
1422}