Skip to main content

vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::IntoArray;
14use crate::LEGACY_SESSION;
15use crate::VortexSessionExecute;
16use crate::arrays::VarBinViewArray;
17use crate::arrays::varbinview::Ref;
18use crate::builders::ArrayBuilder;
19use crate::builders::VarBinViewBuilder;
20
21const DEFAULT_COMPACTION_THRESHOLD: f64 = 0.5;
22const MIN_RETAINED_BYTES_PER_ROW_TO_CHECK_COMPACTION: u64 = 128;
23
24impl VarBinViewArray {
25    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
26    /// operation can be very expensive, in the worst case copying all existing string data into
27    /// a new allocation.
28    ///
29    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
30    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
31    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
32    /// well-utilized buffers unchanged.
33    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
34        // If there is nothing to be gained by compaction, return the original array untouched.
35        if !self.should_compact()? {
36            return Ok(self.clone());
37        }
38
39        self.compact_with_threshold(DEFAULT_COMPACTION_THRESHOLD)
40    }
41
42    fn should_compact(&self) -> VortexResult<bool> {
43        let nbuffers = self.data_buffers().len();
44
45        // If the array is entirely inlined strings, do not attempt to compact.
46        if nbuffers == 0 {
47            return Ok(false);
48        }
49
50        // These will fail to write, so in most cases we want to compact this.
51        if nbuffers > u16::MAX as usize {
52            return Ok(true);
53        }
54
55        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
56        if buffer_total_bytes == 0 {
57            return Ok(true);
58        }
59
60        let len = u64::try_from(self.len()).unwrap_or(u64::MAX);
61        if len > 0 && buffer_total_bytes / len <= MIN_RETAINED_BYTES_PER_ROW_TO_CHECK_COMPACTION {
62            return Ok(false);
63        }
64
65        let bytes_referenced: u64 = self.count_referenced_bytes()?;
66        Ok((bytes_referenced as f64 / buffer_total_bytes as f64) < DEFAULT_COMPACTION_THRESHOLD)
67    }
68
69    /// Iterates over all valid, non-inlined views, calling the provided
70    /// closure for each one.
71    #[inline(always)]
72    fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
73    where
74        F: FnMut(&Ref),
75    {
76        match self.as_ref().validity()?.execute_mask(
77            self.as_ref().len(),
78            &mut LEGACY_SESSION.create_execution_ctx(),
79        )? {
80            Mask::AllTrue(_) => {
81                for &view in self.views().iter() {
82                    if !view.is_inlined() {
83                        f(view.as_view());
84                    }
85                }
86            }
87            Mask::AllFalse(_) => {}
88            Mask::Values(v) => {
89                for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
90                    if is_valid && !view.is_inlined() {
91                        f(view.as_view());
92                    }
93                }
94            }
95        }
96        Ok(())
97    }
98
99    /// Count the number of bytes addressed by the views, not including null
100    /// values or any inlined strings.
101    fn count_referenced_bytes(&self) -> VortexResult<u64> {
102        let mut total = 0u64;
103        self.iter_valid_views(|view| total += view.size as u64)?;
104        Ok(total)
105    }
106
107    pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
108        let mut utilizations: Vec<BufferUtilization> = self
109            .data_buffers()
110            .iter()
111            .map(|buf| {
112                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
113                BufferUtilization::zero(len)
114            })
115            .collect();
116
117        self.iter_valid_views(|view| {
118            utilizations[view.buffer_index as usize].add(view.offset, view.size);
119        })?;
120
121        Ok(utilizations)
122    }
123
124    /// Returns a compacted copy of the input array using selective buffer compaction.
125    ///
126    /// This method analyzes each buffer's utilization and applies one of three strategies:
127    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
128    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
129    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
130    ///
131    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
132    ///
133    /// # Arguments
134    ///
135    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
136    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
137    ///   compaction of any buffer with wasted space.
138    pub fn compact_with_threshold(
139        &self,
140        buffer_utilization_threshold: f64, // [0, 1]
141    ) -> VortexResult<VarBinViewArray> {
142        let mut builder = VarBinViewBuilder::with_compaction(
143            self.dtype().clone(),
144            self.len(),
145            buffer_utilization_threshold,
146        );
147        builder.extend_from_array(&self.clone().into_array());
148        Ok(builder.finish_into_varbinview())
149    }
150}
151
152pub(crate) struct BufferUtilization {
153    len: u32,
154    used: u32,
155    min_offset: u32,
156    max_offset_end: u32,
157}
158
159impl BufferUtilization {
160    fn zero(len: u32) -> Self {
161        BufferUtilization {
162            len,
163            used: 0u32,
164            min_offset: u32::MAX,
165            max_offset_end: 0,
166        }
167    }
168
169    fn add(&mut self, offset: u32, size: u32) {
170        self.used += size;
171        self.min_offset = self.min_offset.min(offset);
172        self.max_offset_end = self.max_offset_end.max(offset + size);
173    }
174
175    pub fn overall_utilization(&self) -> f64 {
176        match self.len {
177            0 => 0.0,
178            len => self.used as f64 / len as f64,
179        }
180    }
181
182    pub fn range_utilization(&self) -> f64 {
183        match self.range_span() {
184            0 => 0.0,
185            span => self.used as f64 / span as f64,
186        }
187    }
188
189    pub fn range(&self) -> Range<u32> {
190        self.min_offset..self.max_offset_end
191    }
192
193    fn range_span(&self) -> u32 {
194        self.max_offset_end.saturating_sub(self.min_offset)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use rstest::rstest;
201    use vortex_buffer::buffer;
202
203    use crate::IntoArray;
204    use crate::VortexSessionExecute;
205    use crate::array_session;
206    use crate::arrays::VarBinArray;
207    use crate::arrays::VarBinViewArray;
208    use crate::assert_arrays_eq;
209    use crate::dtype::DType;
210    use crate::dtype::Nullability;
211    #[test]
212    fn test_optimize_compacts_buffers() {
213        let mut ctx = array_session().create_execution_ctx();
214        // Create a VarBinViewArray with some long strings that will create multiple buffers
215        let original = VarBinViewArray::from_iter_nullable_str([
216            Some("short"),
217            Some("this is a longer string that will be stored in a buffer"),
218            Some("medium length string"),
219            Some("another very long string that definitely needs a buffer to store it"),
220            Some("tiny"),
221        ]);
222
223        // Verify it has buffers
224        assert!(!original.data_buffers().is_empty());
225        let original_buffers = original.data_buffers().len();
226
227        // Take only the first and last elements (indices 0 and 4)
228        let indices = buffer![0u32, 4u32].into_array();
229        let taken = original.take(indices).unwrap();
230        let taken = taken
231            .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
232            .unwrap();
233        // The taken array should still have the same number of buffers
234        assert_eq!(taken.data_buffers().len(), original_buffers);
235
236        // Now optimize the taken array
237        let optimized_array = taken.compact_buffers().unwrap();
238
239        // The optimized array should have compacted buffers
240        // Since both remaining strings are short, they should be inlined
241        // so we might have 0 buffers, or 1 buffer if any were not inlined
242        assert!(optimized_array.data_buffers().len() <= 1);
243
244        // Verify the data is still correct
245        assert_arrays_eq!(
246            optimized_array,
247            <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")]),
248            &mut ctx
249        );
250    }
251
252    #[test]
253    fn test_optimize_with_long_strings() {
254        let mut ctx = array_session().create_execution_ctx();
255        // Create strings that are definitely longer than 12 bytes
256        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
257        let long_string_2 = "another extremely long string that also needs external buffer storage";
258        let long_string_3 = "yet another long string for testing buffer compaction functionality";
259
260        let original = VarBinViewArray::from_iter_str([
261            long_string_1,
262            long_string_2,
263            long_string_3,
264            "short1",
265            "short2",
266        ]);
267
268        // Take only the first and third long strings (indices 0 and 2)
269        let indices = buffer![0u32, 2u32].into_array();
270        let taken = original.take(indices).unwrap();
271        let taken_array = taken
272            .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
273            .unwrap();
274
275        let optimized_array = taken_array.compact_with_threshold(1.0).unwrap();
276
277        // The optimized array should have exactly 1 buffer (consolidated)
278        assert_eq!(optimized_array.data_buffers().len(), 1);
279
280        // Verify the data is still correct
281        assert_arrays_eq!(
282            optimized_array,
283            VarBinArray::from(vec![long_string_1, long_string_3]),
284            &mut ctx
285        );
286    }
287
288    #[test]
289    fn test_optimize_no_buffers() {
290        let mut ctx = array_session().create_execution_ctx();
291        // Create an array with only short strings (all inlined)
292        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
293
294        // This should have no buffers
295        assert_eq!(original.data_buffers().len(), 0);
296
297        // Optimize should return the same array
298        let optimized_array = original.compact_buffers().unwrap();
299
300        assert_eq!(optimized_array.data_buffers().len(), 0);
301
302        assert_arrays_eq!(optimized_array, original, &mut ctx);
303    }
304
305    #[test]
306    fn test_optimize_single_buffer() {
307        let mut ctx = array_session().create_execution_ctx();
308        // Create an array that naturally has only one buffer
309        let str1 = "this is a long string that goes into a buffer";
310        let str2 = "another long string in the same buffer";
311        let original = VarBinViewArray::from_iter_str([str1, str2]);
312
313        // Should have 1 compact buffer
314        assert_eq!(original.data_buffers().len(), 1);
315        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
316
317        // Optimize should return the same array (no change needed)
318        let optimized_array = original.compact_buffers().unwrap();
319
320        assert_eq!(optimized_array.data_buffers().len(), 1);
321
322        assert_arrays_eq!(optimized_array, original, &mut ctx);
323    }
324
325    #[test]
326    fn test_selective_compaction_with_threshold_zero() {
327        let mut ctx = array_session().create_execution_ctx();
328        // threshold=0 should keep all buffers (no compaction)
329        let original = VarBinViewArray::from_iter_str([
330            "this is a longer string that will be stored in a buffer",
331            "another very long string that definitely needs a buffer to store it",
332        ]);
333
334        let original_buffers = original.data_buffers().len();
335        assert!(original_buffers > 0);
336
337        // Take only first element
338        let indices = buffer![0u32].into_array();
339        let taken = original.take(indices).unwrap();
340        let taken = taken
341            .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
342            .unwrap();
343        // Compact with threshold=0 (should not compact)
344        let compacted = taken.compact_with_threshold(0.0).unwrap();
345
346        // Should still have the same number of buffers as the taken array
347        assert_eq!(compacted.data_buffers().len(), taken.data_buffers().len());
348
349        // Verify correctness
350        assert_arrays_eq!(compacted, taken, &mut ctx);
351    }
352
353    #[test]
354    fn test_selective_compaction_with_high_threshold() {
355        let mut ctx = array_session().create_execution_ctx();
356        // threshold=1.0 should compact any buffer with waste
357        let original = VarBinViewArray::from_iter_str([
358            "this is a longer string that will be stored in a buffer",
359            "another very long string that definitely needs a buffer to store it",
360            "yet another long string",
361        ]);
362
363        // Take only first and last elements
364        let indices = buffer![0u32, 2u32].into_array();
365        let taken = original.take(indices).unwrap();
366        let taken = taken
367            .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
368            .unwrap();
369
370        let original_buffers = taken.data_buffers().len();
371
372        // Compact with threshold=1.0 (aggressive compaction)
373        let compacted = taken.compact_with_threshold(1.0).unwrap();
374
375        // Should have compacted buffers
376        assert!(compacted.data_buffers().len() <= original_buffers);
377
378        // Verify correctness
379        assert_arrays_eq!(compacted, taken, &mut ctx);
380    }
381
382    #[test]
383    fn test_selective_compaction_preserves_well_utilized_buffers() {
384        let mut ctx = array_session().create_execution_ctx();
385        // Create an array with multiple strings in one buffer (well-utilized)
386        let str1 = "first long string that needs external buffer storage";
387        let str2 = "second long string also in buffer";
388        let str3 = "third long string in same buffer";
389
390        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
391
392        // All strings should be in one well-utilized buffer
393        assert_eq!(original.data_buffers().len(), 1);
394
395        // Compact with high threshold
396        let compacted = original.compact_with_threshold(0.8).unwrap();
397
398        // Well-utilized buffer should be preserved
399        assert_eq!(compacted.data_buffers().len(), 1);
400
401        // Verify all data is correct
402        assert_arrays_eq!(compacted, original, &mut ctx);
403    }
404
405    #[test]
406    fn test_selective_compaction_with_mixed_utilization() {
407        let mut ctx = array_session().create_execution_ctx();
408        // Create array with some long strings
409        let strings: Vec<String> = (0..10)
410            .map(|i| {
411                format!(
412                    "this is a long string number {} that needs buffer storage",
413                    i
414                )
415            })
416            .collect();
417
418        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
419
420        // Take every other element to create mixed utilization
421        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
422        let taken = original.take(indices_array).unwrap();
423        let taken = taken
424            .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
425            .unwrap();
426
427        // Compact with moderate threshold
428        let compacted = taken.compact_with_threshold(0.7).unwrap();
429
430        let expected = VarBinViewArray::from_iter(
431            [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
432            DType::Utf8(Nullability::NonNullable),
433        );
434        assert_arrays_eq!(expected, compacted, &mut ctx);
435    }
436
437    #[test]
438    fn test_slice_strategy_with_contiguous_range() {
439        let mut ctx = array_session().create_execution_ctx();
440        // Create array with strings that will be in one buffer
441        let strings: Vec<String> = (0..20)
442            .map(|i| format!("this is a long string number {} for slice test", i))
443            .collect();
444
445        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
446
447        // Take only the first 5 elements - they should be in a contiguous range at the start
448        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
449        let taken = original.take(indices_array).unwrap();
450        let taken = taken
451            .execute::<VarBinViewArray>(&mut array_session().create_execution_ctx())
452            .unwrap();
453        // Get buffer stats before compaction
454        let utils_before = taken.buffer_utilizations().unwrap();
455        let original_buffer_count = taken.data_buffers().len();
456
457        // Compact with a threshold that should trigger slicing
458        // The range utilization should be high even if overall utilization is low
459        let compacted = taken.compact_with_threshold(0.8).unwrap();
460
461        // After compaction, we should still have buffers (sliced, not rewritten)
462        assert!(
463            !compacted.data_buffers().is_empty(),
464            "Should have buffers after slice compaction"
465        );
466
467        // Verify correctness
468        assert_arrays_eq!(&compacted, taken, &mut ctx);
469
470        // Verify that if there was only one buffer, the compacted version also has one
471        // (it was sliced, not rewritten into multiple buffers)
472        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
473            assert_eq!(
474                compacted.data_buffers().len(),
475                1,
476                "Slice strategy should maintain single buffer"
477            );
478        }
479    }
480
481    const LONG1: &str = "long string one!";
482    const LONG2: &str = "long string two!";
483    const SHORT: &str = "x";
484    const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
485
486    fn mixed_array() -> VarBinViewArray {
487        VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
488    }
489
490    #[rstest]
491    #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
492    #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
493    #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
494    #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
495    fn test_validity_code_paths(
496        #[case] arr: VarBinViewArray,
497        #[case] expected_bytes: u64,
498        #[case] expected_utils: &[f64],
499    ) {
500        assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
501        let utils: Vec<f64> = arr
502            .buffer_utilizations()
503            .unwrap()
504            .iter()
505            .map(|u| u.overall_utilization())
506            .collect();
507        assert_eq!(utils, expected_utils);
508    }
509}