vortex_array/arrays/varbinview/
compact.rs1use std::ops::Range;
8
9use vortex_error::{VortexExpect, VortexResult};
10
11use crate::arrays::VarBinViewArray;
12use crate::builders::{ArrayBuilder, VarBinViewBuilder};
13use crate::validity::Validity;
14use crate::vtable::ValidityHelper;
15
16impl VarBinViewArray {
17 pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
26 if !self.should_compact() {
28 return Ok(self.clone());
29 }
30
31 self.compact_with_threshold(1.0)
33 }
34
35 fn should_compact(&self) -> bool {
36 if self.nbuffers() == 0 {
38 return false;
39 }
40
41 let bytes_referenced: u64 = self.count_referenced_bytes();
42 let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
43
44 bytes_referenced < buffer_total_bytes
47 }
48
49 fn count_referenced_bytes(&self) -> u64 {
52 match self.validity() {
53 Validity::AllInvalid => 0u64,
54 Validity::NonNullable | Validity::AllValid | Validity::Array(_) => self
55 .views()
56 .iter()
57 .enumerate()
58 .map(|(idx, &view)| {
59 if !self.is_valid(idx) || view.is_inlined() {
60 0u64
61 } else {
62 view.len() as u64
63 }
64 })
65 .sum(),
66 }
67 }
68
69 pub(crate) fn buffer_utilizations(&self) -> Vec<BufferUtilization> {
70 let mut utilizations = self
71 .buffers()
72 .iter()
73 .map(|buf| {
74 let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
75 BufferUtilization::zero(len)
76 })
77 .collect();
78
79 if matches!(self.validity(), Validity::AllInvalid) {
80 return utilizations;
81 }
82
83 for (idx, &view) in self.views().iter().enumerate() {
84 if !self.is_valid(idx) || view.is_inlined() {
85 continue;
86 }
87 let view = view.as_view();
88
89 utilizations[view.buffer_index() as usize].add(view.offset(), view.size)
90 }
91
92 utilizations
93 }
94
95 pub fn compact_with_threshold(
110 &self,
111 buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
113 let mut builder = VarBinViewBuilder::with_compaction(
114 self.dtype().clone(),
115 self.len(),
116 buffer_utilization_threshold,
117 );
118 builder.extend_from_array(self.as_ref());
119 Ok(builder.finish_into_varbinview())
120 }
121}
122
123pub(crate) struct BufferUtilization {
124 len: u32,
125 used: u32,
126 min_offset: u32,
127 max_offset_end: u32,
128}
129
130impl BufferUtilization {
131 fn zero(len: u32) -> Self {
132 BufferUtilization {
133 len,
134 used: 0u32,
135 min_offset: u32::MAX,
136 max_offset_end: 0,
137 }
138 }
139
140 fn add(&mut self, offset: u32, size: u32) {
141 self.used += size;
142 self.min_offset = self.min_offset.min(offset);
143 self.max_offset_end = self.max_offset_end.max(offset + size);
144 }
145
146 pub fn overall_utilization(&self) -> f64 {
147 match self.len {
148 0 => 0.0,
149 len => self.used as f64 / len as f64,
150 }
151 }
152
153 pub fn range_utilization(&self) -> f64 {
154 match self.range_span() {
155 0 => 0.0,
156 span => self.used as f64 / span as f64,
157 }
158 }
159
160 pub fn range(&self) -> Range<u32> {
161 self.min_offset..self.max_offset_end
162 }
163
164 fn range_span(&self) -> u32 {
165 self.max_offset_end.saturating_sub(self.min_offset)
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use vortex_buffer::buffer;
172
173 use crate::IntoArray;
174 use crate::arrays::{VarBinViewArray, VarBinViewVTable};
175 use crate::compute::take;
176
177 #[test]
178 fn test_optimize_compacts_buffers() {
179 let original = VarBinViewArray::from_iter_nullable_str([
181 Some("short"),
182 Some("this is a longer string that will be stored in a buffer"),
183 Some("medium length string"),
184 Some("another very long string that definitely needs a buffer to store it"),
185 Some("tiny"),
186 ]);
187
188 assert!(original.nbuffers() > 0);
190 let original_buffers = original.nbuffers();
191
192 let indices = buffer![0u32, 4u32].into_array();
194 let taken = take(original.as_ref(), &indices).unwrap();
195 let taken_array = taken.as_::<VarBinViewVTable>();
196
197 assert_eq!(taken_array.nbuffers(), original_buffers);
199
200 let optimized_array = taken_array.compact_buffers().unwrap();
202
203 assert!(optimized_array.nbuffers() <= 1);
207
208 assert_eq!(optimized_array.len(), 2);
210 assert_eq!(optimized_array.scalar_at(0), "short".into());
211 assert_eq!(optimized_array.scalar_at(1), "tiny".into());
212 }
213
214 #[test]
215 fn test_optimize_with_long_strings() {
216 let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
218 let long_string_2 = "another extremely long string that also needs external buffer storage";
219 let long_string_3 = "yet another long string for testing buffer compaction functionality";
220
221 let original = VarBinViewArray::from_iter_str([
222 long_string_1,
223 long_string_2,
224 long_string_3,
225 "short1",
226 "short2",
227 ]);
228
229 let indices = buffer![0u32, 2u32].into_array();
231 let taken = take(original.as_ref(), &indices).unwrap();
232 let taken_array = taken.as_::<VarBinViewVTable>();
233
234 let optimized_array = taken_array.compact_buffers().unwrap();
236
237 assert_eq!(optimized_array.nbuffers(), 1);
239
240 assert_eq!(optimized_array.len(), 2);
242 assert_eq!(optimized_array.scalar_at(0), long_string_1.into());
243 assert_eq!(optimized_array.scalar_at(1), long_string_3.into());
244 }
245
246 #[test]
247 fn test_optimize_no_buffers() {
248 let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
250
251 assert_eq!(original.nbuffers(), 0);
253
254 let optimized_array = original.compact_buffers().unwrap();
256
257 assert_eq!(optimized_array.nbuffers(), 0);
258 assert_eq!(optimized_array.len(), 4);
259
260 for i in 0..4 {
262 assert_eq!(optimized_array.scalar_at(i), original.scalar_at(i));
263 }
264 }
265
266 #[test]
267 fn test_optimize_single_buffer() {
268 let str1 = "this is a long string that goes into a buffer";
270 let str2 = "another long string in the same buffer";
271 let original = VarBinViewArray::from_iter_str([str1, str2]);
272
273 assert_eq!(original.nbuffers(), 1);
275 assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
276
277 let optimized_array = original.compact_buffers().unwrap();
279
280 assert_eq!(optimized_array.nbuffers(), 1);
281 assert_eq!(optimized_array.len(), 2);
282
283 for i in 0..2 {
285 assert_eq!(optimized_array.scalar_at(i), original.scalar_at(i));
286 }
287 }
288
289 #[test]
290 fn test_selective_compaction_with_threshold_zero() {
291 let original = VarBinViewArray::from_iter_str([
293 "this is a longer string that will be stored in a buffer",
294 "another very long string that definitely needs a buffer to store it",
295 ]);
296
297 let original_buffers = original.nbuffers();
298 assert!(original_buffers > 0);
299
300 let indices = buffer![0u32].into_array();
302 let taken = take(original.as_ref(), &indices).unwrap();
303 let taken_array = taken.as_::<VarBinViewVTable>();
304
305 let compacted = taken_array.compact_with_threshold(0.0).unwrap();
307
308 assert_eq!(compacted.nbuffers(), taken_array.nbuffers());
310
311 assert_eq!(compacted.len(), 1);
313 assert_eq!(
314 compacted.scalar_at(0),
315 "this is a longer string that will be stored in a buffer".into()
316 );
317 }
318
319 #[test]
320 fn test_selective_compaction_with_high_threshold() {
321 let original = VarBinViewArray::from_iter_str([
323 "this is a longer string that will be stored in a buffer",
324 "another very long string that definitely needs a buffer to store it",
325 "yet another long string",
326 ]);
327
328 let indices = buffer![0u32, 2u32].into_array();
330 let taken = take(original.as_ref(), &indices).unwrap();
331 let taken_array = taken.as_::<VarBinViewVTable>();
332
333 let original_buffers = taken_array.nbuffers();
334
335 let compacted = taken_array.compact_with_threshold(1.0).unwrap();
337
338 assert!(compacted.nbuffers() <= original_buffers);
340
341 assert_eq!(compacted.len(), 2);
343 assert_eq!(
344 compacted.scalar_at(0),
345 "this is a longer string that will be stored in a buffer".into()
346 );
347 assert_eq!(compacted.scalar_at(1), "yet another long string".into());
348 }
349
350 #[test]
351 fn test_selective_compaction_preserves_well_utilized_buffers() {
352 let str1 = "first long string that needs external buffer storage";
354 let str2 = "second long string also in buffer";
355 let str3 = "third long string in same buffer";
356
357 let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
358
359 assert_eq!(original.nbuffers(), 1);
361
362 let compacted = original.compact_with_threshold(0.8).unwrap();
364
365 assert_eq!(compacted.nbuffers(), 1);
367
368 assert_eq!(compacted.len(), 3);
370 assert_eq!(compacted.scalar_at(0), str1.into());
371 assert_eq!(compacted.scalar_at(1), str2.into());
372 assert_eq!(compacted.scalar_at(2), str3.into());
373 }
374
375 #[test]
376 fn test_selective_compaction_with_mixed_utilization() {
377 let strings: Vec<String> = (0..10)
379 .map(|i| {
380 format!(
381 "this is a long string number {} that needs buffer storage",
382 i
383 )
384 })
385 .collect();
386
387 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
388
389 let indices: Vec<u32> = (0..10).step_by(2).map(|i| i as u32).collect();
391 let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
392 let taken = take(original.as_ref(), &indices_array).unwrap();
393 let taken_array = taken.as_::<VarBinViewVTable>();
394
395 let compacted = taken_array.compact_with_threshold(0.7).unwrap();
397
398 assert_eq!(compacted.len(), 5);
400 for (i, idx) in indices.iter().enumerate() {
401 assert_eq!(
402 compacted.scalar_at(i),
403 strings[*idx as usize].as_str().into()
404 );
405 }
406 }
407
408 #[test]
409 fn test_slice_strategy_with_contiguous_range() {
410 let strings: Vec<String> = (0..20)
412 .map(|i| format!("this is a long string number {} for slice test", i))
413 .collect();
414
415 let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
416
417 let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
419 let taken = take(original.as_ref(), &indices_array).unwrap();
420 let taken_array = taken.as_::<VarBinViewVTable>();
421
422 let utils_before = taken_array.buffer_utilizations();
424 let original_buffer_count = taken_array.nbuffers();
425
426 let compacted = taken_array.compact_with_threshold(0.8).unwrap();
429
430 assert!(
432 compacted.nbuffers() > 0,
433 "Should have buffers after slice compaction"
434 );
435
436 assert_eq!(compacted.len(), 5);
438 for i in 0..5 {
439 assert_eq!(compacted.scalar_at(i), strings[i].as_str().into());
440 }
441
442 if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
445 assert_eq!(
446 compacted.nbuffers(),
447 1,
448 "Slice strategy should maintain single buffer"
449 );
450 }
451 }
452}