Skip to main content

vortex_onpair/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Train + compress entry points for the OnPair encoding.
5
6use onpair::Config;
7use onpair::Offset;
8use vortex_array::ArrayRef;
9use vortex_array::ExecutionCtx;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::VortexSessionExecute;
13use vortex_array::accessor::ArrayAccessor;
14use vortex_array::arrays::VarBinViewArray;
15use vortex_array::buffer::BufferHandle;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::Nullability;
18use vortex_array::validity::Validity;
19use vortex_buffer::Buffer;
20use vortex_buffer::BufferMut;
21use vortex_buffer::ByteBuffer;
22use vortex_error::VortexExpect;
23use vortex_error::VortexResult;
24use vortex_error::vortex_err;
25
26use crate::OnPair;
27use crate::OnPairArray;
28
29/// Default OnPair training configuration: 12-bit codes ("dict-12").
30pub const DEFAULT_DICT12_CONFIG: Config = onpair::DEFAULT_CONFIG;
31
32/// Compress an iterable of optional byte strings via the OnPair encoder.
33pub fn onpair_compress_iter<'a, I>(
34    iter: I,
35    len: usize,
36    dtype: DType,
37    config: Config,
38) -> VortexResult<OnPairArray>
39where
40    I: Iterator<Item = Option<&'a [u8]>>,
41{
42    onpair_compress_iter_with_offsets::<u64, _>(iter, len, dtype, config)
43}
44
45fn onpair_compress_iter_with_offsets<'a, O, I>(
46    iter: I,
47    len: usize,
48    dtype: DType,
49    config: Config,
50) -> VortexResult<OnPairArray>
51where
52    O: Offset,
53    I: Iterator<Item = Option<&'a [u8]>>,
54{
55    let mut flat: Vec<u8> = Vec::with_capacity(len * 16);
56    let mut offsets: Vec<O> = Vec::with_capacity(len + 1);
57    let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
58    let mut validity_bits: Vec<bool> = Vec::with_capacity(len);
59    offsets.push(<O as Offset>::from_usize(0));
60
61    for item in iter {
62        match item {
63            Some(bytes) => {
64                flat.extend_from_slice(bytes);
65                offsets.push(<O as Offset>::from_usize(flat.len()));
66                uncompressed_lengths.push(
67                    i32::try_from(bytes.len()).vortex_expect("string length must fit in i32"),
68                );
69                validity_bits.push(true);
70            }
71            None => {
72                offsets.push(<O as Offset>::from_usize(flat.len()));
73                uncompressed_lengths.push(0);
74                validity_bits.push(false);
75            }
76        }
77    }
78
79    let column = onpair::compress(&flat, &offsets, config)
80        .map_err(|e| vortex_err!("OnPair compress failed: {e}"))?;
81    let bits = column.bits;
82    let dict_bytes = dict_bytes_to_buffer(column.dict_bytes);
83    let codes_offsets = build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?;
84    let codes = Buffer::from(column.codes).into_array();
85    let dict_offsets = Buffer::from(column.dict_offsets).into_array();
86    let codes_offsets = Buffer::from(codes_offsets).into_array();
87
88    let uncompressed_lengths = uncompressed_lengths.into_array();
89    let validity = match dtype.nullability() {
90        Nullability::NonNullable => Validity::NonNullable,
91        Nullability::Nullable => Validity::from_iter(validity_bits),
92    };
93
94    OnPair::try_new(
95        dtype,
96        dict_bytes,
97        dict_offsets,
98        codes,
99        codes_offsets,
100        uncompressed_lengths,
101        validity,
102        bits,
103    )
104}
105
106/// Lift compressed dictionary bytes into the Vortex buffer slot.
107fn dict_bytes_to_buffer(dict_bytes: Vec<u8>) -> BufferHandle {
108    // Pad the dictionary blob with MAX_TOKEN_SIZE zero bytes so the
109    // over-copy decoder can issue a fixed 16-byte load for every token
110    // without risking an OOB read on the last entry.
111    let mut padded = Vec::with_capacity(dict_bytes.len() + onpair::MAX_TOKEN_SIZE);
112    padded.extend_from_slice(&dict_bytes);
113    padded.resize(dict_bytes.len() + onpair::MAX_TOKEN_SIZE, 0);
114    // Align dict_bytes to 8 bytes so the segment that ultimately holds the
115    // OnPair tree starts at an 8-aligned in-memory address. Without this
116    // anchor, the per-buffer padding the serializer inserts is only
117    // *relative* to the segment start; if the segment lands at a u8-aligned
118    // heap address, downstream `PrimitiveArray<u32>::deserialize` panics
119    // with `Misaligned buffer cannot be used to build PrimitiveArray of u32`.
120    BufferHandle::new_host(ByteBuffer::from(padded).aligned(vortex_buffer::Alignment::new(8)))
121}
122
123/// Reconstruct the per-row `codes_offsets` from the flat `codes`, the
124/// dictionary `dict_offsets` (token byte lengths) and the per-row decoded byte
125/// boundaries. Returns `nrows + 1` cumulative code counts (`u32`).
126// TODO(joe): can we compute this while compressing the array, yes but a worse API.
127fn build_codes_offsets<O: Offset>(
128    codes: &[u16],
129    dict_offsets: &[u32],
130    row_byte_offsets: &[O],
131) -> VortexResult<Vec<u32>> {
132    let nrows = row_byte_offsets.len() - 1;
133    let mut codes_offsets = Vec::with_capacity(nrows + 1);
134    codes_offsets.push(0u32);
135    let mut decoded_bytes: u64 = 0;
136    let mut code_idx: usize = 0;
137    for r in 0..nrows {
138        let target = row_byte_offsets[r + 1]
139            .to_usize()
140            .ok_or_else(|| vortex_err!("OnPair row byte offset does not fit usize"))?
141            as u64;
142        while decoded_bytes < target {
143            let code = codes[code_idx] as usize;
144            decoded_bytes += u64::from(dict_offsets[code + 1] - dict_offsets[code]);
145            code_idx += 1;
146        }
147        codes_offsets.push(
148            u32::try_from(code_idx)
149                .map_err(|_| vortex_err!("OnPair: code boundary {code_idx} does not fit u32"))?,
150        );
151    }
152    Ok(codes_offsets)
153}
154
155/// Compress a byte-string accessor (typically a `VarBinArray` or
156/// `VarBinViewArray`).
157pub fn onpair_compress<A: ArrayAccessor<[u8]>>(
158    array: A,
159    len: usize,
160    dtype: &DType,
161    config: Config,
162) -> VortexResult<OnPairArray> {
163    array.with_iterator(|iter| onpair_compress_iter(iter, len, dtype.clone(), config))
164}
165
166/// Compress any [`ArrayRef`] whose canonical form is a string array, by first
167/// canonicalising to `VarBinViewArray`.
168pub fn onpair_compress_array(
169    array: &ArrayRef,
170    config: Config,
171    ctx: &mut ExecutionCtx,
172) -> VortexResult<OnPairArray> {
173    let view = array.clone().execute::<VarBinViewArray>(ctx)?;
174    let len = view.len();
175    let dtype = view.dtype().clone();
176    onpair_compress(&view, len, &dtype, config)
177}
178
179/// Convenience: build a default `ExecutionCtx` from `LEGACY_SESSION`.
180pub fn onpair_compress_array_default(
181    array: &ArrayRef,
182    config: Config,
183) -> VortexResult<OnPairArray> {
184    let mut ctx = LEGACY_SESSION.create_execution_ctx();
185    onpair_compress_array(array, config, &mut ctx)
186}