use std::ops::Range;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use crate::IntoArray;
use crate::arrays::VarBinViewArray;
use crate::arrays::varbinview::Ref;
use crate::builders::ArrayBuilder;
use crate::builders::VarBinViewBuilder;
impl VarBinViewArray {
pub fn compact_buffers(&self) -> VortexResult<VarBinViewArray> {
if !self.should_compact()? {
return Ok(self.clone());
}
self.compact_with_threshold(1.0)
}
fn should_compact(&self) -> VortexResult<bool> {
let nbuffers = self.data_buffers().len();
if nbuffers == 0 {
return Ok(false);
}
if nbuffers > u16::MAX as usize {
return Ok(true);
}
let bytes_referenced: u64 = self.count_referenced_bytes()?;
let buffer_total_bytes: u64 = self.buffers.iter().map(|buf| buf.len() as u64).sum();
Ok(bytes_referenced < buffer_total_bytes || buffer_total_bytes == 0)
}
#[inline(always)]
fn iter_valid_views<F>(&self, mut f: F) -> VortexResult<()>
where
F: FnMut(&Ref),
{
match self.validity_mask()? {
Mask::AllTrue(_) => {
for &view in self.views().iter() {
if !view.is_inlined() {
f(view.as_view());
}
}
}
Mask::AllFalse(_) => {}
Mask::Values(v) => {
for (&view, is_valid) in self.views().iter().zip(v.bit_buffer().iter()) {
if is_valid && !view.is_inlined() {
f(view.as_view());
}
}
}
}
Ok(())
}
fn count_referenced_bytes(&self) -> VortexResult<u64> {
let mut total = 0u64;
self.iter_valid_views(|view| total += view.size as u64)?;
Ok(total)
}
pub(crate) fn buffer_utilizations(&self) -> VortexResult<Vec<BufferUtilization>> {
let mut utilizations: Vec<BufferUtilization> = self
.data_buffers()
.iter()
.map(|buf| {
let len = u32::try_from(buf.len()).vortex_expect("buffer sizes must fit in u32");
BufferUtilization::zero(len)
})
.collect();
self.iter_valid_views(|view| {
utilizations[view.buffer_index as usize].add(view.offset, view.size);
})?;
Ok(utilizations)
}
pub fn compact_with_threshold(
&self,
buffer_utilization_threshold: f64, ) -> VortexResult<VarBinViewArray> {
let mut builder = VarBinViewBuilder::with_compaction(
self.dtype().clone(),
self.len(),
buffer_utilization_threshold,
);
builder.extend_from_array(&self.clone().into_array());
Ok(builder.finish_into_varbinview())
}
}
pub(crate) struct BufferUtilization {
len: u32,
used: u32,
min_offset: u32,
max_offset_end: u32,
}
impl BufferUtilization {
fn zero(len: u32) -> Self {
BufferUtilization {
len,
used: 0u32,
min_offset: u32::MAX,
max_offset_end: 0,
}
}
fn add(&mut self, offset: u32, size: u32) {
self.used += size;
self.min_offset = self.min_offset.min(offset);
self.max_offset_end = self.max_offset_end.max(offset + size);
}
pub fn overall_utilization(&self) -> f64 {
match self.len {
0 => 0.0,
len => self.used as f64 / len as f64,
}
}
pub fn range_utilization(&self) -> f64 {
match self.range_span() {
0 => 0.0,
span => self.used as f64 / span as f64,
}
}
pub fn range(&self) -> Range<u32> {
self.min_offset..self.max_offset_end
}
fn range_span(&self) -> u32 {
self.max_offset_end.saturating_sub(self.min_offset)
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use vortex_buffer::buffer;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::arrays::VarBinArray;
use crate::arrays::VarBinViewArray;
use crate::assert_arrays_eq;
use crate::dtype::DType;
use crate::dtype::Nullability;
#[test]
fn test_optimize_compacts_buffers() {
let original = VarBinViewArray::from_iter_nullable_str([
Some("short"),
Some("this is a longer string that will be stored in a buffer"),
Some("medium length string"),
Some("another very long string that definitely needs a buffer to store it"),
Some("tiny"),
]);
assert!(!original.data_buffers().is_empty());
let original_buffers = original.data_buffers().len();
let indices = buffer![0u32, 4u32].into_array();
let taken = original.take(indices).unwrap();
let taken = taken
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
assert_eq!(taken.data_buffers().len(), original_buffers);
let optimized_array = taken.compact_buffers().unwrap();
assert!(optimized_array.data_buffers().len() <= 1);
assert_arrays_eq!(
optimized_array,
<VarBinArray as FromIterator<_>>::from_iter([Some("short"), Some("tiny")])
);
}
#[test]
fn test_optimize_with_long_strings() {
let long_string_1 = "this is definitely a very long string that exceeds the inline limit";
let long_string_2 = "another extremely long string that also needs external buffer storage";
let long_string_3 = "yet another long string for testing buffer compaction functionality";
let original = VarBinViewArray::from_iter_str([
long_string_1,
long_string_2,
long_string_3,
"short1",
"short2",
]);
let indices = buffer![0u32, 2u32].into_array();
let taken = original.take(indices).unwrap();
let taken_array = taken
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
let optimized_array = taken_array.compact_buffers().unwrap();
assert_eq!(optimized_array.data_buffers().len(), 1);
assert_arrays_eq!(
optimized_array,
VarBinArray::from(vec![long_string_1, long_string_3])
);
}
#[test]
fn test_optimize_no_buffers() {
let original = VarBinViewArray::from_iter_str(["a", "bb", "ccc", "dddd"]);
assert_eq!(original.data_buffers().len(), 0);
let optimized_array = original.compact_buffers().unwrap();
assert_eq!(optimized_array.data_buffers().len(), 0);
assert_arrays_eq!(optimized_array, original);
}
#[test]
fn test_optimize_single_buffer() {
let str1 = "this is a long string that goes into a buffer";
let str2 = "another long string in the same buffer";
let original = VarBinViewArray::from_iter_str([str1, str2]);
assert_eq!(original.data_buffers().len(), 1);
assert_eq!(original.buffer(0).len(), str1.len() + str2.len());
let optimized_array = original.compact_buffers().unwrap();
assert_eq!(optimized_array.data_buffers().len(), 1);
assert_arrays_eq!(optimized_array, original);
}
#[test]
fn test_selective_compaction_with_threshold_zero() {
let original = VarBinViewArray::from_iter_str([
"this is a longer string that will be stored in a buffer",
"another very long string that definitely needs a buffer to store it",
]);
let original_buffers = original.data_buffers().len();
assert!(original_buffers > 0);
let indices = buffer![0u32].into_array();
let taken = original.take(indices).unwrap();
let taken = taken
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
let compacted = taken.compact_with_threshold(0.0).unwrap();
assert_eq!(compacted.data_buffers().len(), taken.data_buffers().len());
assert_arrays_eq!(compacted, taken);
}
#[test]
fn test_selective_compaction_with_high_threshold() {
let original = VarBinViewArray::from_iter_str([
"this is a longer string that will be stored in a buffer",
"another very long string that definitely needs a buffer to store it",
"yet another long string",
]);
let indices = buffer![0u32, 2u32].into_array();
let taken = original.take(indices).unwrap();
let taken = taken
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
let original_buffers = taken.data_buffers().len();
let compacted = taken.compact_with_threshold(1.0).unwrap();
assert!(compacted.data_buffers().len() <= original_buffers);
assert_arrays_eq!(compacted, taken);
}
#[test]
fn test_selective_compaction_preserves_well_utilized_buffers() {
let str1 = "first long string that needs external buffer storage";
let str2 = "second long string also in buffer";
let str3 = "third long string in same buffer";
let original = VarBinViewArray::from_iter_str([str1, str2, str3]);
assert_eq!(original.data_buffers().len(), 1);
let compacted = original.compact_with_threshold(0.8).unwrap();
assert_eq!(compacted.data_buffers().len(), 1);
assert_arrays_eq!(compacted, original);
}
#[test]
fn test_selective_compaction_with_mixed_utilization() {
let strings: Vec<String> = (0..10)
.map(|i| {
format!(
"this is a long string number {} that needs buffer storage",
i
)
})
.collect();
let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
let indices_array = buffer![0u32, 2u32, 4u32, 6u32, 8u32].into_array();
let taken = original.take(indices_array).unwrap();
let taken = taken
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
let compacted = taken.compact_with_threshold(0.7).unwrap();
let expected = VarBinViewArray::from_iter(
[0, 2, 4, 6, 8].map(|i| Some(strings[i].as_str())),
DType::Utf8(Nullability::NonNullable),
);
assert_arrays_eq!(expected, compacted);
}
#[test]
fn test_slice_strategy_with_contiguous_range() {
let strings: Vec<String> = (0..20)
.map(|i| format!("this is a long string number {} for slice test", i))
.collect();
let original = VarBinViewArray::from_iter_str(strings.iter().map(|s| s.as_str()));
let indices_array = buffer![0u32, 1u32, 2u32, 3u32, 4u32].into_array();
let taken = original.take(indices_array).unwrap();
let taken = taken
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
let utils_before = taken.buffer_utilizations().unwrap();
let original_buffer_count = taken.data_buffers().len();
let compacted = taken.compact_with_threshold(0.8).unwrap();
assert!(
!compacted.data_buffers().is_empty(),
"Should have buffers after slice compaction"
);
assert_arrays_eq!(&compacted, taken);
if original_buffer_count == 1 && utils_before[0].range_utilization() >= 0.8 {
assert_eq!(
compacted.data_buffers().len(),
1,
"Slice strategy should maintain single buffer"
);
}
}
const LONG1: &str = "long string one!";
const LONG2: &str = "long string two!";
const SHORT: &str = "x";
const EXPECTED_BYTES: u64 = (LONG1.len() + LONG2.len()) as u64;
fn mixed_array() -> VarBinViewArray {
VarBinViewArray::from_iter_nullable_str([Some(LONG1), None, Some(LONG2), Some(SHORT)])
}
#[rstest]
#[case::non_nullable(VarBinViewArray::from_iter_str([LONG1, LONG2, SHORT]), EXPECTED_BYTES, &[1.0])]
#[case::all_valid(VarBinViewArray::from_iter_nullable_str([Some(LONG1), Some(LONG2), Some(SHORT)]), EXPECTED_BYTES, &[1.0])]
#[case::all_invalid(VarBinViewArray::from_iter_nullable_str([None::<&str>, None]), 0, &[])]
#[case::mixed_validity(mixed_array(), EXPECTED_BYTES, &[1.0])]
fn test_validity_code_paths(
#[case] arr: VarBinViewArray,
#[case] expected_bytes: u64,
#[case] expected_utils: &[f64],
) {
assert_eq!(arr.count_referenced_bytes().unwrap(), expected_bytes);
let utils: Vec<f64> = arr
.buffer_utilizations()
.unwrap()
.iter()
.map(|u| u.overall_utilization())
.collect();
assert_eq!(utils, expected_utils);
}
}