vortex_fsst/
canonical.rs

1use fsst::Decompressor;
2use vortex_array::arrays::{BinaryView, VarBinViewArray};
3use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
4use vortex_array::validity::Validity;
5use vortex_array::vtable::CanonicalVTable;
6use vortex_array::{Canonical, IntoArray, ToCanonical};
7use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::match_each_integer_ptype;
9use vortex_error::VortexResult;
10
11use crate::{FSSTArray, FSSTVTable};
12
13impl CanonicalVTable<FSSTVTable> for FSSTVTable {
14    fn canonicalize(array: &FSSTArray) -> VortexResult<Canonical> {
15        fsst_into_varbin_view(array.decompressor(), array, 0).map(Canonical::VarBinView)
16    }
17
18    fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
19        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
20            return builder.extend_from_array(&array.to_canonical()?.into_array());
21        };
22        let view =
23            fsst_into_varbin_view(array.decompressor(), array, builder.completed_block_count())?;
24
25        builder.push_buffer_and_adjusted_views(
26            view.buffers().iter().cloned(),
27            view.views().iter().cloned(),
28            array.validity_mask()?,
29        );
30        Ok(())
31    }
32}
33
34// Decompresses a fsst encoded array into a varbinview, a block_offset can be passed if the decoding
35// if happening as part of the larger view and is used to set the block_offset in each view.
36fn fsst_into_varbin_view(
37    decompressor: Decompressor,
38    fsst_array: &FSSTArray,
39    block_offset: usize,
40) -> VortexResult<VarBinViewArray> {
41    // FSSTArray has two child arrays:
42    //
43    //  1. A VarBinArray, which holds the string heap of the compressed codes.
44    //  2. An uncompressed_lengths primitive array, storing the length of each original
45    //     string element.
46    //
47    // To speed up canonicalization, we can decompress the entire string-heap in a single
48    // call. We then turn our uncompressed_lengths into an offsets buffer
49    // necessary for a VarBinViewArray and construct the canonical array.
50    let bytes = fsst_array.codes().sliced_bytes();
51
52    let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive()?;
53
54    // Decompres the full dataset.
55    #[allow(clippy::cast_possible_truncation)]
56    let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
57       uncompressed_lens_array.as_slice::<$P>().iter().map(|x| *x as usize).sum()
58    });
59
60    // Bulk-decompress the entire array.
61    let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
62    let len =
63        decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
64    unsafe { uncompressed_bytes.set_len(len) };
65
66    let block_offset = u32::try_from(block_offset)?;
67
68    // Directly create the binary views.
69    let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
70
71    match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
72        let mut offset = 0;
73        for len in uncompressed_lens_array.as_slice::<$P>() {
74            let len = *len as usize;
75            let view = BinaryView::make_view(
76                &uncompressed_bytes[offset..][..len],
77                block_offset,
78                offset as u32,
79            );
80            // SAFETY: we reserved the right capacity beforehand
81            unsafe { views.push_unchecked(view.into()) };
82            offset += len;
83        }
84    });
85
86    let views = views.freeze();
87    let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);
88
89    VarBinViewArray::try_new(
90        views,
91        vec![uncompressed_bytes_array],
92        fsst_array.dtype().clone(),
93        Validity::copy_from_array(fsst_array.as_ref())?,
94    )
95}
96
97#[cfg(test)]
98mod tests {
99    use rand::prelude::StdRng;
100    use rand::{Rng, SeedableRng};
101    use vortex_array::accessor::ArrayAccessor;
102    use vortex_array::arrays::{ChunkedArray, VarBinArray};
103    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
104    use vortex_array::{ArrayRef, IntoArray, ToCanonical};
105    use vortex_dtype::{DType, Nullability};
106
107    use crate::{fsst_compress, fsst_train_compressor};
108
109    fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
110        const STRING_COUNT: usize = 1000;
111        let mut rng = StdRng::seed_from_u64(0);
112        let mut strings = Vec::with_capacity(STRING_COUNT);
113
114        for _ in 0..STRING_COUNT {
115            if rng.random_bool(0.9) {
116                strings.push(None)
117            } else {
118                // Generate a random string with length around `avg_len`. The number of possible
119                // characters within the random string is defined by `unique_chars`.
120                let len = 10 * rng.random_range(50..=150) / 100;
121                strings.push(Some(
122                    (0..len)
123                        .map(|_| rng.random_range(b'a'..=b'z') as char)
124                        .collect::<String>()
125                        .into_bytes(),
126                ));
127            }
128        }
129
130        (
131            VarBinArray::from_iter(
132                strings
133                    .clone()
134                    .into_iter()
135                    .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
136                DType::Binary(Nullability::Nullable),
137            )
138            .into_array(),
139            strings,
140        )
141    }
142
143    fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
144        #[allow(clippy::type_complexity)]
145        let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
146            .map(|_| {
147                let (array, data) = make_data();
148                let compressor = fsst_train_compressor(&array).unwrap();
149                (
150                    fsst_compress(&array, &compressor).unwrap().into_array(),
151                    data,
152                )
153            })
154            .unzip();
155
156        (
157            ChunkedArray::from_iter(arr_vec),
158            data_vec.into_iter().flatten().collect(),
159        )
160    }
161
162    #[test]
163    fn test_to_canonical() {
164        let (chunked_arr, data) = make_data_chunked();
165
166        let mut builder =
167            VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
168        chunked_arr.append_to_builder(&mut builder).unwrap();
169
170        {
171            let arr = builder.finish().to_varbinview().unwrap();
172            let res1 = arr
173                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
174                .unwrap();
175            assert_eq!(data, res1);
176        };
177
178        {
179            let arr2 = chunked_arr.to_varbinview().unwrap();
180            let res2 = arr2
181                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
182                .unwrap();
183            assert_eq!(data, res2)
184        };
185    }
186}