1use std::sync::Arc;
5
6use vortex_array::arrays::{BinaryView, VarBinViewArray};
7use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
8use vortex_array::vtable::{CanonicalVTable, ValidityHelper};
9use vortex_array::{Canonical, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::match_each_integer_ptype;
12
13use crate::{FSSTArray, FSSTVTable};
14
15impl CanonicalVTable<FSSTVTable> for FSSTVTable {
16 fn canonicalize(array: &FSSTArray) -> Canonical {
17 let (buffer, views) = fsst_decode_views(array, 0);
18 unsafe {
21 Canonical::VarBinView(VarBinViewArray::new_unchecked(
22 views,
23 Arc::new([buffer]),
24 array.dtype().clone(),
25 array.codes().validity().clone(),
26 ))
27 }
28 }
29
30 fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) {
31 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
32 return builder.extend_from_array(&array.to_canonical().into_array());
33 };
34
35 let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
39
40 builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
41 }
42}
43
44#[allow(clippy::cast_possible_truncation)]
45fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
46 let bytes = fsst_array.codes().sliced_bytes();
54
55 let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive();
56
57 #[allow(clippy::cast_possible_truncation)]
59 let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
60 uncompressed_lens_array
61 .as_slice::<P>()
62 .iter()
63 .map(|x| *x as usize)
64 .sum()
65 });
66
67 let decompressor = fsst_array.decompressor();
69 let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
70 let len =
71 decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
72 unsafe { uncompressed_bytes.set_len(len) };
73
74 let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
76
77 match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
78 let mut offset = 0;
79 for len in uncompressed_lens_array.as_slice::<P>() {
80 let len = *len as usize;
81 let view = BinaryView::make_view(
82 &uncompressed_bytes[offset..][..len],
83 buf_index,
84 offset as u32,
85 );
86 unsafe { views.push_unchecked(view) };
88 offset += len;
89 }
90 });
91
92 (uncompressed_bytes.freeze(), views.freeze())
93}
94
95#[cfg(test)]
96mod tests {
97 use rand::prelude::StdRng;
98 use rand::{Rng, SeedableRng};
99 use vortex_array::accessor::ArrayAccessor;
100 use vortex_array::arrays::{ChunkedArray, VarBinArray};
101 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
102 use vortex_array::{ArrayRef, IntoArray, ToCanonical};
103 use vortex_dtype::{DType, Nullability};
104
105 use crate::{fsst_compress, fsst_train_compressor};
106
107 fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
108 const STRING_COUNT: usize = 1000;
109 let mut rng = StdRng::seed_from_u64(0);
110 let mut strings = Vec::with_capacity(STRING_COUNT);
111
112 for _ in 0..STRING_COUNT {
113 if rng.random_bool(0.9) {
114 strings.push(None)
115 } else {
116 let len = 10 * rng.random_range(50..=150) / 100;
119 strings.push(Some(
120 (0..len)
121 .map(|_| rng.random_range(b'a'..=b'z') as char)
122 .collect::<String>()
123 .into_bytes(),
124 ));
125 }
126 }
127
128 (
129 VarBinArray::from_iter(
130 strings
131 .clone()
132 .into_iter()
133 .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
134 DType::Binary(Nullability::Nullable),
135 )
136 .into_array(),
137 strings,
138 )
139 }
140
141 fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
142 #[allow(clippy::type_complexity)]
143 let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
144 .map(|_| {
145 let (array, data) = make_data();
146 let compressor = fsst_train_compressor(&array).unwrap();
147 (
148 fsst_compress(&array, &compressor).unwrap().into_array(),
149 data,
150 )
151 })
152 .unzip();
153
154 (
155 ChunkedArray::from_iter(arr_vec),
156 data_vec.into_iter().flatten().collect(),
157 )
158 }
159
160 #[test]
161 fn test_to_canonical() {
162 let (chunked_arr, data) = make_data_chunked();
163
164 let mut builder =
165 VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
166 chunked_arr.append_to_builder(&mut builder);
167
168 {
169 let arr = builder.finish_into_canonical().into_varbinview();
170 let res1 = arr
171 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
172 .unwrap();
173 assert_eq!(data, res1);
174 };
175
176 {
177 let arr2 = chunked_arr.to_varbinview();
178 let res2 = arr2
179 .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
180 .unwrap();
181 assert_eq!(data, res2)
182 };
183 }
184}