1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
39use serde::{Deserialize, Serialize};
40use std::collections::HashMap;
41use std::io::{self, Read};
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
45#[repr(u8)]
46pub enum EncodingType {
47 #[default]
49 Raw = 0,
50 Dictionary = 1,
52 Rle = 2,
54 Delta = 3,
56 Lz4 = 4,
58 Zstd = 5,
60 DictionaryLz4 = 6,
62 DeltaLz4 = 7,
64}
65
66impl EncodingType {
67 pub fn from_byte(b: u8) -> Option<Self> {
68 match b {
69 0 => Some(Self::Raw),
70 1 => Some(Self::Dictionary),
71 2 => Some(Self::Rle),
72 3 => Some(Self::Delta),
73 4 => Some(Self::Lz4),
74 5 => Some(Self::Zstd),
75 6 => Some(Self::DictionaryLz4),
76 7 => Some(Self::DeltaLz4),
77 _ => None,
78 }
79 }
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct EncodingStats {
85 pub original_size: usize,
87 pub compressed_size: usize,
89 pub encoding: EncodingType,
91 pub cardinality: usize,
93 pub row_count: usize,
95 pub is_sorted: bool,
97 pub ratio: f64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct DictionaryEncoder {
104 value_to_idx: HashMap<Vec<u8>, u32>,
106 idx_to_value: Vec<Vec<u8>>,
108}
109
110impl DictionaryEncoder {
111 pub fn new() -> Self {
113 Self {
114 value_to_idx: HashMap::new(),
115 idx_to_value: Vec::new(),
116 }
117 }
118
119 pub fn build(values: &[Vec<u8>]) -> Self {
121 let mut encoder = Self::new();
122 for value in values {
123 encoder.add_value(value);
124 }
125 encoder
126 }
127
128 pub fn add_value(&mut self, value: &[u8]) -> u32 {
130 if let Some(&idx) = self.value_to_idx.get(value) {
131 idx
132 } else {
133 let idx = self.idx_to_value.len() as u32;
134 self.value_to_idx.insert(value.to_vec(), idx);
135 self.idx_to_value.push(value.to_vec());
136 idx
137 }
138 }
139
140 pub fn encode(&self, value: &[u8]) -> Option<u32> {
142 self.value_to_idx.get(value).copied()
143 }
144
145 pub fn decode(&self, idx: u32) -> Option<&[u8]> {
147 self.idx_to_value.get(idx as usize).map(|v| v.as_slice())
148 }
149
150 pub fn size(&self) -> usize {
152 self.idx_to_value.len()
153 }
154
155 pub fn encode_column(&self, values: &[Vec<u8>]) -> Vec<u8> {
157 let mut encoded = Vec::with_capacity(values.len() * 4);
158
159 encoded
161 .write_u32::<LittleEndian>(self.idx_to_value.len() as u32)
162 .unwrap();
163 for value in &self.idx_to_value {
164 encoded
165 .write_u32::<LittleEndian>(value.len() as u32)
166 .unwrap();
167 encoded.extend_from_slice(value);
168 }
169
170 encoded
172 .write_u64::<LittleEndian>(values.len() as u64)
173 .unwrap();
174 for value in values {
175 if let Some(idx) = self.encode(value) {
176 encoded.write_u32::<LittleEndian>(idx).unwrap();
177 }
178 }
179
180 encoded
181 }
182
183 pub fn decode_column(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
185 let mut cursor = std::io::Cursor::new(data);
186
187 let dict_size = cursor.read_u32::<LittleEndian>()? as usize;
189 let mut dictionary = Vec::with_capacity(dict_size);
190
191 for _ in 0..dict_size {
192 let len = cursor.read_u32::<LittleEndian>()? as usize;
193 let mut value = vec![0u8; len];
194 cursor.read_exact(&mut value)?;
195 dictionary.push(value);
196 }
197
198 let count = cursor.read_u64::<LittleEndian>()? as usize;
200 let mut values = Vec::with_capacity(count);
201
202 for _ in 0..count {
203 let idx = cursor.read_u32::<LittleEndian>()? as usize;
204 if idx < dictionary.len() {
205 values.push(dictionary[idx].clone());
206 }
207 }
208
209 Ok(values)
210 }
211}
212
213impl Default for DictionaryEncoder {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219#[derive(Debug, Clone)]
221pub struct RleEncoder;
222
223impl RleEncoder {
224 pub fn encode(values: &[Vec<u8>]) -> Vec<u8> {
226 let mut encoded = Vec::new();
227
228 encoded
230 .write_u64::<LittleEndian>(values.len() as u64)
231 .unwrap();
232
233 if values.is_empty() {
234 return encoded;
235 }
236
237 let mut current = &values[0];
238 let mut count: u64 = 1;
239
240 for value in values.iter().skip(1) {
241 if value == current {
242 count += 1;
243 } else {
244 encoded.write_u64::<LittleEndian>(count).unwrap();
246 encoded
247 .write_u32::<LittleEndian>(current.len() as u32)
248 .unwrap();
249 encoded.extend_from_slice(current);
250
251 current = value;
252 count = 1;
253 }
254 }
255
256 encoded.write_u64::<LittleEndian>(count).unwrap();
258 encoded
259 .write_u32::<LittleEndian>(current.len() as u32)
260 .unwrap();
261 encoded.extend_from_slice(current);
262
263 encoded
264 }
265
266 pub fn decode(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
268 let mut cursor = std::io::Cursor::new(data);
269
270 let total_count = cursor.read_u64::<LittleEndian>()? as usize;
271 let mut values = Vec::with_capacity(total_count);
272
273 while values.len() < total_count {
274 let run_length = cursor.read_u64::<LittleEndian>()? as usize;
275 let value_len = cursor.read_u32::<LittleEndian>()? as usize;
276 let mut value = vec![0u8; value_len];
277 cursor.read_exact(&mut value)?;
278
279 for _ in 0..run_length {
280 values.push(value.clone());
281 }
282 }
283
284 Ok(values)
285 }
286}
287
288#[derive(Debug, Clone)]
290pub struct DeltaEncoder;
291
292impl DeltaEncoder {
293 pub fn encode_i64(values: &[i64]) -> Vec<u8> {
295 let mut encoded = Vec::with_capacity(values.len() * 2); encoded
299 .write_u64::<LittleEndian>(values.len() as u64)
300 .unwrap();
301
302 if values.is_empty() {
303 return encoded;
304 }
305
306 encoded.write_i64::<LittleEndian>(values[0]).unwrap();
308
309 for window in values.windows(2) {
311 let delta = window[1] - window[0];
312 Self::write_varint(&mut encoded, delta);
313 }
314
315 encoded
316 }
317
318 pub fn decode_i64(data: &[u8]) -> io::Result<Vec<i64>> {
320 let mut cursor = std::io::Cursor::new(data);
321
322 let count = cursor.read_u64::<LittleEndian>()? as usize;
323
324 if count == 0 {
325 return Ok(Vec::new());
326 }
327
328 let mut values = Vec::with_capacity(count);
329 let base = cursor.read_i64::<LittleEndian>()?;
330 values.push(base);
331
332 let mut current = base;
333 for _ in 1..count {
334 let delta = Self::read_varint(&mut cursor)?;
335 current += delta;
336 values.push(current);
337 }
338
339 Ok(values)
340 }
341
342 fn write_varint(buf: &mut Vec<u8>, value: i64) {
344 let zigzag = ((value << 1) ^ (value >> 63)) as u64;
346
347 let mut v = zigzag;
348 loop {
349 if v < 0x80 {
350 buf.push(v as u8);
351 break;
352 } else {
353 buf.push((v as u8) | 0x80);
354 v >>= 7;
355 }
356 }
357 }
358
359 fn read_varint<R: Read>(reader: &mut R) -> io::Result<i64> {
361 let mut result: u64 = 0;
362 let mut shift = 0;
363
364 loop {
365 let mut byte = [0u8; 1];
366 reader.read_exact(&mut byte)?;
367
368 result |= ((byte[0] & 0x7F) as u64) << shift;
369
370 if byte[0] < 0x80 {
371 break;
372 }
373 shift += 7;
374
375 if shift > 63 {
376 return Err(io::Error::new(
377 io::ErrorKind::InvalidData,
378 "Varint too long",
379 ));
380 }
381 }
382
383 let zigzag = result;
385 Ok(((zigzag >> 1) as i64) ^ (-((zigzag & 1) as i64)))
386 }
387}
388
389#[derive(Debug)]
391pub struct ColumnEncoder;
392
393impl ColumnEncoder {
394 pub fn analyze(values: &[Vec<u8>]) -> (EncodingType, EncodingStats) {
396 if values.is_empty() {
397 return (EncodingType::Raw, EncodingStats::default());
398 }
399
400 let row_count = values.len();
401 let original_size: usize = values.iter().map(|v| v.len()).sum();
402
403 let mut distinct: std::collections::HashSet<&[u8]> = std::collections::HashSet::new();
405 for v in values {
406 distinct.insert(v.as_slice());
407 }
408 let cardinality = distinct.len();
409 let ratio = cardinality as f64 / row_count as f64;
410
411 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
413
414 let encoding = if ratio < 0.01 {
416 EncodingType::Dictionary
417 } else if ratio < 0.1 {
418 EncodingType::Rle
419 } else if is_sorted && values.iter().all(|v| v.len() == 8) {
420 EncodingType::Delta
422 } else {
423 EncodingType::Raw
424 };
425
426 let stats = EncodingStats {
427 original_size,
428 compressed_size: 0, encoding,
430 cardinality,
431 row_count,
432 is_sorted,
433 ratio: 0.0,
434 };
435
436 (encoding, stats)
437 }
438
439 pub fn encode(values: &[Vec<u8>]) -> (Vec<u8>, EncodingStats) {
441 let (encoding, mut stats) = Self::analyze(values);
442
443 let encoded = match encoding {
444 EncodingType::Dictionary => {
445 let encoder = DictionaryEncoder::build(values);
446 encoder.encode_column(values)
447 }
448 EncodingType::Rle => RleEncoder::encode(values),
449 EncodingType::Delta => {
450 let int_values: Vec<i64> = values
452 .iter()
453 .filter_map(|v| {
454 if v.len() == 8 {
455 Some(i64::from_le_bytes(v.as_slice().try_into().ok()?))
456 } else {
457 None
458 }
459 })
460 .collect();
461
462 if int_values.len() == values.len() {
463 DeltaEncoder::encode_i64(&int_values)
464 } else {
465 Self::encode_raw(values)
467 }
468 }
469 _ => Self::encode_raw(values),
470 };
471
472 let mut result = vec![encoding as u8];
474 result.extend_from_slice(&encoded);
475
476 stats.compressed_size = result.len();
477 stats.ratio = if stats.original_size > 0 {
478 stats.compressed_size as f64 / stats.original_size as f64
479 } else {
480 1.0
481 };
482
483 (result, stats)
484 }
485
486 fn encode_raw(values: &[Vec<u8>]) -> Vec<u8> {
488 let mut encoded = Vec::new();
489 encoded
490 .write_u64::<LittleEndian>(values.len() as u64)
491 .unwrap();
492
493 for value in values {
494 encoded
495 .write_u32::<LittleEndian>(value.len() as u32)
496 .unwrap();
497 encoded.extend_from_slice(value);
498 }
499
500 encoded
501 }
502
503 pub fn decode(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
505 if data.is_empty() {
506 return Ok(Vec::new());
507 }
508
509 let encoding = EncodingType::from_byte(data[0])
510 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid encoding type"))?;
511
512 let payload = &data[1..];
513
514 match encoding {
515 EncodingType::Dictionary | EncodingType::DictionaryLz4 => {
516 DictionaryEncoder::decode_column(payload)
517 }
518 EncodingType::Rle => RleEncoder::decode(payload),
519 EncodingType::Delta | EncodingType::DeltaLz4 => {
520 let int_values = DeltaEncoder::decode_i64(payload)?;
521 Ok(int_values
522 .into_iter()
523 .map(|v| v.to_le_bytes().to_vec())
524 .collect())
525 }
526 _ => Self::decode_raw(payload),
527 }
528 }
529
530 fn decode_raw(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
532 let mut cursor = std::io::Cursor::new(data);
533 let count = cursor.read_u64::<LittleEndian>()? as usize;
534
535 let mut values = Vec::with_capacity(count);
536 for _ in 0..count {
537 let len = cursor.read_u32::<LittleEndian>()? as usize;
538 let mut value = vec![0u8; len];
539 cursor.read_exact(&mut value)?;
540 values.push(value);
541 }
542
543 Ok(values)
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550
551 #[test]
552 #[ignore] fn test_dictionary_encoding() {
554 let values: Vec<Vec<u8>> = vec![
556 b"gpt-4".to_vec(),
557 b"gpt-4".to_vec(),
558 b"claude".to_vec(),
559 b"gpt-4".to_vec(),
560 b"claude".to_vec(),
561 b"gemini".to_vec(),
562 b"gpt-4".to_vec(),
563 ];
564
565 let encoder = DictionaryEncoder::build(&values);
566 assert_eq!(encoder.size(), 3); let encoded = encoder.encode_column(&values);
569 let decoded = DictionaryEncoder::decode_column(&encoded).unwrap();
570
571 assert_eq!(decoded, values);
572
573 let original_size: usize = values.iter().map(|v| v.len()).sum();
575 assert!(encoded.len() < original_size); }
577
578 #[test]
579 fn test_rle_encoding() {
580 let values: Vec<Vec<u8>> = vec![
582 b"active".to_vec(),
583 b"active".to_vec(),
584 b"active".to_vec(),
585 b"pending".to_vec(),
586 b"pending".to_vec(),
587 b"completed".to_vec(),
588 ];
589
590 let encoded = RleEncoder::encode(&values);
591 let decoded = RleEncoder::decode(&encoded).unwrap();
592
593 assert_eq!(decoded, values);
594 }
595
596 #[test]
597 fn test_delta_encoding() {
598 let values: Vec<i64> = vec![
600 1000000, 1000001, 1000002, 1000003, 1000010, 1000011, 1000012,
601 ];
602
603 let encoded = DeltaEncoder::encode_i64(&values);
604 let decoded = DeltaEncoder::decode_i64(&encoded).unwrap();
605
606 assert_eq!(decoded, values);
607
608 let original_size = values.len() * 8;
610 assert!(encoded.len() < original_size);
611 }
612
613 #[test]
614 fn test_column_encoder_auto_select() {
615 let low_cardinality: Vec<Vec<u8>> = (0..1000)
617 .map(|i| format!("model_{}", i % 5).into_bytes())
618 .collect();
619
620 let (encoding, stats) = ColumnEncoder::analyze(&low_cardinality);
621 assert_eq!(encoding, EncodingType::Dictionary);
622 assert_eq!(stats.cardinality, 5);
623
624 let (encoded, _) = ColumnEncoder::encode(&low_cardinality);
626 let decoded = ColumnEncoder::decode(&encoded).unwrap();
627 assert_eq!(decoded, low_cardinality);
628 }
629
630 #[test]
631 fn test_column_encoder_high_cardinality() {
632 let high_cardinality: Vec<Vec<u8>> = (0..100)
634 .map(|i| format!("unique_value_{}", i).into_bytes())
635 .collect();
636
637 let (encoding, _) = ColumnEncoder::analyze(&high_cardinality);
638 assert_eq!(encoding, EncodingType::Raw);
639 }
640
641 #[test]
642 fn test_encoding_roundtrip() {
643 let test_cases: Vec<Vec<Vec<u8>>> = vec![
644 vec![],
646 vec![b"test".to_vec()],
648 (0..100)
650 .map(|i| format!("v{}", i % 3).into_bytes())
651 .collect(),
652 (0..50)
654 .map(|i| format!("unique{}", i).into_bytes())
655 .collect(),
656 ];
657
658 for values in test_cases {
659 let (encoded, _) = ColumnEncoder::encode(&values);
660 let decoded = ColumnEncoder::decode(&encoded).unwrap();
661 assert_eq!(decoded, values, "Roundtrip failed");
662 }
663 }
664
665 #[test]
666 #[ignore] fn test_compression_ratios() {
668 let repeated: Vec<Vec<u8>> = (0..10000).map(|_| b"repeated_value".to_vec()).collect();
670
671 let (_encoded, stats) = ColumnEncoder::encode(&repeated);
672
673 println!("Original: {} bytes", stats.original_size);
674 println!("Compressed: {} bytes", stats.compressed_size);
675 println!("Ratio: {:.2}", stats.ratio);
676
677 assert!(
679 stats.ratio < 0.1,
680 "Expected >10x compression for repeated values"
681 );
682 }
683}