vortex_array/arrays/varbinview/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Defines a compaction operation for VarBinViewArrays that evicts unused buffers so they can
5//! be dropped.
6
7use std::ops::Range;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11
12use crate::arrays::VarBinViewArray;
13use crate::builders::ArrayBuilder;
14use crate::builders::VarBinViewBuilder;
15use crate::validity::Validity;
16use crate::vtable::ValidityHelper;
17
18impl VarBinViewArray {
19    /// Returns a compacted copy of the input array, where all wasted space has been cleaned up. This
20    /// operation can be very expensive, in the worst case copying all existing string data into
21    /// a new allocation.
22    ///
23    /// After slicing/taking operations `VarBinViewArray`s can continue to hold references to buffers
24    /// that are no longer visible. We detect when there is wasted space in any of the buffers, and if
25    /// so, will aggressively compact all visible outlined string data into new buffers while keeping
26    /// well-utilized buffers unchanged.
27    pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
28        // If there is nothing to be gained by compaction, return the original array untouched.
29        if !self.should_compact() {
30            return Ok(self.clone());
31        }
32
33        // Use selective compaction with threshold of 1.0 (compact any buffer with any waste)
34        self.compact_with_threshold(1.0)
35    }
36
37    fn should_compact(&self) -> bool {
38        let nbuffers = self.nbuffers();
39
40        // If the array is entirely inlined strings, do not attempt to compact.
41        if nbuffers == 0 {
42            return false;
43        }
44
45        // These will fail to write, so in most cases we want to compact this.
46        if nbuffers > u16::MAX as usize {
47            return true;
48        }
49
50        let bytes_referenced: u64 = self.count_referenced_bytes();
51        let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
52
53        // If there is any wasted space, we want to repack.
54        // This is very aggressive.
55        bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0
56    }
57
58    // count the number of bytes addressed by the views, not including null
59    // values or any inlined strings.
60    fn count_referenced_bytes(&self) -> u64 {
61        match self.validity() {
62            Validity::AllInvalid => 0u64,
63            Validity::NonNullable | Validity::AllValid | Validity::Array(_) => self
64                .views()
65                .iter()
66                .enumerate()
67                .map(|(idx, &view)| {
68                    if !self.is_valid(idx) || view.is_inlined() {
69                        0u64
70                    } else {
71                        view.len() as u64
72                    }
73                })
74                .sum(),
75        }
76    }
77
78    pub(crate) fn buffer_utilizations(&self) -> Vec<BufferUtilization> {
79        let mut utilizations = self
80            .buffers()
81            .iter()
82            .map(|buf| {
83                let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
84                BufferUtilization::zero(len)
85            })
86            .collect();
87
88        if matches!(self.validity(), Validity::AllInvalid) {
89            return utilizations;
90        }
91
92        for (idx, &view) in self.views().iter().enumerate() {
93            if !self.is_valid(idx) || view.is_inlined() {
94                continue;
95            }
96            let view = view.as_view();
97
98            utilizations[view.buffer_index as usize].add(view.offset, view.size)
99        }
100
101        utilizations
102    }
103
104    /// Returns a compacted copy of the input array using selective buffer compaction.
105    ///
106    /// This method analyzes each buffer's utilization and applies one of three strategies:
107    /// - **KeepFull** (zero-copy): Well-utilized buffers are kept unchanged
108    /// - **Slice** (zero-copy): Buffers with contiguous ranges of used data are sliced to that range
109    /// - **Rewrite**: Poorly-utilized buffers have their data copied to new compact buffers
110    ///
111    /// By preserving or slicing well-utilized buffers, compaction becomes zero-copy in many cases.
112    ///
113    /// # Arguments
114    ///
115    /// * `buffer_utilization_threshold` - Threshold in range [0, 1]. Buffers with utilization
116    ///   below this value will be compacted. Use 0.0 for no compaction, 1.0 for aggressive
117    ///   compaction of any buffer with wasted space.
118    pub fn compact_with_threshold(
119        &self,
120        buffer_utilization_threshold: f64, // [0, 1]
121    ) -> VortexResult<VarBinViewArray> {
122        let mut builder = VarBinViewBuilder::with_compaction(
123            self.dtype().clone(),
124            self.len(),
125            buffer_utilization_threshold,
126        );
127        builder.extend_from_array(self.as_ref());
128        Ok(builder.finish_into_varbinview())
129    }
130}
131
132pub(crate) struct BufferUtilization {
133    len: u32,
134    used: u32,
135    min_offset: u32,
136    max_offset_end: u32,
137}
138
139impl BufferUtilization {
140    fn zero(len: u32) -> Self {
141        BufferUtilization {
142            len,
143            used: 0u32,
144            min_offset: u32::MAX,
145            max_offset_end: 0,
146        }
147    }
148
149    fn add(&mut self, offset: u32, size: u32) {
150        self.used += size;
151        self.min_offset = self.min_offset.min(offset);
152        self.max_offset_end = self.max_offset_end.max(offset + size);
153    }
154
155    pub fn overall_utilization(&self) -> f64 {
156        match self.len {
157            0 => 0.0,
158            len => self.used as f64 / len as f64,
159        }
160    }
161
162    pub fn range_utilization(&self) -> f64 {
163        match self.range_span() {
164            0 => 0.0,
165            span => self.used as f64 / span as f64,
166        }
167    }
168
169    pub fn range(&self) -> Range<u32> {
170        self.min_offset..self.max_offset_end
171    }
172
173    fn range_span(&self) -> u32 {
174        self.max_offset_end.saturating_sub(self.min_offset)
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use vortex_buffer::buffer;
181
182    use crate::IntoArray;
183    use crate::arrays::VarBinArray;
184    use crate::arrays::VarBinViewArray;
185    use crate::arrays::VarBinViewVTable;
186    use crate::assert_arrays_eq;
187    use crate::compute::take;
188
189    #[test]
190    fn test_optimize_compacts_buffers() {
191        // Create a VarBinViewArray with some long strings that will create multiple buffers
192        let original = VarBinViewArray::from_iter_nullable_str([
193            Some("short"),
194            Some("this is a longer string that will be stored in a buffer"),
195            Some("medium length string"),
196            Some("another very long string that definitely needs a buffer to store it"),
197            Some("tiny"),
198        ]);
199
200        // Verify it has buffers
201        assert!(original.nbuffers() > 0);
202        let original_buffers = original.nbuffers();
203
204        // Take only the first and last elements (indices 0 and 4)
205        let indices = buffer![0u32, 4u32].into_array();
206        let taken = take(original.as_ref(), &indices).unwrap();
207        let taken_array = taken.as_::<VarBinViewVTable>();
208
209        // The taken array should still have the same number of buffers
210        assert_eq!(taken_array.nbuffers(), original_buffers);
211
212        // Now optimize the taken array
213        let optimized_array = taken_array.compact_buffers().unwrap();
214
215        // The optimized array should have compacted buffers
216        // Since both remaining strings are short, they should be inlined
217        // so we might have 0 buffers, or 1 buffer if any were not inlined
218        assert!(optimized_array.nbuffers() <= 1);
219
220        // Verify the data is still correct
221        assert_arrays_eq!(
222            optimized_array,
223            <VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
224        );
225    }
226
227    #[test]
228    fn test_optimize_with_long_strings() {
229        // Create strings that are definitely longer than 12 bytes
230        let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
231        let long_string_2 = "another extremely long string that also needs external buffer storage";
232        let long_string_3 = "yet another long string for testing buffer compaction functionality";
233
234        let original = VarBinViewArray::from_iter_str([
235            long_string_1,
236            long_string_2,
237            long_string_3,
238            "short1",
239            "short2",
240        ]);
241
242        // Take only the first and third long strings (indices 0 and 2)
243        let indices = buffer![0u32, 2u32].into_array();
244        let taken = take(original.as_ref(), &indices).unwrap();
245        let taken_array = taken.as_::<VarBinViewVTable>();
246
247        // Optimize the taken array
248        let optimized_array = taken_array.compact_buffers().unwrap();
249
250        // The optimized array should have exactly 1 buffer (consolidated)
251        assert_eq!(optimized_array.nbuffers(), 1);
252
253        // Verify the data is still correct
254        assert_arrays_eq!(
255            optimized_array,
256            VarBinArray::from(vec![long_string_1, long_string_3])
257        );
258    }
259
260    #[test]
261    fn test_optimize_no_buffers() {
262        // Create an array with only short strings (all inlined)
263        let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
264
265        // This should have no buffers
266        assert_eq!(original.nbuffers(), 0);
267
268        // Optimize should return the same array
269        let optimized_array = original.compact_buffers().unwrap();
270
271        assert_eq!(optimized_array.nbuffers(), 0);
272
273        assert_arrays_eq!(optimized_array, original);
274    }
275
276    #[test]
277    fn test_optimize_single_buffer() {
278        // Create an array that naturally has only one buffer
279        let str1 = "this is a long string that goes into a buffer";
280        let str2 = "another long string in the same buffer";
281        let original = VarBinViewArray::from_iter_str([str1, str2]);
282
283        // Should have 1 compact buffer
284        assert_eq!(original.nbuffers(), 1);
285        assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
286
287        // Optimize should return the same array (no change needed)
288        let optimized_array = original.compact_buffers().unwrap();
289
290        assert_eq!(optimized_array.nbuffers(), 1);
291
292        assert_arrays_eq!(optimized_array, original);
293    }
294
295    #[test]
296    fn test_selective_compaction_with_threshold_zero() {
297        // threshold=0 should keep all buffers (no compaction)
298        let original = VarBinViewArray::from_iter_str([
299            "this is a longer string that will be stored in a buffer",
300            "another very long string that definitely needs a buffer to store it",
301        ]);
302
303        let original_buffers = original.nbuffers();
304        assert!(original_buffers > 0);
305
306        // Take only first element
307        let indices = buffer![0u32].into_array();
308        let taken = take(original.as_ref(), &indices).unwrap();
309        let taken_array = taken.as_::<VarBinViewVTable>();
310
311        // Compact with threshold=0 (should not compact)
312        let compacted = taken_array.compact_with_threshold(0.0).unwrap();
313
314        // Should still have the same number of buffers as the taken array
315        assert_eq!(compacted.nbuffers(), taken_array.nbuffers());
316
317        // Verify correctness
318        assert_arrays_eq!(compacted, taken);
319    }
320
321    #[test]
322    fn test_selective_compaction_with_high_threshold() {
323        // threshold=1.0 should compact any buffer with waste
324        let original = VarBinViewArray::from_iter_str([
325            "this is a longer string that will be stored in a buffer",
326            "another very long string that definitely needs a buffer to store it",
327            "yet another long string",
328        ]);
329
330        // Take only first and last elements
331        let indices = buffer![0u32, 2u32].into_array();
332        let taken = take(original.as_ref(), &indices).unwrap();
333        let taken_array = taken.as_::<VarBinViewVTable>();
334
335        let original_buffers = taken_array.nbuffers();
336
337        // Compact with threshold=1.0 (aggressive compaction)
338        let compacted = taken_array.compact_with_threshold(1.0).unwrap();
339
340        // Should have compacted buffers
341        assert!(compacted.nbuffers() <= original_buffers);
342
343        // Verify correctness
344        assert_arrays_eq!(compacted, taken);
345    }
346
347    #[test]
348    fn test_selective_compaction_preserves_well_utilized_buffers() {
349        // Create an array with multiple strings in one buffer (well-utilized)
350        let str1 = "first long string that needs external buffer storage";
351        let str2 = "second long string also in buffer";
352        let str3 = "third long string in same buffer";
353
354        let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
355
356        // All strings should be in one well-utilized buffer
357        assert_eq!(original.nbuffers(), 1);
358
359        // Compact with high threshold
360        let compacted = original.compact_with_threshold(0.8).unwrap();
361
362        // Well-utilized buffer should be preserved
363        assert_eq!(compacted.nbuffers(), 1);
364
365        // Verify all data is correct
366        assert_arrays_eq!(compacted, original);
367    }
368
369    #[test]
370    fn test_selective_compaction_with_mixed_utilization() {
371        // Create array with some long strings
372        let strings: Vec<String> = (0..10)
373            .map(|i| {
374                format!(
375                    "this is a long string number {} that needs buffer storage",
376                    i
377                )
378            })
379            .collect();
380
381        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
382
383        // Take every other element to create mixed utilization
384        let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
385        let taken = take(original.as_ref(), &indices_array).unwrap();
386        let taken_array = taken.as_::<VarBinViewVTable>();
387
388        // Compact with moderate threshold
389        let compacted = taken_array.compact_with_threshold(0.7).unwrap();
390
391        // Verify correctness
392        assert_arrays_eq!(compacted, taken);
393    }
394
395    #[test]
396    fn test_slice_strategy_with_contiguous_range() {
397        // Create array with strings that will be in one buffer
398        let strings: Vec<String> = (0..20)
399            .map(|i| format!("this is a long string number {} for slice test", i))
400            .collect();
401
402        let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
403
404        // Take only the first 5 elements - they should be in a contiguous range at the start
405        let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
406        let taken = take(original.as_ref(), &indices_array).unwrap();
407        let taken_array = taken.as_::<VarBinViewVTable>();
408
409        // Get buffer stats before compaction
410        let utils_before = taken_array.buffer_utilizations();
411        let original_buffer_count = taken_array.nbuffers();
412
413        // Compact with a threshold that should trigger slicing
414        // The range utilization should be high even if overall utilization is low
415        let compacted = taken_array.compact_with_threshold(0.8).unwrap();
416
417        // After compaction, we should still have buffers (sliced, not rewritten)
418        assert!(
419            compacted.nbuffers() > 0,
420            "Should have buffers after slice compaction"
421        );
422
423        // Verify correctness
424        assert_arrays_eq!(&compacted, taken);
425
426        // Verify that if there was only one buffer, the compacted version also has one
427        // (it was sliced, not rewritten into multiple buffers)
428        if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
429            assert_eq!(
430                compacted.nbuffers(),
431                1,
432                "Slice strategy should maintain single buffer"
433            );
434        }
435    }
436}