use std::ops::Range;
use vortex_error::{VortexExpect, VortexResult};
use crate::arrays::VarBinViewArray;
use crate::builders::{ArrayBuilder, VarBinViewBuilder};
use crate::validity::Validity;
use crate::vtable::ValidityHelper;
impl VarBinViewArray {
pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
if !self.should_compact() {
return Ok(self.clone());
}
self.compact_with_threshold(1.0)
}
fn should_compact(&self) -> bool {
if self.nbuffers() == 0 {
return false;
}
let bytes_referenced: u64 = self.count_referenced_bytes();
let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
bytes_referenced < buffer_total_bytes
}
fn count_referenced_bytes(&self) -> u64 {
match self.validity() {
Validity::AllInvalid => 0u64,
Validity::NonNullable | Validity::AllValid | Validity::Array(_) => self
.views()
.iter()
.enumerate()
.map(|(idx, &view)| {
if !self.is_valid(idx) || view.is_inlined() {
0u64
} else {
view.len() as u64
}
})
.sum(),
}
}
pub(crate) fn buffer_utilizations(&self) -> Vec<BufferUtilization> {
let mut utilizations = self
.buffers()
.iter()
.map(|buf| {
let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
BufferUtilization::zero(len)
})
.collect();
if matches!(self.validity(), Validity::AllInvalid) {
return utilizations;
}
for (idx, &view) in self.views().iter().enumerate() {
if !self.is_valid(idx) || view.is_inlined() {
continue;
}
let view = view.as_view();
utilizations[view.buffer_index() as usize].add(view.offset(), view.size)
}
utilizations
}
pub fn compact_with_threshold(
&self,
buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
let mut builder = VarBinViewBuilder::with_compaction(
self.dtype().clone(),
self.len(),
buffer_utilization_threshold,
);
builder.extend_from_array(self.as_ref());
Ok(builder.finish_into_varbinview())
}
}
pub(crate) struct BufferUtilization {
len: u32,
used: u32,
min_offset: u32,
max_offset_end: u32,
}
impl BufferUtilization {
fn zero(len: u32) -> Self {
BufferUtilization {
len,
used: 0u32,
min_offset: u32::MAX,
max_offset_end: 0,
}
}
fn add(&mut self, offset: u32, size: u32) {
self.used += size;
self.min_offset = self.min_offset.min(offset);
self.max_offset_end = self.max_offset_end.max(offset + size);
}
pub fn overall_utilization(&self) -> f64 {
match self.len {
0 => 0.0,
len => self.used as f64 / len as f64,
}
}
pub fn range_utilization(&self) -> f64 {
match self.range_span() {
0 => 0.0,
span => self.used as f64 / span as f64,
}
}
pub fn range(&self) -> Range<u32> {
self.min_offset..self.max_offset_end
}
fn range_span(&self) -> u32 {
self.max_offset_end.saturating_sub(self.min_offset)
}
}
#[cfg(test)]
mod tests {
use vortex_buffer::buffer;
use crate::IntoArray;
use crate::arrays::{VarBinViewArray, VarBinViewVTable};
use crate::compute::take;
#[test]
fn test_optimize_compacts_buffers() {
let original = VarBinViewArray::from_iter_nullable_str([
Some("short"),
Some("this is a longer string that will be stored in a buffer"),
Some("medium length string"),
Some("another very long string that definitely needs a buffer to store it"),
Some("tiny"),
]);
assert!(original.nbuffers() > 0);
let original_buffers = original.nbuffers();
let indices = buffer![0u32, 4u32].into_array();
let taken = take(original.as_ref(), &indices).unwrap();
let taken_array = taken.as_::<VarBinViewVTable>();
assert_eq!(taken_array.nbuffers(), original_buffers);
let optimized_array = taken_array.compact_buffers().unwrap();
assert!(optimized_array.nbuffers() <= 1);
assert_eq!(optimized_array.len(), 2);
assert_eq!(optimized_array.scalar_at(0), "short".into());
assert_eq!(optimized_array.scalar_at(1), "tiny".into());
}
#[test]
fn test_optimize_with_long_strings() {
let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
let long_string_2 = "another extremely long string that also needs external buffer storage";
let long_string_3 = "yet another long string for testing buffer compaction functionality";
let original = VarBinViewArray::from_iter_str([
long_string_1,
long_string_2,
long_string_3,
"short1",
"short2",
]);
let indices = buffer![0u32, 2u32].into_array();
let taken = take(original.as_ref(), &indices).unwrap();
let taken_array = taken.as_::<VarBinViewVTable>();
let optimized_array = taken_array.compact_buffers().unwrap();
assert_eq!(optimized_array.nbuffers(), 1);
assert_eq!(optimized_array.len(), 2);
assert_eq!(optimized_array.scalar_at(0), long_string_1.into());
assert_eq!(optimized_array.scalar_at(1), long_string_3.into());
}
#[test]
fn test_optimize_no_buffers() {
let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
assert_eq!(original.nbuffers(), 0);
let optimized_array = original.compact_buffers().unwrap();
assert_eq!(optimized_array.nbuffers(), 0);
assert_eq!(optimized_array.len(), 4);
for i in 0..4 {
assert_eq!(optimized_array.scalar_at(i), original.scalar_at(i));
}
}
#[test]
fn test_optimize_single_buffer() {
let str1 = "this is a long string that goes into a buffer";
let str2 = "another long string in the same buffer";
let original = VarBinViewArray::from_iter_str([str1, str2]);
assert_eq!(original.nbuffers(), 1);
assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
let optimized_array = original.compact_buffers().unwrap();
assert_eq!(optimized_array.nbuffers(), 1);
assert_eq!(optimized_array.len(), 2);
for i in 0..2 {
assert_eq!(optimized_array.scalar_at(i), original.scalar_at(i));
}
}
#[test]
fn test_selective_compaction_with_threshold_zero() {
let original = VarBinViewArray::from_iter_str([
"this is a longer string that will be stored in a buffer",
"another very long string that definitely needs a buffer to store it",
]);
let original_buffers = original.nbuffers();
assert!(original_buffers > 0);
let indices = buffer![0u32].into_array();
let taken = take(original.as_ref(), &indices).unwrap();
let taken_array = taken.as_::<VarBinViewVTable>();
let compacted = taken_array.compact_with_threshold(0.0).unwrap();
assert_eq!(compacted.nbuffers(), taken_array.nbuffers());
assert_eq!(compacted.len(), 1);
assert_eq!(
compacted.scalar_at(0),
"this is a longer string that will be stored in a buffer".into()
);
}
#[test]
fn test_selective_compaction_with_high_threshold() {
let original = VarBinViewArray::from_iter_str([
"this is a longer string that will be stored in a buffer",
"another very long string that definitely needs a buffer to store it",
"yet another long string",
]);
let indices = buffer![0u32, 2u32].into_array();
let taken = take(original.as_ref(), &indices).unwrap();
let taken_array = taken.as_::<VarBinViewVTable>();
let original_buffers = taken_array.nbuffers();
let compacted = taken_array.compact_with_threshold(1.0).unwrap();
assert!(compacted.nbuffers() <= original_buffers);
assert_eq!(compacted.len(), 2);
assert_eq!(
compacted.scalar_at(0),
"this is a longer string that will be stored in a buffer".into()
);
assert_eq!(compacted.scalar_at(1), "yet another long string".into());
}
#[test]
fn test_selective_compaction_preserves_well_utilized_buffers() {
let str1 = "first long string that needs external buffer storage";
let str2 = "second long string also in buffer";
let str3 = "third long string in same buffer";
let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
assert_eq!(original.nbuffers(), 1);
let compacted = original.compact_with_threshold(0.8).unwrap();
assert_eq!(compacted.nbuffers(), 1);
assert_eq!(compacted.len(), 3);
assert_eq!(compacted.scalar_at(0), str1.into());
assert_eq!(compacted.scalar_at(1), str2.into());
assert_eq!(compacted.scalar_at(2), str3.into());
}
#[test]
fn test_selective_compaction_with_mixed_utilization() {
let strings: Vec<String> = (0..10)
.map(|i| {
format!(
"this is a long string number {} that needs buffer storage",
i
)
})
.collect();
let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
let indices: Vec<u32> = (0..10).step_by(2).map(|i| i as u32).collect();
let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
let taken = take(original.as_ref(), &indices_array).unwrap();
let taken_array = taken.as_::<VarBinViewVTable>();
let compacted = taken_array.compact_with_threshold(0.7).unwrap();
assert_eq!(compacted.len(), 5);
for (i, idx) in indices.iter().enumerate() {
assert_eq!(
compacted.scalar_at(i),
strings[*idx as usize].as_str().into()
);
}
}
#[test]
fn test_slice_strategy_with_contiguous_range() {
let strings: Vec<String> = (0..20)
.map(|i| format!("this is a long string number {} for slice test", i))
.collect();
let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
let taken = take(original.as_ref(), &indices_array).unwrap();
let taken_array = taken.as_::<VarBinViewVTable>();
let utils_before = taken_array.buffer_utilizations();
let original_buffer_count = taken_array.nbuffers();
let compacted = taken_array.compact_with_threshold(0.8).unwrap();
assert!(
compacted.nbuffers() > 0,
"Should have buffers after slice compaction"
);
assert_eq!(compacted.len(), 5);
for i in 0..5 {
assert_eq!(compacted.scalar_at(i), strings[i].as_str().into());
}
if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
assert_eq!(
compacted.nbuffers(),
1,
"Slice strategy should maintain single buffer"
);
}
}
}