1use std::sync::Arc;
5
6use vortex_array::arrays::{BinaryView, VarBinViewArray};
7use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
8use vortex_array::vtable::{CanonicalVTable, ValidityHelper};
9use vortex_array::{Canonical, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::match_each_integer_ptype;
12use vortex_error::{VortexExpect, VortexResult};
13
14use crate::{FSSTArray, FSSTVTable};
15
16impl CanonicalVTable<FSSTVTable> for FSSTVTable {
17 fn canonicalize(array: &FSSTArray) -> VortexResult<Canonical> {
18 let (buffer, views) = fsst_decode_views(array, 0);
19 unsafe {
22 Ok(Canonical::VarBinView(VarBinViewArray::new_unchecked(
23 views,
24 Arc::new([buffer]),
25 array.dtype().clone(),
26 array.codes().validity().clone(),
27 )))
28 }
29 }
30
31 fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
32 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
33 return builder.extend_from_array(&array.to_canonical()?.into_array());
34 };
35
36 let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
40
41 builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask()?);
42 Ok(())
43 }
44}
45
46#[allow(clippy::cast_possible_truncation)]
47fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
48 let bytes = fsst_array.codes().sliced_bytes();
56
57 let uncompressed_lens_array = fsst_array
58 .uncompressed_lengths()
59 .to_primitive()
60 .vortex_expect("uncompressed_lens must be primitive");
61
62 #[allow(clippy::cast_possible_truncation)]
64 let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
65 uncompressed_lens_array
66 .as_slice::<P>()
67 .iter()
68 .map(|x| *x as usize)
69 .sum()
70 });
71
72 let decompressor = fsst_array.decompressor();
74 let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
75 let len =
76 decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
77 unsafe { uncompressed_bytes.set_len(len) };
78
79 let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
81
82 match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
83 let mut offset = 0;
84 for len in uncompressed_lens_array.as_slice::<P>() {
85 let len = *len as usize;
86 let view = BinaryView::make_view(
87 &uncompressed_bytes[offset..][..len],
88 buf_index,
89 offset as u32,
90 );
91 unsafe { views.push_unchecked(view) };
93 offset += len;
94 }
95 });
96
97 (uncompressed_bytes.freeze(), views.freeze())
98}
99
100#[cfg(test)]
101mod tests {
102 use rand::prelude::StdRng;
103 use rand::{Rng, SeedableRng};
104 use vortex_array::accessor::ArrayAccessor;
105 use vortex_array::arrays::{ChunkedArray, VarBinArray};
106 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
107 use vortex_array::{ArrayRef, IntoArray, ToCanonical};
108 use vortex_dtype::{DType, Nullability};
109
110 use crate::{fsst_compress, fsst_train_compressor};
111
112 fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
113 const STRING_COUNT: usize = 1000;
114 let mut rng = StdRng::seed_from_u64(0);
115 let mut strings = Vec::with_capacity(STRING_COUNT);
116
117 for _ in 0..STRING_COUNT {
118 if rng.random_bool(0.9) {
119 strings.push(None)
120 } else {
121 let len = 10 * rng.random_range(50..=150) / 100;
124 strings.push(Some(
125 (0..len)
126 .map(|_| rng.random_range(b'a'..=b'z') as char)
127 .collect::<String>()
128 .into_bytes(),
129 ));
130 }
131 }
132
133 (
134 VarBinArray::from_iter(
135 strings
136 .clone()
137 .into_iter()
138 .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
139 DType::Binary(Nullability::Nullable),
140 )
141 .into_array(),
142 strings,
143 )
144 }
145
146 fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
147 #[allow(clippy::type_complexity)]
148 let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
149 .map(|_| {
150 let (array, data) = make_data();
151 let compressor = fsst_train_compressor(&array).unwrap();
152 (
153 fsst_compress(&array, &compressor).unwrap().into_array(),
154 data,
155 )
156 })
157 .unzip();
158
159 (
160 ChunkedArray::from_iter(arr_vec),
161 data_vec.into_iter().flatten().collect(),
162 )
163 }
164
165 #[test]
166 fn test_to_canonical() {
167 let (chunked_arr, data) = make_data_chunked();
168
169 let mut builder =
170 VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
171 chunked_arr.append_to_builder(&mut builder).unwrap();
172
173 {
174 let arr = builder.finish().to_varbinview().unwrap();
175 let res1 = arr
176 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
177 .unwrap();
178 assert_eq!(data, res1);
179 };
180
181 {
182 let arr2 = chunked_arr.to_varbinview().unwrap();
183 let res2 = arr2
184 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
185 .unwrap();
186 assert_eq!(data, res2)
187 };
188 }
189}