Skip to main content

vortex_onpair/
ops.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use onpair::Parts;
5use vortex_array::ArrayView;
6use vortex_array::ExecutionCtx;
7use vortex_array::arrays::varbin::varbin_scalar;
8use vortex_array::scalar::Scalar;
9use vortex_array::vtable::OperationsVTable;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexResult;
12use vortex_error::vortex_err;
13
14use crate::OnPair;
15use crate::OnPairArraySlotsExt;
16use crate::decode::code_boundary_at;
17use crate::decode::collect_widened;
18
19impl OperationsVTable<OnPair> for OnPair {
20    fn scalar_at(
21        array: ArrayView<'_, OnPair>,
22        index: usize,
23        ctx: &mut ExecutionCtx,
24    ) -> VortexResult<Scalar> {
25        // A row owns a variable-length run of the flat `codes` stream; the
26        // per-row `codes_offsets` boundaries map the row index to that run.
27        // Read just this row's two boundaries (point lookups that decode at
28        // most one chunk of `codes_offsets`) and decode only that run — never
29        // the whole column.
30        let codes_offsets = array.codes_offsets();
31        let row_start = code_boundary_at(codes_offsets, index, ctx)?;
32        let row_end = code_boundary_at(codes_offsets, index + 1, ctx)?;
33
34        let codes = collect_widened::<u16>(&array.codes().slice(row_start..row_end)?, ctx)?;
35        let dict_offsets = collect_widened::<u32>(array.dict_offsets(), ctx)?;
36        let parts = Parts {
37            dict_bytes: array.dict_bytes().as_slice(),
38            dict_offsets: dict_offsets.as_slice(),
39            bits: array.bits(),
40            codes: codes.as_slice(),
41        };
42
43        // The per-row decoded length is recorded in the `uncompressed_lengths`
44        // child, so read it directly instead of asking the decoder to compute it.
45        let len = array
46            .uncompressed_lengths()
47            .execute_scalar(index, ctx)?
48            .as_primitive()
49            .as_::<usize>()
50            .ok_or_else(|| vortex_err!("OnPair uncompressed_lengths[{index}] is null"))?;
51        let mut buf: Vec<u8> = Vec::with_capacity(len);
52        let written = onpair::decompress_into(parts, buf.spare_capacity_mut());
53        debug_assert_eq!(written, len);
54        // SAFETY: `decompress_into` initialised `written` bytes of the spare
55        // capacity reserved above.
56        unsafe { buf.set_len(written) };
57        Ok(varbin_scalar(ByteBuffer::from(buf), array.dtype()))
58    }
59}