vortex_fsst/
canonical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use vortex_array::arrays::{BinaryView, VarBinViewArray};
7use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
8use vortex_array::vtable::{CanonicalVTable, ValidityHelper};
9use vortex_array::{Canonical, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::match_each_integer_ptype;
12
13use crate::{FSSTArray, FSSTVTable};
14
15impl CanonicalVTable<FSSTVTable> for FSSTVTable {
16    fn canonicalize(array: &FSSTArray) -> Canonical {
17        let (buffer, views) = fsst_decode_views(array, 0);
18        // SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on
19        //  top of them, so the view pointers will all be valid.
20        unsafe {
21            Canonical::VarBinView(VarBinViewArray::new_unchecked(
22                views,
23                Arc::new([buffer]),
24                array.dtype().clone(),
25                array.codes().validity().clone(),
26            ))
27        }
28    }
29
30    fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) {
31        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
32            return builder.extend_from_array(&array.to_canonical().into_array());
33        };
34
35        // Decompress the whole block of data into a new buffer, and create some views
36        // from it instead.
37
38        let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
39
40        builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
41    }
42}
43
44#[allow(clippy::cast_possible_truncation)]
45fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
46    // FSSTArray has two child arrays:
47    //  1. A VarBinArray, which holds the string heap of the compressed codes.
48    //  2. An uncompressed_lengths primitive array, storing the length of each original
49    //     string element.
50    // To speed up canonicalization, we can decompress the entire string-heap in a single
51    // call. We then turn our uncompressed_lengths into an offsets buffer
52    // necessary for a VarBinViewArray and construct the canonical array.
53    let bytes = fsst_array.codes().sliced_bytes();
54
55    let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive();
56
57    // Decompress the full dataset.
58    #[allow(clippy::cast_possible_truncation)]
59    let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
60        uncompressed_lens_array
61            .as_slice::<P>()
62            .iter()
63            .map(|x| *x as usize)
64            .sum()
65    });
66
67    // Bulk-decompress the entire array.
68    let decompressor = fsst_array.decompressor();
69    let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
70    let len =
71        decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
72    unsafe { uncompressed_bytes.set_len(len) };
73
74    // Directly create the binary views.
75    let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
76
77    match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
78        let mut offset = 0;
79        for len in uncompressed_lens_array.as_slice::<P>() {
80            let len = *len as usize;
81            let view = BinaryView::make_view(
82                &uncompressed_bytes[offset..][..len],
83                buf_index,
84                offset as u32,
85            );
86            // SAFETY: we reserved the right capacity beforehand
87            unsafe { views.push_unchecked(view) };
88            offset += len;
89        }
90    });
91
92    (uncompressed_bytes.freeze(), views.freeze())
93}
94
95#[cfg(test)]
96mod tests {
97    use rand::prelude::StdRng;
98    use rand::{Rng, SeedableRng};
99    use vortex_array::accessor::ArrayAccessor;
100    use vortex_array::arrays::{ChunkedArray, VarBinArray};
101    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
102    use vortex_array::{ArrayRef, IntoArray, ToCanonical};
103    use vortex_dtype::{DType, Nullability};
104
105    use crate::{fsst_compress, fsst_train_compressor};
106
107    fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
108        const STRING_COUNT: usize = 1000;
109        let mut rng = StdRng::seed_from_u64(0);
110        let mut strings = Vec::with_capacity(STRING_COUNT);
111
112        for _ in 0..STRING_COUNT {
113            if rng.random_bool(0.9) {
114                strings.push(None)
115            } else {
116                // Generate a random string with length around `avg_len`. The number of possible
117                // characters within the random string is defined by `unique_chars`.
118                let len = 10 * rng.random_range(50..=150) / 100;
119                strings.push(Some(
120                    (0..len)
121                        .map(|_| rng.random_range(b'a'..=b'z') as char)
122                        .collect::<String>()
123                        .into_bytes(),
124                ));
125            }
126        }
127
128        (
129            VarBinArray::from_iter(
130                strings
131                    .clone()
132                    .into_iter()
133                    .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
134                DType::Binary(Nullability::Nullable),
135            )
136            .into_array(),
137            strings,
138        )
139    }
140
141    fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
142        #[allow(clippy::type_complexity)]
143        let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
144            .map(|_| {
145                let (array, data) = make_data();
146                let compressor = fsst_train_compressor(&array).unwrap();
147                (
148                    fsst_compress(&array, &compressor).unwrap().into_array(),
149                    data,
150                )
151            })
152            .unzip();
153
154        (
155            ChunkedArray::from_iter(arr_vec),
156            data_vec.into_iter().flatten().collect(),
157        )
158    }
159
160    #[test]
161    fn test_to_canonical() {
162        let (chunked_arr, data) = make_data_chunked();
163
164        let mut builder =
165            VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
166        chunked_arr.append_to_builder(&mut builder);
167
168        {
169            let arr = builder.finish_into_canonical().into_varbinview();
170            let res1 = arr
171                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
172                .unwrap();
173            assert_eq!(data, res1);
174        };
175
176        {
177            let arr2 = chunked_arr.to_varbinview();
178            let res2 = arr2
179                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
180                .unwrap();
181            assert_eq!(data, res2)
182        };
183    }
184}