1use fsst::Decompressor;
2use vortex_array::arrays::{BinaryView, VarBinViewArray};
3use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
4use vortex_array::validity::Validity;
5use vortex_array::vtable::CanonicalVTable;
6use vortex_array::{Canonical, IntoArray, ToCanonical};
7use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::match_each_integer_ptype;
9use vortex_error::VortexResult;
10
11use crate::{FSSTArray, FSSTVTable};
12
13impl CanonicalVTable<FSSTVTable> for FSSTVTable {
14 fn canonicalize(array: &FSSTArray) -> VortexResult<Canonical> {
15 fsst_into_varbin_view(array.decompressor(), array, 0).map(Canonical::VarBinView)
16 }
17
18 fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
19 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
20 return builder.extend_from_array(&array.to_canonical()?.into_array());
21 };
22 let view =
23 fsst_into_varbin_view(array.decompressor(), array, builder.completed_block_count())?;
24
25 builder.push_buffer_and_adjusted_views(
26 view.buffers().iter().cloned(),
27 view.views().iter().cloned(),
28 array.validity_mask()?,
29 );
30 Ok(())
31 }
32}
33
34fn fsst_into_varbin_view(
37 decompressor: Decompressor,
38 fsst_array: &FSSTArray,
39 block_offset: usize,
40) -> VortexResult<VarBinViewArray> {
41 let bytes = fsst_array.codes().sliced_bytes();
51
52 let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive()?;
53
54 #[allow(clippy::cast_possible_truncation)]
56 let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
57 uncompressed_lens_array.as_slice::<$P>().iter().map(|x| *x as usize).sum()
58 });
59
60 let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
62 let len =
63 decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
64 unsafe { uncompressed_bytes.set_len(len) };
65
66 let block_offset = u32::try_from(block_offset)?;
67
68 let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
70
71 match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
72 let mut offset = 0;
73 for len in uncompressed_lens_array.as_slice::<$P>() {
74 let len = *len as usize;
75 let view = BinaryView::make_view(
76 &uncompressed_bytes[offset..][..len],
77 block_offset,
78 offset as u32,
79 );
80 unsafe { views.push_unchecked(view.into()) };
82 offset += len;
83 }
84 });
85
86 let views = views.freeze();
87 let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);
88
89 VarBinViewArray::try_new(
90 views,
91 vec![uncompressed_bytes_array],
92 fsst_array.dtype().clone(),
93 Validity::copy_from_array(fsst_array.as_ref())?,
94 )
95}
96
97#[cfg(test)]
98mod tests {
99 use rand::prelude::StdRng;
100 use rand::{Rng, SeedableRng};
101 use vortex_array::accessor::ArrayAccessor;
102 use vortex_array::arrays::{ChunkedArray, VarBinArray};
103 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
104 use vortex_array::{ArrayRef, IntoArray, ToCanonical};
105 use vortex_dtype::{DType, Nullability};
106
107 use crate::{fsst_compress, fsst_train_compressor};
108
109 fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
110 const STRING_COUNT: usize = 1000;
111 let mut rng = StdRng::seed_from_u64(0);
112 let mut strings = Vec::with_capacity(STRING_COUNT);
113
114 for _ in 0..STRING_COUNT {
115 if rng.random_bool(0.9) {
116 strings.push(None)
117 } else {
118 let len = 10 * rng.random_range(50..=150) / 100;
121 strings.push(Some(
122 (0..len)
123 .map(|_| rng.random_range(b'a'..=b'z') as char)
124 .collect::<String>()
125 .into_bytes(),
126 ));
127 }
128 }
129
130 (
131 VarBinArray::from_iter(
132 strings
133 .clone()
134 .into_iter()
135 .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
136 DType::Binary(Nullability::Nullable),
137 )
138 .into_array(),
139 strings,
140 )
141 }
142
143 fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
144 #[allow(clippy::type_complexity)]
145 let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
146 .map(|_| {
147 let (array, data) = make_data();
148 let compressor = fsst_train_compressor(&array).unwrap();
149 (
150 fsst_compress(&array, &compressor).unwrap().into_array(),
151 data,
152 )
153 })
154 .unzip();
155
156 (
157 ChunkedArray::from_iter(arr_vec),
158 data_vec.into_iter().flatten().collect(),
159 )
160 }
161
162 #[test]
163 fn test_to_canonical() {
164 let (chunked_arr, data) = make_data_chunked();
165
166 let mut builder =
167 VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
168 chunked_arr.append_to_builder(&mut builder).unwrap();
169
170 {
171 let arr = builder.finish().to_varbinview().unwrap();
172 let res1 = arr
173 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
174 .unwrap();
175 assert_eq!(data, res1);
176 };
177
178 {
179 let arr2 = chunked_arr.to_varbinview().unwrap();
180 let res2 = arr2
181 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
182 .unwrap();
183 assert_eq!(data, res2)
184 };
185 }
186}