vortex_fsst/
canonical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use vortex_array::Canonical;
7use vortex_array::IntoArray;
8use vortex_array::ToCanonical;
9use vortex_array::arrays::VarBinViewArray;
10use vortex_array::builders::ArrayBuilder;
11use vortex_array::builders::VarBinViewBuilder;
12use vortex_array::vtable::CanonicalVTable;
13use vortex_array::vtable::ValidityHelper;
14use vortex_buffer::Buffer;
15use vortex_buffer::BufferMut;
16use vortex_buffer::ByteBuffer;
17use vortex_buffer::ByteBufferMut;
18use vortex_dtype::match_each_integer_ptype;
19use vortex_vector::binaryview::BinaryView;
20
21use crate::FSSTArray;
22use crate::FSSTVTable;
23
24impl CanonicalVTable<FSSTVTable> for FSSTVTable {
25    fn canonicalize(array: &FSSTArray) -> Canonical {
26        let (buffer, views) = fsst_decode_views(array, 0);
27        // SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on
28        //  top of them, so the view pointers will all be valid.
29        unsafe {
30            Canonical::VarBinView(VarBinViewArray::new_unchecked(
31                views,
32                Arc::new([buffer]),
33                array.dtype().clone(),
34                array.codes().validity().clone(),
35            ))
36        }
37    }
38
39    fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) {
40        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
41            return builder.extend_from_array(&array.to_canonical().into_array());
42        };
43
44        // Decompress the whole block of data into a new buffer, and create some views
45        // from it instead.
46
47        let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
48
49        builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
50    }
51}
52
53#[expect(
54    clippy::cast_possible_truncation,
55    reason = "truncation is intentional for buffer index"
56)]
57fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
58    // FSSTArray has two child arrays:
59    //  1. A VarBinArray, which holds the string heap of the compressed codes.
60    //  2. An uncompressed_lengths primitive array, storing the length of each original
61    //     string element.
62    // To speed up canonicalization, we can decompress the entire string-heap in a single
63    // call. We then turn our uncompressed_lengths into an offsets buffer
64    // necessary for a VarBinViewArray and construct the canonical array.
65    let bytes = fsst_array.codes().sliced_bytes();
66
67    let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive();
68
69    // Decompress the full dataset.
70    #[allow(clippy::cast_possible_truncation)]
71    let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
72        uncompressed_lens_array
73            .as_slice::<P>()
74            .iter()
75            .map(|x| *x as usize)
76            .sum()
77    });
78
79    // Bulk-decompress the entire array.
80    let decompressor = fsst_array.decompressor();
81    let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
82    let len =
83        decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
84    unsafe { uncompressed_bytes.set_len(len) };
85
86    // Directly create the binary views.
87    let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
88
89    match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
90        let mut offset = 0;
91        for len in uncompressed_lens_array.as_slice::<P>() {
92            let len = *len as usize;
93            let view = BinaryView::make_view(
94                &uncompressed_bytes[offset..][..len],
95                buf_index,
96                offset as u32,
97            );
98            // SAFETY: we reserved the right capacity beforehand
99            unsafe { views.push_unchecked(view) };
100            offset += len;
101        }
102    });
103
104    (uncompressed_bytes.freeze(), views.freeze())
105}
106
107#[cfg(test)]
108mod tests {
109    use rand::Rng;
110    use rand::SeedableRng;
111    use rand::prelude::StdRng;
112    use vortex_array::ArrayRef;
113    use vortex_array::IntoArray;
114    use vortex_array::ToCanonical;
115    use vortex_array::accessor::ArrayAccessor;
116    use vortex_array::arrays::ChunkedArray;
117    use vortex_array::arrays::VarBinArray;
118    use vortex_array::builders::ArrayBuilder;
119    use vortex_array::builders::VarBinViewBuilder;
120    use vortex_dtype::DType;
121    use vortex_dtype::Nullability;
122
123    use crate::fsst_compress;
124    use crate::fsst_train_compressor;
125
126    fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
127        const STRING_COUNT: usize = 1000;
128        let mut rng = StdRng::seed_from_u64(0);
129        let mut strings = Vec::with_capacity(STRING_COUNT);
130
131        for _ in 0..STRING_COUNT {
132            if rng.random_bool(0.9) {
133                strings.push(None)
134            } else {
135                // Generate a random string with length around `avg_len`. The number of possible
136                // characters within the random string is defined by `unique_chars`.
137                let len = 10 * rng.random_range(50..=150) / 100;
138                strings.push(Some(
139                    (0..len)
140                        .map(|_| rng.random_range(b'a'..=b'z') as char)
141                        .collect::<String>()
142                        .into_bytes(),
143                ));
144            }
145        }
146
147        (
148            VarBinArray::from_iter(
149                strings
150                    .clone()
151                    .into_iter()
152                    .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
153                DType::Binary(Nullability::Nullable),
154            ),
155            strings,
156        )
157    }
158
159    fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
160        #[allow(clippy::type_complexity)]
161        let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
162            .map(|_| {
163                let (array, data) = make_data();
164                let compressor = fsst_train_compressor(&array);
165                (fsst_compress(&array, &compressor).into_array(), data)
166            })
167            .unzip();
168
169        (
170            ChunkedArray::from_iter(arr_vec),
171            data_vec.into_iter().flatten().collect(),
172        )
173    }
174
175    #[test]
176    fn test_to_canonical() {
177        let (chunked_arr, data) = make_data_chunked();
178
179        let mut builder =
180            VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
181        chunked_arr.append_to_builder(&mut builder);
182
183        {
184            let arr = builder.finish_into_canonical().into_varbinview();
185            let res1 =
186                arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
187            assert_eq!(data, res1);
188        };
189
190        {
191            let arr2 = chunked_arr.to_varbinview();
192            let res2 =
193                arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
194            assert_eq!(data, res2)
195        };
196    }
197}