Skip to main content

vortex_fsst/compute/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod cast;
5mod compare;
6mod filter;
7mod like;
8
9use vortex_array::ArrayRef;
10use vortex_array::ArrayView;
11use vortex_array::ExecutionCtx;
12use vortex_array::IntoArray;
13use vortex_array::arrays::VarBin;
14use vortex_array::arrays::dict::TakeExecute;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::scalar::Scalar;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20
21use crate::FSST;
22use crate::FSSTArrayExt;
23
24impl TakeExecute for FSST {
25    fn take(
26        array: ArrayView<'_, Self>,
27        indices: &ArrayRef,
28        ctx: &mut ExecutionCtx,
29    ) -> VortexResult<Option<ArrayRef>> {
30        Ok(Some(
31            FSST::try_new(
32                array
33                    .dtype()
34                    .clone()
35                    .union_nullability(indices.dtype().nullability()),
36                array.symbols().clone(),
37                array.symbol_lengths().clone(),
38                {
39                    let codes = array.codes();
40                    let codes = codes.as_view();
41                    <VarBin as TakeExecute>::take(codes, indices, ctx)?
42                        .vortex_expect("VarBin take kernel always returns Some")
43                }
44                .try_downcast::<VarBin>()
45                .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
46                array
47                    .uncompressed_lengths()
48                    .take(indices.clone())?
49                    .fill_null(Scalar::zero_value(
50                        &array.uncompressed_lengths_dtype().clone(),
51                    ))?,
52                ctx,
53            )?
54            .into_array(),
55        ))
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use rstest::rstest;
62    use vortex_array::ExecutionCtx;
63    use vortex_array::IntoArray;
64    use vortex_array::LEGACY_SESSION;
65    use vortex_array::VortexSessionExecute;
66    use vortex_array::arrays::PrimitiveArray;
67    use vortex_array::arrays::VarBinArray;
68    use vortex_array::compute::conformance::consistency::test_array_consistency;
69    use vortex_array::compute::conformance::take::test_take_conformance;
70    use vortex_array::dtype::DType;
71    use vortex_array::dtype::Nullability;
72
73    use crate::FSSTArray;
74    use crate::fsst_compress;
75    use crate::fsst_train_compressor;
76
77    #[test]
78    fn test_take_null() {
79        let mut ctx = LEGACY_SESSION.create_execution_ctx();
80        let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
81        let compr = fsst_train_compressor(&arr);
82        let fsst = fsst_compress(&arr, arr.len(), arr.dtype(), &compr, &mut ctx);
83
84        let idx1: PrimitiveArray = (0..1).collect();
85
86        assert_eq!(
87            fsst.take(idx1.into_array()).unwrap().dtype(),
88            &DType::Utf8(Nullability::NonNullable)
89        );
90
91        let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
92
93        assert_eq!(
94            fsst.take(idx2.into_array()).unwrap().dtype(),
95            &DType::Utf8(Nullability::Nullable)
96        );
97    }
98
99    #[rstest]
100    #[case(VarBinArray::from_iter(
101        ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
102        DType::Utf8(Nullability::NonNullable),
103    ))]
104    #[case(VarBinArray::from_iter(
105        [Some("hello"), None, Some("world"), Some("test"), None],
106        DType::Utf8(Nullability::Nullable),
107    ))]
108    #[case(VarBinArray::from_iter(
109        ["single element"].map(Some),
110        DType::Utf8(Nullability::NonNullable),
111    ))]
112    fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
113        let mut ctx = LEGACY_SESSION.create_execution_ctx();
114        let compressor = fsst_train_compressor(&varbin);
115        let array = fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, &mut ctx);
116        test_take_conformance(&array.into_array());
117    }
118
119    type FsstBuilder = fn(&mut ExecutionCtx) -> FSSTArray;
120
121    #[rstest]
122    // Basic string arrays
123    #[case::fsst_simple(|ctx: &mut ExecutionCtx| {
124        let varbin = VarBinArray::from_iter(
125            ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
126            DType::Utf8(Nullability::NonNullable),
127        );
128        let compressor = fsst_train_compressor(&varbin);
129        fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx)
130    })]
131    // Nullable strings
132    #[case::fsst_nullable(|ctx: &mut ExecutionCtx| {
133        let varbin = VarBinArray::from_iter(
134            [Some("hello"), None, Some("world"), Some("test"), None],
135            DType::Utf8(Nullability::Nullable),
136        );
137        let compressor = fsst_train_compressor(&varbin);
138        let len = varbin.len();
139        let dtype = varbin.dtype().clone();
140        fsst_compress(varbin, len, &dtype, &compressor, ctx)
141    })]
142    // Repetitive patterns (good for FSST compression)
143    #[case::fsst_repetitive(|ctx: &mut ExecutionCtx| {
144        let varbin = VarBinArray::from_iter(
145            ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
146            DType::Utf8(Nullability::NonNullable),
147        );
148        let compressor = fsst_train_compressor(&varbin);
149        fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx)
150    })]
151    // Edge cases
152    #[case::fsst_single(|ctx: &mut ExecutionCtx| {
153        let varbin = VarBinArray::from_iter(
154            ["single element"].map(Some),
155            DType::Utf8(Nullability::NonNullable),
156        );
157        let compressor = fsst_train_compressor(&varbin);
158        fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx)
159    })]
160    #[case::fsst_empty_strings(|ctx: &mut ExecutionCtx| {
161        let varbin = VarBinArray::from_iter(
162            ["", "test", "", "hello", ""].map(Some),
163            DType::Utf8(Nullability::NonNullable),
164        );
165        let compressor = fsst_train_compressor(&varbin);
166        let len = varbin.len();
167        let dtype = varbin.dtype().clone();
168        fsst_compress(varbin, len, &dtype, &compressor, ctx)
169    })]
170    // Large arrays
171    #[case::fsst_large(|ctx: &mut ExecutionCtx| {
172        let data: Vec<Option<&str>> = (0..1500)
173            .map(|i| Some(match i % 10 {
174                0 => "https://www.example.com/page",
175                1 => "https://www.test.org/data",
176                2 => "https://www.vortex.dev/docs",
177                3 => "https://www.github.com/apache/arrow",
178                4 => "https://www.rust-lang.org/learn",
179                5 => "SELECT * FROM table WHERE id = ",
180                6 => "INSERT INTO users (name, email) VALUES",
181                7 => "UPDATE records SET status = 'active'",
182                8 => "DELETE FROM logs WHERE timestamp < ",
183                _ => "CREATE TABLE data (id INT, value TEXT)",
184            }))
185            .collect();
186        let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
187        let compressor = fsst_train_compressor(&varbin);
188        let len = varbin.len();
189        let dtype = varbin.dtype().clone();
190        fsst_compress(varbin, len, &dtype, &compressor, ctx)
191    })]
192
193    fn test_fsst_consistency(#[case] build: FsstBuilder) {
194        let mut ctx = LEGACY_SESSION.create_execution_ctx();
195        let array = build(&mut ctx);
196        test_array_consistency(&array.into_array());
197    }
198}