Skip to main content

vortex_btrblocks/
canonical_compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! BtrBlocks-specific compressor wrapping the generic [`CascadingCompressor`].
5
6use std::ops::Deref;
7
8use vortex_array::ArrayRef;
9use vortex_array::ExecutionCtx;
10use vortex_error::VortexResult;
11
12use crate::BtrBlocksCompressorBuilder;
13use crate::CascadingCompressor;
14
15/// The BtrBlocks-style compressor with all built-in schemes pre-registered.
16///
17/// This is a thin wrapper around [`CascadingCompressor`] that provides a default set of
18/// compression schemes via [`BtrBlocksCompressorBuilder`].
19///
20/// # Examples
21///
22/// ```rust
23/// use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, Scheme, SchemeExt};
24/// use vortex_btrblocks::schemes::integer::IntDictScheme;
25///
26/// // Default compressor - all schemes allowed.
27/// let compressor = BtrBlocksCompressor::default();
28///
29/// // Remove specific schemes using the builder.
30/// let compressor = BtrBlocksCompressorBuilder::default()
31///     .exclude_schemes([IntDictScheme.id()])
32///     .build();
33/// ```
34#[derive(Clone)]
35pub struct BtrBlocksCompressor(
36    /// The underlying cascading compressor.
37    pub CascadingCompressor,
38);
39
40impl BtrBlocksCompressor {
41    /// Compresses an array using BtrBlocks-inspired compression.
42    pub fn compress(&self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
43        self.0.compress(array, ctx)
44    }
45}
46
47impl Deref for BtrBlocksCompressor {
48    type Target = CascadingCompressor;
49
50    fn deref(&self) -> &CascadingCompressor {
51        &self.0
52    }
53}
54
55impl Default for BtrBlocksCompressor {
56    fn default() -> Self {
57        BtrBlocksCompressorBuilder::default().build()
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use std::sync::LazyLock;
64
65    use rstest::rstest;
66    use vortex_array::IntoArray;
67    use vortex_array::VortexSessionExecute;
68    use vortex_array::arrays::BoolArray;
69    use vortex_array::arrays::Constant;
70    use vortex_array::arrays::Dict;
71    use vortex_array::arrays::List;
72    use vortex_array::arrays::ListView;
73    use vortex_array::arrays::ListViewArray;
74    use vortex_array::arrays::VarBinViewArray;
75    use vortex_array::assert_arrays_eq;
76    use vortex_array::dtype::DType;
77    use vortex_array::dtype::Nullability;
78    use vortex_array::session::ArraySession;
79    use vortex_array::validity::Validity;
80    use vortex_buffer::BitBuffer;
81    use vortex_buffer::buffer;
82    use vortex_error::VortexResult;
83    use vortex_session::VortexSession;
84
85    use crate::BtrBlocksCompressor;
86    #[cfg(feature = "zstd")]
87    use crate::BtrBlocksCompressorBuilder;
88
89    static SESSION: LazyLock<VortexSession> =
90        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
91
92    #[rstest]
93    #[case::zctl(
94        unsafe {
95            ListViewArray::new_unchecked(
96                buffer![1i32, 2, 3, 4, 5].into_array(),
97                buffer![0i32, 3].into_array(),
98                buffer![3i32, 2].into_array(),
99                Validity::NonNullable,
100            ).with_zero_copy_to_list(true)
101        },
102        true,
103    )]
104    #[case::overlapping(
105        ListViewArray::new(
106            buffer![1i32, 2, 3].into_array(),
107            buffer![0i32, 0, 0].into_array(),
108            buffer![3i32, 3, 3].into_array(),
109            Validity::NonNullable,
110        ),
111        false,
112    )]
113    fn listview_compress_roundtrip(
114        #[case] input: ListViewArray,
115        #[case] expect_list: bool,
116    ) -> VortexResult<()> {
117        let array_ref = input.clone().into_array();
118        let result = BtrBlocksCompressor::default()
119            .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
120        if expect_list {
121            assert!(result.as_opt::<List>().is_some());
122        } else {
123            assert!(result.as_opt::<ListView>().is_some());
124        }
125        assert_arrays_eq!(result, input);
126        Ok(())
127    }
128
129    #[test]
130    fn test_constant_all_true() -> VortexResult<()> {
131        let array = BoolArray::new(BitBuffer::from(vec![true; 100]), Validity::NonNullable);
132        let btr = BtrBlocksCompressor::default();
133        let compressed = btr.compress(
134            &array.clone().into_array(),
135            &mut SESSION.create_execution_ctx(),
136        )?;
137        assert!(compressed.is::<Constant>());
138        assert_arrays_eq!(compressed, array);
139        Ok(())
140    }
141
142    #[test]
143    fn test_constant_all_false() -> VortexResult<()> {
144        let array = BoolArray::new(BitBuffer::from(vec![false; 100]), Validity::NonNullable);
145        let btr = BtrBlocksCompressor::default();
146        let compressed = btr.compress(
147            &array.clone().into_array(),
148            &mut SESSION.create_execution_ctx(),
149        )?;
150        assert!(compressed.is::<Constant>());
151        assert_arrays_eq!(compressed, array);
152        Ok(())
153    }
154
155    #[test]
156    fn test_nullable_all_valid_compressed() -> VortexResult<()> {
157        let array = BoolArray::new(
158            BitBuffer::from(vec![true; 100]),
159            Validity::from(BitBuffer::from(vec![true; 100])),
160        );
161        let btr = BtrBlocksCompressor::default();
162        let compressed = btr.compress(
163            &array.clone().into_array(),
164            &mut SESSION.create_execution_ctx(),
165        )?;
166        assert!(compressed.is::<Constant>());
167        assert_arrays_eq!(compressed, array);
168        Ok(())
169    }
170
171    #[test]
172    fn test_nullable_with_nulls_not_compressed() -> VortexResult<()> {
173        let validity = Validity::from(BitBuffer::from_iter((0..100).map(|i| i % 3 != 0)));
174        let array = BoolArray::new(BitBuffer::from(vec![true; 100]), validity);
175        let btr = BtrBlocksCompressor::default();
176        let compressed = btr.compress(
177            &array.clone().into_array(),
178            &mut SESSION.create_execution_ctx(),
179        )?;
180        assert!(!compressed.is::<Constant>());
181        assert_arrays_eq!(compressed, array);
182        Ok(())
183    }
184
185    #[test]
186    fn test_mixed_not_constant() -> VortexResult<()> {
187        let array = BoolArray::new(
188            BitBuffer::from(vec![true, false, true, false, true]),
189            Validity::NonNullable,
190        );
191        let btr = BtrBlocksCompressor::default();
192        let compressed = btr.compress(
193            &array.clone().into_array(),
194            &mut SESSION.create_execution_ctx(),
195        )?;
196        assert!(!compressed.is::<Constant>());
197        assert_arrays_eq!(compressed, array);
198        Ok(())
199    }
200
201    #[test]
202    fn test_binary_constant_compressed() -> VortexResult<()> {
203        let values = vec![Some(b"constant-bytes".as_slice()); 100];
204        let array = VarBinViewArray::from_iter(values, DType::Binary(Nullability::NonNullable));
205        let btr = BtrBlocksCompressor::default();
206        let compressed = btr.compress(
207            &array.clone().into_array(),
208            &mut SESSION.create_execution_ctx(),
209        )?;
210        assert!(compressed.is::<Constant>());
211        assert_arrays_eq!(compressed, array);
212        Ok(())
213    }
214
215    #[test]
216    fn test_binary_dict_compressed() -> VortexResult<()> {
217        let distinct_values: [&[u8]; 3] = [b"alpha", b"beta", b"gamma"];
218        let values = (0..1000)
219            .map(|idx| Some(distinct_values[idx % distinct_values.len()]))
220            .collect::<Vec<_>>();
221        let array = VarBinViewArray::from_iter(values, DType::Binary(Nullability::NonNullable));
222        let btr = BtrBlocksCompressor::default();
223        let compressed = btr.compress(
224            &array.clone().into_array(),
225            &mut SESSION.create_execution_ctx(),
226        )?;
227        assert!(compressed.is::<Dict>());
228        assert_arrays_eq!(compressed, array);
229        Ok(())
230    }
231
232    #[cfg(feature = "zstd")]
233    #[test]
234    fn test_compact_binary_zstd_compressed() -> VortexResult<()> {
235        let values = (0..1024)
236            .map(|idx| {
237                let mut value = Vec::from(&b"common binary payload prefix "[..]);
238                value.extend_from_slice(&(idx as u32).to_le_bytes());
239                value.extend_from_slice(&[b'x'; 96]);
240                value
241            })
242            .collect::<Vec<_>>();
243        let array = VarBinViewArray::from_iter(
244            values.iter().map(|value| Some(value.as_slice())),
245            DType::Binary(Nullability::NonNullable),
246        );
247
248        let compressor = BtrBlocksCompressorBuilder::default().with_compact().build();
249        let compressed = compressor.compress(
250            &array.clone().into_array(),
251            &mut SESSION.create_execution_ctx(),
252        )?;
253
254        assert!(
255            compressed.is::<vortex_zstd::Zstd>(),
256            "expected Zstd, got {}",
257            compressed.encoding_id()
258        );
259        assert_arrays_eq!(compressed, array);
260        Ok(())
261    }
262
263    #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
264    #[test]
265    fn test_cuda_compatible_binary_zstd_buffers_compressed() -> VortexResult<()> {
266        let values = (0..1024)
267            .map(|idx| {
268                let mut value = Vec::from(&b"common binary payload prefix "[..]);
269                value.extend_from_slice(&(idx as u32).to_le_bytes());
270                value.extend_from_slice(&[b'x'; 96]);
271                value
272            })
273            .collect::<Vec<_>>();
274        let array = VarBinViewArray::from_iter(
275            values.iter().map(|value| Some(value.as_slice())),
276            DType::Binary(Nullability::NonNullable),
277        );
278
279        let compressor = BtrBlocksCompressorBuilder::default()
280            .only_cuda_compatible()
281            .build();
282        let compressed = compressor.compress(
283            &array.clone().into_array(),
284            &mut SESSION.create_execution_ctx(),
285        )?;
286
287        assert!(
288            compressed.is::<vortex_zstd::ZstdBuffers>(),
289            "expected ZstdBuffers, got {}",
290            compressed.encoding_id()
291        );
292        assert_arrays_eq!(compressed, array);
293        Ok(())
294    }
295}