vortex_fsst/
canonical.rs

1use fsst::Decompressor;
2use vortex_array::arrays::{BinaryView, VarBinViewArray};
3use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
4use vortex_array::validity::Validity;
5use vortex_array::vtable::CanonicalVTable;
6use vortex_array::{Canonical, IntoArray, ToCanonical};
7use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::match_each_integer_ptype;
9use vortex_error::VortexResult;
10
11use crate::{FSSTArray, FSSTVTable};
12
13impl CanonicalVTable<FSSTVTable> for FSSTVTable {
14    fn canonicalize(array: &FSSTArray) -> VortexResult<Canonical> {
15        fsst_into_varbin_view(array.decompressor(), array, 0).map(Canonical::VarBinView)
16    }
17
18    fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
19        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
20            return builder.extend_from_array(&array.to_canonical()?.into_array());
21        };
22        let view =
23            fsst_into_varbin_view(array.decompressor(), array, builder.completed_block_count())?;
24
25        builder.push_buffer_and_adjusted_views(
26            view.buffers().iter().cloned(),
27            view.views().iter().cloned(),
28            array.validity_mask()?,
29        );
30        Ok(())
31    }
32}
33
34// Decompresses a fsst encoded array into a varbinview, a block_offset can be passed if the decoding
35// if happening as part of the larger view and is used to set the block_offset in each view.
36#[allow(clippy::cast_possible_truncation)]
37fn fsst_into_varbin_view(
38    decompressor: Decompressor,
39    fsst_array: &FSSTArray,
40    block_offset: usize,
41) -> VortexResult<VarBinViewArray> {
42    // FSSTArray has two child arrays:
43    //
44    //  1. A VarBinArray, which holds the string heap of the compressed codes.
45    //  2. An uncompressed_lengths primitive array, storing the length of each original
46    //     string element.
47    //
48    // To speed up canonicalization, we can decompress the entire string-heap in a single
49    // call. We then turn our uncompressed_lengths into an offsets buffer
50    // necessary for a VarBinViewArray and construct the canonical array.
51    let bytes = fsst_array.codes().sliced_bytes();
52
53    let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive()?;
54
55    // Decompres the full dataset.
56    #[allow(clippy::cast_possible_truncation)]
57    let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
58        uncompressed_lens_array
59            .as_slice::<P>()
60            .iter()
61            .map(|x| *x as usize)
62            .sum()
63    });
64
65    // Bulk-decompress the entire array.
66    let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
67    let len =
68        decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
69    unsafe { uncompressed_bytes.set_len(len) };
70
71    let block_offset = u32::try_from(block_offset)?;
72
73    // Directly create the binary views.
74    let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
75
76    match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
77        let mut offset = 0;
78        for len in uncompressed_lens_array.as_slice::<P>() {
79            let len = *len as usize;
80            let view = BinaryView::make_view(
81                &uncompressed_bytes[offset..][..len],
82                block_offset,
83                offset as u32,
84            );
85            // SAFETY: we reserved the right capacity beforehand
86            unsafe { views.push_unchecked(view) };
87            offset += len;
88        }
89    });
90
91    let views = views.freeze();
92    let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);
93
94    VarBinViewArray::try_new(
95        views,
96        vec![uncompressed_bytes_array],
97        fsst_array.dtype().clone(),
98        Validity::copy_from_array(fsst_array.as_ref())?,
99    )
100}
101
102#[cfg(test)]
103mod tests {
104    use rand::prelude::StdRng;
105    use rand::{Rng, SeedableRng};
106    use vortex_array::accessor::ArrayAccessor;
107    use vortex_array::arrays::{ChunkedArray, VarBinArray};
108    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
109    use vortex_array::{ArrayRef, IntoArray, ToCanonical};
110    use vortex_dtype::{DType, Nullability};
111
112    use crate::{fsst_compress, fsst_train_compressor};
113
114    fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
115        const STRING_COUNT: usize = 1000;
116        let mut rng = StdRng::seed_from_u64(0);
117        let mut strings = Vec::with_capacity(STRING_COUNT);
118
119        for _ in 0..STRING_COUNT {
120            if rng.random_bool(0.9) {
121                strings.push(None)
122            } else {
123                // Generate a random string with length around `avg_len`. The number of possible
124                // characters within the random string is defined by `unique_chars`.
125                let len = 10 * rng.random_range(50..=150) / 100;
126                strings.push(Some(
127                    (0..len)
128                        .map(|_| rng.random_range(b'a'..=b'z') as char)
129                        .collect::<String>()
130                        .into_bytes(),
131                ));
132            }
133        }
134
135        (
136            VarBinArray::from_iter(
137                strings
138                    .clone()
139                    .into_iter()
140                    .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
141                DType::Binary(Nullability::Nullable),
142            )
143            .into_array(),
144            strings,
145        )
146    }
147
148    fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
149        #[allow(clippy::type_complexity)]
150        let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
151            .map(|_| {
152                let (array, data) = make_data();
153                let compressor = fsst_train_compressor(&array).unwrap();
154                (
155                    fsst_compress(&array, &compressor).unwrap().into_array(),
156                    data,
157                )
158            })
159            .unzip();
160
161        (
162            ChunkedArray::from_iter(arr_vec),
163            data_vec.into_iter().flatten().collect(),
164        )
165    }
166
167    #[test]
168    fn test_to_canonical() {
169        let (chunked_arr, data) = make_data_chunked();
170
171        let mut builder =
172            VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
173        chunked_arr.append_to_builder(&mut builder).unwrap();
174
175        {
176            let arr = builder.finish().to_varbinview().unwrap();
177            let res1 = arr
178                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
179                .unwrap();
180            assert_eq!(data, res1);
181        };
182
183        {
184            let arr2 = chunked_arr.to_varbinview().unwrap();
185            let res2 = arr2
186                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
187                .unwrap();
188            assert_eq!(data, res2)
189        };
190    }
191}