vortex_fsst/compute/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::arrays::VarBinVTable;
9use vortex_array::compute::{TakeKernel, TakeKernelAdapter, fill_null, take};
10use vortex_array::{Array, ArrayRef, IntoArray, register_kernel};
11use vortex_error::VortexResult;
12use vortex_scalar::{Scalar, ScalarValue};
13
14use crate::{FSSTArray, FSSTVTable};
15
16impl TakeKernel for FSSTVTable {
17    // Take on an FSSTArray is a simple take on the codes array.
18    fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
19        Ok(FSSTArray::try_new(
20            array
21                .dtype()
22                .clone()
23                .union_nullability(indices.dtype().nullability()),
24            array.symbols().clone(),
25            array.symbol_lengths().clone(),
26            take(array.codes().as_ref(), indices)?
27                .as_::<VarBinVTable>()
28                .clone(),
29            fill_null(
30                &take(array.uncompressed_lengths(), indices)?,
31                &Scalar::new(
32                    array.uncompressed_lengths_dtype().clone(),
33                    ScalarValue::from(0),
34                ),
35            )?,
36        )?
37        .into_array())
38    }
39}
40
41register_kernel!(TakeKernelAdapter(FSSTVTable).lift());
42
43#[cfg(test)]
44mod tests {
45    use rstest::rstest;
46    use vortex_array::arrays::{PrimitiveArray, VarBinArray};
47    use vortex_array::compute::conformance::consistency::test_array_consistency;
48    use vortex_array::compute::conformance::take::test_take_conformance;
49    use vortex_array::compute::take;
50    use vortex_dtype::{DType, Nullability};
51
52    use crate::{FSSTArray, fsst_compress, fsst_train_compressor};
53
54    #[test]
55    fn test_take_null() {
56        let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
57        let compr = fsst_train_compressor(&arr);
58        let fsst = fsst_compress(&arr, &compr);
59
60        let idx1: PrimitiveArray = (0..1).collect();
61
62        assert_eq!(
63            take(fsst.as_ref(), idx1.as_ref()).unwrap().dtype(),
64            &DType::Utf8(Nullability::NonNullable)
65        );
66
67        let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
68
69        assert_eq!(
70            take(fsst.as_ref(), idx2.as_ref()).unwrap().dtype(),
71            &DType::Utf8(Nullability::Nullable)
72        );
73    }
74
75    #[rstest]
76    #[case(VarBinArray::from_iter(
77        ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
78        DType::Utf8(Nullability::NonNullable),
79    ))]
80    #[case(VarBinArray::from_iter(
81        [Some("hello"), None, Some("world"), Some("test"), None],
82        DType::Utf8(Nullability::Nullable),
83    ))]
84    #[case(VarBinArray::from_iter(
85        ["single element"].map(Some),
86        DType::Utf8(Nullability::NonNullable),
87    ))]
88    fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
89        let compressor = fsst_train_compressor(&varbin);
90        let array = fsst_compress(&varbin, &compressor);
91        test_take_conformance(array.as_ref());
92    }
93
94    #[rstest]
95    // Basic string arrays
96    #[case::fsst_simple({
97        let varbin = VarBinArray::from_iter(
98            ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
99            DType::Utf8(Nullability::NonNullable),
100        );
101        let compressor = fsst_train_compressor(&varbin);
102        fsst_compress(&varbin, &compressor)
103    })]
104    // Nullable strings
105    #[case::fsst_nullable({
106        let varbin = VarBinArray::from_iter(
107            [Some("hello"), None, Some("world"), Some("test"), None],
108            DType::Utf8(Nullability::Nullable),
109        );
110        let compressor = fsst_train_compressor(&varbin);
111        fsst_compress(varbin, &compressor)
112    })]
113    // Repetitive patterns (good for FSST compression)
114    #[case::fsst_repetitive({
115        let varbin = VarBinArray::from_iter(
116            ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
117            DType::Utf8(Nullability::NonNullable),
118        );
119        let compressor = fsst_train_compressor(&varbin);
120        fsst_compress(&varbin, &compressor)
121    })]
122    // Edge cases
123    #[case::fsst_single({
124        let varbin = VarBinArray::from_iter(
125            ["single element"].map(Some),
126            DType::Utf8(Nullability::NonNullable),
127        );
128        let compressor = fsst_train_compressor(&varbin);
129        fsst_compress(&varbin, &compressor)
130    })]
131    #[case::fsst_empty_strings({
132        let varbin = VarBinArray::from_iter(
133            ["", "test", "", "hello", ""].map(Some),
134            DType::Utf8(Nullability::NonNullable),
135        );
136        let compressor = fsst_train_compressor(&varbin);
137        fsst_compress(varbin, &compressor)
138    })]
139    // Large arrays
140    #[case::fsst_large({
141        let data: Vec<Option<&str>> = (0..1500)
142            .map(|i| Some(match i % 10 {
143                0 => "https://www.example.com/page",
144                1 => "https://www.test.org/data",
145                2 => "https://www.vortex.dev/docs",
146                3 => "https://www.github.com/apache/arrow",
147                4 => "https://www.rust-lang.org/learn",
148                5 => "SELECT * FROM table WHERE id = ",
149                6 => "INSERT INTO users (name, email) VALUES",
150                7 => "UPDATE records SET status = 'active'",
151                8 => "DELETE FROM logs WHERE timestamp < ",
152                _ => "CREATE TABLE data (id INT, value TEXT)",
153            }))
154            .collect();
155        let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
156        let compressor = fsst_train_compressor(&varbin);
157        fsst_compress(varbin, &compressor)
158    })]
159
160    fn test_fsst_consistency(#[case] array: FSSTArray) {
161        test_array_consistency(array.as_ref());
162    }
163}