vortex_onpair/
compress.rs1use onpair::Config;
7use onpair::Offset;
8use vortex_array::ArrayRef;
9use vortex_array::ExecutionCtx;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::VortexSessionExecute;
13use vortex_array::accessor::ArrayAccessor;
14use vortex_array::arrays::VarBinViewArray;
15use vortex_array::buffer::BufferHandle;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::Nullability;
18use vortex_array::validity::Validity;
19use vortex_buffer::Buffer;
20use vortex_buffer::BufferMut;
21use vortex_buffer::ByteBuffer;
22use vortex_error::VortexExpect;
23use vortex_error::VortexResult;
24use vortex_error::vortex_err;
25
26use crate::OnPair;
27use crate::OnPairArray;
28
29pub const DEFAULT_DICT12_CONFIG: Config = onpair::DEFAULT_CONFIG;
31
32pub fn onpair_compress_iter<'a, I>(
34 iter: I,
35 len: usize,
36 dtype: DType,
37 config: Config,
38) -> VortexResult<OnPairArray>
39where
40 I: Iterator<Item = Option<&'a [u8]>>,
41{
42 onpair_compress_iter_with_offsets::<u64, _>(iter, len, dtype, config)
43}
44
45fn onpair_compress_iter_with_offsets<'a, O, I>(
46 iter: I,
47 len: usize,
48 dtype: DType,
49 config: Config,
50) -> VortexResult<OnPairArray>
51where
52 O: Offset,
53 I: Iterator<Item = Option<&'a [u8]>>,
54{
55 let mut flat: Vec<u8> = Vec::with_capacity(len * 16);
56 let mut offsets: Vec<O> = Vec::with_capacity(len + 1);
57 let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
58 let mut validity_bits: Vec<bool> = Vec::with_capacity(len);
59 offsets.push(<O as Offset>::from_usize(0));
60
61 for item in iter {
62 match item {
63 Some(bytes) => {
64 flat.extend_from_slice(bytes);
65 offsets.push(<O as Offset>::from_usize(flat.len()));
66 uncompressed_lengths.push(
67 i32::try_from(bytes.len()).vortex_expect("string length must fit in i32"),
68 );
69 validity_bits.push(true);
70 }
71 None => {
72 offsets.push(<O as Offset>::from_usize(flat.len()));
73 uncompressed_lengths.push(0);
74 validity_bits.push(false);
75 }
76 }
77 }
78
79 let column = onpair::compress(&flat, &offsets, config)
80 .map_err(|e| vortex_err!("OnPair compress failed: {e}"))?;
81 let bits = column.bits;
82 let dict_bytes = dict_bytes_to_buffer(column.dict_bytes);
83 let codes_offsets = build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?;
84 let codes = Buffer::from(column.codes).into_array();
85 let dict_offsets = Buffer::from(column.dict_offsets).into_array();
86 let codes_offsets = Buffer::from(codes_offsets).into_array();
87
88 let uncompressed_lengths = uncompressed_lengths.into_array();
89 let validity = match dtype.nullability() {
90 Nullability::NonNullable => Validity::NonNullable,
91 Nullability::Nullable => Validity::from_iter(validity_bits),
92 };
93
94 OnPair::try_new(
95 dtype,
96 dict_bytes,
97 dict_offsets,
98 codes,
99 codes_offsets,
100 uncompressed_lengths,
101 validity,
102 bits,
103 )
104}
105
106fn dict_bytes_to_buffer(dict_bytes: Vec<u8>) -> BufferHandle {
108 let mut padded = Vec::with_capacity(dict_bytes.len() + onpair::MAX_TOKEN_SIZE);
112 padded.extend_from_slice(&dict_bytes);
113 padded.resize(dict_bytes.len() + onpair::MAX_TOKEN_SIZE, 0);
114 BufferHandle::new_host(ByteBuffer::from(padded).aligned(vortex_buffer::Alignment::new(8)))
121}
122
123fn build_codes_offsets<O: Offset>(
128 codes: &[u16],
129 dict_offsets: &[u32],
130 row_byte_offsets: &[O],
131) -> VortexResult<Vec<u32>> {
132 let nrows = row_byte_offsets.len() - 1;
133 let mut codes_offsets = Vec::with_capacity(nrows + 1);
134 codes_offsets.push(0u32);
135 let mut decoded_bytes: u64 = 0;
136 let mut code_idx: usize = 0;
137 for r in 0..nrows {
138 let target = row_byte_offsets[r + 1]
139 .to_usize()
140 .ok_or_else(|| vortex_err!("OnPair row byte offset does not fit usize"))?
141 as u64;
142 while decoded_bytes < target {
143 let code = codes[code_idx] as usize;
144 decoded_bytes += u64::from(dict_offsets[code + 1] - dict_offsets[code]);
145 code_idx += 1;
146 }
147 codes_offsets.push(
148 u32::try_from(code_idx)
149 .map_err(|_| vortex_err!("OnPair: code boundary {code_idx} does not fit u32"))?,
150 );
151 }
152 Ok(codes_offsets)
153}
154
155pub fn onpair_compress<A: ArrayAccessor<[u8]>>(
158 array: A,
159 len: usize,
160 dtype: &DType,
161 config: Config,
162) -> VortexResult<OnPairArray> {
163 array.with_iterator(|iter| onpair_compress_iter(iter, len, dtype.clone(), config))
164}
165
166pub fn onpair_compress_array(
169 array: &ArrayRef,
170 config: Config,
171 ctx: &mut ExecutionCtx,
172) -> VortexResult<OnPairArray> {
173 let view = array.clone().execute::<VarBinViewArray>(ctx)?;
174 let len = view.len();
175 let dtype = view.dtype().clone();
176 onpair_compress(&view, len, &dtype, config)
177}
178
179pub fn onpair_compress_array_default(
181 array: &ArrayRef,
182 config: Config,
183) -> VortexResult<OnPairArray> {
184 let mut ctx = LEGACY_SESSION.create_execution_ctx();
185 onpair_compress_array(array, config, &mut ctx)
186}