1use fsst::Decompressor;
2use vortex_array::arrays::{BinaryView, VarBinViewArray};
3use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
4use vortex_array::validity::Validity;
5use vortex_array::vtable::CanonicalVTable;
6use vortex_array::{Canonical, IntoArray, ToCanonical};
7use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::match_each_integer_ptype;
9use vortex_error::VortexResult;
10
11use crate::{FSSTArray, FSSTVTable};
12
13impl CanonicalVTable<FSSTVTable> for FSSTVTable {
14 fn canonicalize(array: &FSSTArray) -> VortexResult<Canonical> {
15 fsst_into_varbin_view(array.decompressor(), array, 0).map(Canonical::VarBinView)
16 }
17
18 fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
19 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
20 return builder.extend_from_array(&array.to_canonical()?.into_array());
21 };
22 let view =
23 fsst_into_varbin_view(array.decompressor(), array, builder.completed_block_count())?;
24
25 builder.push_buffer_and_adjusted_views(
26 view.buffers().iter().cloned(),
27 view.views().iter().cloned(),
28 array.validity_mask()?,
29 );
30 Ok(())
31 }
32}
33
34#[allow(clippy::cast_possible_truncation)]
37fn fsst_into_varbin_view(
38 decompressor: Decompressor,
39 fsst_array: &FSSTArray,
40 block_offset: usize,
41) -> VortexResult<VarBinViewArray> {
42 let bytes = fsst_array.codes().sliced_bytes();
52
53 let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive()?;
54
55 #[allow(clippy::cast_possible_truncation)]
57 let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
58 uncompressed_lens_array
59 .as_slice::<P>()
60 .iter()
61 .map(|x| *x as usize)
62 .sum()
63 });
64
65 let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
67 let len =
68 decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
69 unsafe { uncompressed_bytes.set_len(len) };
70
71 let block_offset = u32::try_from(block_offset)?;
72
73 let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
75
76 match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
77 let mut offset = 0;
78 for len in uncompressed_lens_array.as_slice::<P>() {
79 let len = *len as usize;
80 let view = BinaryView::make_view(
81 &uncompressed_bytes[offset..][..len],
82 block_offset,
83 offset as u32,
84 );
85 unsafe { views.push_unchecked(view) };
87 offset += len;
88 }
89 });
90
91 let views = views.freeze();
92 let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);
93
94 VarBinViewArray::try_new(
95 views,
96 vec![uncompressed_bytes_array],
97 fsst_array.dtype().clone(),
98 Validity::copy_from_array(fsst_array.as_ref())?,
99 )
100}
101
102#[cfg(test)]
103mod tests {
104 use rand::prelude::StdRng;
105 use rand::{Rng, SeedableRng};
106 use vortex_array::accessor::ArrayAccessor;
107 use vortex_array::arrays::{ChunkedArray, VarBinArray};
108 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
109 use vortex_array::{ArrayRef, IntoArray, ToCanonical};
110 use vortex_dtype::{DType, Nullability};
111
112 use crate::{fsst_compress, fsst_train_compressor};
113
114 fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
115 const STRING_COUNT: usize = 1000;
116 let mut rng = StdRng::seed_from_u64(0);
117 let mut strings = Vec::with_capacity(STRING_COUNT);
118
119 for _ in 0..STRING_COUNT {
120 if rng.random_bool(0.9) {
121 strings.push(None)
122 } else {
123 let len = 10 * rng.random_range(50..=150) / 100;
126 strings.push(Some(
127 (0..len)
128 .map(|_| rng.random_range(b'a'..=b'z') as char)
129 .collect::<String>()
130 .into_bytes(),
131 ));
132 }
133 }
134
135 (
136 VarBinArray::from_iter(
137 strings
138 .clone()
139 .into_iter()
140 .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
141 DType::Binary(Nullability::Nullable),
142 )
143 .into_array(),
144 strings,
145 )
146 }
147
148 fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
149 #[allow(clippy::type_complexity)]
150 let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
151 .map(|_| {
152 let (array, data) = make_data();
153 let compressor = fsst_train_compressor(&array).unwrap();
154 (
155 fsst_compress(&array, &compressor).unwrap().into_array(),
156 data,
157 )
158 })
159 .unzip();
160
161 (
162 ChunkedArray::from_iter(arr_vec),
163 data_vec.into_iter().flatten().collect(),
164 )
165 }
166
167 #[test]
168 fn test_to_canonical() {
169 let (chunked_arr, data) = make_data_chunked();
170
171 let mut builder =
172 VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
173 chunked_arr.append_to_builder(&mut builder).unwrap();
174
175 {
176 let arr = builder.finish().to_varbinview().unwrap();
177 let res1 = arr
178 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
179 .unwrap();
180 assert_eq!(data, res1);
181 };
182
183 {
184 let arr2 = chunked_arr.to_varbinview().unwrap();
185 let res2 = arr2
186 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
187 .unwrap();
188 assert_eq!(data, res2)
189 };
190 }
191}