Skip to main content

vortex_fsst/compute/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::Array;
9use vortex_array::ArrayRef;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::TakeExecute;
13use vortex_array::arrays::VarBinVTable;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::scalar::Scalar;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_err;
19
20use crate::FSSTArray;
21use crate::FSSTVTable;
22
23impl TakeExecute for FSSTVTable {
24    fn take(
25        array: &FSSTArray,
26        indices: &dyn Array,
27        _ctx: &mut ExecutionCtx,
28    ) -> VortexResult<Option<ArrayRef>> {
29        Ok(Some(
30            FSSTArray::try_new(
31                array
32                    .dtype()
33                    .clone()
34                    .union_nullability(indices.dtype().nullability()),
35                array.symbols().clone(),
36                array.symbol_lengths().clone(),
37                VarBinVTable::take(array.codes(), indices, _ctx)?
38                    .vortex_expect("cannot fail")
39                    .try_into::<VarBinVTable>()
40                    .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
41                array
42                    .uncompressed_lengths()
43                    .take(indices.to_array())?
44                    .fill_null(Scalar::zero_value(
45                        &array.uncompressed_lengths_dtype().clone(),
46                    ))?,
47            )?
48            .into_array(),
49        ))
50    }
51}
52
53#[cfg(test)]
54mod tests {
55    use rstest::rstest;
56    use vortex_array::Array;
57    use vortex_array::arrays::PrimitiveArray;
58    use vortex_array::arrays::VarBinArray;
59    use vortex_array::compute::conformance::consistency::test_array_consistency;
60    use vortex_array::compute::conformance::take::test_take_conformance;
61    use vortex_array::dtype::DType;
62    use vortex_array::dtype::Nullability;
63
64    use crate::FSSTArray;
65    use crate::fsst_compress;
66    use crate::fsst_train_compressor;
67
68    #[test]
69    fn test_take_null() {
70        let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
71        let compr = fsst_train_compressor(&arr);
72        let fsst = fsst_compress(&arr, &compr);
73
74        let idx1: PrimitiveArray = (0..1).collect();
75
76        assert_eq!(
77            fsst.take(idx1.to_array()).unwrap().dtype(),
78            &DType::Utf8(Nullability::NonNullable)
79        );
80
81        let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
82
83        assert_eq!(
84            fsst.take(idx2.to_array()).unwrap().dtype(),
85            &DType::Utf8(Nullability::Nullable)
86        );
87    }
88
89    #[rstest]
90    #[case(VarBinArray::from_iter(
91        ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
92        DType::Utf8(Nullability::NonNullable),
93    ))]
94    #[case(VarBinArray::from_iter(
95        [Some("hello"), None, Some("world"), Some("test"), None],
96        DType::Utf8(Nullability::Nullable),
97    ))]
98    #[case(VarBinArray::from_iter(
99        ["single element"].map(Some),
100        DType::Utf8(Nullability::NonNullable),
101    ))]
102    fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
103        let compressor = fsst_train_compressor(&varbin);
104        let array = fsst_compress(&varbin, &compressor);
105        test_take_conformance(array.as_ref());
106    }
107
108    #[rstest]
109    // Basic string arrays
110    #[case::fsst_simple({
111        let varbin = VarBinArray::from_iter(
112            ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
113            DType::Utf8(Nullability::NonNullable),
114        );
115        let compressor = fsst_train_compressor(&varbin);
116        fsst_compress(&varbin, &compressor)
117    })]
118    // Nullable strings
119    #[case::fsst_nullable({
120        let varbin = VarBinArray::from_iter(
121            [Some("hello"), None, Some("world"), Some("test"), None],
122            DType::Utf8(Nullability::Nullable),
123        );
124        let compressor = fsst_train_compressor(&varbin);
125        fsst_compress(varbin, &compressor)
126    })]
127    // Repetitive patterns (good for FSST compression)
128    #[case::fsst_repetitive({
129        let varbin = VarBinArray::from_iter(
130            ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
131            DType::Utf8(Nullability::NonNullable),
132        );
133        let compressor = fsst_train_compressor(&varbin);
134        fsst_compress(&varbin, &compressor)
135    })]
136    // Edge cases
137    #[case::fsst_single({
138        let varbin = VarBinArray::from_iter(
139            ["single element"].map(Some),
140            DType::Utf8(Nullability::NonNullable),
141        );
142        let compressor = fsst_train_compressor(&varbin);
143        fsst_compress(&varbin, &compressor)
144    })]
145    #[case::fsst_empty_strings({
146        let varbin = VarBinArray::from_iter(
147            ["", "test", "", "hello", ""].map(Some),
148            DType::Utf8(Nullability::NonNullable),
149        );
150        let compressor = fsst_train_compressor(&varbin);
151        fsst_compress(varbin, &compressor)
152    })]
153    // Large arrays
154    #[case::fsst_large({
155        let data: Vec<Option<&str>> = (0..1500)
156            .map(|i| Some(match i % 10 {
157                0 => "https://www.example.com/page",
158                1 => "https://www.test.org/data",
159                2 => "https://www.vortex.dev/docs",
160                3 => "https://www.github.com/apache/arrow",
161                4 => "https://www.rust-lang.org/learn",
162                5 => "SELECT * FROM table WHERE id = ",
163                6 => "INSERT INTO users (name, email) VALUES",
164                7 => "UPDATE records SET status = 'active'",
165                8 => "DELETE FROM logs WHERE timestamp < ",
166                _ => "CREATE TABLE data (id INT, value TEXT)",
167            }))
168            .collect();
169        let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
170        let compressor = fsst_train_compressor(&varbin);
171        fsst_compress(varbin, &compressor)
172    })]
173
174    fn test_fsst_consistency(#[case] array: FSSTArray) {
175        test_array_consistency(array.as_ref());
176    }
177}