Skip to main content

vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::arrays::Ref;
14use crate::arrays::VarBinViewArray;
15use crate::builders::ArrayBuilder;
16use crate::builders::VarBinViewBuilder;
17
18impl VarBinViewArray {
19    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
20    /// operation can be very expensive, in the worst case copying all existing string data into
21    /// a new allocation.
22    ///
23    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
24    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
25    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
26    /// well-utilized buffers unchanged.
27    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
28        // If there is nothing to be gained by compaction, return the original array untouched.
29        if !self.should_compact()? {
30            return Ok(self.clone());
31        }
32
33        // Use selective compaction with threshold of 1.0 (compact any buffer with any waste)
34        self.compact_with_threshold(1.0)
35    }
36
37    fn should_compact(&self) -> VortexResult<bool> {
38        let nbuffers = self.nbuffers();
39
40        // If the array is entirely inlined strings, do not attempt to compact.
41        if nbuffers == 0 {
42            return Ok(false);
43        }
44
45        // These will fail to write, so in most cases we want to compact this.
46        if nbuffers > u16::MAX as usize {
47            return Ok(true);
48        }
49
50        let bytes_referenced: u64 = self.count_referenced_bytes()?;
51        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
52
53        // If there is any wasted space, we want to repack.
54        // This is very aggressive.
55        Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
56    }
57
58    /// Iterates over all valid, non-inlined views, calling the provided
59    /// closure for each one.
60    #[inline(always)]
61    fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
62    where
63        F: FnMut(&Ref),
64    {
65        match self.validity_mask()? {
66            Mask::AllTrue(_) => {
67                for &view in self.views().iter() {
68                    if !view.is_inlined() {
69                        f(view.as_view());
70                    }
71                }
72            }
73            Mask::AllFalse(_) => {}
74            Mask::Values(v) => {
75                for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
76                    if is_valid && !view.is_inlined() {
77                        f(view.as_view());
78                    }
79                }
80            }
81        }
82        Ok(())
83    }
84
85    /// Count the number of bytes addressed by the views, not including null
86    /// values or any inlined strings.
87    fn count_referenced_bytes(&self) -> VortexResult<u64> {
88        let mut total = 0u64;
89        self.iter_valid_views(|view| total += view.size as u64)?;
90        Ok(total)
91    }
92
93    pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
94        let mut utilizations: Vec<BufferUtilization> = self
95            .buffers()
96            .iter()
97            .map(|buf| {
98                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
99                BufferUtilization::zero(len)
100            })
101            .collect();
102
103        self.iter_valid_views(|view| {
104            utilizations[view.buffer_index as usize].add(view.offset, view.size);
105        })?;
106
107        Ok(utilizations)
108    }
109
110    /// Returns a compacted copy of the input array using selective buffer compaction.
111    ///
112    /// This method analyzes each buffer's utilization and applies one of three strategies:
113    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
114    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
115    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
116    ///
117    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
118    ///
119    /// # Arguments
120    ///
121    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
122    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
123    ///   compaction of any buffer with wasted space.
124    pub fn compact_with_threshold(
125        &self,
126        buffer_utilization_threshold: f64, // [0, 1]
127    ) -> VortexResult<VarBinViewArray> {
128        let mut builder = VarBinViewBuilder::with_compaction(
129            self.dtype().clone(),
130            self.len(),
131            buffer_utilization_threshold,
132        );
133        builder.extend_from_array(self.as_ref());
134        Ok(builder.finish_into_varbinview())
135    }
136}
137
138pub(crate) struct BufferUtilization {
139    len: u32,
140    used: u32,
141    min_offset: u32,
142    max_offset_end: u32,
143}
144
145impl BufferUtilization {
146    fn zero(len: u32) -> Self {
147        BufferUtilization {
148            len,
149            used: 0u32,
150            min_offset: u32::MAX,
151            max_offset_end: 0,
152        }
153    }
154
155    fn add(&mut self, offset: u32, size: u32) {
156        self.used += size;
157        self.min_offset = self.min_offset.min(offset);
158        self.max_offset_end = self.max_offset_end.max(offset + size);
159    }
160
161    pub fn overall_utilization(&self) -> f64 {
162        match self.len {
163            0 => 0.0,
164            len => self.used as f64 / len as f64,
165        }
166    }
167
168    pub fn range_utilization(&self) -> f64 {
169        match self.range_span() {
170            0 => 0.0,
171            span => self.used as f64 / span as f64,
172        }
173    }
174
175    pub fn range(&self) -> Range<u32> {
176        self.min_offset..self.max_offset_end
177    }
178
179    fn range_span(&self) -> u32 {
180        self.max_offset_end.saturating_sub(self.min_offset)
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use rstest::rstest;
187    use vortex_buffer::buffer;
188    use vortex_dtype::DType;
189    use vortex_dtype::Nullability;
190
191    use crate::IntoArray;
192    use crate::LEGACY_SESSION;
193    use crate::VortexSessionExecute;
194    use crate::arrays::VarBinArray;
195    use crate::arrays::VarBinViewArray;
196    use crate::assert_arrays_eq;
197    #[test]
198    fn test_optimize_compacts_buffers() {
199        // Create a VarBinViewArray with some long strings that will create multiple buffers
200        let original = VarBinViewArray::from_iter_nullable_str([
201            Some("short"),
202            Some("this is a longer string that will be stored in a buffer"),
203            Some("medium length string"),
204            Some("another very long string that definitely needs a buffer to store it"),
205            Some("tiny"),
206        ]);
207
208        // Verify it has buffers
209        assert!(original.nbuffers() > 0);
210        let original_buffers = original.nbuffers();
211
212        // Take only the first and last elements (indices 0 and 4)
213        let indices = buffer![0u32, 4u32].into_array();
214        let taken = original.take(indices.to_array()).unwrap();
215        let taken = taken
216            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
217            .unwrap();
218        // The taken array should still have the same number of buffers
219        assert_eq!(taken.nbuffers(), original_buffers);
220
221        // Now optimize the taken array
222        let optimized_array = taken.compact_buffers().unwrap();
223
224        // The optimized array should have compacted buffers
225        // Since both remaining strings are short, they should be inlined
226        // so we might have 0 buffers, or 1 buffer if any were not inlined
227        assert!(optimized_array.nbuffers() <= 1);
228
229        // Verify the data is still correct
230        assert_arrays_eq!(
231            optimized_array,
232            <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
233        );
234    }
235
236    #[test]
237    fn test_optimize_with_long_strings() {
238        // Create strings that are definitely longer than 12 bytes
239        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
240        let long_string_2 = "another extremely long string that also needs external buffer storage";
241        let long_string_3 = "yet another long string for testing buffer compaction functionality";
242
243        let original = VarBinViewArray::from_iter_str([
244            long_string_1,
245            long_string_2,
246            long_string_3,
247            "short1",
248            "short2",
249        ]);
250
251        // Take only the first and third long strings (indices 0 and 2)
252        let indices = buffer![0u32, 2u32].into_array();
253        let taken = original.take(indices.to_array()).unwrap();
254        let taken_array = taken
255            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
256            .unwrap();
257
258        // Optimize the taken array
259        let optimized_array = taken_array.compact_buffers().unwrap();
260
261        // The optimized array should have exactly 1 buffer (consolidated)
262        assert_eq!(optimized_array.nbuffers(), 1);
263
264        // Verify the data is still correct
265        assert_arrays_eq!(
266            optimized_array,
267            VarBinArray::from(vec![long_string_1, long_string_3])
268        );
269    }
270
271    #[test]
272    fn test_optimize_no_buffers() {
273        // Create an array with only short strings (all inlined)
274        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
275
276        // This should have no buffers
277        assert_eq!(original.nbuffers(), 0);
278
279        // Optimize should return the same array
280        let optimized_array = original.compact_buffers().unwrap();
281
282        assert_eq!(optimized_array.nbuffers(), 0);
283
284        assert_arrays_eq!(optimized_array, original);
285    }
286
287    #[test]
288    fn test_optimize_single_buffer() {
289        // Create an array that naturally has only one buffer
290        let str1 = "this is a long string that goes into a buffer";
291        let str2 = "another long string in the same buffer";
292        let original = VarBinViewArray::from_iter_str([str1, str2]);
293
294        // Should have 1 compact buffer
295        assert_eq!(original.nbuffers(), 1);
296        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
297
298        // Optimize should return the same array (no change needed)
299        let optimized_array = original.compact_buffers().unwrap();
300
301        assert_eq!(optimized_array.nbuffers(), 1);
302
303        assert_arrays_eq!(optimized_array, original);
304    }
305
306    #[test]
307    fn test_selective_compaction_with_threshold_zero() {
308        // threshold=0 should keep all buffers (no compaction)
309        let original = VarBinViewArray::from_iter_str([
310            "this is a longer string that will be stored in a buffer",
311            "another very long string that definitely needs a buffer to store it",
312        ]);
313
314        let original_buffers = original.nbuffers();
315        assert!(original_buffers > 0);
316
317        // Take only first element
318        let indices = buffer![0u32].into_array();
319        let taken = original.take(indices.to_array()).unwrap();
320        let taken = taken
321            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
322            .unwrap();
323        // Compact with threshold=0 (should not compact)
324        let compacted = taken.compact_with_threshold(0.0).unwrap();
325
326        // Should still have the same number of buffers as the taken array
327        assert_eq!(compacted.nbuffers(), taken.nbuffers());
328
329        // Verify correctness
330        assert_arrays_eq!(compacted, taken);
331    }
332
333    #[test]
334    fn test_selective_compaction_with_high_threshold() {
335        // threshold=1.0 should compact any buffer with waste
336        let original = VarBinViewArray::from_iter_str([
337            "this is a longer string that will be stored in a buffer",
338            "another very long string that definitely needs a buffer to store it",
339            "yet another long string",
340        ]);
341
342        // Take only first and last elements
343        let indices = buffer![0u32, 2u32].into_array();
344        let taken = original.take(indices.to_array()).unwrap();
345        let taken = taken
346            .clone()
347            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
348            .unwrap();
349
350        let original_buffers = taken.nbuffers();
351
352        // Compact with threshold=1.0 (aggressive compaction)
353        let compacted = taken.compact_with_threshold(1.0).unwrap();
354
355        // Should have compacted buffers
356        assert!(compacted.nbuffers() <= original_buffers);
357
358        // Verify correctness
359        assert_arrays_eq!(compacted, taken);
360    }
361
362    #[test]
363    fn test_selective_compaction_preserves_well_utilized_buffers() {
364        // Create an array with multiple strings in one buffer (well-utilized)
365        let str1 = "first long string that needs external buffer storage";
366        let str2 = "second long string also in buffer";
367        let str3 = "third long string in same buffer";
368
369        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
370
371        // All strings should be in one well-utilized buffer
372        assert_eq!(original.nbuffers(), 1);
373
374        // Compact with high threshold
375        let compacted = original.compact_with_threshold(0.8).unwrap();
376
377        // Well-utilized buffer should be preserved
378        assert_eq!(compacted.nbuffers(), 1);
379
380        // Verify all data is correct
381        assert_arrays_eq!(compacted, original);
382    }
383
384    #[test]
385    fn test_selective_compaction_with_mixed_utilization() {
386        // Create array with some long strings
387        let strings: Vec<String> = (0..10)
388            .map(|i| {
389                format!(
390                    "this is a long string number {} that needs buffer storage",
391                    i
392                )
393            })
394            .collect();
395
396        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
397
398        // Take every other element to create mixed utilization
399        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
400        let taken = original.take(indices_array.to_array()).unwrap();
401        let taken = taken
402            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
403            .unwrap();
404
405        // Compact with moderate threshold
406        let compacted = taken.compact_with_threshold(0.7).unwrap();
407
408        let expected = VarBinViewArray::from_iter(
409            [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
410            DType::Utf8(Nullability::NonNullable),
411        );
412        assert_arrays_eq!(expected, compacted);
413    }
414
415    #[test]
416    fn test_slice_strategy_with_contiguous_range() {
417        // Create array with strings that will be in one buffer
418        let strings: Vec<String> = (0..20)
419            .map(|i| format!("this is a long string number {} for slice test", i))
420            .collect();
421
422        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
423
424        // Take only the first 5 elements - they should be in a contiguous range at the start
425        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
426        let taken = original.take(indices_array.to_array()).unwrap();
427        let taken = taken
428            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
429            .unwrap();
430        // Get buffer stats before compaction
431        let utils_before = taken.buffer_utilizations().unwrap();
432        let original_buffer_count = taken.nbuffers();
433
434        // Compact with a threshold that should trigger slicing
435        // The range utilization should be high even if overall utilization is low
436        let compacted = taken.compact_with_threshold(0.8).unwrap();
437
438        // After compaction, we should still have buffers (sliced, not rewritten)
439        assert!(
440            compacted.nbuffers() > 0,
441            "Should have buffers after slice compaction"
442        );
443
444        // Verify correctness
445        assert_arrays_eq!(&compacted, taken);
446
447        // Verify that if there was only one buffer, the compacted version also has one
448        // (it was sliced, not rewritten into multiple buffers)
449        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
450            assert_eq!(
451                compacted.nbuffers(),
452                1,
453                "Slice strategy should maintain single buffer"
454            );
455        }
456    }
457
458    const LONG1: &str = "long string one!";
459    const LONG2: &str = "long string two!";
460    const SHORT: &str = "x";
461    const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
462
463    fn mixed_array() -> VarBinViewArray {
464        VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
465    }
466
467    #[rstest]
468    #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
469    #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
470    #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
471    #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
472    fn test_validity_code_paths(
473        #[case] arr: VarBinViewArray,
474        #[case] expected_bytes: u64,
475        #[case] expected_utils: &[f64],
476    ) {
477        assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
478        let utils: Vec<f64> = arr
479            .buffer_utilizations()
480            .unwrap()
481            .iter()
482            .map(|u| u.overall_utilization())
483            .collect();
484        assert_eq!(utils, expected_utils);
485    }
486}