vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::{VortexExpect, VortexResult};
10
11use crate::arrays::VarBinViewArray;
12use crate::builders::{ArrayBuilder, VarBinViewBuilder};
13use crate::validity::Validity;
14use crate::vtable::ValidityHelper;
15
16impl VarBinViewArray {
17    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
18    /// operation can be very expensive, in the worst case copying all existing string data into
19    /// a new allocation.
20    ///
21    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
22    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
23    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
24    /// well-utilized buffers unchanged.
25    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
26        // If there is nothing to be gained by compaction, return the original array untouched.
27        if !self.should_compact() {
28            return Ok(self.clone());
29        }
30
31        // Use selective compaction with threshold of 1.0 (compact any buffer with any waste)
32        self.compact_with_threshold(1.0)
33    }
34
35    fn should_compact(&self) -> bool {
36        // If the array is entirely inlined strings, do not attempt to compact.
37        if self.nbuffers() == 0 {
38            return false;
39        }
40
41        let bytes_referenced: u64 = self.count_referenced_bytes();
42        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
43
44        // If there is any wasted space, we want to repack.
45        // This is very aggressive.
46        bytes_referenced < buffer_total_bytes
47    }
48
49    // count the number of bytes addressed by the views, not including null
50    // values or any inlined strings.
51    fn count_referenced_bytes(&self) -> u64 {
52        match self.validity() {
53            Validity::AllInvalid => 0u64,
54            Validity::NonNullable | Validity::AllValid | Validity::Array(_) => self
55                .views()
56                .iter()
57                .enumerate()
58                .map(|(idx, &view)| {
59                    if !self.is_valid(idx) || view.is_inlined() {
60                        0u64
61                    } else {
62                        view.len() as u64
63                    }
64                })
65                .sum(),
66        }
67    }
68
69    pub(crate) fn buffer_utilizations(&self) -> Vec<BufferUtilization> {
70        let mut utilizations = self
71            .buffers()
72            .iter()
73            .map(|buf| {
74                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
75                BufferUtilization::zero(len)
76            })
77            .collect();
78
79        if matches!(self.validity(), Validity::AllInvalid) {
80            return utilizations;
81        }
82
83        for (idx, &view) in self.views().iter().enumerate() {
84            if !self.is_valid(idx) || view.is_inlined() {
85                continue;
86            }
87            let view = view.as_view();
88
89            utilizations[view.buffer_index() as usize].add(view.offset(), view.size)
90        }
91
92        utilizations
93    }
94
95    /// Returns a compacted copy of the input array using selective buffer compaction.
96    ///
97    /// This method analyzes each buffer's utilization and applies one of three strategies:
98    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
99    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
100    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
101    ///
102    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
103    ///
104    /// # Arguments
105    ///
106    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
107    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
108    ///   compaction of any buffer with wasted space.
109    pub fn compact_with_threshold(
110        &self,
111        buffer_utilization_threshold: f64, // [0, 1]
112    ) -> VortexResult<VarBinViewArray> {
113        let mut builder = VarBinViewBuilder::with_compaction(
114            self.dtype().clone(),
115            self.len(),
116            buffer_utilization_threshold,
117        );
118        builder.extend_from_array(self.as_ref());
119        Ok(builder.finish_into_varbinview())
120    }
121}
122
123pub(crate) struct BufferUtilization {
124    len: u32,
125    used: u32,
126    min_offset: u32,
127    max_offset_end: u32,
128}
129
130impl BufferUtilization {
131    fn zero(len: u32) -> Self {
132        BufferUtilization {
133            len,
134            used: 0u32,
135            min_offset: u32::MAX,
136            max_offset_end: 0,
137        }
138    }
139
140    fn add(&mut self, offset: u32, size: u32) {
141        self.used += size;
142        self.min_offset = self.min_offset.min(offset);
143        self.max_offset_end = self.max_offset_end.max(offset + size);
144    }
145
146    pub fn overall_utilization(&self) -> f64 {
147        match self.len {
148            0 => 0.0,
149            len => self.used as f64 / len as f64,
150        }
151    }
152
153    pub fn range_utilization(&self) -> f64 {
154        match self.range_span() {
155            0 => 0.0,
156            span => self.used as f64 / span as f64,
157        }
158    }
159
160    pub fn range(&self) -> Range<u32> {
161        self.min_offset..self.max_offset_end
162    }
163
164    fn range_span(&self) -> u32 {
165        self.max_offset_end.saturating_sub(self.min_offset)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use vortex_buffer::buffer;
172
173    use crate::IntoArray;
174    use crate::arrays::{VarBinViewArray, VarBinViewVTable};
175    use crate::compute::take;
176
177    #[test]
178    fn test_optimize_compacts_buffers() {
179        // Create a VarBinViewArray with some long strings that will create multiple buffers
180        let original = VarBinViewArray::from_iter_nullable_str([
181            Some("short"),
182            Some("this is a longer string that will be stored in a buffer"),
183            Some("medium length string"),
184            Some("another very long string that definitely needs a buffer to store it"),
185            Some("tiny"),
186        ]);
187
188        // Verify it has buffers
189        assert!(original.nbuffers() > 0);
190        let original_buffers = original.nbuffers();
191
192        // Take only the first and last elements (indices 0 and 4)
193        let indices = buffer![0u32, 4u32].into_array();
194        let taken = take(original.as_ref(), &indices).unwrap();
195        let taken_array = taken.as_::<VarBinViewVTable>();
196
197        // The taken array should still have the same number of buffers
198        assert_eq!(taken_array.nbuffers(), original_buffers);
199
200        // Now optimize the taken array
201        let optimized_array = taken_array.compact_buffers().unwrap();
202
203        // The optimized array should have compacted buffers
204        // Since both remaining strings are short, they should be inlined
205        // so we might have 0 buffers, or 1 buffer if any were not inlined
206        assert!(optimized_array.nbuffers() <= 1);
207
208        // Verify the data is still correct
209        assert_eq!(optimized_array.len(), 2);
210        assert_eq!(optimized_array.scalar_at(0), "short".into());
211        assert_eq!(optimized_array.scalar_at(1), "tiny".into());
212    }
213
214    #[test]
215    fn test_optimize_with_long_strings() {
216        // Create strings that are definitely longer than 12 bytes
217        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
218        let long_string_2 = "another extremely long string that also needs external buffer storage";
219        let long_string_3 = "yet another long string for testing buffer compaction functionality";
220
221        let original = VarBinViewArray::from_iter_str([
222            long_string_1,
223            long_string_2,
224            long_string_3,
225            "short1",
226            "short2",
227        ]);
228
229        // Take only the first and third long strings (indices 0 and 2)
230        let indices = buffer![0u32, 2u32].into_array();
231        let taken = take(original.as_ref(), &indices).unwrap();
232        let taken_array = taken.as_::<VarBinViewVTable>();
233
234        // Optimize the taken array
235        let optimized_array = taken_array.compact_buffers().unwrap();
236
237        // The optimized array should have exactly 1 buffer (consolidated)
238        assert_eq!(optimized_array.nbuffers(), 1);
239
240        // Verify the data is still correct
241        assert_eq!(optimized_array.len(), 2);
242        assert_eq!(optimized_array.scalar_at(0), long_string_1.into());
243        assert_eq!(optimized_array.scalar_at(1), long_string_3.into());
244    }
245
246    #[test]
247    fn test_optimize_no_buffers() {
248        // Create an array with only short strings (all inlined)
249        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
250
251        // This should have no buffers
252        assert_eq!(original.nbuffers(), 0);
253
254        // Optimize should return the same array
255        let optimized_array = original.compact_buffers().unwrap();
256
257        assert_eq!(optimized_array.nbuffers(), 0);
258        assert_eq!(optimized_array.len(), 4);
259
260        // Verify all values are preserved
261        for i in 0..4 {
262            assert_eq!(optimized_array.scalar_at(i), original.scalar_at(i));
263        }
264    }
265
266    #[test]
267    fn test_optimize_single_buffer() {
268        // Create an array that naturally has only one buffer
269        let str1 = "this is a long string that goes into a buffer";
270        let str2 = "another long string in the same buffer";
271        let original = VarBinViewArray::from_iter_str([str1, str2]);
272
273        // Should have 1 compact buffer
274        assert_eq!(original.nbuffers(), 1);
275        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
276
277        // Optimize should return the same array (no change needed)
278        let optimized_array = original.compact_buffers().unwrap();
279
280        assert_eq!(optimized_array.nbuffers(), 1);
281        assert_eq!(optimized_array.len(), 2);
282
283        // Verify all values are preserved
284        for i in 0..2 {
285            assert_eq!(optimized_array.scalar_at(i), original.scalar_at(i));
286        }
287    }
288
289    #[test]
290    fn test_selective_compaction_with_threshold_zero() {
291        // threshold=0 should keep all buffers (no compaction)
292        let original = VarBinViewArray::from_iter_str([
293            "this is a longer string that will be stored in a buffer",
294            "another very long string that definitely needs a buffer to store it",
295        ]);
296
297        let original_buffers = original.nbuffers();
298        assert!(original_buffers > 0);
299
300        // Take only first element
301        let indices = buffer![0u32].into_array();
302        let taken = take(original.as_ref(), &indices).unwrap();
303        let taken_array = taken.as_::<VarBinViewVTable>();
304
305        // Compact with threshold=0 (should not compact)
306        let compacted = taken_array.compact_with_threshold(0.0).unwrap();
307
308        // Should still have the same number of buffers as the taken array
309        assert_eq!(compacted.nbuffers(), taken_array.nbuffers());
310
311        // Verify correctness
312        assert_eq!(compacted.len(), 1);
313        assert_eq!(
314            compacted.scalar_at(0),
315            "this is a longer string that will be stored in a buffer".into()
316        );
317    }
318
319    #[test]
320    fn test_selective_compaction_with_high_threshold() {
321        // threshold=1.0 should compact any buffer with waste
322        let original = VarBinViewArray::from_iter_str([
323            "this is a longer string that will be stored in a buffer",
324            "another very long string that definitely needs a buffer to store it",
325            "yet another long string",
326        ]);
327
328        // Take only first and last elements
329        let indices = buffer![0u32, 2u32].into_array();
330        let taken = take(original.as_ref(), &indices).unwrap();
331        let taken_array = taken.as_::<VarBinViewVTable>();
332
333        let original_buffers = taken_array.nbuffers();
334
335        // Compact with threshold=1.0 (aggressive compaction)
336        let compacted = taken_array.compact_with_threshold(1.0).unwrap();
337
338        // Should have compacted buffers
339        assert!(compacted.nbuffers() <= original_buffers);
340
341        // Verify correctness
342        assert_eq!(compacted.len(), 2);
343        assert_eq!(
344            compacted.scalar_at(0),
345            "this is a longer string that will be stored in a buffer".into()
346        );
347        assert_eq!(compacted.scalar_at(1), "yet another long string".into());
348    }
349
350    #[test]
351    fn test_selective_compaction_preserves_well_utilized_buffers() {
352        // Create an array with multiple strings in one buffer (well-utilized)
353        let str1 = "first long string that needs external buffer storage";
354        let str2 = "second long string also in buffer";
355        let str3 = "third long string in same buffer";
356
357        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
358
359        // All strings should be in one well-utilized buffer
360        assert_eq!(original.nbuffers(), 1);
361
362        // Compact with high threshold
363        let compacted = original.compact_with_threshold(0.8).unwrap();
364
365        // Well-utilized buffer should be preserved
366        assert_eq!(compacted.nbuffers(), 1);
367
368        // Verify all data is correct
369        assert_eq!(compacted.len(), 3);
370        assert_eq!(compacted.scalar_at(0), str1.into());
371        assert_eq!(compacted.scalar_at(1), str2.into());
372        assert_eq!(compacted.scalar_at(2), str3.into());
373    }
374
375    #[test]
376    fn test_selective_compaction_with_mixed_utilization() {
377        // Create array with some long strings
378        let strings: Vec<String> = (0..10)
379            .map(|i| {
380                format!(
381                    "this is a long string number {} that needs buffer storage",
382                    i
383                )
384            })
385            .collect();
386
387        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
388
389        // Take every other element to create mixed utilization
390        let indices: Vec<u32> = (0..10).step_by(2).map(|i| i as u32).collect();
391        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
392        let taken = take(original.as_ref(), &indices_array).unwrap();
393        let taken_array = taken.as_::<VarBinViewVTable>();
394
395        // Compact with moderate threshold
396        let compacted = taken_array.compact_with_threshold(0.7).unwrap();
397
398        // Verify correctness
399        assert_eq!(compacted.len(), 5);
400        for (i, idx) in indices.iter().enumerate() {
401            assert_eq!(
402                compacted.scalar_at(i),
403                strings[*idx as usize].as_str().into()
404            );
405        }
406    }
407
408    #[test]
409    fn test_slice_strategy_with_contiguous_range() {
410        // Create array with strings that will be in one buffer
411        let strings: Vec<String> = (0..20)
412            .map(|i| format!("this is a long string number {} for slice test", i))
413            .collect();
414
415        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
416
417        // Take only the first 5 elements - they should be in a contiguous range at the start
418        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
419        let taken = take(original.as_ref(), &indices_array).unwrap();
420        let taken_array = taken.as_::<VarBinViewVTable>();
421
422        // Get buffer stats before compaction
423        let utils_before = taken_array.buffer_utilizations();
424        let original_buffer_count = taken_array.nbuffers();
425
426        // Compact with a threshold that should trigger slicing
427        // The range utilization should be high even if overall utilization is low
428        let compacted = taken_array.compact_with_threshold(0.8).unwrap();
429
430        // After compaction, we should still have buffers (sliced, not rewritten)
431        assert!(
432            compacted.nbuffers() > 0,
433            "Should have buffers after slice compaction"
434        );
435
436        // Verify correctness
437        assert_eq!(compacted.len(), 5);
438        for i in 0..5 {
439            assert_eq!(compacted.scalar_at(i), strings[i].as_str().into());
440        }
441
442        // Verify that if there was only one buffer, the compacted version also has one
443        // (it was sliced, not rewritten into multiple buffers)
444        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
445            assert_eq!(
446                compacted.nbuffers(),
447                1,
448                "Slice strategy should maintain single buffer"
449            );
450        }
451    }
452}