vortex_array/arrays/varbinview/
compact.rs1use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::IntoArray;
14use crate::LEGACY_SESSION;
15use crate::VortexSessionExecute;
16use crate::arrays::VarBinViewArray;
17use crate::arrays::varbinview::Ref;
18use crate::builders::ArrayBuilder;
19use crate::builders::VarBinViewBuilder;
20
21const DEFAULT_COMPACTION_THRESHOLD: f64 = 0.5;
22const MIN_RETAINED_BYTES_PER_ROW_TO_CHECK_COMPACTION: u64 = 128;
23
24impl VarBinViewArray {
25 pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
34 if !self.should_compact()? {
36 return Ok(self.clone());
37 }
38
39 self.compact_with_threshold(DEFAULT_COMPACTION_THRESHOLD)
40 }
41
42 fn should_compact(&self) -> VortexResult<bool> {
43 let nbuffers = self.data_buffers().len();
44
45 if nbuffers == 0 {
47 return Ok(false);
48 }
49
50 if nbuffers > u16::MAX as usize {
52 return Ok(true);
53 }
54
55 let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
56 if buffer_total_bytes == 0 {
57 return Ok(true);
58 }
59
60 let len = u64::try_from(self.len()).unwrap_or(u64::MAX);
61 if len > 0 && buffer_total_bytes / len <= MIN_RETAINED_BYTES_PER_ROW_TO_CHECK_COMPACTION {
62 return Ok(false);
63 }
64
65 let bytes_referenced: u64 = self.count_referenced_bytes()?;
66 Ok((bytes_referenced as f64 / buffer_total_bytes as f64) < DEFAULT_COMPACTION_THRESHOLD)
67 }
68
69 #[inline(always)]
72 fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
73 where
74 F: FnMut(&Ref),
75 {
76 match self.as_ref().validity()?.execute_mask(
77 self.as_ref().len(),
78 &mut LEGACY_SESSION.create_execution_ctx(),
79 )? {
80 Mask::AllTrue(_) => {
81 for &view in self.views().iter() {
82 if !view.is_inlined() {
83 f(view.as_view());
84 }
85 }
86 }
87 Mask::AllFalse(_) => {}
88 Mask::Values(v) => {
89 for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
90 if is_valid && !view.is_inlined() {
91 f(view.as_view());
92 }
93 }
94 }
95 }
96 Ok(())
97 }
98
99 fn count_referenced_bytes(&self) -> VortexResult<u64> {
102 let mut total = 0u64;
103 self.iter_valid_views(|view| total += view.size as u64)?;
104 Ok(total)
105 }
106
107 pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
108 let mut utilizations: Vec<BufferUtilization> = self
109 .data_buffers()
110 .iter()
111 .map(|buf| {
112 let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
113 BufferUtilization::zero(len)
114 })
115 .collect();
116
117 self.iter_valid_views(|view| {
118 utilizations[view.buffer_index as usize].add(view.offset, view.size);
119 })?;
120
121 Ok(utilizations)
122 }
123
124 pub fn compact_with_threshold(
139 &self,
140 buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
142 let mut builder = VarBinViewBuilder::with_compaction(
143 self.dtype().clone(),
144 self.len(),
145 buffer_utilization_threshold,
146 );
147 builder.extend_from_array(&self.clone().into_array());
148 Ok(builder.finish_into_varbinview())
149 }
150}
151
152pub(crate) struct BufferUtilization {
153 len: u32,
154 used: u32,
155 min_offset: u32,
156 max_offset_end: u32,
157}
158
159impl BufferUtilization {
160 fn zero(len: u32) -> Self {
161 BufferUtilization {
162 len,
163 used: 0u32,
164 min_offset: u32::MAX,
165 max_offset_end: 0,
166 }
167 }
168
169 fn add(&mut self, offset: u32, size: u32) {
170 self.used += size;
171 self.min_offset = self.min_offset.min(offset);
172 self.max_offset_end = self.max_offset_end.max(offset + size);
173 }
174
175 pub fn overall_utilization(&self) -> f64 {
176 match self.len {
177 0 => 0.0,
178 len => self.used as f64 / len as f64,
179 }
180 }
181
182 pub fn range_utilization(&self) -> f64 {
183 match self.range_span() {
184 0 => 0.0,
185 span => self.used as f64 / span as f64,
186 }
187 }
188
189 pub fn range(&self) -> Range<u32> {
190 self.min_offset..self.max_offset_end
191 }
192
193 fn range_span(&self) -> u32 {
194 self.max_offset_end.saturating_sub(self.min_offset)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use rstest::rstest;
201 use vortex_buffer::buffer;
202
203 use crate::IntoArray;
204 use crate::VortexSessionExecute;
205 use crate::array_session;
206 use crate::arrays::VarBinArray;
207 use crate::arrays::VarBinViewArray;
208 use crate::assert_arrays_eq;
209 use crate::dtype::DType;
210 use crate::dtype::Nullability;
211 #[test]
212 fn test_optimize_compacts_buffers() {
213 let mut ctx = array_session().create_execution_ctx();
214 let original = VarBinViewArray::from_iter_nullable_str([
216 Some("short"),
217 Some("this is a longer string that will be stored in a buffer"),
218 Some("medium length string"),
219 Some("another very long string that definitely needs a buffer to store it"),
220 Some("tiny"),
221 ]);
222
223 assert!(!original.data_buffers().is_empty());
225 let original_buffers = original.data_buffers().len();
226
227 let indices = buffer![0u32, 4u32].into_array();
229 let taken = original.take(indices).unwrap();
230 let taken = taken
231 .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
232 .unwrap();
233 assert_eq!(taken.data_buffers().len(), original_buffers);
235
236 let optimized_array = taken.compact_buffers().unwrap();
238
239 assert!(optimized_array.data_buffers().len() <= 1);
243
244 assert_arrays_eq!(
246 optimized_array,
247 <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")]),
248 &mut ctx
249 );
250 }
251
252 #[test]
253 fn test_optimize_with_long_strings() {
254 let mut ctx = array_session().create_execution_ctx();
255 let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
257 let long_string_2 = "another extremely long string that also needs external buffer storage";
258 let long_string_3 = "yet another long string for testing buffer compaction functionality";
259
260 let original = VarBinViewArray::from_iter_str([
261 long_string_1,
262 long_string_2,
263 long_string_3,
264 "short1",
265 "short2",
266 ]);
267
268 let indices = buffer![0u32, 2u32].into_array();
270 let taken = original.take(indices).unwrap();
271 let taken_array = taken
272 .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
273 .unwrap();
274
275 let optimized_array = taken_array.compact_with_threshold(1.0).unwrap();
276
277 assert_eq!(optimized_array.data_buffers().len(), 1);
279
280 assert_arrays_eq!(
282 optimized_array,
283 VarBinArray::from(vec![long_string_1, long_string_3]),
284 &mut ctx
285 );
286 }
287
288 #[test]
289 fn test_optimize_no_buffers() {
290 let mut ctx = array_session().create_execution_ctx();
291 let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
293
294 assert_eq!(original.data_buffers().len(), 0);
296
297 let optimized_array = original.compact_buffers().unwrap();
299
300 assert_eq!(optimized_array.data_buffers().len(), 0);
301
302 assert_arrays_eq!(optimized_array, original, &mut ctx);
303 }
304
305 #[test]
306 fn test_optimize_single_buffer() {
307 let mut ctx = array_session().create_execution_ctx();
308 let str1 = "this is a long string that goes into a buffer";
310 let str2 = "another long string in the same buffer";
311 let original = VarBinViewArray::from_iter_str([str1, str2]);
312
313 assert_eq!(original.data_buffers().len(), 1);
315 assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
316
317 let optimized_array = original.compact_buffers().unwrap();
319
320 assert_eq!(optimized_array.data_buffers().len(), 1);
321
322 assert_arrays_eq!(optimized_array, original, &mut ctx);
323 }
324
325 #[test]
326 fn test_selective_compaction_with_threshold_zero() {
327 let mut ctx = array_session().create_execution_ctx();
328 let original = VarBinViewArray::from_iter_str([
330 "this is a longer string that will be stored in a buffer",
331 "another very long string that definitely needs a buffer to store it",
332 ]);
333
334 let original_buffers = original.data_buffers().len();
335 assert!(original_buffers > 0);
336
337 let indices = buffer![0u32].into_array();
339 let taken = original.take(indices).unwrap();
340 let taken = taken
341 .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
342 .unwrap();
343 let compacted = taken.compact_with_threshold(0.0).unwrap();
345
346 assert_eq!(compacted.data_buffers().len(), taken.data_buffers().len());
348
349 assert_arrays_eq!(compacted, taken, &mut ctx);
351 }
352
353 #[test]
354 fn test_selective_compaction_with_high_threshold() {
355 let mut ctx = array_session().create_execution_ctx();
356 let original = VarBinViewArray::from_iter_str([
358 "this is a longer string that will be stored in a buffer",
359 "another very long string that definitely needs a buffer to store it",
360 "yet another long string",
361 ]);
362
363 let indices = buffer![0u32, 2u32].into_array();
365 let taken = original.take(indices).unwrap();
366 let taken = taken
367 .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
368 .unwrap();
369
370 let original_buffers = taken.data_buffers().len();
371
372 let compacted = taken.compact_with_threshold(1.0).unwrap();
374
375 assert!(compacted.data_buffers().len() <= original_buffers);
377
378 assert_arrays_eq!(compacted, taken, &mut ctx);
380 }
381
382 #[test]
383 fn test_selective_compaction_preserves_well_utilized_buffers() {
384 let mut ctx = array_session().create_execution_ctx();
385 let str1 = "first long string that needs external buffer storage";
387 let str2 = "second long string also in buffer";
388 let str3 = "third long string in same buffer";
389
390 let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
391
392 assert_eq!(original.data_buffers().len(), 1);
394
395 let compacted = original.compact_with_threshold(0.8).unwrap();
397
398 assert_eq!(compacted.data_buffers().len(), 1);
400
401 assert_arrays_eq!(compacted, original, &mut ctx);
403 }
404
405 #[test]
406 fn test_selective_compaction_with_mixed_utilization() {
407 let mut ctx = array_session().create_execution_ctx();
408 let strings: Vec<String> = (0..10)
410 .map(|i| {
411 format!(
412 "this is a long string number {} that needs buffer storage",
413 i
414 )
415 })
416 .collect();
417
418 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
419
420 let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
422 let taken = original.take(indices_array).unwrap();
423 let taken = taken
424 .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
425 .unwrap();
426
427 let compacted = taken.compact_with_threshold(0.7).unwrap();
429
430 let expected = VarBinViewArray::from_iter(
431 [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
432 DType::Utf8(Nullability::NonNullable),
433 );
434 assert_arrays_eq!(expected, compacted, &mut ctx);
435 }
436
437 #[test]
438 fn test_slice_strategy_with_contiguous_range() {
439 let mut ctx = array_session().create_execution_ctx();
440 let strings: Vec<String> = (0..20)
442 .map(|i| format!("this is a long string number {} for slice test", i))
443 .collect();
444
445 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
446
447 let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
449 let taken = original.take(indices_array).unwrap();
450 let taken = taken
451 .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
452 .unwrap();
453 let utils_before = taken.buffer_utilizations().unwrap();
455 let original_buffer_count = taken.data_buffers().len();
456
457 let compacted = taken.compact_with_threshold(0.8).unwrap();
460
461 assert!(
463 !compacted.data_buffers().is_empty(),
464 "Should have buffers after slice compaction"
465 );
466
467 assert_arrays_eq!(&compacted, taken, &mut ctx);
469
470 if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
473 assert_eq!(
474 compacted.data_buffers().len(),
475 1,
476 "Slice strategy should maintain single buffer"
477 );
478 }
479 }
480
481 const LONG1: &str = "long string one!";
482 const LONG2: &str = "long string two!";
483 const SHORT: &str = "x";
484 const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
485
486 fn mixed_array() -> VarBinViewArray {
487 VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
488 }
489
490 #[rstest]
491 #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
492 #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
493 #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
494 #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
495 fn test_validity_code_paths(
496 #[case] arr: VarBinViewArray,
497 #[case] expected_bytes: u64,
498 #[case] expected_utils: &[f64],
499 ) {
500 assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
501 let utils: Vec<f64> = arr
502 .buffer_utilizations()
503 .unwrap()
504 .iter()
505 .map(|u| u.overall_utilization())
506 .collect();
507 assert_eq!(utils, expected_utils);
508 }
509}