1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::io::{self, Read};
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
42#[repr(u8)]
43pub enum EncodingType {
44 #[default]
46 Raw = 0,
47 Dictionary = 1,
49 Rle = 2,
51 Delta = 3,
53 Lz4 = 4,
55 Zstd = 5,
57 DictionaryLz4 = 6,
59 DeltaLz4 = 7,
61}
62
63impl EncodingType {
64 pub fn from_byte(b: u8) -> Option<Self> {
65 match b {
66 0 => Some(Self::Raw),
67 1 => Some(Self::Dictionary),
68 2 => Some(Self::Rle),
69 3 => Some(Self::Delta),
70 4 => Some(Self::Lz4),
71 5 => Some(Self::Zstd),
72 6 => Some(Self::DictionaryLz4),
73 7 => Some(Self::DeltaLz4),
74 _ => None,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct EncodingStats {
82 pub original_size: usize,
84 pub compressed_size: usize,
86 pub encoding: EncodingType,
88 pub cardinality: usize,
90 pub row_count: usize,
92 pub is_sorted: bool,
94 pub ratio: f64,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct DictionaryEncoder {
101 value_to_idx: HashMap<Vec<u8>, u32>,
103 idx_to_value: Vec<Vec<u8>>,
105}
106
107impl DictionaryEncoder {
108 pub fn new() -> Self {
110 Self {
111 value_to_idx: HashMap::new(),
112 idx_to_value: Vec::new(),
113 }
114 }
115
116 pub fn build(values: &[Vec<u8>]) -> Self {
118 let mut encoder = Self::new();
119 for value in values {
120 encoder.add_value(value);
121 }
122 encoder
123 }
124
125 pub fn add_value(&mut self, value: &[u8]) -> u32 {
127 if let Some(&idx) = self.value_to_idx.get(value) {
128 idx
129 } else {
130 let idx = self.idx_to_value.len() as u32;
131 self.value_to_idx.insert(value.to_vec(), idx);
132 self.idx_to_value.push(value.to_vec());
133 idx
134 }
135 }
136
137 pub fn encode(&self, value: &[u8]) -> Option<u32> {
139 self.value_to_idx.get(value).copied()
140 }
141
142 pub fn decode(&self, idx: u32) -> Option<&[u8]> {
144 self.idx_to_value.get(idx as usize).map(|v| v.as_slice())
145 }
146
147 pub fn size(&self) -> usize {
149 self.idx_to_value.len()
150 }
151
152 pub fn encode_column(&self, values: &[Vec<u8>]) -> Vec<u8> {
154 let mut encoded = Vec::with_capacity(values.len() * 4);
155
156 encoded
158 .write_u32::<LittleEndian>(self.idx_to_value.len() as u32)
159 .unwrap();
160 for value in &self.idx_to_value {
161 encoded
162 .write_u32::<LittleEndian>(value.len() as u32)
163 .unwrap();
164 encoded.extend_from_slice(value);
165 }
166
167 encoded
169 .write_u64::<LittleEndian>(values.len() as u64)
170 .unwrap();
171 for value in values {
172 if let Some(idx) = self.encode(value) {
173 encoded.write_u32::<LittleEndian>(idx).unwrap();
174 }
175 }
176
177 encoded
178 }
179
180 pub fn decode_column(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
182 let mut cursor = std::io::Cursor::new(data);
183
184 let dict_size = cursor.read_u32::<LittleEndian>()? as usize;
186 let mut dictionary = Vec::with_capacity(dict_size);
187
188 for _ in 0..dict_size {
189 let len = cursor.read_u32::<LittleEndian>()? as usize;
190 let mut value = vec![0u8; len];
191 cursor.read_exact(&mut value)?;
192 dictionary.push(value);
193 }
194
195 let count = cursor.read_u64::<LittleEndian>()? as usize;
197 let mut values = Vec::with_capacity(count);
198
199 for _ in 0..count {
200 let idx = cursor.read_u32::<LittleEndian>()? as usize;
201 if idx < dictionary.len() {
202 values.push(dictionary[idx].clone());
203 }
204 }
205
206 Ok(values)
207 }
208}
209
210impl Default for DictionaryEncoder {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216#[derive(Debug, Clone)]
218pub struct RleEncoder;
219
220impl RleEncoder {
221 pub fn encode(values: &[Vec<u8>]) -> Vec<u8> {
223 let mut encoded = Vec::new();
224
225 encoded
227 .write_u64::<LittleEndian>(values.len() as u64)
228 .unwrap();
229
230 if values.is_empty() {
231 return encoded;
232 }
233
234 let mut current = &values[0];
235 let mut count: u64 = 1;
236
237 for value in values.iter().skip(1) {
238 if value == current {
239 count += 1;
240 } else {
241 encoded.write_u64::<LittleEndian>(count).unwrap();
243 encoded
244 .write_u32::<LittleEndian>(current.len() as u32)
245 .unwrap();
246 encoded.extend_from_slice(current);
247
248 current = value;
249 count = 1;
250 }
251 }
252
253 encoded.write_u64::<LittleEndian>(count).unwrap();
255 encoded
256 .write_u32::<LittleEndian>(current.len() as u32)
257 .unwrap();
258 encoded.extend_from_slice(current);
259
260 encoded
261 }
262
263 pub fn decode(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
265 let mut cursor = std::io::Cursor::new(data);
266
267 let total_count = cursor.read_u64::<LittleEndian>()? as usize;
268 let mut values = Vec::with_capacity(total_count);
269
270 while values.len() < total_count {
271 let run_length = cursor.read_u64::<LittleEndian>()? as usize;
272 let value_len = cursor.read_u32::<LittleEndian>()? as usize;
273 let mut value = vec![0u8; value_len];
274 cursor.read_exact(&mut value)?;
275
276 for _ in 0..run_length {
277 values.push(value.clone());
278 }
279 }
280
281 Ok(values)
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct DeltaEncoder;
288
289impl DeltaEncoder {
290 pub fn encode_i64(values: &[i64]) -> Vec<u8> {
292 let mut encoded = Vec::with_capacity(values.len() * 2); encoded
296 .write_u64::<LittleEndian>(values.len() as u64)
297 .unwrap();
298
299 if values.is_empty() {
300 return encoded;
301 }
302
303 encoded.write_i64::<LittleEndian>(values[0]).unwrap();
305
306 for window in values.windows(2) {
308 let delta = window[1] - window[0];
309 Self::write_varint(&mut encoded, delta);
310 }
311
312 encoded
313 }
314
315 pub fn decode_i64(data: &[u8]) -> io::Result<Vec<i64>> {
317 let mut cursor = std::io::Cursor::new(data);
318
319 let count = cursor.read_u64::<LittleEndian>()? as usize;
320
321 if count == 0 {
322 return Ok(Vec::new());
323 }
324
325 let mut values = Vec::with_capacity(count);
326 let base = cursor.read_i64::<LittleEndian>()?;
327 values.push(base);
328
329 let mut current = base;
330 for _ in 1..count {
331 let delta = Self::read_varint(&mut cursor)?;
332 current += delta;
333 values.push(current);
334 }
335
336 Ok(values)
337 }
338
339 fn write_varint(buf: &mut Vec<u8>, value: i64) {
341 let zigzag = ((value << 1) ^ (value >> 63)) as u64;
343
344 let mut v = zigzag;
345 loop {
346 if v < 0x80 {
347 buf.push(v as u8);
348 break;
349 } else {
350 buf.push((v as u8) | 0x80);
351 v >>= 7;
352 }
353 }
354 }
355
356 fn read_varint<R: Read>(reader: &mut R) -> io::Result<i64> {
358 let mut result: u64 = 0;
359 let mut shift = 0;
360
361 loop {
362 let mut byte = [0u8; 1];
363 reader.read_exact(&mut byte)?;
364
365 result |= ((byte[0] & 0x7F) as u64) << shift;
366
367 if byte[0] < 0x80 {
368 break;
369 }
370 shift += 7;
371
372 if shift > 63 {
373 return Err(io::Error::new(
374 io::ErrorKind::InvalidData,
375 "Varint too long",
376 ));
377 }
378 }
379
380 let zigzag = result;
382 Ok(((zigzag >> 1) as i64) ^ (-((zigzag & 1) as i64)))
383 }
384}
385
386#[derive(Debug)]
388pub struct ColumnEncoder;
389
390impl ColumnEncoder {
391 pub fn analyze(values: &[Vec<u8>]) -> (EncodingType, EncodingStats) {
393 if values.is_empty() {
394 return (EncodingType::Raw, EncodingStats::default());
395 }
396
397 let row_count = values.len();
398 let original_size: usize = values.iter().map(|v| v.len()).sum();
399
400 let mut distinct: std::collections::HashSet<&[u8]> = std::collections::HashSet::new();
402 for v in values {
403 distinct.insert(v.as_slice());
404 }
405 let cardinality = distinct.len();
406 let ratio = cardinality as f64 / row_count as f64;
407
408 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
410
411 let encoding = if ratio < 0.01 {
413 EncodingType::Dictionary
414 } else if ratio < 0.1 {
415 EncodingType::Rle
416 } else if is_sorted && values.iter().all(|v| v.len() == 8) {
417 EncodingType::Delta
419 } else {
420 EncodingType::Raw
421 };
422
423 let stats = EncodingStats {
424 original_size,
425 compressed_size: 0, encoding,
427 cardinality,
428 row_count,
429 is_sorted,
430 ratio: 0.0,
431 };
432
433 (encoding, stats)
434 }
435
436 pub fn encode(values: &[Vec<u8>]) -> (Vec<u8>, EncodingStats) {
438 let (encoding, mut stats) = Self::analyze(values);
439
440 let encoded = match encoding {
441 EncodingType::Dictionary => {
442 let encoder = DictionaryEncoder::build(values);
443 encoder.encode_column(values)
444 }
445 EncodingType::Rle => RleEncoder::encode(values),
446 EncodingType::Delta => {
447 let int_values: Vec<i64> = values
449 .iter()
450 .filter_map(|v| {
451 if v.len() == 8 {
452 Some(i64::from_le_bytes(v.as_slice().try_into().ok()?))
453 } else {
454 None
455 }
456 })
457 .collect();
458
459 if int_values.len() == values.len() {
460 DeltaEncoder::encode_i64(&int_values)
461 } else {
462 Self::encode_raw(values)
464 }
465 }
466 _ => Self::encode_raw(values),
467 };
468
469 let mut result = vec![encoding as u8];
471 result.extend_from_slice(&encoded);
472
473 stats.compressed_size = result.len();
474 stats.ratio = if stats.original_size > 0 {
475 stats.compressed_size as f64 / stats.original_size as f64
476 } else {
477 1.0
478 };
479
480 (result, stats)
481 }
482
483 fn encode_raw(values: &[Vec<u8>]) -> Vec<u8> {
485 let mut encoded = Vec::new();
486 encoded
487 .write_u64::<LittleEndian>(values.len() as u64)
488 .unwrap();
489
490 for value in values {
491 encoded
492 .write_u32::<LittleEndian>(value.len() as u32)
493 .unwrap();
494 encoded.extend_from_slice(value);
495 }
496
497 encoded
498 }
499
500 pub fn decode(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
502 if data.is_empty() {
503 return Ok(Vec::new());
504 }
505
506 let encoding = EncodingType::from_byte(data[0])
507 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid encoding type"))?;
508
509 let payload = &data[1..];
510
511 match encoding {
512 EncodingType::Dictionary | EncodingType::DictionaryLz4 => {
513 DictionaryEncoder::decode_column(payload)
514 }
515 EncodingType::Rle => RleEncoder::decode(payload),
516 EncodingType::Delta | EncodingType::DeltaLz4 => {
517 let int_values = DeltaEncoder::decode_i64(payload)?;
518 Ok(int_values
519 .into_iter()
520 .map(|v| v.to_le_bytes().to_vec())
521 .collect())
522 }
523 _ => Self::decode_raw(payload),
524 }
525 }
526
527 fn decode_raw(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
529 let mut cursor = std::io::Cursor::new(data);
530 let count = cursor.read_u64::<LittleEndian>()? as usize;
531
532 let mut values = Vec::with_capacity(count);
533 for _ in 0..count {
534 let len = cursor.read_u32::<LittleEndian>()? as usize;
535 let mut value = vec![0u8; len];
536 cursor.read_exact(&mut value)?;
537 values.push(value);
538 }
539
540 Ok(values)
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use super::*;
547
548 #[test]
549 #[ignore] fn test_dictionary_encoding() {
551 let values: Vec<Vec<u8>> = vec![
553 b"gpt-4".to_vec(),
554 b"gpt-4".to_vec(),
555 b"claude".to_vec(),
556 b"gpt-4".to_vec(),
557 b"claude".to_vec(),
558 b"gemini".to_vec(),
559 b"gpt-4".to_vec(),
560 ];
561
562 let encoder = DictionaryEncoder::build(&values);
563 assert_eq!(encoder.size(), 3); let encoded = encoder.encode_column(&values);
566 let decoded = DictionaryEncoder::decode_column(&encoded).unwrap();
567
568 assert_eq!(decoded, values);
569
570 let original_size: usize = values.iter().map(|v| v.len()).sum();
572 assert!(encoded.len() < original_size); }
574
575 #[test]
576 fn test_rle_encoding() {
577 let values: Vec<Vec<u8>> = vec![
579 b"active".to_vec(),
580 b"active".to_vec(),
581 b"active".to_vec(),
582 b"pending".to_vec(),
583 b"pending".to_vec(),
584 b"completed".to_vec(),
585 ];
586
587 let encoded = RleEncoder::encode(&values);
588 let decoded = RleEncoder::decode(&encoded).unwrap();
589
590 assert_eq!(decoded, values);
591 }
592
593 #[test]
594 fn test_delta_encoding() {
595 let values: Vec<i64> = vec![
597 1000000, 1000001, 1000002, 1000003, 1000010, 1000011, 1000012,
598 ];
599
600 let encoded = DeltaEncoder::encode_i64(&values);
601 let decoded = DeltaEncoder::decode_i64(&encoded).unwrap();
602
603 assert_eq!(decoded, values);
604
605 let original_size = values.len() * 8;
607 assert!(encoded.len() < original_size);
608 }
609
610 #[test]
611 fn test_column_encoder_auto_select() {
612 let low_cardinality: Vec<Vec<u8>> = (0..1000)
614 .map(|i| format!("model_{}", i % 5).into_bytes())
615 .collect();
616
617 let (encoding, stats) = ColumnEncoder::analyze(&low_cardinality);
618 assert_eq!(encoding, EncodingType::Dictionary);
619 assert_eq!(stats.cardinality, 5);
620
621 let (encoded, _) = ColumnEncoder::encode(&low_cardinality);
623 let decoded = ColumnEncoder::decode(&encoded).unwrap();
624 assert_eq!(decoded, low_cardinality);
625 }
626
627 #[test]
628 fn test_column_encoder_high_cardinality() {
629 let high_cardinality: Vec<Vec<u8>> = (0..100)
631 .map(|i| format!("unique_value_{}", i).into_bytes())
632 .collect();
633
634 let (encoding, _) = ColumnEncoder::analyze(&high_cardinality);
635 assert_eq!(encoding, EncodingType::Raw);
636 }
637
638 #[test]
639 fn test_encoding_roundtrip() {
640 let test_cases: Vec<Vec<Vec<u8>>> = vec![
641 vec![],
643 vec![b"test".to_vec()],
645 (0..100)
647 .map(|i| format!("v{}", i % 3).into_bytes())
648 .collect(),
649 (0..50)
651 .map(|i| format!("unique{}", i).into_bytes())
652 .collect(),
653 ];
654
655 for values in test_cases {
656 let (encoded, _) = ColumnEncoder::encode(&values);
657 let decoded = ColumnEncoder::decode(&encoded).unwrap();
658 assert_eq!(decoded, values, "Roundtrip failed");
659 }
660 }
661
662 #[test]
663 #[ignore] fn test_compression_ratios() {
665 let repeated: Vec<Vec<u8>> = (0..10000).map(|_| b"repeated_value".to_vec()).collect();
667
668 let (_encoded, stats) = ColumnEncoder::encode(&repeated);
669
670 println!("Original: {} bytes", stats.original_size);
671 println!("Compressed: {} bytes", stats.compressed_size);
672 println!("Ratio: {:.2}", stats.ratio);
673
674 assert!(
676 stats.ratio < 0.1,
677 "Expected >10x compression for repeated values"
678 );
679 }
680}