1use itertools::Itertools;
5use num_traits::AsPrimitive;
6use vortex_buffer::Buffer;
7use vortex_buffer::BufferMut;
8use vortex_buffer::ByteBuffer;
9use vortex_buffer::ByteBufferMut;
10
11pub use crate::arrays::varbinview::BinaryView;
12use crate::dtype::NativePType;
13
14#[inline]
16pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
17 offsets
18 .iter()
19 .tuple_windows::<(_, _)>()
20 .map(|(&start, &end)| end - start)
21 .collect()
22}
23
24pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;
26
27pub fn build_views<P: NativePType + AsPrimitive<usize>>(
32 start_buf_index: u32,
33 max_buffer_len: usize,
34 bytes: ByteBufferMut,
35 lens: &[P],
36) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
37 assert!(
38 max_buffer_len <= MAX_BUFFER_LEN,
39 "max_buffer_len cannot exceed MAX_BUFFER_LEN, offsets must fit in u32"
40 );
41
42 if bytes.len() <= max_buffer_len {
43 build_views_single_buffer(start_buf_index, bytes, lens)
46 } else {
47 build_views_rolling(start_buf_index, max_buffer_len, bytes, lens)
48 }
49}
50
51fn build_views_single_buffer<P: NativePType + AsPrimitive<usize>>(
58 start_buf_index: u32,
59 bytes: ByteBufferMut,
60 lens: &[P],
61) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
62 let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
63
64 let data = bytes.as_slice();
65 let mut offset = 0usize;
66 let spare = views.spare_capacity_mut();
72 for (slot, &len) in spare.iter_mut().zip(lens) {
73 let len = len.as_();
74 let value = &data[offset..offset + len];
75 let view = if len > BinaryView::MAX_INLINED_SIZE {
76 let mut prefix = [0u8; 4];
77 prefix.copy_from_slice(&value[..4]);
78 BinaryView::new_ref(len.as_(), prefix, start_buf_index, offset.as_())
79 } else {
80 BinaryView::make_view(value, start_buf_index, offset.as_())
81 };
82 slot.write(view);
83 offset += len;
84 }
85 unsafe { views.set_len(lens.len()) };
88
89 let buffers = if bytes.is_empty() {
90 Vec::new()
91 } else {
92 vec![bytes.freeze()]
93 };
94 (buffers, views.freeze())
95}
96
97fn build_views_rolling<P: NativePType + AsPrimitive<usize>>(
102 start_buf_index: u32,
103 max_buffer_len: usize,
104 mut bytes: ByteBufferMut,
105 lens: &[P],
106) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
107 let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
108 let mut buffers = Vec::new();
109 let mut buf_index = start_buf_index;
110
111 let mut offset = 0;
112 for &len in lens {
113 let len = len.as_();
114 assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");
115
116 if (offset + len) > max_buffer_len {
117 let rest = bytes.split_off(offset);
119
120 buffers.push(bytes.freeze());
121 buf_index += 1;
122 offset = 0;
123
124 bytes = rest;
125 }
126 let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
127 unsafe { views.push_unchecked(view) };
129 offset += len;
130 }
131
132 if !bytes.is_empty() {
133 buffers.push(bytes.freeze());
134 }
135
136 (buffers, views.freeze())
137}
138
139#[cfg(test)]
140mod tests {
141 use rstest::rstest;
142 use vortex_buffer::ByteBuffer;
143 use vortex_buffer::ByteBufferMut;
144
145 use crate::arrays::varbinview::BinaryView;
146 use crate::arrays::varbinview::build_views::MAX_BUFFER_LEN;
147 use crate::arrays::varbinview::build_views::build_views;
148
149 fn flatten(values: &[&[u8]]) -> (ByteBufferMut, Vec<u32>) {
152 let mut bytes = ByteBufferMut::empty();
153 let mut lens = Vec::with_capacity(values.len());
154 for v in values {
155 bytes.extend_from_slice(v);
156 lens.push(u32::try_from(v.len()).unwrap());
157 }
158 (bytes, lens)
159 }
160
161 fn reconstruct(
166 buffers: &[ByteBuffer],
167 views: &[BinaryView],
168 start_buf_index: u32,
169 ) -> Vec<Vec<u8>> {
170 views
171 .iter()
172 .map(|view| {
173 if view.is_inlined() {
174 view.as_inlined().value().to_vec()
175 } else {
176 let r = view.as_view();
177 let buf = &buffers[(r.buffer_index - start_buf_index) as usize];
178 buf[r.as_range()].to_vec()
179 }
180 })
181 .collect()
182 }
183
184 #[rstest]
190 #[case::mixed(&[b"a".as_slice(), b"this is a long reference value", b"short", b"another long value here!!"])]
191 #[case::inline_boundary(&[&[b'x'; 12] as &[u8], &[b'y'; 13], &[b'z'; 12], &[b'w'; 13]])]
192 #[case::all_inlined(&[b"".as_slice(), b"a", b"bb", b"ccc", b"dddddddddddd"])]
193 #[case::all_reference(&[&[b'a'; 100] as &[u8], &[b'b'; 50], &[b'c'; 4096]])]
194 #[case::empty_values_interleaved(&[b"".as_slice(), b"a long value that is referenced", b"", b"", b"trailing long reference value"])]
195 #[case::single_long(&[&[7u8; 1 << 16] as &[u8]])]
196 fn fast_path_roundtrip(#[case] values: &[&[u8]]) {
197 let (bytes, lens) = flatten(values);
198 let total = bytes.len();
199 let start_buf_index = 3;
200
201 let (buffers, views) = build_views(start_buf_index, total + 1, bytes, &lens);
203
204 assert_eq!(views.len(), values.len());
205 if total == 0 {
206 assert!(buffers.is_empty(), "empty heap must not allocate a buffer");
207 } else {
208 assert_eq!(buffers.len(), 1, "whole heap must stay in one buffer");
209 let concatenated: Vec<u8> = values.concat();
211 assert_eq!(buffers[0].as_slice(), concatenated.as_slice());
212 }
213 for view in views.iter() {
214 if !view.is_inlined() {
215 assert_eq!(view.as_view().buffer_index, start_buf_index);
216 }
217 }
218
219 let expected: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
220 assert_eq!(reconstruct(&buffers, &views, start_buf_index), expected);
221 }
222
223 #[test]
229 fn fast_path_large_offsets() {
230 const N: usize = 9000;
231 const LEN: usize = 1000;
232 const _: () = assert!((N - 1) * LEN > (1 << 23));
234
235 let values: Vec<Vec<u8>> = (0..N)
236 .map(|i| {
237 let mut v = vec![0u8; LEN];
238 v[..4].copy_from_slice(&u32::try_from(i).unwrap().to_le_bytes());
239 v
240 })
241 .collect();
242 let refs: Vec<&[u8]> = values.iter().map(|v| v.as_slice()).collect();
243
244 let (bytes, lens) = flatten(&refs);
245 let total = bytes.len();
246
247 let (buffers, views) = build_views(0, total + 1, bytes, &lens);
248
249 assert_eq!(buffers.len(), 1);
250 for (i, view) in views.iter().enumerate() {
252 let r = view.as_view();
253 assert_eq!(r.offset as usize, i * LEN, "wrong offset for view {i}");
254 assert_eq!(r.size as usize, LEN);
255 }
256 assert_eq!(reconstruct(&buffers, &views, 0), values);
257 }
258
259 #[test]
262 fn fast_path_taken_at_exact_boundary() {
263 let (bytes, lens) =
264 flatten(&[b"this value is definitely long", b"and so is this one here"]);
265 let total = bytes.len();
266
267 let (buffers, views) = build_views(0, total, bytes, &lens);
268
269 assert_eq!(
270 buffers.len(),
271 1,
272 "len == max_buffer_len must stay on fast path"
273 );
274 assert_eq!(views.len(), 2);
275 }
276
277 #[test]
281 fn fast_and_slow_paths_agree() {
282 let values: &[&[u8]] = &[
283 b"first long reference value",
284 b"tiny",
285 b"second long reference value!!",
286 b"third looooong reference value",
287 ];
288 let expected: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
289
290 let (fast_bytes, lens) = flatten(values);
291 let total = fast_bytes.len();
292 let (fast_buffers, fast_views) = build_views(0, total + 1, fast_bytes, &lens);
293 assert_eq!(fast_buffers.len(), 1);
294 assert_eq!(reconstruct(&fast_buffers, &fast_views, 0), expected);
295
296 let longest = values.iter().map(|v| v.len()).max().unwrap();
298 let (slow_bytes, _) = flatten(values);
299 let (slow_buffers, slow_views) = build_views(0, longest, slow_bytes, &lens);
300 assert!(
301 slow_buffers.len() > 1,
302 "small cap should split into many buffers"
303 );
304 assert_eq!(reconstruct(&slow_buffers, &slow_views, 0), expected);
305
306 assert_eq!(
308 reconstruct(&fast_buffers, &fast_views, 0),
309 reconstruct(&slow_buffers, &slow_views, 0)
310 );
311 }
312
313 #[test]
315 fn fast_path_empty_input() {
316 let lens: Vec<u32> = Vec::new();
317 let (buffers, views) = build_views(0, 1024, ByteBufferMut::empty(), &lens);
318 assert!(buffers.is_empty());
319 assert!(views.is_empty());
320 }
321
322 #[test]
325 fn fast_path_matches_make_view() {
326 let values: &[&[u8]] = &[b"inline", b"this is a long reference value", b""];
327 let (bytes, lens) = flatten(values);
328 let total = bytes.len();
329 let (_buffers, views) = build_views(0, total + 1, bytes, &lens);
330
331 let expected = [
332 BinaryView::make_view(b"inline", 0, 0),
333 BinaryView::make_view(b"this is a long reference value", 0, 6),
334 BinaryView::make_view(b"", 0, 36),
335 ];
336 assert_eq!(views.as_slice(), &expected);
337 }
338
339 #[test_with::env(CI)]
361 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
362 fn build_views_offsets_overflow_i32() {
363 const STRING_LEN: usize = 64 * 1024;
364 const TOTAL_BYTES: usize = (1usize << 31) + (256 << 20); const N: usize = TOTAL_BYTES / STRING_LEN;
367
368 let nth_string = |i: usize| {
370 let mut s = vec![b'x'; STRING_LEN];
371 s[..8].copy_from_slice(&(i as u64).to_le_bytes());
372 s
373 };
374
375 let mut bytes = ByteBufferMut::with_capacity(N * STRING_LEN);
376 let mut value = vec![b'x'; STRING_LEN];
377 for i in 0..N {
378 value[..8].copy_from_slice(&(i as u64).to_le_bytes());
379 bytes.extend_from_slice(&value);
380 }
381
382 let lens = vec![u32::try_from(STRING_LEN).unwrap(); N];
383 let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, &lens);
384
385 assert_eq!(views.len(), N);
386 assert!(
387 buffers.len() >= 2,
388 "heap exceeding MAX_BUFFER_LEN must roll over into multiple buffers, got {}",
389 buffers.len()
390 );
391 for (i, b) in buffers.iter().enumerate() {
392 assert!(
393 b.len() <= MAX_BUFFER_LEN,
394 "buffer {i} of {} bytes exceeds MAX_BUFFER_LEN",
395 b.len()
396 );
397 }
398
399 let boundary = MAX_BUFFER_LEN / STRING_LEN;
401 for i in [0, boundary - 1, boundary, boundary + 1, N / 2, N - 1] {
402 let view = &views[i];
403 let r = view.as_view();
404 let got = &buffers[r.buffer_index as usize][r.as_range()];
405 assert_eq!(got, nth_string(i).as_slice(), "value mismatch at row {i}");
406 assert_eq!(r.size as usize, STRING_LEN);
407 }
408 }
409
410 #[test]
411 fn test_to_canonical_large() {
412 let raw_data =
422 ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd");
423 let lens = vec![13u8; 4];
424
425 let (buffers, views) = build_views(0, 26, raw_data, &lens);
426
427 assert_eq!(
428 buffers,
429 vec![
430 ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"),
431 ByteBuffer::copy_from("cccccccccccccddddddddddddd"),
432 ]
433 );
434
435 assert_eq!(
436 views.as_slice(),
437 &[
438 BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0),
439 BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13),
440 BinaryView::make_view(b"ccccccccccccc", 1, 0),
441 BinaryView::make_view(b"ddddddddddddd", 1, 13),
442 ]
443 )
444 }
445
446 #[test]
447 #[should_panic(expected = "max_buffer_len cannot exceed MAX_BUFFER_LEN")]
448 fn test_max_buffer_len_too_large_panics() {
449 build_views(
450 0,
451 MAX_BUFFER_LEN + 1,
452 ByteBufferMut::copy_from("abc"),
453 &[3u32],
454 );
455 }
456}