1use std::sync::Arc;
5
6use vortex_array::arrays::VarBinViewArray;
7use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
8use vortex_array::vtable::{CanonicalVTable, ValidityHelper};
9use vortex_array::{Canonical, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::match_each_integer_ptype;
12use vortex_vector::binaryview::BinaryView;
13
14use crate::{FSSTArray, FSSTVTable};
15
16impl CanonicalVTable<FSSTVTable> for FSSTVTable {
17 fn canonicalize(array: &FSSTArray) -> Canonical {
18 let (buffer, views) = fsst_decode_views(array, 0);
19 unsafe {
22 Canonical::VarBinView(VarBinViewArray::new_unchecked(
23 views,
24 Arc::new([buffer]),
25 array.dtype().clone(),
26 array.codes().validity().clone(),
27 ))
28 }
29 }
30
31 fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) {
32 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
33 return builder.extend_from_array(&array.to_canonical().into_array());
34 };
35
36 let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
40
41 builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
42 }
43}
44
45#[allow(clippy::cast_possible_truncation)]
46fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
47 let bytes = fsst_array.codes().sliced_bytes();
55
56 let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive();
57
58 #[allow(clippy::cast_possible_truncation)]
60 let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
61 uncompressed_lens_array
62 .as_slice::<P>()
63 .iter()
64 .map(|x| *x as usize)
65 .sum()
66 });
67
68 let decompressor = fsst_array.decompressor();
70 let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
71 let len =
72 decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
73 unsafe { uncompressed_bytes.set_len(len) };
74
75 let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
77
78 match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
79 let mut offset = 0;
80 for len in uncompressed_lens_array.as_slice::<P>() {
81 let len = *len as usize;
82 let view = BinaryView::make_view(
83 &uncompressed_bytes[offset..][..len],
84 buf_index,
85 offset as u32,
86 );
87 unsafe { views.push_unchecked(view) };
89 offset += len;
90 }
91 });
92
93 (uncompressed_bytes.freeze(), views.freeze())
94}
95
96#[cfg(test)]
97mod tests {
98 use rand::prelude::StdRng;
99 use rand::{Rng, SeedableRng};
100 use vortex_array::accessor::ArrayAccessor;
101 use vortex_array::arrays::{ChunkedArray, VarBinArray};
102 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
103 use vortex_array::{ArrayRef, IntoArray, ToCanonical};
104 use vortex_dtype::{DType, Nullability};
105
106 use crate::{fsst_compress, fsst_train_compressor};
107
108 fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
109 const STRING_COUNT: usize = 1000;
110 let mut rng = StdRng::seed_from_u64(0);
111 let mut strings = Vec::with_capacity(STRING_COUNT);
112
113 for _ in 0..STRING_COUNT {
114 if rng.random_bool(0.9) {
115 strings.push(None)
116 } else {
117 let len = 10 * rng.random_range(50..=150) / 100;
120 strings.push(Some(
121 (0..len)
122 .map(|_| rng.random_range(b'a'..=b'z') as char)
123 .collect::<String>()
124 .into_bytes(),
125 ));
126 }
127 }
128
129 (
130 VarBinArray::from_iter(
131 strings
132 .clone()
133 .into_iter()
134 .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
135 DType::Binary(Nullability::Nullable),
136 ),
137 strings,
138 )
139 }
140
141 fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
142 #[allow(clippy::type_complexity)]
143 let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
144 .map(|_| {
145 let (array, data) = make_data();
146 let compressor = fsst_train_compressor(&array);
147 (fsst_compress(&array, &compressor).into_array(), data)
148 })
149 .unzip();
150
151 (
152 ChunkedArray::from_iter(arr_vec),
153 data_vec.into_iter().flatten().collect(),
154 )
155 }
156
157 #[test]
158 fn test_to_canonical() {
159 let (chunked_arr, data) = make_data_chunked();
160
161 let mut builder =
162 VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
163 chunked_arr.append_to_builder(&mut builder);
164
165 {
166 let arr = builder.finish_into_canonical().into_varbinview();
167 let res1 =
168 arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
169 assert_eq!(data, res1);
170 };
171
172 {
173 let arr2 = chunked_arr.to_varbinview();
174 let res2 =
175 arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
176 assert_eq!(data, res2)
177 };
178 }
179}