vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12use vortex_vector::binaryview::Ref;
13
14use crate::arrays::VarBinViewArray;
15use crate::builders::ArrayBuilder;
16use crate::builders::VarBinViewBuilder;
17
18impl VarBinViewArray {
19    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
20    /// operation can be very expensive, in the worst case copying all existing string data into
21    /// a new allocation.
22    ///
23    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
24    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
25    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
26    /// well-utilized buffers unchanged.
27    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
28        // If there is nothing to be gained by compaction, return the original array untouched.
29        if !self.should_compact() {
30            return Ok(self.clone());
31        }
32
33        // Use selective compaction with threshold of 1.0 (compact any buffer with any waste)
34        self.compact_with_threshold(1.0)
35    }
36
37    fn should_compact(&self) -> bool {
38        let nbuffers = self.nbuffers();
39
40        // If the array is entirely inlined strings, do not attempt to compact.
41        if nbuffers == 0 {
42            return false;
43        }
44
45        // These will fail to write, so in most cases we want to compact this.
46        if nbuffers > u16::MAX as usize {
47            return true;
48        }
49
50        let bytes_referenced: u64 = self.count_referenced_bytes();
51        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
52
53        // If there is any wasted space, we want to repack.
54        // This is very aggressive.
55        bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0
56    }
57
58    /// Iterates over all valid, non-inlined views, calling the provided
59    /// closure for each one.
60    #[inline(always)]
61    fn iter_valid_views<F>(&self, mut f: F)
62    where
63        F: FnMut(&Ref),
64    {
65        match self.validity_mask() {
66            Mask::AllTrue(_) => {
67                for &view in self.views().iter() {
68                    if !view.is_inlined() {
69                        f(view.as_view());
70                    }
71                }
72            }
73            Mask::AllFalse(_) => {}
74            Mask::Values(v) => {
75                for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
76                    if is_valid && !view.is_inlined() {
77                        f(view.as_view());
78                    }
79                }
80            }
81        }
82    }
83
84    /// Count the number of bytes addressed by the views, not including null
85    /// values or any inlined strings.
86    fn count_referenced_bytes(&self) -> u64 {
87        let mut total = 0u64;
88        self.iter_valid_views(|view| total += view.size as u64);
89        total
90    }
91
92    pub(crate) fn buffer_utilizations(&self) -> Vec<BufferUtilization> {
93        let mut utilizations: Vec<BufferUtilization> = self
94            .buffers()
95            .iter()
96            .map(|buf| {
97                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
98                BufferUtilization::zero(len)
99            })
100            .collect();
101
102        self.iter_valid_views(|view| {
103            utilizations[view.buffer_index as usize].add(view.offset, view.size);
104        });
105
106        utilizations
107    }
108
109    /// Returns a compacted copy of the input array using selective buffer compaction.
110    ///
111    /// This method analyzes each buffer's utilization and applies one of three strategies:
112    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
113    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
114    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
115    ///
116    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
117    ///
118    /// # Arguments
119    ///
120    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
121    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
122    ///   compaction of any buffer with wasted space.
123    pub fn compact_with_threshold(
124        &self,
125        buffer_utilization_threshold: f64, // [0, 1]
126    ) -> VortexResult<VarBinViewArray> {
127        let mut builder = VarBinViewBuilder::with_compaction(
128            self.dtype().clone(),
129            self.len(),
130            buffer_utilization_threshold,
131        );
132        builder.extend_from_array(self.as_ref());
133        Ok(builder.finish_into_varbinview())
134    }
135}
136
137pub(crate) struct BufferUtilization {
138    len: u32,
139    used: u32,
140    min_offset: u32,
141    max_offset_end: u32,
142}
143
144impl BufferUtilization {
145    fn zero(len: u32) -> Self {
146        BufferUtilization {
147            len,
148            used: 0u32,
149            min_offset: u32::MAX,
150            max_offset_end: 0,
151        }
152    }
153
154    fn add(&mut self, offset: u32, size: u32) {
155        self.used += size;
156        self.min_offset = self.min_offset.min(offset);
157        self.max_offset_end = self.max_offset_end.max(offset + size);
158    }
159
160    pub fn overall_utilization(&self) -> f64 {
161        match self.len {
162            0 => 0.0,
163            len => self.used as f64 / len as f64,
164        }
165    }
166
167    pub fn range_utilization(&self) -> f64 {
168        match self.range_span() {
169            0 => 0.0,
170            span => self.used as f64 / span as f64,
171        }
172    }
173
174    pub fn range(&self) -> Range<u32> {
175        self.min_offset..self.max_offset_end
176    }
177
178    fn range_span(&self) -> u32 {
179        self.max_offset_end.saturating_sub(self.min_offset)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use rstest::rstest;
186    use vortex_buffer::buffer;
187
188    use crate::IntoArray;
189    use crate::arrays::VarBinArray;
190    use crate::arrays::VarBinViewArray;
191    use crate::arrays::VarBinViewVTable;
192    use crate::assert_arrays_eq;
193    use crate::compute::take;
194
195    #[test]
196    fn test_optimize_compacts_buffers() {
197        // Create a VarBinViewArray with some long strings that will create multiple buffers
198        let original = VarBinViewArray::from_iter_nullable_str([
199            Some("short"),
200            Some("this is a longer string that will be stored in a buffer"),
201            Some("medium length string"),
202            Some("another very long string that definitely needs a buffer to store it"),
203            Some("tiny"),
204        ]);
205
206        // Verify it has buffers
207        assert!(original.nbuffers() > 0);
208        let original_buffers = original.nbuffers();
209
210        // Take only the first and last elements (indices 0 and 4)
211        let indices = buffer![0u32, 4u32].into_array();
212        let taken = take(original.as_ref(), &indices).unwrap();
213        let taken_array = taken.as_::<VarBinViewVTable>();
214
215        // The taken array should still have the same number of buffers
216        assert_eq!(taken_array.nbuffers(), original_buffers);
217
218        // Now optimize the taken array
219        let optimized_array = taken_array.compact_buffers().unwrap();
220
221        // The optimized array should have compacted buffers
222        // Since both remaining strings are short, they should be inlined
223        // so we might have 0 buffers, or 1 buffer if any were not inlined
224        assert!(optimized_array.nbuffers() <= 1);
225
226        // Verify the data is still correct
227        assert_arrays_eq!(
228            optimized_array,
229            <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
230        );
231    }
232
233    #[test]
234    fn test_optimize_with_long_strings() {
235        // Create strings that are definitely longer than 12 bytes
236        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
237        let long_string_2 = "another extremely long string that also needs external buffer storage";
238        let long_string_3 = "yet another long string for testing buffer compaction functionality";
239
240        let original = VarBinViewArray::from_iter_str([
241            long_string_1,
242            long_string_2,
243            long_string_3,
244            "short1",
245            "short2",
246        ]);
247
248        // Take only the first and third long strings (indices 0 and 2)
249        let indices = buffer![0u32, 2u32].into_array();
250        let taken = take(original.as_ref(), &indices).unwrap();
251        let taken_array = taken.as_::<VarBinViewVTable>();
252
253        // Optimize the taken array
254        let optimized_array = taken_array.compact_buffers().unwrap();
255
256        // The optimized array should have exactly 1 buffer (consolidated)
257        assert_eq!(optimized_array.nbuffers(), 1);
258
259        // Verify the data is still correct
260        assert_arrays_eq!(
261            optimized_array,
262            VarBinArray::from(vec![long_string_1, long_string_3])
263        );
264    }
265
266    #[test]
267    fn test_optimize_no_buffers() {
268        // Create an array with only short strings (all inlined)
269        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
270
271        // This should have no buffers
272        assert_eq!(original.nbuffers(), 0);
273
274        // Optimize should return the same array
275        let optimized_array = original.compact_buffers().unwrap();
276
277        assert_eq!(optimized_array.nbuffers(), 0);
278
279        assert_arrays_eq!(optimized_array, original);
280    }
281
282    #[test]
283    fn test_optimize_single_buffer() {
284        // Create an array that naturally has only one buffer
285        let str1 = "this is a long string that goes into a buffer";
286        let str2 = "another long string in the same buffer";
287        let original = VarBinViewArray::from_iter_str([str1, str2]);
288
289        // Should have 1 compact buffer
290        assert_eq!(original.nbuffers(), 1);
291        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
292
293        // Optimize should return the same array (no change needed)
294        let optimized_array = original.compact_buffers().unwrap();
295
296        assert_eq!(optimized_array.nbuffers(), 1);
297
298        assert_arrays_eq!(optimized_array, original);
299    }
300
301    #[test]
302    fn test_selective_compaction_with_threshold_zero() {
303        // threshold=0 should keep all buffers (no compaction)
304        let original = VarBinViewArray::from_iter_str([
305            "this is a longer string that will be stored in a buffer",
306            "another very long string that definitely needs a buffer to store it",
307        ]);
308
309        let original_buffers = original.nbuffers();
310        assert!(original_buffers > 0);
311
312        // Take only first element
313        let indices = buffer![0u32].into_array();
314        let taken = take(original.as_ref(), &indices).unwrap();
315        let taken_array = taken.as_::<VarBinViewVTable>();
316
317        // Compact with threshold=0 (should not compact)
318        let compacted = taken_array.compact_with_threshold(0.0).unwrap();
319
320        // Should still have the same number of buffers as the taken array
321        assert_eq!(compacted.nbuffers(), taken_array.nbuffers());
322
323        // Verify correctness
324        assert_arrays_eq!(compacted, taken);
325    }
326
327    #[test]
328    fn test_selective_compaction_with_high_threshold() {
329        // threshold=1.0 should compact any buffer with waste
330        let original = VarBinViewArray::from_iter_str([
331            "this is a longer string that will be stored in a buffer",
332            "another very long string that definitely needs a buffer to store it",
333            "yet another long string",
334        ]);
335
336        // Take only first and last elements
337        let indices = buffer![0u32, 2u32].into_array();
338        let taken = take(original.as_ref(), &indices).unwrap();
339        let taken_array = taken.as_::<VarBinViewVTable>();
340
341        let original_buffers = taken_array.nbuffers();
342
343        // Compact with threshold=1.0 (aggressive compaction)
344        let compacted = taken_array.compact_with_threshold(1.0).unwrap();
345
346        // Should have compacted buffers
347        assert!(compacted.nbuffers() <= original_buffers);
348
349        // Verify correctness
350        assert_arrays_eq!(compacted, taken);
351    }
352
353    #[test]
354    fn test_selective_compaction_preserves_well_utilized_buffers() {
355        // Create an array with multiple strings in one buffer (well-utilized)
356        let str1 = "first long string that needs external buffer storage";
357        let str2 = "second long string also in buffer";
358        let str3 = "third long string in same buffer";
359
360        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
361
362        // All strings should be in one well-utilized buffer
363        assert_eq!(original.nbuffers(), 1);
364
365        // Compact with high threshold
366        let compacted = original.compact_with_threshold(0.8).unwrap();
367
368        // Well-utilized buffer should be preserved
369        assert_eq!(compacted.nbuffers(), 1);
370
371        // Verify all data is correct
372        assert_arrays_eq!(compacted, original);
373    }
374
375    #[test]
376    fn test_selective_compaction_with_mixed_utilization() {
377        // Create array with some long strings
378        let strings: Vec<String> = (0..10)
379            .map(|i| {
380                format!(
381                    "this is a long string number {} that needs buffer storage",
382                    i
383                )
384            })
385            .collect();
386
387        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
388
389        // Take every other element to create mixed utilization
390        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
391        let taken = take(original.as_ref(), &indices_array).unwrap();
392        let taken_array = taken.as_::<VarBinViewVTable>();
393
394        // Compact with moderate threshold
395        let compacted = taken_array.compact_with_threshold(0.7).unwrap();
396
397        // Verify correctness
398        assert_arrays_eq!(compacted, taken);
399    }
400
401    #[test]
402    fn test_slice_strategy_with_contiguous_range() {
403        // Create array with strings that will be in one buffer
404        let strings: Vec<String> = (0..20)
405            .map(|i| format!("this is a long string number {} for slice test", i))
406            .collect();
407
408        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
409
410        // Take only the first 5 elements - they should be in a contiguous range at the start
411        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
412        let taken = take(original.as_ref(), &indices_array).unwrap();
413        let taken_array = taken.as_::<VarBinViewVTable>();
414
415        // Get buffer stats before compaction
416        let utils_before = taken_array.buffer_utilizations();
417        let original_buffer_count = taken_array.nbuffers();
418
419        // Compact with a threshold that should trigger slicing
420        // The range utilization should be high even if overall utilization is low
421        let compacted = taken_array.compact_with_threshold(0.8).unwrap();
422
423        // After compaction, we should still have buffers (sliced, not rewritten)
424        assert!(
425            compacted.nbuffers() > 0,
426            "Should have buffers after slice compaction"
427        );
428
429        // Verify correctness
430        assert_arrays_eq!(&compacted, taken);
431
432        // Verify that if there was only one buffer, the compacted version also has one
433        // (it was sliced, not rewritten into multiple buffers)
434        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
435            assert_eq!(
436                compacted.nbuffers(),
437                1,
438                "Slice strategy should maintain single buffer"
439            );
440        }
441    }
442
443    const LONG1: &str = "long string one!";
444    const LONG2: &str = "long string two!";
445    const SHORT: &str = "x";
446    const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
447
448    fn mixed_array() -> VarBinViewArray {
449        VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
450    }
451
452    #[rstest]
453    #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
454    #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
455    #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
456    #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
457    fn test_validity_code_paths(
458        #[case] arr: VarBinViewArray,
459        #[case] expected_bytes: u64,
460        #[case] expected_utils: &[f64],
461    ) {
462        assert_eq!(arr.count_referenced_bytes(), expected_bytes);
463        let utils: Vec<f64> = arr
464            .buffer_utilizations()
465            .iter()
466            .map(|u| u.overall_utilization())
467            .collect();
468        assert_eq!(utils, expected_utils);
469    }
470}