Skip to main content

vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12
13use crate::IntoArray;
14use crate::arrays::VarBinViewArray;
15use crate::arrays::varbinview::Ref;
16use crate::builders::ArrayBuilder;
17use crate::builders::VarBinViewBuilder;
18
19impl VarBinViewArray {
20    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
21    /// operation can be very expensive, in the worst case copying all existing string data into
22    /// a new allocation.
23    ///
24    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
25    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
26    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
27    /// well-utilized buffers unchanged.
28    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
29        // If there is nothing to be gained by compaction, return the original array untouched.
30        if !self.should_compact()? {
31            return Ok(self.clone());
32        }
33
34        // Use selective compaction with threshold of 1.0 (compact any buffer with any waste)
35        self.compact_with_threshold(1.0)
36    }
37
38    fn should_compact(&self) -> VortexResult<bool> {
39        let nbuffers = self.nbuffers();
40
41        // If the array is entirely inlined strings, do not attempt to compact.
42        if nbuffers == 0 {
43            return Ok(false);
44        }
45
46        // These will fail to write, so in most cases we want to compact this.
47        if nbuffers > u16::MAX as usize {
48            return Ok(true);
49        }
50
51        let bytes_referenced: u64 = self.count_referenced_bytes()?;
52        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
53
54        // If there is any wasted space, we want to repack.
55        // This is very aggressive.
56        Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
57    }
58
59    /// Iterates over all valid, non-inlined views, calling the provided
60    /// closure for each one.
61    #[inline(always)]
62    fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
63    where
64        F: FnMut(&Ref),
65    {
66        match self.validity_mask()? {
67            Mask::AllTrue(_) => {
68                for &view in self.views().iter() {
69                    if !view.is_inlined() {
70                        f(view.as_view());
71                    }
72                }
73            }
74            Mask::AllFalse(_) => {}
75            Mask::Values(v) => {
76                for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
77                    if is_valid && !view.is_inlined() {
78                        f(view.as_view());
79                    }
80                }
81            }
82        }
83        Ok(())
84    }
85
86    /// Count the number of bytes addressed by the views, not including null
87    /// values or any inlined strings.
88    fn count_referenced_bytes(&self) -> VortexResult<u64> {
89        let mut total = 0u64;
90        self.iter_valid_views(|view| total += view.size as u64)?;
91        Ok(total)
92    }
93
94    pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
95        let mut utilizations: Vec<BufferUtilization> = self
96            .buffers()
97            .iter()
98            .map(|buf| {
99                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
100                BufferUtilization::zero(len)
101            })
102            .collect();
103
104        self.iter_valid_views(|view| {
105            utilizations[view.buffer_index as usize].add(view.offset, view.size);
106        })?;
107
108        Ok(utilizations)
109    }
110
111    /// Returns a compacted copy of the input array using selective buffer compaction.
112    ///
113    /// This method analyzes each buffer's utilization and applies one of three strategies:
114    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
115    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
116    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
117    ///
118    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
119    ///
120    /// # Arguments
121    ///
122    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
123    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
124    ///   compaction of any buffer with wasted space.
125    pub fn compact_with_threshold(
126        &self,
127        buffer_utilization_threshold: f64, // [0, 1]
128    ) -> VortexResult<VarBinViewArray> {
129        let mut builder = VarBinViewBuilder::with_compaction(
130            self.dtype().clone(),
131            self.len(),
132            buffer_utilization_threshold,
133        );
134        builder.extend_from_array(&self.clone().into_array());
135        Ok(builder.finish_into_varbinview())
136    }
137}
138
139pub(crate) struct BufferUtilization {
140    len: u32,
141    used: u32,
142    min_offset: u32,
143    max_offset_end: u32,
144}
145
146impl BufferUtilization {
147    fn zero(len: u32) -> Self {
148        BufferUtilization {
149            len,
150            used: 0u32,
151            min_offset: u32::MAX,
152            max_offset_end: 0,
153        }
154    }
155
156    fn add(&mut self, offset: u32, size: u32) {
157        self.used += size;
158        self.min_offset = self.min_offset.min(offset);
159        self.max_offset_end = self.max_offset_end.max(offset + size);
160    }
161
162    pub fn overall_utilization(&self) -> f64 {
163        match self.len {
164            0 => 0.0,
165            len => self.used as f64 / len as f64,
166        }
167    }
168
169    pub fn range_utilization(&self) -> f64 {
170        match self.range_span() {
171            0 => 0.0,
172            span => self.used as f64 / span as f64,
173        }
174    }
175
176    pub fn range(&self) -> Range<u32> {
177        self.min_offset..self.max_offset_end
178    }
179
180    fn range_span(&self) -> u32 {
181        self.max_offset_end.saturating_sub(self.min_offset)
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use rstest::rstest;
188    use vortex_buffer::buffer;
189
190    use crate::IntoArray;
191    use crate::LEGACY_SESSION;
192    use crate::VortexSessionExecute;
193    use crate::arrays::VarBinArray;
194    use crate::arrays::VarBinViewArray;
195    use crate::assert_arrays_eq;
196    use crate::dtype::DType;
197    use crate::dtype::Nullability;
198    #[test]
199    fn test_optimize_compacts_buffers() {
200        // Create a VarBinViewArray with some long strings that will create multiple buffers
201        let original = VarBinViewArray::from_iter_nullable_str([
202            Some("short"),
203            Some("this is a longer string that will be stored in a buffer"),
204            Some("medium length string"),
205            Some("another very long string that definitely needs a buffer to store it"),
206            Some("tiny"),
207        ]);
208
209        // Verify it has buffers
210        assert!(original.nbuffers() > 0);
211        let original_buffers = original.nbuffers();
212
213        // Take only the first and last elements (indices 0 and 4)
214        let indices = buffer![0u32, 4u32].into_array();
215        let taken = original.take(indices.to_array()).unwrap();
216        let taken = taken
217            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
218            .unwrap();
219        // The taken array should still have the same number of buffers
220        assert_eq!(taken.nbuffers(), original_buffers);
221
222        // Now optimize the taken array
223        let optimized_array = taken.compact_buffers().unwrap();
224
225        // The optimized array should have compacted buffers
226        // Since both remaining strings are short, they should be inlined
227        // so we might have 0 buffers, or 1 buffer if any were not inlined
228        assert!(optimized_array.nbuffers() <= 1);
229
230        // Verify the data is still correct
231        assert_arrays_eq!(
232            optimized_array,
233            <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
234        );
235    }
236
237    #[test]
238    fn test_optimize_with_long_strings() {
239        // Create strings that are definitely longer than 12 bytes
240        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
241        let long_string_2 = "another extremely long string that also needs external buffer storage";
242        let long_string_3 = "yet another long string for testing buffer compaction functionality";
243
244        let original = VarBinViewArray::from_iter_str([
245            long_string_1,
246            long_string_2,
247            long_string_3,
248            "short1",
249            "short2",
250        ]);
251
252        // Take only the first and third long strings (indices 0 and 2)
253        let indices = buffer![0u32, 2u32].into_array();
254        let taken = original.take(indices.to_array()).unwrap();
255        let taken_array = taken
256            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
257            .unwrap();
258
259        // Optimize the taken array
260        let optimized_array = taken_array.compact_buffers().unwrap();
261
262        // The optimized array should have exactly 1 buffer (consolidated)
263        assert_eq!(optimized_array.nbuffers(), 1);
264
265        // Verify the data is still correct
266        assert_arrays_eq!(
267            optimized_array,
268            VarBinArray::from(vec![long_string_1, long_string_3])
269        );
270    }
271
272    #[test]
273    fn test_optimize_no_buffers() {
274        // Create an array with only short strings (all inlined)
275        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
276
277        // This should have no buffers
278        assert_eq!(original.nbuffers(), 0);
279
280        // Optimize should return the same array
281        let optimized_array = original.compact_buffers().unwrap();
282
283        assert_eq!(optimized_array.nbuffers(), 0);
284
285        assert_arrays_eq!(optimized_array, original);
286    }
287
288    #[test]
289    fn test_optimize_single_buffer() {
290        // Create an array that naturally has only one buffer
291        let str1 = "this is a long string that goes into a buffer";
292        let str2 = "another long string in the same buffer";
293        let original = VarBinViewArray::from_iter_str([str1, str2]);
294
295        // Should have 1 compact buffer
296        assert_eq!(original.nbuffers(), 1);
297        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
298
299        // Optimize should return the same array (no change needed)
300        let optimized_array = original.compact_buffers().unwrap();
301
302        assert_eq!(optimized_array.nbuffers(), 1);
303
304        assert_arrays_eq!(optimized_array, original);
305    }
306
307    #[test]
308    fn test_selective_compaction_with_threshold_zero() {
309        // threshold=0 should keep all buffers (no compaction)
310        let original = VarBinViewArray::from_iter_str([
311            "this is a longer string that will be stored in a buffer",
312            "another very long string that definitely needs a buffer to store it",
313        ]);
314
315        let original_buffers = original.nbuffers();
316        assert!(original_buffers > 0);
317
318        // Take only first element
319        let indices = buffer![0u32].into_array();
320        let taken = original.take(indices.to_array()).unwrap();
321        let taken = taken
322            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
323            .unwrap();
324        // Compact with threshold=0 (should not compact)
325        let compacted = taken.compact_with_threshold(0.0).unwrap();
326
327        // Should still have the same number of buffers as the taken array
328        assert_eq!(compacted.nbuffers(), taken.nbuffers());
329
330        // Verify correctness
331        assert_arrays_eq!(compacted, taken);
332    }
333
334    #[test]
335    fn test_selective_compaction_with_high_threshold() {
336        // threshold=1.0 should compact any buffer with waste
337        let original = VarBinViewArray::from_iter_str([
338            "this is a longer string that will be stored in a buffer",
339            "another very long string that definitely needs a buffer to store it",
340            "yet another long string",
341        ]);
342
343        // Take only first and last elements
344        let indices = buffer![0u32, 2u32].into_array();
345        let taken = original.take(indices.to_array()).unwrap();
346        let taken = taken
347            .clone()
348            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
349            .unwrap();
350
351        let original_buffers = taken.nbuffers();
352
353        // Compact with threshold=1.0 (aggressive compaction)
354        let compacted = taken.compact_with_threshold(1.0).unwrap();
355
356        // Should have compacted buffers
357        assert!(compacted.nbuffers() <= original_buffers);
358
359        // Verify correctness
360        assert_arrays_eq!(compacted, taken);
361    }
362
363    #[test]
364    fn test_selective_compaction_preserves_well_utilized_buffers() {
365        // Create an array with multiple strings in one buffer (well-utilized)
366        let str1 = "first long string that needs external buffer storage";
367        let str2 = "second long string also in buffer";
368        let str3 = "third long string in same buffer";
369
370        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
371
372        // All strings should be in one well-utilized buffer
373        assert_eq!(original.nbuffers(), 1);
374
375        // Compact with high threshold
376        let compacted = original.compact_with_threshold(0.8).unwrap();
377
378        // Well-utilized buffer should be preserved
379        assert_eq!(compacted.nbuffers(), 1);
380
381        // Verify all data is correct
382        assert_arrays_eq!(compacted, original);
383    }
384
385    #[test]
386    fn test_selective_compaction_with_mixed_utilization() {
387        // Create array with some long strings
388        let strings: Vec<String> = (0..10)
389            .map(|i| {
390                format!(
391                    "this is a long string number {} that needs buffer storage",
392                    i
393                )
394            })
395            .collect();
396
397        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
398
399        // Take every other element to create mixed utilization
400        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
401        let taken = original.take(indices_array.to_array()).unwrap();
402        let taken = taken
403            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
404            .unwrap();
405
406        // Compact with moderate threshold
407        let compacted = taken.compact_with_threshold(0.7).unwrap();
408
409        let expected = VarBinViewArray::from_iter(
410            [0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
411            DType::Utf8(Nullability::NonNullable),
412        );
413        assert_arrays_eq!(expected, compacted);
414    }
415
416    #[test]
417    fn test_slice_strategy_with_contiguous_range() {
418        // Create array with strings that will be in one buffer
419        let strings: Vec<String> = (0..20)
420            .map(|i| format!("this is a long string number {} for slice test", i))
421            .collect();
422
423        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
424
425        // Take only the first 5 elements - they should be in a contiguous range at the start
426        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
427        let taken = original.take(indices_array.to_array()).unwrap();
428        let taken = taken
429            .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
430            .unwrap();
431        // Get buffer stats before compaction
432        let utils_before = taken.buffer_utilizations().unwrap();
433        let original_buffer_count = taken.nbuffers();
434
435        // Compact with a threshold that should trigger slicing
436        // The range utilization should be high even if overall utilization is low
437        let compacted = taken.compact_with_threshold(0.8).unwrap();
438
439        // After compaction, we should still have buffers (sliced, not rewritten)
440        assert!(
441            compacted.nbuffers() > 0,
442            "Should have buffers after slice compaction"
443        );
444
445        // Verify correctness
446        assert_arrays_eq!(&compacted, taken);
447
448        // Verify that if there was only one buffer, the compacted version also has one
449        // (it was sliced, not rewritten into multiple buffers)
450        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
451            assert_eq!(
452                compacted.nbuffers(),
453                1,
454                "Slice strategy should maintain single buffer"
455            );
456        }
457    }
458
459    const LONG1: &str = "long string one!";
460    const LONG2: &str = "long string two!";
461    const SHORT: &str = "x";
462    const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
463
464    fn mixed_array() -> VarBinViewArray {
465        VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
466    }
467
468    #[rstest]
469    #[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
470    #[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
471    #[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
472    #[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
473    fn test_validity_code_paths(
474        #[case] arr: VarBinViewArray,
475        #[case] expected_bytes: u64,
476        #[case] expected_utils: &[f64],
477    ) {
478        assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
479        let utils: Vec<f64> = arr
480            .buffer_utilizations()
481            .unwrap()
482            .iter()
483            .map(|u| u.overall_utilization())
484            .collect();
485        assert_eq!(utils, expected_utils);
486    }
487}