use itertools::Itertools;
use num_traits::AsPrimitive;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
pub use crate::arrays::varbinview::BinaryView;
use crate::dtype::NativePType;
#[inline]
pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
offsets
.iter()
.tuple_windows::<(_, _)>()
.map(|(&start, &end)| end - start)
.collect()
}
pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;
pub fn build_views<P: NativePType + AsPrimitive<usize>>(
start_buf_index: u32,
max_buffer_len: usize,
bytes: ByteBufferMut,
lens: &[P],
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
assert!(
max_buffer_len <= MAX_BUFFER_LEN,
"max_buffer_len cannot exceed MAX_BUFFER_LEN, offsets must fit in u32"
);
if bytes.len() <= max_buffer_len {
build_views_single_buffer(start_buf_index, bytes, lens)
} else {
build_views_rolling(start_buf_index, max_buffer_len, bytes, lens)
}
}
fn build_views_single_buffer<P: NativePType + AsPrimitive<usize>>(
start_buf_index: u32,
bytes: ByteBufferMut,
lens: &[P],
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
let data = bytes.as_slice();
let mut offset = 0usize;
let spare = views.spare_capacity_mut();
for (slot, &len) in spare.iter_mut().zip(lens) {
let len = len.as_();
let value = &data[offset..offset + len];
let view = if len > BinaryView::MAX_INLINED_SIZE {
let mut prefix = [0u8; 4];
prefix.copy_from_slice(&value[..4]);
BinaryView::new_ref(len.as_(), prefix, start_buf_index, offset.as_())
} else {
BinaryView::make_view(value, start_buf_index, offset.as_())
};
slot.write(view);
offset += len;
}
unsafe { views.set_len(lens.len()) };
let buffers = if bytes.is_empty() {
Vec::new()
} else {
vec![bytes.freeze()]
};
(buffers, views.freeze())
}
fn build_views_rolling<P: NativePType + AsPrimitive<usize>>(
start_buf_index: u32,
max_buffer_len: usize,
mut bytes: ByteBufferMut,
lens: &[P],
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
let mut buffers = Vec::new();
let mut buf_index = start_buf_index;
let mut offset = 0;
for &len in lens {
let len = len.as_();
assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");
if (offset + len) > max_buffer_len {
let rest = bytes.split_off(offset);
buffers.push(bytes.freeze());
buf_index += 1;
offset = 0;
bytes = rest;
}
let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
unsafe { views.push_unchecked(view) };
offset += len;
}
if !bytes.is_empty() {
buffers.push(bytes.freeze());
}
(buffers, views.freeze())
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use crate::arrays::varbinview::BinaryView;
use crate::arrays::varbinview::build_views::MAX_BUFFER_LEN;
use crate::arrays::varbinview::build_views::build_views;
fn flatten(values: &[&[u8]]) -> (ByteBufferMut, Vec<u32>) {
let mut bytes = ByteBufferMut::empty();
let mut lens = Vec::with_capacity(values.len());
for v in values {
bytes.extend_from_slice(v);
lens.push(u32::try_from(v.len()).unwrap());
}
(bytes, lens)
}
fn reconstruct(
buffers: &[ByteBuffer],
views: &[BinaryView],
start_buf_index: u32,
) -> Vec<Vec<u8>> {
views
.iter()
.map(|view| {
if view.is_inlined() {
view.as_inlined().value().to_vec()
} else {
let r = view.as_view();
let buf = &buffers[(r.buffer_index - start_buf_index) as usize];
buf[r.as_range()].to_vec()
}
})
.collect()
}
#[rstest]
#[case::mixed(&[b"a".as_slice(), b"this is a long reference value", b"short", b"another long value here!!"])]
#[case::inline_boundary(&[&[b'x'; 12] as &[u8], &[b'y'; 13], &[b'z'; 12], &[b'w'; 13]])]
#[case::all_inlined(&[b"".as_slice(), b"a", b"bb", b"ccc", b"dddddddddddd"])]
#[case::all_reference(&[&[b'a'; 100] as &[u8], &[b'b'; 50], &[b'c'; 4096]])]
#[case::empty_values_interleaved(&[b"".as_slice(), b"a long value that is referenced", b"", b"", b"trailing long reference value"])]
#[case::single_long(&[&[7u8; 1 << 16] as &[u8]])]
fn fast_path_roundtrip(#[case] values: &[&[u8]]) {
let (bytes, lens) = flatten(values);
let total = bytes.len();
let start_buf_index = 3;
let (buffers, views) = build_views(start_buf_index, total + 1, bytes, &lens);
assert_eq!(views.len(), values.len());
if total == 0 {
assert!(buffers.is_empty(), "empty heap must not allocate a buffer");
} else {
assert_eq!(buffers.len(), 1, "whole heap must stay in one buffer");
let concatenated: Vec<u8> = values.concat();
assert_eq!(buffers[0].as_slice(), concatenated.as_slice());
}
for view in views.iter() {
if !view.is_inlined() {
assert_eq!(view.as_view().buffer_index, start_buf_index);
}
}
let expected: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
assert_eq!(reconstruct(&buffers, &views, start_buf_index), expected);
}
#[test]
fn fast_path_large_offsets() {
const N: usize = 9000;
const LEN: usize = 1000;
const _: () = assert!((N - 1) * LEN > (1 << 23));
let values: Vec<Vec<u8>> = (0..N)
.map(|i| {
let mut v = vec![0u8; LEN];
v[..4].copy_from_slice(&u32::try_from(i).unwrap().to_le_bytes());
v
})
.collect();
let refs: Vec<&[u8]> = values.iter().map(|v| v.as_slice()).collect();
let (bytes, lens) = flatten(&refs);
let total = bytes.len();
let (buffers, views) = build_views(0, total + 1, bytes, &lens);
assert_eq!(buffers.len(), 1);
for (i, view) in views.iter().enumerate() {
let r = view.as_view();
assert_eq!(r.offset as usize, i * LEN, "wrong offset for view {i}");
assert_eq!(r.size as usize, LEN);
}
assert_eq!(reconstruct(&buffers, &views, 0), values);
}
#[test]
fn fast_path_taken_at_exact_boundary() {
let (bytes, lens) =
flatten(&[b"this value is definitely long", b"and so is this one here"]);
let total = bytes.len();
let (buffers, views) = build_views(0, total, bytes, &lens);
assert_eq!(
buffers.len(),
1,
"len == max_buffer_len must stay on fast path"
);
assert_eq!(views.len(), 2);
}
#[test]
fn fast_and_slow_paths_agree() {
let values: &[&[u8]] = &[
b"first long reference value",
b"tiny",
b"second long reference value!!",
b"third looooong reference value",
];
let expected: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
let (fast_bytes, lens) = flatten(values);
let total = fast_bytes.len();
let (fast_buffers, fast_views) = build_views(0, total + 1, fast_bytes, &lens);
assert_eq!(fast_buffers.len(), 1);
assert_eq!(reconstruct(&fast_buffers, &fast_views, 0), expected);
let longest = values.iter().map(|v| v.len()).max().unwrap();
let (slow_bytes, _) = flatten(values);
let (slow_buffers, slow_views) = build_views(0, longest, slow_bytes, &lens);
assert!(
slow_buffers.len() > 1,
"small cap should split into many buffers"
);
assert_eq!(reconstruct(&slow_buffers, &slow_views, 0), expected);
assert_eq!(
reconstruct(&fast_buffers, &fast_views, 0),
reconstruct(&slow_buffers, &slow_views, 0)
);
}
#[test]
fn fast_path_empty_input() {
let lens: Vec<u32> = Vec::new();
let (buffers, views) = build_views(0, 1024, ByteBufferMut::empty(), &lens);
assert!(buffers.is_empty());
assert!(views.is_empty());
}
#[test]
fn fast_path_matches_make_view() {
let values: &[&[u8]] = &[b"inline", b"this is a long reference value", b""];
let (bytes, lens) = flatten(values);
let total = bytes.len();
let (_buffers, views) = build_views(0, total + 1, bytes, &lens);
let expected = [
BinaryView::make_view(b"inline", 0, 0),
BinaryView::make_view(b"this is a long reference value", 0, 6),
BinaryView::make_view(b"", 0, 36),
];
assert_eq!(views.as_slice(), &expected);
}
#[test_with::env(CI)]
#[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
fn build_views_offsets_overflow_i32() {
const STRING_LEN: usize = 64 * 1024;
const TOTAL_BYTES: usize = (1usize << 31) + (256 << 20); const N: usize = TOTAL_BYTES / STRING_LEN;
let nth_string = |i: usize| {
let mut s = vec![b'x'; STRING_LEN];
s[..8].copy_from_slice(&(i as u64).to_le_bytes());
s
};
let mut bytes = ByteBufferMut::with_capacity(N * STRING_LEN);
let mut value = vec![b'x'; STRING_LEN];
for i in 0..N {
value[..8].copy_from_slice(&(i as u64).to_le_bytes());
bytes.extend_from_slice(&value);
}
let lens = vec![u32::try_from(STRING_LEN).unwrap(); N];
let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, &lens);
assert_eq!(views.len(), N);
assert!(
buffers.len() >= 2,
"heap exceeding MAX_BUFFER_LEN must roll over into multiple buffers, got {}",
buffers.len()
);
for (i, b) in buffers.iter().enumerate() {
assert!(
b.len() <= MAX_BUFFER_LEN,
"buffer {i} of {} bytes exceeds MAX_BUFFER_LEN",
b.len()
);
}
let boundary = MAX_BUFFER_LEN / STRING_LEN;
for i in [0, boundary - 1, boundary, boundary + 1, N / 2, N - 1] {
let view = &views[i];
let r = view.as_view();
let got = &buffers[r.buffer_index as usize][r.as_range()];
assert_eq!(got, nth_string(i).as_slice(), "value mismatch at row {i}");
assert_eq!(r.size as usize, STRING_LEN);
}
}
#[test]
fn test_to_canonical_large() {
let raw_data =
ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd");
let lens = vec![13u8; 4];
let (buffers, views) = build_views(0, 26, raw_data, &lens);
assert_eq!(
buffers,
vec![
ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"),
ByteBuffer::copy_from("cccccccccccccddddddddddddd"),
]
);
assert_eq!(
views.as_slice(),
&[
BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0),
BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13),
BinaryView::make_view(b"ccccccccccccc", 1, 0),
BinaryView::make_view(b"ddddddddddddd", 1, 13),
]
)
}
#[test]
#[should_panic(expected = "max_buffer_len cannot exceed MAX_BUFFER_LEN")]
fn test_max_buffer_len_too_large_panics() {
build_views(
0,
MAX_BUFFER_LEN + 1,
ByteBufferMut::copy_from("abc"),
&[3u32],
);
}
}