vortex_fsst/
canonical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use vortex_array::arrays::VarBinViewArray;
7use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
8use vortex_array::vtable::{CanonicalVTable, ValidityHelper};
9use vortex_array::{Canonical, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::match_each_integer_ptype;
12use vortex_vector::binaryview::BinaryView;
13
14use crate::{FSSTArray, FSSTVTable};
15
16impl CanonicalVTable<FSSTVTable> for FSSTVTable {
17    fn canonicalize(array: &FSSTArray) -> Canonical {
18        let (buffer, views) = fsst_decode_views(array, 0);
19        // SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on
20        //  top of them, so the view pointers will all be valid.
21        unsafe {
22            Canonical::VarBinView(VarBinViewArray::new_unchecked(
23                views,
24                Arc::new([buffer]),
25                array.dtype().clone(),
26                array.codes().validity().clone(),
27            ))
28        }
29    }
30
31    fn append_to_builder(array: &FSSTArray, builder: &mut dyn ArrayBuilder) {
32        let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
33            return builder.extend_from_array(&array.to_canonical().into_array());
34        };
35
36        // Decompress the whole block of data into a new buffer, and create some views
37        // from it instead.
38
39        let (buffer, views) = fsst_decode_views(array, builder.completed_block_count());
40
41        builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
42    }
43}
44
45#[allow(clippy::cast_possible_truncation)]
46fn fsst_decode_views(fsst_array: &FSSTArray, buf_index: u32) -> (ByteBuffer, Buffer<BinaryView>) {
47    // FSSTArray has two child arrays:
48    //  1. A VarBinArray, which holds the string heap of the compressed codes.
49    //  2. An uncompressed_lengths primitive array, storing the length of each original
50    //     string element.
51    // To speed up canonicalization, we can decompress the entire string-heap in a single
52    // call. We then turn our uncompressed_lengths into an offsets buffer
53    // necessary for a VarBinViewArray and construct the canonical array.
54    let bytes = fsst_array.codes().sliced_bytes();
55
56    let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive();
57
58    // Decompress the full dataset.
59    #[allow(clippy::cast_possible_truncation)]
60    let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
61        uncompressed_lens_array
62            .as_slice::<P>()
63            .iter()
64            .map(|x| *x as usize)
65            .sum()
66    });
67
68    // Bulk-decompress the entire array.
69    let decompressor = fsst_array.decompressor();
70    let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
71    let len =
72        decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
73    unsafe { uncompressed_bytes.set_len(len) };
74
75    // Directly create the binary views.
76    let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());
77
78    match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
79        let mut offset = 0;
80        for len in uncompressed_lens_array.as_slice::<P>() {
81            let len = *len as usize;
82            let view = BinaryView::make_view(
83                &uncompressed_bytes[offset..][..len],
84                buf_index,
85                offset as u32,
86            );
87            // SAFETY: we reserved the right capacity beforehand
88            unsafe { views.push_unchecked(view) };
89            offset += len;
90        }
91    });
92
93    (uncompressed_bytes.freeze(), views.freeze())
94}
95
96#[cfg(test)]
97mod tests {
98    use rand::prelude::StdRng;
99    use rand::{Rng, SeedableRng};
100    use vortex_array::accessor::ArrayAccessor;
101    use vortex_array::arrays::{ChunkedArray, VarBinArray};
102    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
103    use vortex_array::{ArrayRef, IntoArray, ToCanonical};
104    use vortex_dtype::{DType, Nullability};
105
106    use crate::{fsst_compress, fsst_train_compressor};
107
108    fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
109        const STRING_COUNT: usize = 1000;
110        let mut rng = StdRng::seed_from_u64(0);
111        let mut strings = Vec::with_capacity(STRING_COUNT);
112
113        for _ in 0..STRING_COUNT {
114            if rng.random_bool(0.9) {
115                strings.push(None)
116            } else {
117                // Generate a random string with length around `avg_len`. The number of possible
118                // characters within the random string is defined by `unique_chars`.
119                let len = 10 * rng.random_range(50..=150) / 100;
120                strings.push(Some(
121                    (0..len)
122                        .map(|_| rng.random_range(b'a'..=b'z') as char)
123                        .collect::<String>()
124                        .into_bytes(),
125                ));
126            }
127        }
128
129        (
130            VarBinArray::from_iter(
131                strings
132                    .clone()
133                    .into_iter()
134                    .map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
135                DType::Binary(Nullability::Nullable),
136            ),
137            strings,
138        )
139    }
140
141    fn make_data_chunked() -> (ChunkedArray, Vec<Option<Vec<u8>>>) {
142        #[allow(clippy::type_complexity)]
143        let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
144            .map(|_| {
145                let (array, data) = make_data();
146                let compressor = fsst_train_compressor(&array);
147                (fsst_compress(&array, &compressor).into_array(), data)
148            })
149            .unzip();
150
151        (
152            ChunkedArray::from_iter(arr_vec),
153            data_vec.into_iter().flatten().collect(),
154        )
155    }
156
157    #[test]
158    fn test_to_canonical() {
159        let (chunked_arr, data) = make_data_chunked();
160
161        let mut builder =
162            VarBinViewBuilder::with_capacity(chunked_arr.dtype().clone(), chunked_arr.len());
163        chunked_arr.append_to_builder(&mut builder);
164
165        {
166            let arr = builder.finish_into_canonical().into_varbinview();
167            let res1 =
168                arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
169            assert_eq!(data, res1);
170        };
171
172        {
173            let arr2 = chunked_arr.to_varbinview();
174            let res2 =
175                arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
176            assert_eq!(data, res2)
177        };
178    }
179}