Skip to main content

vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::IntoArray;
14use crate::LEGACY_SESSION;
15use crate::VortexSessionExecute;
16use crate::arrays::VarBinViewArray;
17use crate::arrays::varbinview::Ref;
18use crate::builders::ArrayBuilder;
19use crate::builders::VarBinViewBuilder;
20
21impl VarBinViewArray {
22    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
23    /// operation can be very expensive, in the worst case copying all existing string data into
24    /// a new allocation.
25    ///
26    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
27    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
28    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
29    /// well-utilized buffers unchanged.
30    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
31        // If there is nothing to be gained by compaction, return the original array untouched.
32        if !self.should_compact()? {
33            return Ok(self.clone());
34        }
35
36        // Use selective compaction with threshold of 1.0 (compact any buffer with any waste)
37        self.compact_with_threshold(1.0)
38    }
39
40    fn should_compact(&self) -> VortexResult<bool> {
41        let nbuffers = self.data_buffers().len();
42
43        // If the array is entirely inlined strings, do not attempt to compact.
44        if nbuffers == 0 {
45            return Ok(false);
46        }
47
48        // These will fail to write, so in most cases we want to compact this.
49        if nbuffers > u16::MAX as usize {
50            return Ok(true);
51        }
52
53        let bytes_referenced: u64 = self.count_referenced_bytes()?;
54        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
55
56        // If there is any wasted space, we want to repack.
57        // This is very aggressive.
58        Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
59    }
60
61    /// Iterates over all valid, non-inlined views, calling the provided
62    /// closure for each one.
63    #[inline(always)]
64    fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
65    where
66        F: FnMut(&Ref),
67    {
68        match self.as_ref().validity()?.to_mask(
69            self.as_ref().len(),
70            &mut LEGACY_SESSION.create_execution_ctx(),
71        )? {
72            Mask::AllTrue(_) => {
73                for &view in self.views().iter() {
74                    if !view.is_inlined() {
75                        f(view.as_view());
76                    }
77                }
78            }
79            Mask::AllFalse(_) => {}
80            Mask::Values(v) => {
81                for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
82                    if is_valid && !view.is_inlined() {
83                        f(view.as_view());
84                    }
85                }
86            }
87        }
88        Ok(())
89    }
90
91    /// Count the number of bytes addressed by the views, not including null
92    /// values or any inlined strings.
93    fn count_referenced_bytes(&self) -> VortexResult<u64> {
94        let mut total = 0u64;
95        self.iter_valid_views(|view| total += view.size as u64)?;
96        Ok(total)
97    }
98
99    pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
100        let mut utilizations: Vec<BufferUtilization> = self
101            .data_buffers()
102            .iter()
103            .map(|buf| {
104                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
105                BufferUtilization::zero(len)
106            })
107            .collect();
108
109        self.iter_valid_views(|view| {
110            utilizations[view.buffer_index as usize].add(view.offset, view.size);
111        })?;
112
113        Ok(utilizations)
114    }
115
116    /// Returns a compacted copy of the input array using selective buffer compaction.
117    ///
118    /// This method analyzes each buffer's utilization and applies one of three strategies:
119    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
120    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
121    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
122    ///
123    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
124    ///
125    /// # Arguments
126    ///
127    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
128    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
129    ///   compaction of any buffer with wasted space.
130    pub fn compact_with_threshold(
131        &self,
132        buffer_utilization_threshold: f64, // [0, 1]
133    ) -> VortexResult<VarBinViewArray> {
134        let mut builder = VarBinViewBuilder::with_compaction(
135            self.dtype().clone(),
136            self.len(),
137            buffer_utilization_threshold,
138        );
139        builder.extend_from_array(&self.clone().into_array());
140        Ok(builder.finish_into_varbinview())
141    }
142}
143
144pub(crate) struct BufferUtilization {
145    len: u32,
146    used: u32,
147    min_offset: u32,
148    max_offset_end: u32,
149}
150
151impl BufferUtilization {
152    fn zero(len: u32) -> Self {
153        BufferUtilization {
154            len,
155            used: 0u32,
156            min_offset: u32::MAX,
157            max_offset_end: 0,
158        }
159    }
160
161    fn add(&mut self, offset: u32, size: u32) {
162        self.used += size;
163        self.min_offset = self.min_offset.min(offset);
164        self.max_offset_end = self.max_offset_end.max(offset + size);
165    }
166
167    pub fn overall_utilization(&self) -> f64 {
168        match self.len {
169            0 => 0.0,
170            len => self.used as f64 / len as f64,
171        }
172    }
173
174    pub fn range_utilization(&self) -> f64 {
175        match self.range_span() {
176            0 => 0.0,
177            span => self.used as f64 / span as f64,
178        }
179    }
180
181    pub fn range(&self) -> Range<u32> {
182        self.min_offset..self.max_offset_end
183    }
184
185    fn range_span(&self) -> u32 {
186        self.max_offset_end.saturating_sub(self.min_offset)
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use rstest::rstest;
193    use vortex_buffer::buffer;
194
195    use crate::IntoArray;
196    use crate::LEGACY_SESSION;
197    use crate::VortexSessionExecute;
198    use crate::arrays::VarBinArray;
199    use crate::arrays::VarBinViewArray;
200    use crate::assert_arrays_eq;
201    use crate::dtype::DType;
202    use crate::dtype::Nullability;
203    #[test]
204    fn test_optimize_compacts_buffers() {
205        // Create a VarBinViewArray with some long strings that will create multiple buffers
206        let original = VarBinViewArray::from_iter_nullable_str([
207            Some("short"),
208            Some("this is a longer string that will be stored in a buffer"),
209            Some("medium length string"),
210            Some("another very long string that definitely needs a buffer to store it"),
211            Some("tiny"),
212        ]);
213
214        // Verify it has buffers
215        assert!(!original.data_buffers().is_empty());
216        let original_buffers = original.data_buffers().len();
217
218        // Take only the first and last elements (indices 0 and 4)
219        let indices = buffer![0u32, 4u32].into_array();
220        let taken = original.take(indices).unwrap();
221        let taken = taken
222            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
223            .unwrap();
224        // The taken array should still have the same number of buffers
225        assert_eq!(taken.data_buffers().len(), original_buffers);
226
227        // Now optimize the taken array
228        let optimized_array = taken.compact_buffers().unwrap();
229
230        // The optimized array should have compacted buffers
231        // Since both remaining strings are short, they should be inlined
232        // so we might have 0 buffers, or 1 buffer if any were not inlined
233        assert!(optimized_array.data_buffers().len() <= 1);
234
235        // Verify the data is still correct
236        assert_arrays_eq!(
237            optimized_array,
238            <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
239        );
240    }
241
242    #[test]
243    fn test_optimize_with_long_strings() {
244        // Create strings that are definitely longer than 12 bytes
245        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
246        let long_string_2 = "another extremely long string that also needs external buffer storage";
247        let long_string_3 = "yet another long string for testing buffer compaction functionality";
248
249        let original = VarBinViewArray::from_iter_str([
250            long_string_1,
251            long_string_2,
252            long_string_3,
253            "short1",
254            "short2",
255        ]);
256
257        // Take only the first and third long strings (indices 0 and 2)
258        let indices = buffer![0u32, 2u32].into_array();
259        let taken = original.take(indices).unwrap();
260        let taken_array = taken
261            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
262            .unwrap();
263
264        // Optimize the taken array
265        let optimized_array = taken_array.compact_buffers().unwrap();
266
267        // The optimized array should have exactly 1 buffer (consolidated)
268        assert_eq!(optimized_array.data_buffers().len(), 1);
269
270        // Verify the data is still correct
271        assert_arrays_eq!(
272            optimized_array,
273            VarBinArray::from(vec![long_string_1, long_string_3])
274        );
275    }
276
277    #[test]
278    fn test_optimize_no_buffers() {
279        // Create an array with only short strings (all inlined)
280        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
281
282        // This should have no buffers
283        assert_eq!(original.data_buffers().len(), 0);
284
285        // Optimize should return the same array
286        let optimized_array = original.compact_buffers().unwrap();
287
288        assert_eq!(optimized_array.data_buffers().len(), 0);
289
290        assert_arrays_eq!(optimized_array, original);
291    }
292
293    #[test]
294    fn test_optimize_single_buffer() {
295        // Create an array that naturally has only one buffer
296        let str1 = "this is a long string that goes into a buffer";
297        let str2 = "another long string in the same buffer";
298        let original = VarBinViewArray::from_iter_str([str1, str2]);
299
300        // Should have 1 compact buffer
301        assert_eq!(original.data_buffers().len(), 1);
302        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
303
304        // Optimize should return the same array (no change needed)
305        let optimized_array = original.compact_buffers().unwrap();
306
307        assert_eq!(optimized_array.data_buffers().len(), 1);
308
309        assert_arrays_eq!(optimized_array, original);
310    }
311
312    #[test]
313    fn test_selective_compaction_with_threshold_zero() {
314        // threshold=0 should keep all buffers (no compaction)
315        let original = VarBinViewArray::from_iter_str([
316            "this is a longer string that will be stored in a buffer",
317            "another very long string that definitely needs a buffer to store it",
318        ]);
319
320        let original_buffers = original.data_buffers().len();
321        assert!(original_buffers > 0);
322
323        // Take only first element
324        let indices = buffer![0u32].into_array();
325        let taken = original.take(indices).unwrap();
326        let taken = taken
327            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
328            .unwrap();
329        // Compact with threshold=0 (should not compact)
330        let compacted = taken.compact_with_threshold(0.0).unwrap();
331
332        // Should still have the same number of buffers as the taken array
333        assert_eq!(compacted.data_buffers().len(), taken.data_buffers().len());
334
335        // Verify correctness
336        assert_arrays_eq!(compacted, taken);
337    }
338
339    #[test]
340    fn test_selective_compaction_with_high_threshold() {
341        // threshold=1.0 should compact any buffer with waste
342        let original = VarBinViewArray::from_iter_str([
343            "this is a longer string that will be stored in a buffer",
344            "another very long string that definitely needs a buffer to store it",
345            "yet another long string",
346        ]);
347
348        // Take only first and last elements
349        let indices = buffer![0u32, 2u32].into_array();
350        let taken = original.take(indices).unwrap();
351        let taken = taken
352            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
353            .unwrap();
354
355        let original_buffers = taken.data_buffers().len();
356
357        // Compact with threshold=1.0 (aggressive compaction)
358        let compacted = taken.compact_with_threshold(1.0).unwrap();
359
360        // Should have compacted buffers
361        assert!(compacted.data_buffers().len() <= original_buffers);
362
363        // Verify correctness
364        assert_arrays_eq!(compacted, taken);
365    }
366
367    #[test]
368    fn test_selective_compaction_preserves_well_utilized_buffers() {
369        // Create an array with multiple strings in one buffer (well-utilized)
370        let str1 = "first long string that needs external buffer storage";
371        let str2 = "second long string also in buffer";
372        let str3 = "third long string in same buffer";
373
374        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
375
376        // All strings should be in one well-utilized buffer
377        assert_eq!(original.data_buffers().len(), 1);
378
379        // Compact with high threshold
380        let compacted = original.compact_with_threshold(0.8).unwrap();
381
382        // Well-utilized buffer should be preserved
383        assert_eq!(compacted.data_buffers().len(), 1);
384
385        // Verify all data is correct
386        assert_arrays_eq!(compacted, original);
387    }
388
389    #[test]
390    fn test_selective_compaction_with_mixed_utilization() {
391        // Create array with some long strings
392        let strings: Vec<String> = (0..10)
393            .map(|i| {
394                format!(
395                    "this is a long string number {} that needs buffer storage",
396                    i
397                )
398            })
399            .collect();
400
401        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
402
403        // Take every other element to create mixed utilization
404        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
405        let taken = original.take(indices_array).unwrap();
406        let taken = taken
407            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
408            .unwrap();
409
410        // Compact with moderate threshold
411        let compacted = taken.compact_with_threshold(0.7).unwrap();
412
413        let expected = VarBinViewArray::from_iter(
414            [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
415            DType::Utf8(Nullability::NonNullable),
416        );
417        assert_arrays_eq!(expected, compacted);
418    }
419
420    #[test]
421    fn test_slice_strategy_with_contiguous_range() {
422        // Create array with strings that will be in one buffer
423        let strings: Vec<String> = (0..20)
424            .map(|i| format!("this is a long string number {} for slice test", i))
425            .collect();
426
427        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
428
429        // Take only the first 5 elements - they should be in a contiguous range at the start
430        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
431        let taken = original.take(indices_array).unwrap();
432        let taken = taken
433            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
434            .unwrap();
435        // Get buffer stats before compaction
436        let utils_before = taken.buffer_utilizations().unwrap();
437        let original_buffer_count = taken.data_buffers().len();
438
439        // Compact with a threshold that should trigger slicing
440        // The range utilization should be high even if overall utilization is low
441        let compacted = taken.compact_with_threshold(0.8).unwrap();
442
443        // After compaction, we should still have buffers (sliced, not rewritten)
444        assert!(
445            !compacted.data_buffers().is_empty(),
446            "Should have buffers after slice compaction"
447        );
448
449        // Verify correctness
450        assert_arrays_eq!(&compacted, taken);
451
452        // Verify that if there was only one buffer, the compacted version also has one
453        // (it was sliced, not rewritten into multiple buffers)
454        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
455            assert_eq!(
456                compacted.data_buffers().len(),
457                1,
458                "Slice strategy should maintain single buffer"
459            );
460        }
461    }
462
463    const LONG1: &str = "long string one!";
464    const LONG2: &str = "long string two!";
465    const SHORT: &str = "x";
466    const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
467
468    fn mixed_array() -> VarBinViewArray {
469        VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
470    }
471
472    #[rstest]
473    #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
474    #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
475    #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
476    #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
477    fn test_validity_code_paths(
478        #[case] arr: VarBinViewArray,
479        #[case] expected_bytes: u64,
480        #[case] expected_utils: &[f64],
481    ) {
482        assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
483        let utils: Vec<f64> = arr
484            .buffer_utilizations()
485            .unwrap()
486            .iter()
487            .map(|u| u.overall_utilization())
488            .collect();
489        assert_eq!(utils, expected_utils);
490    }
491}