1use std::io;
19
20use super::bitpack::{BitPackedInts, DeltaBitPacked};
21use super::bitvec::BitVector;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum CompressionCodec {
26 None,
28
29 Delta,
31
32 BitPacked {
34 bits: u8,
36 },
37
38 DeltaBitPacked {
40 bits: u8,
42 },
43
44 Dictionary,
46
47 BitVector,
49
50 RunLength,
52}
53
54impl CompressionCodec {
55 #[must_use]
57 pub fn name(&self) -> &'static str {
58 match self {
59 Self::None => "None",
60 Self::Delta => "Delta",
61 Self::BitPacked { .. } => "BitPacked",
62 Self::DeltaBitPacked { .. } => "DeltaBitPacked",
63 Self::Dictionary => "Dictionary",
64 Self::BitVector => "BitVector",
65 Self::RunLength => "RunLength",
66 }
67 }
68
69 #[must_use]
71 pub fn is_lossless(&self) -> bool {
72 true
74 }
75}
76
77#[derive(Debug, Clone)]
81pub struct CompressedData {
82 pub codec: CompressionCodec,
84 pub uncompressed_size: usize,
86 pub data: Vec<u8>,
88 pub metadata: CompressionMetadata,
90}
91
92#[derive(Debug, Clone)]
94pub enum CompressionMetadata {
95 None,
97 Delta {
99 base: i64,
101 },
102 BitPacked {
104 count: usize,
106 },
107 DeltaBitPacked {
109 base: i64,
111 count: usize,
113 },
114 Dictionary {
116 dict_id: u32,
118 },
119 RunLength {
121 run_count: usize,
123 },
124}
125
126impl CompressedData {
127 pub fn uncompressed(data: Vec<u8>) -> Self {
129 let size = data.len();
130 Self {
131 codec: CompressionCodec::None,
132 uncompressed_size: size,
133 data,
134 metadata: CompressionMetadata::None,
135 }
136 }
137
138 #[must_use]
140 pub fn compression_ratio(&self) -> f64 {
141 if self.data.is_empty() {
142 return 1.0;
143 }
144 self.uncompressed_size as f64 / self.data.len() as f64
145 }
146
147 #[must_use]
149 pub fn is_compressed(&self) -> bool {
150 !matches!(self.codec, CompressionCodec::None)
151 }
152}
153
154pub struct CodecSelector;
156
157impl CodecSelector {
158 #[must_use]
160 pub fn select_for_integers(values: &[u64]) -> CompressionCodec {
161 if values.is_empty() {
162 return CompressionCodec::None;
163 }
164
165 if values.len() < 8 {
166 return CompressionCodec::None;
168 }
169
170 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
172
173 if is_sorted {
174 let deltas: Vec<u64> = values.windows(2).map(|w| w[1] - w[0]).collect();
176 let max_delta = deltas.iter().copied().max().unwrap_or(0);
177 let bits_needed = BitPackedInts::bits_needed(max_delta);
178
179 return CompressionCodec::DeltaBitPacked { bits: bits_needed };
180 }
181
182 let max_value = values.iter().copied().max().unwrap_or(0);
184 let bits_needed = BitPackedInts::bits_needed(max_value);
185
186 if bits_needed < 32 {
187 CompressionCodec::BitPacked { bits: bits_needed }
188 } else {
189 CompressionCodec::None
190 }
191 }
192
193 #[must_use]
195 pub fn select_for_strings(values: &[&str]) -> CompressionCodec {
196 if values.is_empty() || values.len() < 4 {
197 return CompressionCodec::None;
198 }
199
200 let unique: std::collections::HashSet<_> = values.iter().collect();
202 let cardinality_ratio = unique.len() as f64 / values.len() as f64;
203
204 if cardinality_ratio < 0.5 {
206 CompressionCodec::Dictionary
207 } else {
208 CompressionCodec::None
209 }
210 }
211
212 #[must_use]
214 pub fn select_for_booleans(_values: &[bool]) -> CompressionCodec {
215 CompressionCodec::BitVector
217 }
218}
219
220pub struct TypeSpecificCompressor;
222
223impl TypeSpecificCompressor {
224 pub fn compress_integers(values: &[u64]) -> CompressedData {
226 let codec = CodecSelector::select_for_integers(values);
227
228 match codec {
229 CompressionCodec::None => {
230 let mut data = Vec::with_capacity(values.len() * 8);
231 for &v in values {
232 data.extend_from_slice(&v.to_le_bytes());
233 }
234 CompressedData {
235 codec,
236 uncompressed_size: values.len() * 8,
237 data,
238 metadata: CompressionMetadata::None,
239 }
240 }
241 CompressionCodec::DeltaBitPacked { bits } => {
242 let encoded = DeltaBitPacked::encode(values);
243 CompressedData {
244 codec: CompressionCodec::DeltaBitPacked { bits },
245 uncompressed_size: values.len() * 8,
246 data: encoded.to_bytes(),
247 metadata: CompressionMetadata::DeltaBitPacked {
248 base: encoded.base() as i64,
249 count: values.len(),
250 },
251 }
252 }
253 CompressionCodec::BitPacked { bits } => {
254 let packed = BitPackedInts::pack(values);
255 CompressedData {
256 codec: CompressionCodec::BitPacked { bits },
257 uncompressed_size: values.len() * 8,
258 data: packed.to_bytes(),
259 metadata: CompressionMetadata::BitPacked {
260 count: values.len(),
261 },
262 }
263 }
264 _ => unreachable!("Unexpected codec for integers"),
265 }
266 }
267
268 pub fn compress_signed_integers(values: &[i64]) -> CompressedData {
270 let zigzag: Vec<u64> = values
272 .iter()
273 .map(|&v| super::delta::zigzag_encode(v))
274 .collect();
275 Self::compress_integers(&zigzag)
276 }
277
278 pub fn compress_booleans(values: &[bool]) -> CompressedData {
280 let bitvec = BitVector::from_bools(values);
281 CompressedData {
282 codec: CompressionCodec::BitVector,
283 uncompressed_size: values.len(),
284 data: bitvec.to_bytes(),
285 metadata: CompressionMetadata::BitPacked {
286 count: values.len(),
287 },
288 }
289 }
290
291 pub fn decompress_integers(data: &CompressedData) -> io::Result<Vec<u64>> {
293 match data.codec {
294 CompressionCodec::None => {
295 let mut values = Vec::with_capacity(data.data.len() / 8);
296 for chunk in data.data.chunks_exact(8) {
297 values.push(u64::from_le_bytes(chunk.try_into().unwrap()));
298 }
299 Ok(values)
300 }
301 CompressionCodec::DeltaBitPacked { .. } => {
302 let encoded = DeltaBitPacked::from_bytes(&data.data)?;
303 Ok(encoded.decode())
304 }
305 CompressionCodec::BitPacked { .. } => {
306 let packed = BitPackedInts::from_bytes(&data.data)?;
307 Ok(packed.unpack())
308 }
309 _ => Err(io::Error::new(
310 io::ErrorKind::InvalidData,
311 "Invalid codec for integer decompression",
312 )),
313 }
314 }
315
316 pub fn decompress_booleans(data: &CompressedData) -> io::Result<Vec<bool>> {
318 match data.codec {
319 CompressionCodec::BitVector => {
320 let bitvec = BitVector::from_bytes(&data.data)?;
321 Ok(bitvec.to_bools())
322 }
323 _ => Err(io::Error::new(
324 io::ErrorKind::InvalidData,
325 "Invalid codec for boolean decompression",
326 )),
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_codec_selection_sorted_integers() {
337 let sorted: Vec<u64> = (0..100).collect();
338 let codec = CodecSelector::select_for_integers(&sorted);
339 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
340 }
341
342 #[test]
343 fn test_codec_selection_small_integers() {
344 let small: Vec<u64> = vec![1, 5, 3, 7, 2, 4, 6, 8];
345 let codec = CodecSelector::select_for_integers(&small);
346 assert!(matches!(codec, CompressionCodec::BitPacked { .. }));
347 }
348
349 #[test]
350 fn test_codec_selection_strings() {
351 let repeated = vec!["a", "b", "a", "a", "b", "a", "c", "a"];
352 let codec = CodecSelector::select_for_strings(&repeated);
353 assert_eq!(codec, CompressionCodec::Dictionary);
354
355 let unique = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
356 let codec = CodecSelector::select_for_strings(&unique);
357 assert_eq!(codec, CompressionCodec::None);
358 }
359
360 #[test]
361 fn test_codec_selection_booleans() {
362 let bools = vec![true, false, true];
363 let codec = CodecSelector::select_for_booleans(&bools);
364 assert_eq!(codec, CompressionCodec::BitVector);
365 }
366
367 #[test]
368 fn test_compress_decompress_sorted_integers() {
369 let values: Vec<u64> = (100..200).collect();
370 let compressed = TypeSpecificCompressor::compress_integers(&values);
371
372 assert!(matches!(
373 compressed.codec,
374 CompressionCodec::DeltaBitPacked { .. }
375 ));
376 assert!(compressed.compression_ratio() > 1.0);
377
378 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
379 assert_eq!(values, decompressed);
380 }
381
382 #[test]
383 fn test_compress_decompress_small_integers() {
384 let values: Vec<u64> = vec![5, 2, 7, 1, 9, 3, 8, 4, 6, 0];
385 let compressed = TypeSpecificCompressor::compress_integers(&values);
386
387 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
388 assert_eq!(values, decompressed);
389 }
390
391 #[test]
392 fn test_compress_decompress_booleans() {
393 let values = vec![true, false, true, true, false, false, true, false];
394 let compressed = TypeSpecificCompressor::compress_booleans(&values);
395
396 assert_eq!(compressed.codec, CompressionCodec::BitVector);
397
398 let decompressed = TypeSpecificCompressor::decompress_booleans(&compressed).unwrap();
399 assert_eq!(values, decompressed);
400 }
401
402 #[test]
403 fn test_compression_ratio() {
404 let values: Vec<u64> = (1000..1100).collect();
406 let compressed = TypeSpecificCompressor::compress_integers(&values);
407
408 let ratio = compressed.compression_ratio();
409 assert!(ratio > 5.0, "Expected ratio > 5, got {}", ratio);
410 }
411
412 #[test]
413 fn test_codec_names() {
414 assert_eq!(CompressionCodec::None.name(), "None");
415 assert_eq!(CompressionCodec::Delta.name(), "Delta");
416 assert_eq!(CompressionCodec::BitPacked { bits: 4 }.name(), "BitPacked");
417 assert_eq!(
418 CompressionCodec::DeltaBitPacked { bits: 4 }.name(),
419 "DeltaBitPacked"
420 );
421 assert_eq!(CompressionCodec::Dictionary.name(), "Dictionary");
422 assert_eq!(CompressionCodec::BitVector.name(), "BitVector");
423 assert_eq!(CompressionCodec::RunLength.name(), "RunLength");
424 }
425}