vortex_fsst/
canonical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use vortex_array::arrays::{BinaryView, VarBinViewArray};
7use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
8use vortex_array::vtable::{CanonicalVTable, ValidityHelper};
9use vortex_array::{Canonical, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::match_each_integer_ptype;
12use vortex_error::{VortexExpect, VortexResult};
13
14use crate::{FSSTArray, FSSTVTable};
15
16impl CanonicalVTable<FSSTVTable> for FSSTVTable {
17    fn canonicalize(array: &FSSTArray) -> VortexResult<Canonical> {
18        let (buffer, views) = fsst_decode_views(array, 0);
19        // SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on
20        //  top of them, so the view pointers will all be valid.
21        unsafe {
22            Ok(Canonical::VarBinView(VarBinViewArray::new_unchecked(
23                views,
24                Arc::new([buffer]),
25                array.dtype().clone(),
26                array.codes().validity().clone(),
27            )))
28        }
29    }
30
31    fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
32        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
33            return builder.extend_from_array(&array.to_canonical()?.into_array());
34        };
35
36        // Decompress the whole block of data into a new buffer, and create some views
37        // from it instead.
38
39        let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
40
41        builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask()?);
42        Ok(())
43    }
44}
45
46#[allow(clippy::cast_possible_truncation)]
47fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
48    // FSSTArray has two child arrays:
49    //  1. A VarBinArray, which holds the string heap of the compressed codes.
50    //  2. An uncompressed_lengths primitive array, storing the length of each original
51    //     string element.
52    // To speed up canonicalization, we can decompress the entire string-heap in a single
53    // call. We then turn our uncompressed_lengths into an offsets buffer
54    // necessary for a VarBinViewArray and construct the canonical array.
55    let bytes = fsst_array.codes().sliced_bytes();
56
57    let uncompressed_lens_array = fsst_array
58        .uncompressed_lengths()
59        .to_primitive()
60        .vortex_expect("uncompressed_lens must be primitive");
61
62    // Decompres the full dataset.
63    #[allow(clippy::cast_possible_truncation)]
64    let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
65        uncompressed_lens_array
66            .as_slice::<P>()
67            .iter()
68            .map(|x| *x as usize)
69            .sum()
70    });
71
72    // Bulk-decompress the entire array.
73    let decompressor = fsst_array.decompressor();
74    let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
75    let len =
76        decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
77    unsafe { uncompressed_bytes.set_len(len) };
78
79    // Directly create the binary views.
80    let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
81
82    match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
83        let mut offset = 0;
84        for len in uncompressed_lens_array.as_slice::<P>() {
85            let len = *len as usize;
86            let view = BinaryView::make_view(
87                &uncompressed_bytes[offset..][..len],
88                buf_index,
89                offset as u32,
90            );
91            // SAFETY: we reserved the right capacity beforehand
92            unsafe { views.push_unchecked(view) };
93            offset += len;
94        }
95    });
96
97    (uncompressed_bytes.freeze(), views.freeze())
98}
99
100#[cfg(test)]
101mod tests {
102    use rand::prelude::StdRng;
103    use rand::{Rng, SeedableRng};
104    use vortex_array::accessor::ArrayAccessor;
105    use vortex_array::arrays::{ChunkedArray, VarBinArray};
106    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
107    use vortex_array::{ArrayRef, IntoArray, ToCanonical};
108    use vortex_dtype::{DType, Nullability};
109
110    use crate::{fsst_compress, fsst_train_compressor};
111
112    fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
113        const STRING_COUNT: usize = 1000;
114        let mut rng = StdRng::seed_from_u64(0);
115        let mut strings = Vec::with_capacity(STRING_COUNT);
116
117        for _ in 0..STRING_COUNT {
118            if rng.random_bool(0.9) {
119                strings.push(None)
120            } else {
121                // Generate a random string with length around `avg_len`. The number of possible
122                // characters within the random string is defined by `unique_chars`.
123                let len = 10 * rng.random_range(50..=150) / 100;
124                strings.push(Some(
125                    (0..len)
126                        .map(|_| rng.random_range(b'a'..=b'z') as char)
127                        .collect::<String>()
128                        .into_bytes(),
129                ));
130            }
131        }
132
133        (
134            VarBinArray::from_iter(
135                strings
136                    .clone()
137                    .into_iter()
138                    .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
139                DType::Binary(Nullability::Nullable),
140            )
141            .into_array(),
142            strings,
143        )
144    }
145
146    fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
147        #[allow(clippy::type_complexity)]
148        let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
149            .map(|_| {
150                let (array, data) = make_data();
151                let compressor = fsst_train_compressor(&array).unwrap();
152                (
153                    fsst_compress(&array, &compressor).unwrap().into_array(),
154                    data,
155                )
156            })
157            .unzip();
158
159        (
160            ChunkedArray::from_iter(arr_vec),
161            data_vec.into_iter().flatten().collect(),
162        )
163    }
164
165    #[test]
166    fn test_to_canonical() {
167        let (chunked_arr, data) = make_data_chunked();
168
169        let mut builder =
170            VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
171        chunked_arr.append_to_builder(&mut builder).unwrap();
172
173        {
174            let arr = builder.finish().to_varbinview().unwrap();
175            let res1 = arr
176                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
177                .unwrap();
178            assert_eq!(data, res1);
179        };
180
181        {
182            let arr2 = chunked_arr.to_varbinview().unwrap();
183            let res2 = arr2
184                .with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
185                .unwrap();
186            assert_eq!(data, res2)
187        };
188    }
189}