vortex_array/arrays/varbinview/
compact.rs1use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::IntoArray;
14use crate::arrays::VarBinViewArray;
15use crate::arrays::varbinview::Ref;
16use crate::builders::ArrayBuilder;
17use crate::builders::VarBinViewBuilder;
18
19impl VarBinViewArray {
20 pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
29 if !self.should_compact()? {
31 return Ok(self.clone());
32 }
33
34 self.compact_with_threshold(1.0)
36 }
37
38 fn should_compact(&self) -> VortexResult<bool> {
39 let nbuffers = self.nbuffers();
40
41 if nbuffers == 0 {
43 return Ok(false);
44 }
45
46 if nbuffers > u16::MAX as usize {
48 return Ok(true);
49 }
50
51 let bytes_referenced: u64 = self.count_referenced_bytes()?;
52 let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
53
54 Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
57 }
58
59 #[inline(always)]
62 fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
63 where
64 F: FnMut(&Ref),
65 {
66 match self.validity_mask()? {
67 Mask::AllTrue(_) => {
68 for &view in self.views().iter() {
69 if !view.is_inlined() {
70 f(view.as_view());
71 }
72 }
73 }
74 Mask::AllFalse(_) => {}
75 Mask::Values(v) => {
76 for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
77 if is_valid && !view.is_inlined() {
78 f(view.as_view());
79 }
80 }
81 }
82 }
83 Ok(())
84 }
85
86 fn count_referenced_bytes(&self) -> VortexResult<u64> {
89 let mut total = 0u64;
90 self.iter_valid_views(|view| total += view.size as u64)?;
91 Ok(total)
92 }
93
94 pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
95 let mut utilizations: Vec<BufferUtilization> = self
96 .buffers()
97 .iter()
98 .map(|buf| {
99 let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
100 BufferUtilization::zero(len)
101 })
102 .collect();
103
104 self.iter_valid_views(|view| {
105 utilizations[view.buffer_index as usize].add(view.offset, view.size);
106 })?;
107
108 Ok(utilizations)
109 }
110
111 pub fn compact_with_threshold(
126 &self,
127 buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
129 let mut builder = VarBinViewBuilder::with_compaction(
130 self.dtype().clone(),
131 self.len(),
132 buffer_utilization_threshold,
133 );
134 builder.extend_from_array(&self.clone().into_array());
135 Ok(builder.finish_into_varbinview())
136 }
137}
138
139pub(crate) struct BufferUtilization {
140 len: u32,
141 used: u32,
142 min_offset: u32,
143 max_offset_end: u32,
144}
145
146impl BufferUtilization {
147 fn zero(len: u32) -> Self {
148 BufferUtilization {
149 len,
150 used: 0u32,
151 min_offset: u32::MAX,
152 max_offset_end: 0,
153 }
154 }
155
156 fn add(&mut self, offset: u32, size: u32) {
157 self.used += size;
158 self.min_offset = self.min_offset.min(offset);
159 self.max_offset_end = self.max_offset_end.max(offset + size);
160 }
161
162 pub fn overall_utilization(&self) -> f64 {
163 match self.len {
164 0 => 0.0,
165 len => self.used as f64 / len as f64,
166 }
167 }
168
169 pub fn range_utilization(&self) -> f64 {
170 match self.range_span() {
171 0 => 0.0,
172 span => self.used as f64 / span as f64,
173 }
174 }
175
176 pub fn range(&self) -> Range<u32> {
177 self.min_offset..self.max_offset_end
178 }
179
180 fn range_span(&self) -> u32 {
181 self.max_offset_end.saturating_sub(self.min_offset)
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use rstest::rstest;
188 use vortex_buffer::buffer;
189
190 use crate::IntoArray;
191 use crate::LEGACY_SESSION;
192 use crate::VortexSessionExecute;
193 use crate::arrays::VarBinArray;
194 use crate::arrays::VarBinViewArray;
195 use crate::assert_arrays_eq;
196 use crate::dtype::DType;
197 use crate::dtype::Nullability;
198 #[test]
199 fn test_optimize_compacts_buffers() {
200 let original = VarBinViewArray::from_iter_nullable_str([
202 Some("short"),
203 Some("this is a longer string that will be stored in a buffer"),
204 Some("medium length string"),
205 Some("another very long string that definitely needs a buffer to store it"),
206 Some("tiny"),
207 ]);
208
209 assert!(original.nbuffers() > 0);
211 let original_buffers = original.nbuffers();
212
213 let indices = buffer![0u32, 4u32].into_array();
215 let taken = original.take(indices.to_array()).unwrap();
216 let taken = taken
217 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
218 .unwrap();
219 assert_eq!(taken.nbuffers(), original_buffers);
221
222 let optimized_array = taken.compact_buffers().unwrap();
224
225 assert!(optimized_array.nbuffers() <= 1);
229
230 assert_arrays_eq!(
232 optimized_array,
233 <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
234 );
235 }
236
237 #[test]
238 fn test_optimize_with_long_strings() {
239 let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
241 let long_string_2 = "another extremely long string that also needs external buffer storage";
242 let long_string_3 = "yet another long string for testing buffer compaction functionality";
243
244 let original = VarBinViewArray::from_iter_str([
245 long_string_1,
246 long_string_2,
247 long_string_3,
248 "short1",
249 "short2",
250 ]);
251
252 let indices = buffer![0u32, 2u32].into_array();
254 let taken = original.take(indices.to_array()).unwrap();
255 let taken_array = taken
256 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
257 .unwrap();
258
259 let optimized_array = taken_array.compact_buffers().unwrap();
261
262 assert_eq!(optimized_array.nbuffers(), 1);
264
265 assert_arrays_eq!(
267 optimized_array,
268 VarBinArray::from(vec![long_string_1, long_string_3])
269 );
270 }
271
272 #[test]
273 fn test_optimize_no_buffers() {
274 let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
276
277 assert_eq!(original.nbuffers(), 0);
279
280 let optimized_array = original.compact_buffers().unwrap();
282
283 assert_eq!(optimized_array.nbuffers(), 0);
284
285 assert_arrays_eq!(optimized_array, original);
286 }
287
288 #[test]
289 fn test_optimize_single_buffer() {
290 let str1 = "this is a long string that goes into a buffer";
292 let str2 = "another long string in the same buffer";
293 let original = VarBinViewArray::from_iter_str([str1, str2]);
294
295 assert_eq!(original.nbuffers(), 1);
297 assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
298
299 let optimized_array = original.compact_buffers().unwrap();
301
302 assert_eq!(optimized_array.nbuffers(), 1);
303
304 assert_arrays_eq!(optimized_array, original);
305 }
306
307 #[test]
308 fn test_selective_compaction_with_threshold_zero() {
309 let original = VarBinViewArray::from_iter_str([
311 "this is a longer string that will be stored in a buffer",
312 "another very long string that definitely needs a buffer to store it",
313 ]);
314
315 let original_buffers = original.nbuffers();
316 assert!(original_buffers > 0);
317
318 let indices = buffer![0u32].into_array();
320 let taken = original.take(indices.to_array()).unwrap();
321 let taken = taken
322 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
323 .unwrap();
324 let compacted = taken.compact_with_threshold(0.0).unwrap();
326
327 assert_eq!(compacted.nbuffers(), taken.nbuffers());
329
330 assert_arrays_eq!(compacted, taken);
332 }
333
334 #[test]
335 fn test_selective_compaction_with_high_threshold() {
336 let original = VarBinViewArray::from_iter_str([
338 "this is a longer string that will be stored in a buffer",
339 "another very long string that definitely needs a buffer to store it",
340 "yet another long string",
341 ]);
342
343 let indices = buffer![0u32, 2u32].into_array();
345 let taken = original.take(indices.to_array()).unwrap();
346 let taken = taken
347 .clone()
348 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
349 .unwrap();
350
351 let original_buffers = taken.nbuffers();
352
353 let compacted = taken.compact_with_threshold(1.0).unwrap();
355
356 assert!(compacted.nbuffers() <= original_buffers);
358
359 assert_arrays_eq!(compacted, taken);
361 }
362
363 #[test]
364 fn test_selective_compaction_preserves_well_utilized_buffers() {
365 let str1 = "first long string that needs external buffer storage";
367 let str2 = "second long string also in buffer";
368 let str3 = "third long string in same buffer";
369
370 let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
371
372 assert_eq!(original.nbuffers(), 1);
374
375 let compacted = original.compact_with_threshold(0.8).unwrap();
377
378 assert_eq!(compacted.nbuffers(), 1);
380
381 assert_arrays_eq!(compacted, original);
383 }
384
385 #[test]
386 fn test_selective_compaction_with_mixed_utilization() {
387 let strings: Vec<String> = (0..10)
389 .map(|i| {
390 format!(
391 "this is a long string number {} that needs buffer storage",
392 i
393 )
394 })
395 .collect();
396
397 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
398
399 let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
401 let taken = original.take(indices_array.to_array()).unwrap();
402 let taken = taken
403 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
404 .unwrap();
405
406 let compacted = taken.compact_with_threshold(0.7).unwrap();
408
409 let expected = VarBinViewArray::from_iter(
410 [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
411 DType::Utf8(Nullability::NonNullable),
412 );
413 assert_arrays_eq!(expected, compacted);
414 }
415
416 #[test]
417 fn test_slice_strategy_with_contiguous_range() {
418 let strings: Vec<String> = (0..20)
420 .map(|i| format!("this is a long string number {} for slice test", i))
421 .collect();
422
423 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
424
425 let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
427 let taken = original.take(indices_array.to_array()).unwrap();
428 let taken = taken
429 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
430 .unwrap();
431 let utils_before = taken.buffer_utilizations().unwrap();
433 let original_buffer_count = taken.nbuffers();
434
435 let compacted = taken.compact_with_threshold(0.8).unwrap();
438
439 assert!(
441 compacted.nbuffers() > 0,
442 "Should have buffers after slice compaction"
443 );
444
445 assert_arrays_eq!(&compacted, taken);
447
448 if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
451 assert_eq!(
452 compacted.nbuffers(),
453 1,
454 "Slice strategy should maintain single buffer"
455 );
456 }
457 }
458
459 const LONG1: &str = "long string one!";
460 const LONG2: &str = "long string two!";
461 const SHORT: &str = "x";
462 const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
463
464 fn mixed_array() -> VarBinViewArray {
465 VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
466 }
467
468 #[rstest]
469 #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
470 #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
471 #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
472 #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
473 fn test_validity_code_paths(
474 #[case] arr: VarBinViewArray,
475 #[case] expected_bytes: u64,
476 #[case] expected_utils: &[f64],
477 ) {
478 assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
479 let utils: Vec<f64> = arr
480 .buffer_utilizations()
481 .unwrap()
482 .iter()
483 .map(|u| u.overall_utilization())
484 .collect();
485 assert_eq!(utils, expected_utils);
486 }
487}