vortex_btrblocks/
canonical_compressor.rs1use std::ops::Deref;
7
8use vortex_array::ArrayRef;
9use vortex_array::ExecutionCtx;
10use vortex_error::VortexResult;
11
12use crate::BtrBlocksCompressorBuilder;
13use crate::CascadingCompressor;
14
15#[derive(Clone)]
35pub struct BtrBlocksCompressor(
36 pub CascadingCompressor,
38);
39
40impl BtrBlocksCompressor {
41 pub fn compress(&self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
43 self.0.compress(array, ctx)
44 }
45}
46
47impl Deref for BtrBlocksCompressor {
48 type Target = CascadingCompressor;
49
50 fn deref(&self) -> &CascadingCompressor {
51 &self.0
52 }
53}
54
55impl Default for BtrBlocksCompressor {
56 fn default() -> Self {
57 BtrBlocksCompressorBuilder::default().build()
58 }
59}
60
61#[cfg(test)]
62mod tests {
63 use std::sync::LazyLock;
64
65 use rstest::rstest;
66 use vortex_array::IntoArray;
67 use vortex_array::VortexSessionExecute;
68 use vortex_array::arrays::BoolArray;
69 use vortex_array::arrays::Constant;
70 use vortex_array::arrays::Dict;
71 use vortex_array::arrays::List;
72 use vortex_array::arrays::ListView;
73 use vortex_array::arrays::ListViewArray;
74 use vortex_array::arrays::VarBinViewArray;
75 use vortex_array::assert_arrays_eq;
76 use vortex_array::dtype::DType;
77 use vortex_array::dtype::Nullability;
78 use vortex_array::session::ArraySession;
79 use vortex_array::validity::Validity;
80 use vortex_buffer::BitBuffer;
81 use vortex_buffer::buffer;
82 use vortex_error::VortexResult;
83 use vortex_session::VortexSession;
84
85 use crate::BtrBlocksCompressor;
86 #[cfg(feature = "zstd")]
87 use crate::BtrBlocksCompressorBuilder;
88
89 static SESSION: LazyLock<VortexSession> =
90 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
91
92 #[rstest]
93 #[case::zctl(
94 unsafe {
95 ListViewArray::new_unchecked(
96 buffer![1i32, 2, 3, 4, 5].into_array(),
97 buffer![0i32, 3].into_array(),
98 buffer![3i32, 2].into_array(),
99 Validity::NonNullable,
100 ).with_zero_copy_to_list(true)
101 },
102 true,
103 )]
104 #[case::overlapping(
105 ListViewArray::new(
106 buffer![1i32, 2, 3].into_array(),
107 buffer![0i32, 0, 0].into_array(),
108 buffer![3i32, 3, 3].into_array(),
109 Validity::NonNullable,
110 ),
111 false,
112 )]
113 fn listview_compress_roundtrip(
114 #[case] input: ListViewArray,
115 #[case] expect_list: bool,
116 ) -> VortexResult<()> {
117 let array_ref = input.clone().into_array();
118 let result = BtrBlocksCompressor::default()
119 .compress(&array_ref, &mut SESSION.create_execution_ctx())?;
120 if expect_list {
121 assert!(result.as_opt::<List>().is_some());
122 } else {
123 assert!(result.as_opt::<ListView>().is_some());
124 }
125 assert_arrays_eq!(result, input);
126 Ok(())
127 }
128
129 #[test]
130 fn test_constant_all_true() -> VortexResult<()> {
131 let array = BoolArray::new(BitBuffer::from(vec![true; 100]), Validity::NonNullable);
132 let btr = BtrBlocksCompressor::default();
133 let compressed = btr.compress(
134 &array.clone().into_array(),
135 &mut SESSION.create_execution_ctx(),
136 )?;
137 assert!(compressed.is::<Constant>());
138 assert_arrays_eq!(compressed, array);
139 Ok(())
140 }
141
142 #[test]
143 fn test_constant_all_false() -> VortexResult<()> {
144 let array = BoolArray::new(BitBuffer::from(vec![false; 100]), Validity::NonNullable);
145 let btr = BtrBlocksCompressor::default();
146 let compressed = btr.compress(
147 &array.clone().into_array(),
148 &mut SESSION.create_execution_ctx(),
149 )?;
150 assert!(compressed.is::<Constant>());
151 assert_arrays_eq!(compressed, array);
152 Ok(())
153 }
154
155 #[test]
156 fn test_nullable_all_valid_compressed() -> VortexResult<()> {
157 let array = BoolArray::new(
158 BitBuffer::from(vec![true; 100]),
159 Validity::from(BitBuffer::from(vec![true; 100])),
160 );
161 let btr = BtrBlocksCompressor::default();
162 let compressed = btr.compress(
163 &array.clone().into_array(),
164 &mut SESSION.create_execution_ctx(),
165 )?;
166 assert!(compressed.is::<Constant>());
167 assert_arrays_eq!(compressed, array);
168 Ok(())
169 }
170
171 #[test]
172 fn test_nullable_with_nulls_not_compressed() -> VortexResult<()> {
173 let validity = Validity::from(BitBuffer::from_iter((0..100).map(|i| i % 3 != 0)));
174 let array = BoolArray::new(BitBuffer::from(vec![true; 100]), validity);
175 let btr = BtrBlocksCompressor::default();
176 let compressed = btr.compress(
177 &array.clone().into_array(),
178 &mut SESSION.create_execution_ctx(),
179 )?;
180 assert!(!compressed.is::<Constant>());
181 assert_arrays_eq!(compressed, array);
182 Ok(())
183 }
184
185 #[test]
186 fn test_mixed_not_constant() -> VortexResult<()> {
187 let array = BoolArray::new(
188 BitBuffer::from(vec![true, false, true, false, true]),
189 Validity::NonNullable,
190 );
191 let btr = BtrBlocksCompressor::default();
192 let compressed = btr.compress(
193 &array.clone().into_array(),
194 &mut SESSION.create_execution_ctx(),
195 )?;
196 assert!(!compressed.is::<Constant>());
197 assert_arrays_eq!(compressed, array);
198 Ok(())
199 }
200
201 #[test]
202 fn test_binary_constant_compressed() -> VortexResult<()> {
203 let values = vec![Some(b"constant-bytes".as_slice()); 100];
204 let array = VarBinViewArray::from_iter(values, DType::Binary(Nullability::NonNullable));
205 let btr = BtrBlocksCompressor::default();
206 let compressed = btr.compress(
207 &array.clone().into_array(),
208 &mut SESSION.create_execution_ctx(),
209 )?;
210 assert!(compressed.is::<Constant>());
211 assert_arrays_eq!(compressed, array);
212 Ok(())
213 }
214
215 #[test]
216 fn test_binary_dict_compressed() -> VortexResult<()> {
217 let distinct_values: [&[u8]; 3] = [b"alpha", b"beta", b"gamma"];
218 let values = (0..1000)
219 .map(|idx| Some(distinct_values[idx % distinct_values.len()]))
220 .collect::<Vec<_>>();
221 let array = VarBinViewArray::from_iter(values, DType::Binary(Nullability::NonNullable));
222 let btr = BtrBlocksCompressor::default();
223 let compressed = btr.compress(
224 &array.clone().into_array(),
225 &mut SESSION.create_execution_ctx(),
226 )?;
227 assert!(compressed.is::<Dict>());
228 assert_arrays_eq!(compressed, array);
229 Ok(())
230 }
231
232 #[cfg(feature = "zstd")]
233 #[test]
234 fn test_compact_binary_zstd_compressed() -> VortexResult<()> {
235 let values = (0..1024)
236 .map(|idx| {
237 let mut value = Vec::from(&b"common binary payload prefix "[..]);
238 value.extend_from_slice(&(idx as u32).to_le_bytes());
239 value.extend_from_slice(&[b'x'; 96]);
240 value
241 })
242 .collect::<Vec<_>>();
243 let array = VarBinViewArray::from_iter(
244 values.iter().map(|value| Some(value.as_slice())),
245 DType::Binary(Nullability::NonNullable),
246 );
247
248 let compressor = BtrBlocksCompressorBuilder::default().with_compact().build();
249 let compressed = compressor.compress(
250 &array.clone().into_array(),
251 &mut SESSION.create_execution_ctx(),
252 )?;
253
254 assert!(
255 compressed.is::<vortex_zstd::Zstd>(),
256 "expected Zstd, got {}",
257 compressed.encoding_id()
258 );
259 assert_arrays_eq!(compressed, array);
260 Ok(())
261 }
262
263 #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
264 #[test]
265 fn test_cuda_compatible_binary_zstd_buffers_compressed() -> VortexResult<()> {
266 let values = (0..1024)
267 .map(|idx| {
268 let mut value = Vec::from(&b"common binary payload prefix "[..]);
269 value.extend_from_slice(&(idx as u32).to_le_bytes());
270 value.extend_from_slice(&[b'x'; 96]);
271 value
272 })
273 .collect::<Vec<_>>();
274 let array = VarBinViewArray::from_iter(
275 values.iter().map(|value| Some(value.as_slice())),
276 DType::Binary(Nullability::NonNullable),
277 );
278
279 let compressor = BtrBlocksCompressorBuilder::default()
280 .only_cuda_compatible()
281 .build();
282 let compressed = compressor.compress(
283 &array.clone().into_array(),
284 &mut SESSION.create_execution_ctx(),
285 )?;
286
287 assert!(
288 compressed.is::<vortex_zstd::ZstdBuffers>(),
289 "expected ZstdBuffers, got {}",
290 compressed.encoding_id()
291 );
292 assert_arrays_eq!(compressed, array);
293 Ok(())
294 }
295}