vortex_array/arrays/varbinview/
compact.rs1use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::IntoArray;
14use crate::LEGACY_SESSION;
15use crate::VortexSessionExecute;
16use crate::arrays::VarBinViewArray;
17use crate::arrays::varbinview::Ref;
18use crate::builders::ArrayBuilder;
19use crate::builders::VarBinViewBuilder;
20
21impl VarBinViewArray {
22 pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
31 if !self.should_compact()? {
33 return Ok(self.clone());
34 }
35
36 self.compact_with_threshold(1.0)
38 }
39
40 fn should_compact(&self) -> VortexResult<bool> {
41 let nbuffers = self.data_buffers().len();
42
43 if nbuffers == 0 {
45 return Ok(false);
46 }
47
48 if nbuffers > u16::MAX as usize {
50 return Ok(true);
51 }
52
53 let bytes_referenced: u64 = self.count_referenced_bytes()?;
54 let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
55
56 Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
59 }
60
61 #[inline(always)]
64 fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
65 where
66 F: FnMut(&Ref),
67 {
68 match self.as_ref().validity()?.to_mask(
69 self.as_ref().len(),
70 &mut LEGACY_SESSION.create_execution_ctx(),
71 )? {
72 Mask::AllTrue(_) => {
73 for &view in self.views().iter() {
74 if !view.is_inlined() {
75 f(view.as_view());
76 }
77 }
78 }
79 Mask::AllFalse(_) => {}
80 Mask::Values(v) => {
81 for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
82 if is_valid && !view.is_inlined() {
83 f(view.as_view());
84 }
85 }
86 }
87 }
88 Ok(())
89 }
90
91 fn count_referenced_bytes(&self) -> VortexResult<u64> {
94 let mut total = 0u64;
95 self.iter_valid_views(|view| total += view.size as u64)?;
96 Ok(total)
97 }
98
99 pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
100 let mut utilizations: Vec<BufferUtilization> = self
101 .data_buffers()
102 .iter()
103 .map(|buf| {
104 let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
105 BufferUtilization::zero(len)
106 })
107 .collect();
108
109 self.iter_valid_views(|view| {
110 utilizations[view.buffer_index as usize].add(view.offset, view.size);
111 })?;
112
113 Ok(utilizations)
114 }
115
116 pub fn compact_with_threshold(
131 &self,
132 buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
134 let mut builder = VarBinViewBuilder::with_compaction(
135 self.dtype().clone(),
136 self.len(),
137 buffer_utilization_threshold,
138 );
139 builder.extend_from_array(&self.clone().into_array());
140 Ok(builder.finish_into_varbinview())
141 }
142}
143
144pub(crate) struct BufferUtilization {
145 len: u32,
146 used: u32,
147 min_offset: u32,
148 max_offset_end: u32,
149}
150
151impl BufferUtilization {
152 fn zero(len: u32) -> Self {
153 BufferUtilization {
154 len,
155 used: 0u32,
156 min_offset: u32::MAX,
157 max_offset_end: 0,
158 }
159 }
160
161 fn add(&mut self, offset: u32, size: u32) {
162 self.used += size;
163 self.min_offset = self.min_offset.min(offset);
164 self.max_offset_end = self.max_offset_end.max(offset + size);
165 }
166
167 pub fn overall_utilization(&self) -> f64 {
168 match self.len {
169 0 => 0.0,
170 len => self.used as f64 / len as f64,
171 }
172 }
173
174 pub fn range_utilization(&self) -> f64 {
175 match self.range_span() {
176 0 => 0.0,
177 span => self.used as f64 / span as f64,
178 }
179 }
180
181 pub fn range(&self) -> Range<u32> {
182 self.min_offset..self.max_offset_end
183 }
184
185 fn range_span(&self) -> u32 {
186 self.max_offset_end.saturating_sub(self.min_offset)
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use rstest::rstest;
193 use vortex_buffer::buffer;
194
195 use crate::IntoArray;
196 use crate::LEGACY_SESSION;
197 use crate::VortexSessionExecute;
198 use crate::arrays::VarBinArray;
199 use crate::arrays::VarBinViewArray;
200 use crate::assert_arrays_eq;
201 use crate::dtype::DType;
202 use crate::dtype::Nullability;
203 #[test]
204 fn test_optimize_compacts_buffers() {
205 let original = VarBinViewArray::from_iter_nullable_str([
207 Some("short"),
208 Some("this is a longer string that will be stored in a buffer"),
209 Some("medium length string"),
210 Some("another very long string that definitely needs a buffer to store it"),
211 Some("tiny"),
212 ]);
213
214 assert!(!original.data_buffers().is_empty());
216 let original_buffers = original.data_buffers().len();
217
218 let indices = buffer![0u32, 4u32].into_array();
220 let taken = original.take(indices).unwrap();
221 let taken = taken
222 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
223 .unwrap();
224 assert_eq!(taken.data_buffers().len(), original_buffers);
226
227 let optimized_array = taken.compact_buffers().unwrap();
229
230 assert!(optimized_array.data_buffers().len() <= 1);
234
235 assert_arrays_eq!(
237 optimized_array,
238 <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
239 );
240 }
241
242 #[test]
243 fn test_optimize_with_long_strings() {
244 let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
246 let long_string_2 = "another extremely long string that also needs external buffer storage";
247 let long_string_3 = "yet another long string for testing buffer compaction functionality";
248
249 let original = VarBinViewArray::from_iter_str([
250 long_string_1,
251 long_string_2,
252 long_string_3,
253 "short1",
254 "short2",
255 ]);
256
257 let indices = buffer![0u32, 2u32].into_array();
259 let taken = original.take(indices).unwrap();
260 let taken_array = taken
261 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
262 .unwrap();
263
264 let optimized_array = taken_array.compact_buffers().unwrap();
266
267 assert_eq!(optimized_array.data_buffers().len(), 1);
269
270 assert_arrays_eq!(
272 optimized_array,
273 VarBinArray::from(vec![long_string_1, long_string_3])
274 );
275 }
276
277 #[test]
278 fn test_optimize_no_buffers() {
279 let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
281
282 assert_eq!(original.data_buffers().len(), 0);
284
285 let optimized_array = original.compact_buffers().unwrap();
287
288 assert_eq!(optimized_array.data_buffers().len(), 0);
289
290 assert_arrays_eq!(optimized_array, original);
291 }
292
293 #[test]
294 fn test_optimize_single_buffer() {
295 let str1 = "this is a long string that goes into a buffer";
297 let str2 = "another long string in the same buffer";
298 let original = VarBinViewArray::from_iter_str([str1, str2]);
299
300 assert_eq!(original.data_buffers().len(), 1);
302 assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
303
304 let optimized_array = original.compact_buffers().unwrap();
306
307 assert_eq!(optimized_array.data_buffers().len(), 1);
308
309 assert_arrays_eq!(optimized_array, original);
310 }
311
312 #[test]
313 fn test_selective_compaction_with_threshold_zero() {
314 let original = VarBinViewArray::from_iter_str([
316 "this is a longer string that will be stored in a buffer",
317 "another very long string that definitely needs a buffer to store it",
318 ]);
319
320 let original_buffers = original.data_buffers().len();
321 assert!(original_buffers > 0);
322
323 let indices = buffer![0u32].into_array();
325 let taken = original.take(indices).unwrap();
326 let taken = taken
327 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
328 .unwrap();
329 let compacted = taken.compact_with_threshold(0.0).unwrap();
331
332 assert_eq!(compacted.data_buffers().len(), taken.data_buffers().len());
334
335 assert_arrays_eq!(compacted, taken);
337 }
338
339 #[test]
340 fn test_selective_compaction_with_high_threshold() {
341 let original = VarBinViewArray::from_iter_str([
343 "this is a longer string that will be stored in a buffer",
344 "another very long string that definitely needs a buffer to store it",
345 "yet another long string",
346 ]);
347
348 let indices = buffer![0u32, 2u32].into_array();
350 let taken = original.take(indices).unwrap();
351 let taken = taken
352 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
353 .unwrap();
354
355 let original_buffers = taken.data_buffers().len();
356
357 let compacted = taken.compact_with_threshold(1.0).unwrap();
359
360 assert!(compacted.data_buffers().len() <= original_buffers);
362
363 assert_arrays_eq!(compacted, taken);
365 }
366
367 #[test]
368 fn test_selective_compaction_preserves_well_utilized_buffers() {
369 let str1 = "first long string that needs external buffer storage";
371 let str2 = "second long string also in buffer";
372 let str3 = "third long string in same buffer";
373
374 let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
375
376 assert_eq!(original.data_buffers().len(), 1);
378
379 let compacted = original.compact_with_threshold(0.8).unwrap();
381
382 assert_eq!(compacted.data_buffers().len(), 1);
384
385 assert_arrays_eq!(compacted, original);
387 }
388
389 #[test]
390 fn test_selective_compaction_with_mixed_utilization() {
391 let strings: Vec<String> = (0..10)
393 .map(|i| {
394 format!(
395 "this is a long string number {} that needs buffer storage",
396 i
397 )
398 })
399 .collect();
400
401 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
402
403 let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
405 let taken = original.take(indices_array).unwrap();
406 let taken = taken
407 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
408 .unwrap();
409
410 let compacted = taken.compact_with_threshold(0.7).unwrap();
412
413 let expected = VarBinViewArray::from_iter(
414 [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
415 DType::Utf8(Nullability::NonNullable),
416 );
417 assert_arrays_eq!(expected, compacted);
418 }
419
420 #[test]
421 fn test_slice_strategy_with_contiguous_range() {
422 let strings: Vec<String> = (0..20)
424 .map(|i| format!("this is a long string number {} for slice test", i))
425 .collect();
426
427 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
428
429 let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
431 let taken = original.take(indices_array).unwrap();
432 let taken = taken
433 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
434 .unwrap();
435 let utils_before = taken.buffer_utilizations().unwrap();
437 let original_buffer_count = taken.data_buffers().len();
438
439 let compacted = taken.compact_with_threshold(0.8).unwrap();
442
443 assert!(
445 !compacted.data_buffers().is_empty(),
446 "Should have buffers after slice compaction"
447 );
448
449 assert_arrays_eq!(&compacted, taken);
451
452 if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
455 assert_eq!(
456 compacted.data_buffers().len(),
457 1,
458 "Slice strategy should maintain single buffer"
459 );
460 }
461 }
462
463 const LONG1: &str = "long string one!";
464 const LONG2: &str = "long string two!";
465 const SHORT: &str = "x";
466 const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
467
468 fn mixed_array() -> VarBinViewArray {
469 VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
470 }
471
472 #[rstest]
473 #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
474 #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
475 #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
476 #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
477 fn test_validity_code_paths(
478 #[case] arr: VarBinViewArray,
479 #[case] expected_bytes: u64,
480 #[case] expected_utils: &[f64],
481 ) {
482 assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
483 let utils: Vec<f64> = arr
484 .buffer_utilizations()
485 .unwrap()
486 .iter()
487 .map(|u| u.overall_utilization())
488 .collect();
489 assert_eq!(utils, expected_utils);
490 }
491}