1use std::sync::Arc;
5
6use vortex_array::Canonical;
7use vortex_array::IntoArray;
8use vortex_array::ToCanonical;
9use vortex_array::arrays::VarBinViewArray;
10use vortex_array::builders::ArrayBuilder;
11use vortex_array::builders::VarBinViewBuilder;
12use vortex_array::vtable::CanonicalVTable;
13use vortex_array::vtable::ValidityHelper;
14use vortex_buffer::Buffer;
15use vortex_buffer::BufferMut;
16use vortex_buffer::ByteBuffer;
17use vortex_buffer::ByteBufferMut;
18use vortex_dtype::match_each_integer_ptype;
19use vortex_vector::binaryview::BinaryView;
20
21use crate::FSSTArray;
22use crate::FSSTVTable;
23
24impl CanonicalVTable<FSSTVTable> for FSSTVTable {
25 fn canonicalize(array: &FSSTArray) -> Canonical {
26 let (buffer, views) = fsst_decode_views(array, 0);
27 unsafe {
30 Canonical::VarBinView(VarBinViewArray::new_unchecked(
31 views,
32 Arc::new([buffer]),
33 array.dtype().clone(),
34 array.codes().validity().clone(),
35 ))
36 }
37 }
38
39 fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) {
40 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
41 return builder.extend_from_array(&array.to_canonical().into_array());
42 };
43
44 let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
48
49 builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
50 }
51}
52
53#[expect(
54 clippy::cast_possible_truncation,
55 reason = "truncation is intentional for buffer index"
56)]
57fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
58 let bytes = fsst_array.codes().sliced_bytes();
66
67 let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive();
68
69 #[allow(clippy::cast_possible_truncation)]
71 let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
72 uncompressed_lens_array
73 .as_slice::<P>()
74 .iter()
75 .map(|x| *x as usize)
76 .sum()
77 });
78
79 let decompressor = fsst_array.decompressor();
81 let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
82 let len =
83 decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
84 unsafe { uncompressed_bytes.set_len(len) };
85
86 let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
88
89 match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
90 let mut offset = 0;
91 for len in uncompressed_lens_array.as_slice::<P>() {
92 let len = *len as usize;
93 let view = BinaryView::make_view(
94 &uncompressed_bytes[offset..][..len],
95 buf_index,
96 offset as u32,
97 );
98 unsafe { views.push_unchecked(view) };
100 offset += len;
101 }
102 });
103
104 (uncompressed_bytes.freeze(), views.freeze())
105}
106
107#[cfg(test)]
108mod tests {
109 use rand::Rng;
110 use rand::SeedableRng;
111 use rand::prelude::StdRng;
112 use vortex_array::ArrayRef;
113 use vortex_array::IntoArray;
114 use vortex_array::ToCanonical;
115 use vortex_array::accessor::ArrayAccessor;
116 use vortex_array::arrays::ChunkedArray;
117 use vortex_array::arrays::VarBinArray;
118 use vortex_array::builders::ArrayBuilder;
119 use vortex_array::builders::VarBinViewBuilder;
120 use vortex_dtype::DType;
121 use vortex_dtype::Nullability;
122
123 use crate::fsst_compress;
124 use crate::fsst_train_compressor;
125
126 fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
127 const STRING_COUNT: usize = 1000;
128 let mut rng = StdRng::seed_from_u64(0);
129 let mut strings = Vec::with_capacity(STRING_COUNT);
130
131 for _ in 0..STRING_COUNT {
132 if rng.random_bool(0.9) {
133 strings.push(None)
134 } else {
135 let len = 10 * rng.random_range(50..=150) / 100;
138 strings.push(Some(
139 (0..len)
140 .map(|_| rng.random_range(b'a'..=b'z') as char)
141 .collect::<String>()
142 .into_bytes(),
143 ));
144 }
145 }
146
147 (
148 VarBinArray::from_iter(
149 strings
150 .clone()
151 .into_iter()
152 .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
153 DType::Binary(Nullability::Nullable),
154 ),
155 strings,
156 )
157 }
158
159 fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
160 #[allow(clippy::type_complexity)]
161 let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
162 .map(|_| {
163 let (array, data) = make_data();
164 let compressor = fsst_train_compressor(&array);
165 (fsst_compress(&array, &compressor).into_array(), data)
166 })
167 .unzip();
168
169 (
170 ChunkedArray::from_iter(arr_vec),
171 data_vec.into_iter().flatten().collect(),
172 )
173 }
174
175 #[test]
176 fn test_to_canonical() {
177 let (chunked_arr, data) = make_data_chunked();
178
179 let mut builder =
180 VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
181 chunked_arr.append_to_builder(&mut builder);
182
183 {
184 let arr = builder.finish_into_canonical().into_varbinview();
185 let res1 =
186 arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
187 assert_eq!(data, res1);
188 };
189
190 {
191 let arr2 = chunked_arr.to_varbinview();
192 let res2 =
193 arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
194 assert_eq!(data, res2)
195 };
196 }
197}