Skip to main content

vortex_array/arrays/varbinview/
build_views.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use num_traits::AsPrimitive;
6use vortex_buffer::Buffer;
7use vortex_buffer::BufferMut;
8use vortex_buffer::ByteBuffer;
9use vortex_buffer::ByteBufferMut;
10
11pub use crate::arrays::varbinview::BinaryView;
12use crate::dtype::NativePType;
13
14/// Convert an offsets buffer to a buffer of element lengths.
15#[inline]
16pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
17    offsets
18        .iter()
19        .tuple_windows::<(_, _)>()
20        .map(|(&start, &end)| end - start)
21        .collect()
22}
23
24/// Maximum number of buffer bytes that can be referenced by a single `BinaryView`
25pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;
26
27/// Split a large buffer of input `bytes` holding string data into `VarBinView` buffers and views.
28///
29/// `max_buffer_len` must not exceed [`MAX_BUFFER_LEN`], since every view offset is stored in a
30/// `u32` and offsets are bounded by `max_buffer_len`.
31pub fn build_views<P: NativePType + AsPrimitive<usize>>(
32    start_buf_index: u32,
33    max_buffer_len: usize,
34    bytes: ByteBufferMut,
35    lens: &[P],
36) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
37    assert!(
38        max_buffer_len <= MAX_BUFFER_LEN,
39        "max_buffer_len cannot exceed MAX_BUFFER_LEN, offsets must fit in u32"
40    );
41
42    if bytes.len() <= max_buffer_len {
43        // Common case: the whole decoded heap fits within a single buffer, so no rollover can occur
44        // (`bytes.len()` is the total decoded size and therefore an upper bound on every offset).
45        build_views_single_buffer(start_buf_index, bytes, lens)
46    } else {
47        build_views_rolling(start_buf_index, max_buffer_len, bytes, lens)
48    }
49}
50
51/// Build views when the whole heap fits in a single output buffer.
52///
53/// Because no rollover can occur, the hot loop drops the per-element rollover branch and constructs
54/// reference views inline, avoiding the out-of-line `BinaryView::make_view` call for the common
55/// long-string case. Every offset is bounded by `bytes.len()`, which the caller has guaranteed is
56/// at most [`MAX_BUFFER_LEN`], so the `usize -> u32` conversions cannot truncate.
57fn build_views_single_buffer<P: NativePType + AsPrimitive<usize>>(
58    start_buf_index: u32,
59    bytes: ByteBufferMut,
60    lens: &[P],
61) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
62    let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
63
64    let data = bytes.as_slice();
65    let mut offset = 0usize;
66    // Write directly into the reserved spare capacity rather than `push_unchecked`. The latter
67    // advances the backing buffer's length on every call, which the optimizer cannot prove is
68    // loop-invariant, so it reloads and rewrites the output cursor through the stack each
69    // iteration. Writing into the spare slice keeps the cursor in a register and the length is
70    // set once after the loop.
71    let spare = views.spare_capacity_mut();
72    for (slot, &len) in spare.iter_mut().zip(lens) {
73        let len = len.as_();
74        let value = &data[offset..offset + len];
75        let view = if len > BinaryView::MAX_INLINED_SIZE {
76            let mut prefix = [0u8; 4];
77            prefix.copy_from_slice(&value[..4]);
78            BinaryView::new_ref(len.as_(), prefix, start_buf_index, offset.as_())
79        } else {
80            BinaryView::make_view(value, start_buf_index, offset.as_())
81        };
82        slot.write(view);
83        offset += len;
84    }
85    // SAFETY: the loop initialized exactly `lens.len()` contiguous views (`spare` has at least
86    //  `lens.len()` slots, and `zip` stops at the shorter operand).
87    unsafe { views.set_len(lens.len()) };
88
89    let buffers = if bytes.is_empty() {
90        Vec::new()
91    } else {
92        vec![bytes.freeze()]
93    };
94    (buffers, views.freeze())
95}
96
97/// Build views when the heap exceeds `max_buffer_len` and must be split across multiple buffers.
98///
99/// The buffer is rolled over every `max_buffer_len` bytes so that no view offset overflows the
100/// `u32` offset field.
101fn build_views_rolling<P: NativePType + AsPrimitive<usize>>(
102    start_buf_index: u32,
103    max_buffer_len: usize,
104    mut bytes: ByteBufferMut,
105    lens: &[P],
106) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
107    let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
108    let mut buffers = Vec::new();
109    let mut buf_index = start_buf_index;
110
111    let mut offset = 0;
112    for &len in lens {
113        let len = len.as_();
114        assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");
115
116        if (offset + len) > max_buffer_len {
117            // Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field
118            let rest = bytes.split_off(offset);
119
120            buffers.push(bytes.freeze());
121            buf_index += 1;
122            offset = 0;
123
124            bytes = rest;
125        }
126        let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
127        // SAFETY: we reserved the right capacity beforehand
128        unsafe { views.push_unchecked(view) };
129        offset += len;
130    }
131
132    if !bytes.is_empty() {
133        buffers.push(bytes.freeze());
134    }
135
136    (buffers, views.freeze())
137}
138
139#[cfg(test)]
140mod tests {
141    use rstest::rstest;
142    use vortex_buffer::ByteBuffer;
143    use vortex_buffer::ByteBufferMut;
144
145    use crate::arrays::varbinview::BinaryView;
146    use crate::arrays::varbinview::build_views::MAX_BUFFER_LEN;
147    use crate::arrays::varbinview::build_views::build_views;
148
149    /// Concatenate `values` into a single byte heap and return it alongside the per-element lengths,
150    /// matching the `(bytes, lens)` inputs that `build_views` consumes.
151    fn flatten(values: &[&[u8]]) -> (ByteBufferMut, Vec<u32>) {
152        let mut bytes = ByteBufferMut::empty();
153        let mut lens = Vec::with_capacity(values.len());
154        for v in values {
155            bytes.extend_from_slice(v);
156            lens.push(u32::try_from(v.len()).unwrap());
157        }
158        (bytes, lens)
159    }
160
161    /// Reconstruct the logical value behind each view by dereferencing it through the output
162    /// buffers. The first buffer corresponds to `start_buf_index`, so buffer indices are rebased by
163    /// that amount. This is the core correctness invariant: regardless of which code path built the
164    /// views, every view must point back at its original bytes.
165    fn reconstruct(
166        buffers: &[ByteBuffer],
167        views: &[BinaryView],
168        start_buf_index: u32,
169    ) -> Vec<Vec<u8>> {
170        views
171            .iter()
172            .map(|view| {
173                if view.is_inlined() {
174                    view.as_inlined().value().to_vec()
175                } else {
176                    let r = view.as_view();
177                    let buf = &buffers[(r.buffer_index - start_buf_index) as usize];
178                    buf[r.as_range()].to_vec()
179                }
180            })
181            .collect()
182    }
183
184    /// The single-buffer fast path (`bytes.len() <= max_buffer_len`) must reproduce every input
185    /// value exactly, emit a single output buffer holding the untouched heap, and reference only
186    /// `start_buf_index`. We cover a spread of value sets that mix inlined (<= 12 bytes) and
187    /// reference (> 12 bytes) lengths, including the 12/13 byte inline boundary, empty values, and a
188    /// fully-inlined set.
189    #[rstest]
190    #[case::mixed(&[b"a".as_slice(), b"this is a long reference value", b"short", b"another long value here!!"])]
191    #[case::inline_boundary(&[&[b'x'; 12] as &[u8], &[b'y'; 13], &[b'z'; 12], &[b'w'; 13]])]
192    #[case::all_inlined(&[b"".as_slice(), b"a", b"bb", b"ccc", b"dddddddddddd"])]
193    #[case::all_reference(&[&[b'a'; 100] as &[u8], &[b'b'; 50], &[b'c'; 4096]])]
194    #[case::empty_values_interleaved(&[b"".as_slice(), b"a long value that is referenced", b"", b"", b"trailing long reference value"])]
195    #[case::single_long(&[&[7u8; 1 << 16] as &[u8]])]
196    fn fast_path_roundtrip(#[case] values: &[&[u8]]) {
197        let (bytes, lens) = flatten(values);
198        let total = bytes.len();
199        let start_buf_index = 3;
200
201        // `max_buffer_len` strictly greater than the heap forces the single-buffer fast path.
202        let (buffers, views) = build_views(start_buf_index, total + 1, bytes, &lens);
203
204        assert_eq!(views.len(), values.len());
205        if total == 0 {
206            assert!(buffers.is_empty(), "empty heap must not allocate a buffer");
207        } else {
208            assert_eq!(buffers.len(), 1, "whole heap must stay in one buffer");
209            // The fast path freezes the input heap unchanged.
210            let concatenated: Vec<u8> = values.concat();
211            assert_eq!(buffers[0].as_slice(), concatenated.as_slice());
212        }
213        for view in views.iter() {
214            if !view.is_inlined() {
215                assert_eq!(view.as_view().buffer_index, start_buf_index);
216            }
217        }
218
219        let expected: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
220        assert_eq!(reconstruct(&buffers, &views, start_buf_index), expected);
221    }
222
223    /// Offsets and sizes are written into the `u32` `Ref` fields via `as_` truncation, so we must
224    /// confirm they stay correct once the running offset grows well past the 16-bit range (i.e. is
225    /// not narrowed to a smaller width). A ~9 MiB heap pushes offsets above 2^23 while remaining far
226    /// below `MAX_BUFFER_LEN`; each value encodes its index in its first bytes so a misplaced offset
227    /// would reconstruct the wrong bytes.
228    #[test]
229    fn fast_path_large_offsets() {
230        const N: usize = 9000;
231        const LEN: usize = 1000;
232        // The final offset is (N - 1) * LEN, which must exceed 2^23 to be a meaningful check.
233        const _: () = assert!((N - 1) * LEN > (1 << 23));
234
235        let values: Vec<Vec<u8>> = (0..N)
236            .map(|i| {
237                let mut v = vec![0u8; LEN];
238                v[..4].copy_from_slice(&u32::try_from(i).unwrap().to_le_bytes());
239                v
240            })
241            .collect();
242        let refs: Vec<&[u8]> = values.iter().map(|v| v.as_slice()).collect();
243
244        let (bytes, lens) = flatten(&refs);
245        let total = bytes.len();
246
247        let (buffers, views) = build_views(0, total + 1, bytes, &lens);
248
249        assert_eq!(buffers.len(), 1);
250        // The recorded offset must equal the cumulative byte position, exactly, for every view.
251        for (i, view) in views.iter().enumerate() {
252            let r = view.as_view();
253            assert_eq!(r.offset as usize, i * LEN, "wrong offset for view {i}");
254            assert_eq!(r.size as usize, LEN);
255        }
256        assert_eq!(reconstruct(&buffers, &views, 0), values);
257    }
258
259    /// The fast path is taken when `bytes.len() <= max_buffer_len`, so equality at the boundary must
260    /// still produce a single buffer (not roll over to the slow path).
261    #[test]
262    fn fast_path_taken_at_exact_boundary() {
263        let (bytes, lens) =
264            flatten(&[b"this value is definitely long", b"and so is this one here"]);
265        let total = bytes.len();
266
267        let (buffers, views) = build_views(0, total, bytes, &lens);
268
269        assert_eq!(
270            buffers.len(),
271            1,
272            "len == max_buffer_len must stay on fast path"
273        );
274        assert_eq!(views.len(), 2);
275    }
276
277    /// For the same logical data, the fast path (single buffer) and the slow rollover path must
278    /// reconstruct identical values. Driving the slow path with a small `max_buffer_len` forces
279    /// buffer splitting while leaving the recovered values unchanged.
280    #[test]
281    fn fast_and_slow_paths_agree() {
282        let values: &[&[u8]] = &[
283            b"first long reference value",
284            b"tiny",
285            b"second long reference value!!",
286            b"third looooong reference value",
287        ];
288        let expected: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
289
290        let (fast_bytes, lens) = flatten(values);
291        let total = fast_bytes.len();
292        let (fast_buffers, fast_views) = build_views(0, total + 1, fast_bytes, &lens);
293        assert_eq!(fast_buffers.len(), 1);
294        assert_eq!(reconstruct(&fast_buffers, &fast_views, 0), expected);
295
296        // Force the rollover path: a small cap (>= the longest value) that the total heap exceeds.
297        let longest = values.iter().map(|v| v.len()).max().unwrap();
298        let (slow_bytes, _) = flatten(values);
299        let (slow_buffers, slow_views) = build_views(0, longest, slow_bytes, &lens);
300        assert!(
301            slow_buffers.len() > 1,
302            "small cap should split into many buffers"
303        );
304        assert_eq!(reconstruct(&slow_buffers, &slow_views, 0), expected);
305
306        // Same logical contents regardless of how the heap was partitioned.
307        assert_eq!(
308            reconstruct(&fast_buffers, &fast_views, 0),
309            reconstruct(&slow_buffers, &slow_views, 0)
310        );
311    }
312
313    /// Empty input must yield no buffers and no views, exercising the `bytes.is_empty()` branch.
314    #[test]
315    fn fast_path_empty_input() {
316        let lens: Vec<u32> = Vec::new();
317        let (buffers, views) = build_views(0, 1024, ByteBufferMut::empty(), &lens);
318        assert!(buffers.is_empty());
319        assert!(views.is_empty());
320    }
321
322    /// The fast path must produce views byte-identical to the value-inspecting `make_view`, which is
323    /// what the slow path uses. This pins the inline/reference decision and field layout.
324    #[test]
325    fn fast_path_matches_make_view() {
326        let values: &[&[u8]] = &[b"inline", b"this is a long reference value", b""];
327        let (bytes, lens) = flatten(values);
328        let total = bytes.len();
329        let (_buffers, views) = build_views(0, total + 1, bytes, &lens);
330
331        let expected = [
332            BinaryView::make_view(b"inline", 0, 0),
333            BinaryView::make_view(b"this is a long reference value", 0, 6),
334            BinaryView::make_view(b"", 0, 36),
335        ];
336        assert_eq!(views.as_slice(), &expected);
337    }
338
339    // TODO(someone): ideally CI would run this in release mode as well, since debug builds make the
340    // ~2.25 GiB allocation and fill loop substantially slower.
341    /// Slow regression for the single-buffer fast-path guard. The fast path is only valid when the
342    /// whole heap fits in one buffer (`bytes.len() <= max_buffer_len`); once the heap exceeds
343    /// [`MAX_BUFFER_LEN`] (`i32::MAX`, ~2.0 GiB) `build_views` must roll the heap into multiple
344    /// buffers, resetting the per-buffer offset, so no view references an offset past the
345    /// `i32`-bounded buffer limit.
346    ///
347    /// We build a heap just past `i32::MAX` and assert it rolls over into more than one buffer, that
348    /// no buffer exceeds `MAX_BUFFER_LEN`, and that values straddling the rollover boundary (where
349    /// the second buffer's offsets restart from zero) reconstruct exactly. If the guard regressed and
350    /// the fast path swallowed the whole heap, it would emit a single >2 GiB buffer with offsets past
351    /// `i32::MAX`, which the buffer-count and buffer-size assertions catch.
352    ///
353    /// Allocates ~2.25 GiB, so it is gated to CI and skipped when `VORTEX_SKIP_SLOW_TESTS` is set:
354    ///
355    /// ```text
356    /// CI=1 cargo test --release -p vortex-array build_views_offsets_overflow
357    /// ```
358    ///
359    /// [`MAX_BUFFER_LEN`]: super::MAX_BUFFER_LEN
360    #[test_with::env(CI)]
361    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
362    fn build_views_offsets_overflow_i32() {
363        const STRING_LEN: usize = 64 * 1024;
364        // Comfortably past MAX_BUFFER_LEN (`i32::MAX` ~= 2.0 GiB) so the heap must roll over.
365        const TOTAL_BYTES: usize = (1usize << 31) + (256 << 20); // ~2.25 GiB
366        const N: usize = TOTAL_BYTES / STRING_LEN;
367
368        // Each value's first 8 bytes encode its row index, so a misrouted offset is detectable.
369        let nth_string = |i: usize| {
370            let mut s = vec![b'x'; STRING_LEN];
371            s[..8].copy_from_slice(&(i as u64).to_le_bytes());
372            s
373        };
374
375        let mut bytes = ByteBufferMut::with_capacity(N * STRING_LEN);
376        let mut value = vec![b'x'; STRING_LEN];
377        for i in 0..N {
378            value[..8].copy_from_slice(&(i as u64).to_le_bytes());
379            bytes.extend_from_slice(&value);
380        }
381
382        let lens = vec![u32::try_from(STRING_LEN).unwrap(); N];
383        let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, &lens);
384
385        assert_eq!(views.len(), N);
386        assert!(
387            buffers.len() >= 2,
388            "heap exceeding MAX_BUFFER_LEN must roll over into multiple buffers, got {}",
389            buffers.len()
390        );
391        for (i, b) in buffers.iter().enumerate() {
392            assert!(
393                b.len() <= MAX_BUFFER_LEN,
394                "buffer {i} of {} bytes exceeds MAX_BUFFER_LEN",
395                b.len()
396            );
397        }
398
399        // The boundary row is the first whose offset would cross MAX_BUFFER_LEN on the fast path.
400        let boundary = MAX_BUFFER_LEN / STRING_LEN;
401        for i in [0, boundary - 1, boundary, boundary + 1, N / 2, N - 1] {
402            let view = &views[i];
403            let r = view.as_view();
404            let got = &buffers[r.buffer_index as usize][r.as_range()];
405            assert_eq!(got, nth_string(i).as_slice(), "value mismatch at row {i}");
406            assert_eq!(r.size as usize, STRING_LEN);
407        }
408    }
409
410    #[test]
411    fn test_to_canonical_large() {
412        // We are testing generating views for raw data that should look like
413        //
414        //    aaaaaaaaaaaaa ("a"*13)
415        //    bbbbbbbbbbbbb ("b"*13)
416        //    ccccccccccccc ("c"*13)
417        //    ddddddddddddd ("d"*13)
418        //
419        // In real code, this would all fit in one buffer, but to unit test the splitting logic
420        // we split buffers at length 26, which should result in two buffers for the output array.
421        let raw_data =
422            ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd");
423        let lens = vec![13u8; 4];
424
425        let (buffers, views) = build_views(0, 26, raw_data, &lens);
426
427        assert_eq!(
428            buffers,
429            vec![
430                ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"),
431                ByteBuffer::copy_from("cccccccccccccddddddddddddd"),
432            ]
433        );
434
435        assert_eq!(
436            views.as_slice(),
437            &[
438                BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0),
439                BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13),
440                BinaryView::make_view(b"ccccccccccccc", 1, 0),
441                BinaryView::make_view(b"ddddddddddddd", 1, 13),
442            ]
443        )
444    }
445
446    #[test]
447    #[should_panic(expected = "max_buffer_len cannot exceed MAX_BUFFER_LEN")]
448    fn test_max_buffer_len_too_large_panics() {
449        build_views(
450            0,
451            MAX_BUFFER_LEN + 1,
452            ByteBufferMut::copy_from("abc"),
453            &[3u32],
454        );
455    }
456}