vortex_array/arrays/varbinview/
compact.rs1use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::arrays::Ref;
14use crate::arrays::VarBinViewArray;
15use crate::builders::ArrayBuilder;
16use crate::builders::VarBinViewBuilder;
17
18impl VarBinViewArray {
19 pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
28 if !self.should_compact()? {
30 return Ok(self.clone());
31 }
32
33 self.compact_with_threshold(1.0)
35 }
36
37 fn should_compact(&self) -> VortexResult<bool> {
38 let nbuffers = self.nbuffers();
39
40 if nbuffers == 0 {
42 return Ok(false);
43 }
44
45 if nbuffers > u16::MAX as usize {
47 return Ok(true);
48 }
49
50 let bytes_referenced: u64 = self.count_referenced_bytes()?;
51 let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
52
53 Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
56 }
57
58 #[inline(always)]
61 fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
62 where
63 F: FnMut(&Ref),
64 {
65 match self.validity_mask()? {
66 Mask::AllTrue(_) => {
67 for &view in self.views().iter() {
68 if !view.is_inlined() {
69 f(view.as_view());
70 }
71 }
72 }
73 Mask::AllFalse(_) => {}
74 Mask::Values(v) => {
75 for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
76 if is_valid && !view.is_inlined() {
77 f(view.as_view());
78 }
79 }
80 }
81 }
82 Ok(())
83 }
84
85 fn count_referenced_bytes(&self) -> VortexResult<u64> {
88 let mut total = 0u64;
89 self.iter_valid_views(|view| total += view.size as u64)?;
90 Ok(total)
91 }
92
93 pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
94 let mut utilizations: Vec<BufferUtilization> = self
95 .buffers()
96 .iter()
97 .map(|buf| {
98 let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
99 BufferUtilization::zero(len)
100 })
101 .collect();
102
103 self.iter_valid_views(|view| {
104 utilizations[view.buffer_index as usize].add(view.offset, view.size);
105 })?;
106
107 Ok(utilizations)
108 }
109
110 pub fn compact_with_threshold(
125 &self,
126 buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
128 let mut builder = VarBinViewBuilder::with_compaction(
129 self.dtype().clone(),
130 self.len(),
131 buffer_utilization_threshold,
132 );
133 builder.extend_from_array(self.as_ref());
134 Ok(builder.finish_into_varbinview())
135 }
136}
137
138pub(crate) struct BufferUtilization {
139 len: u32,
140 used: u32,
141 min_offset: u32,
142 max_offset_end: u32,
143}
144
145impl BufferUtilization {
146 fn zero(len: u32) -> Self {
147 BufferUtilization {
148 len,
149 used: 0u32,
150 min_offset: u32::MAX,
151 max_offset_end: 0,
152 }
153 }
154
155 fn add(&mut self, offset: u32, size: u32) {
156 self.used += size;
157 self.min_offset = self.min_offset.min(offset);
158 self.max_offset_end = self.max_offset_end.max(offset + size);
159 }
160
161 pub fn overall_utilization(&self) -> f64 {
162 match self.len {
163 0 => 0.0,
164 len => self.used as f64 / len as f64,
165 }
166 }
167
168 pub fn range_utilization(&self) -> f64 {
169 match self.range_span() {
170 0 => 0.0,
171 span => self.used as f64 / span as f64,
172 }
173 }
174
175 pub fn range(&self) -> Range<u32> {
176 self.min_offset..self.max_offset_end
177 }
178
179 fn range_span(&self) -> u32 {
180 self.max_offset_end.saturating_sub(self.min_offset)
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use rstest::rstest;
187 use vortex_buffer::buffer;
188 use vortex_dtype::DType;
189 use vortex_dtype::Nullability;
190
191 use crate::IntoArray;
192 use crate::LEGACY_SESSION;
193 use crate::VortexSessionExecute;
194 use crate::arrays::VarBinArray;
195 use crate::arrays::VarBinViewArray;
196 use crate::assert_arrays_eq;
197 #[test]
198 fn test_optimize_compacts_buffers() {
199 let original = VarBinViewArray::from_iter_nullable_str([
201 Some("short"),
202 Some("this is a longer string that will be stored in a buffer"),
203 Some("medium length string"),
204 Some("another very long string that definitely needs a buffer to store it"),
205 Some("tiny"),
206 ]);
207
208 assert!(original.nbuffers() > 0);
210 let original_buffers = original.nbuffers();
211
212 let indices = buffer![0u32, 4u32].into_array();
214 let taken = original.take(indices.to_array()).unwrap();
215 let taken = taken
216 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
217 .unwrap();
218 assert_eq!(taken.nbuffers(), original_buffers);
220
221 let optimized_array = taken.compact_buffers().unwrap();
223
224 assert!(optimized_array.nbuffers() <= 1);
228
229 assert_arrays_eq!(
231 optimized_array,
232 <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
233 );
234 }
235
236 #[test]
237 fn test_optimize_with_long_strings() {
238 let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
240 let long_string_2 = "another extremely long string that also needs external buffer storage";
241 let long_string_3 = "yet another long string for testing buffer compaction functionality";
242
243 let original = VarBinViewArray::from_iter_str([
244 long_string_1,
245 long_string_2,
246 long_string_3,
247 "short1",
248 "short2",
249 ]);
250
251 let indices = buffer![0u32, 2u32].into_array();
253 let taken = original.take(indices.to_array()).unwrap();
254 let taken_array = taken
255 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
256 .unwrap();
257
258 let optimized_array = taken_array.compact_buffers().unwrap();
260
261 assert_eq!(optimized_array.nbuffers(), 1);
263
264 assert_arrays_eq!(
266 optimized_array,
267 VarBinArray::from(vec![long_string_1, long_string_3])
268 );
269 }
270
271 #[test]
272 fn test_optimize_no_buffers() {
273 let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
275
276 assert_eq!(original.nbuffers(), 0);
278
279 let optimized_array = original.compact_buffers().unwrap();
281
282 assert_eq!(optimized_array.nbuffers(), 0);
283
284 assert_arrays_eq!(optimized_array, original);
285 }
286
287 #[test]
288 fn test_optimize_single_buffer() {
289 let str1 = "this is a long string that goes into a buffer";
291 let str2 = "another long string in the same buffer";
292 let original = VarBinViewArray::from_iter_str([str1, str2]);
293
294 assert_eq!(original.nbuffers(), 1);
296 assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
297
298 let optimized_array = original.compact_buffers().unwrap();
300
301 assert_eq!(optimized_array.nbuffers(), 1);
302
303 assert_arrays_eq!(optimized_array, original);
304 }
305
306 #[test]
307 fn test_selective_compaction_with_threshold_zero() {
308 let original = VarBinViewArray::from_iter_str([
310 "this is a longer string that will be stored in a buffer",
311 "another very long string that definitely needs a buffer to store it",
312 ]);
313
314 let original_buffers = original.nbuffers();
315 assert!(original_buffers > 0);
316
317 let indices = buffer![0u32].into_array();
319 let taken = original.take(indices.to_array()).unwrap();
320 let taken = taken
321 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
322 .unwrap();
323 let compacted = taken.compact_with_threshold(0.0).unwrap();
325
326 assert_eq!(compacted.nbuffers(), taken.nbuffers());
328
329 assert_arrays_eq!(compacted, taken);
331 }
332
333 #[test]
334 fn test_selective_compaction_with_high_threshold() {
335 let original = VarBinViewArray::from_iter_str([
337 "this is a longer string that will be stored in a buffer",
338 "another very long string that definitely needs a buffer to store it",
339 "yet another long string",
340 ]);
341
342 let indices = buffer![0u32, 2u32].into_array();
344 let taken = original.take(indices.to_array()).unwrap();
345 let taken = taken
346 .clone()
347 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
348 .unwrap();
349
350 let original_buffers = taken.nbuffers();
351
352 let compacted = taken.compact_with_threshold(1.0).unwrap();
354
355 assert!(compacted.nbuffers() <= original_buffers);
357
358 assert_arrays_eq!(compacted, taken);
360 }
361
362 #[test]
363 fn test_selective_compaction_preserves_well_utilized_buffers() {
364 let str1 = "first long string that needs external buffer storage";
366 let str2 = "second long string also in buffer";
367 let str3 = "third long string in same buffer";
368
369 let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
370
371 assert_eq!(original.nbuffers(), 1);
373
374 let compacted = original.compact_with_threshold(0.8).unwrap();
376
377 assert_eq!(compacted.nbuffers(), 1);
379
380 assert_arrays_eq!(compacted, original);
382 }
383
384 #[test]
385 fn test_selective_compaction_with_mixed_utilization() {
386 let strings: Vec<String> = (0..10)
388 .map(|i| {
389 format!(
390 "this is a long string number {} that needs buffer storage",
391 i
392 )
393 })
394 .collect();
395
396 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
397
398 let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
400 let taken = original.take(indices_array.to_array()).unwrap();
401 let taken = taken
402 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
403 .unwrap();
404
405 let compacted = taken.compact_with_threshold(0.7).unwrap();
407
408 let expected = VarBinViewArray::from_iter(
409 [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
410 DType::Utf8(Nullability::NonNullable),
411 );
412 assert_arrays_eq!(expected, compacted);
413 }
414
415 #[test]
416 fn test_slice_strategy_with_contiguous_range() {
417 let strings: Vec<String> = (0..20)
419 .map(|i| format!("this is a long string number {} for slice test", i))
420 .collect();
421
422 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
423
424 let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
426 let taken = original.take(indices_array.to_array()).unwrap();
427 let taken = taken
428 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
429 .unwrap();
430 let utils_before = taken.buffer_utilizations().unwrap();
432 let original_buffer_count = taken.nbuffers();
433
434 let compacted = taken.compact_with_threshold(0.8).unwrap();
437
438 assert!(
440 compacted.nbuffers() > 0,
441 "Should have buffers after slice compaction"
442 );
443
444 assert_arrays_eq!(&compacted, taken);
446
447 if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
450 assert_eq!(
451 compacted.nbuffers(),
452 1,
453 "Slice strategy should maintain single buffer"
454 );
455 }
456 }
457
458 const LONG1: &str = "long string one!";
459 const LONG2: &str = "long string two!";
460 const SHORT: &str = "x";
461 const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
462
463 fn mixed_array() -> VarBinViewArray {
464 VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
465 }
466
467 #[rstest]
468 #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
469 #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
470 #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
471 #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
472 fn test_validity_code_paths(
473 #[case] arr: VarBinViewArray,
474 #[case] expected_bytes: u64,
475 #[case] expected_utils: &[f64],
476 ) {
477 assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
478 let utils: Vec<f64> = arr
479 .buffer_utilizations()
480 .unwrap()
481 .iter()
482 .map(|u| u.overall_utilization())
483 .collect();
484 assert_eq!(utils, expected_utils);
485 }
486}