sochdb_storage/
dict_compression.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dictionary-Based Compression
16//!
17//! Implements dictionary compression for repetitive data patterns like JSON
18//! agent traces with common schemas.
19//!
20//! ## jj.md Task 5: Dictionary Compression
21//!
22//! Goals:
23//! - 2-4x better compression ratio for small payloads
24//! - Reduce storage cost by 50-70%
25//! - Faster decompression (dictionary pre-loaded)
26//!
27//! ## How It Works
28//!
29//! 1. Train a dictionary from representative sample payloads
30//! 2. Share the dictionary across an SSTable (stored in footer)
31//! 3. Use the dictionary for both compression and decompression
32//!
33//! For agent trace payloads (typically JSON with repetitive schemas like
34//! `{"prompt": ..., "response": ..., "model": ...}`), dictionary compression
35//! exploits the repeated structure for 5-10x compression ratios.
36//!
37//! ## Reference
38//!
39//! Zstd Dictionary Compression - https://facebook.github.io/zstd/#small-data
40
41use std::io;
42
43#[cfg(test)]
44use std::io::Cursor;
45
46/// Default dictionary size in bytes (32KB is a good balance)
47pub const DEFAULT_DICT_SIZE: usize = 32 * 1024;
48
49/// Minimum samples needed for effective dictionary training
50pub const MIN_TRAINING_SAMPLES: usize = 100;
51
52/// Maximum sample size for training (larger samples are truncated)
53pub const MAX_SAMPLE_SIZE: usize = 128 * 1024;
54
55/// A trained compression dictionary for Zstd.
56///
57/// The dictionary contains common patterns extracted from sample data,
58/// enabling much better compression for small, repetitive payloads.
59#[derive(Clone)]
60pub struct CompressionDictionary {
61    /// Raw dictionary bytes
62    data: Vec<u8>,
63    /// Dictionary ID (for validation)
64    id: u32,
65}
66
67impl std::fmt::Debug for CompressionDictionary {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("CompressionDictionary")
70            .field("size", &self.data.len())
71            .field("id", &self.id)
72            .finish()
73    }
74}
75
76impl CompressionDictionary {
77    /// Train a new dictionary from sample data.
78    ///
79    /// # Arguments
80    /// * `samples` - Representative sample payloads
81    /// * `dict_size` - Target dictionary size in bytes (default: 32KB)
82    ///
83    /// # Returns
84    /// A trained dictionary, or an error if training fails.
85    ///
86    /// # Example
87    /// ```ignore
88    /// let samples: Vec<Vec<u8>> = payloads.clone();
89    /// let dict = CompressionDictionary::train(&samples, 32 * 1024)?;
90    /// ```
91    pub fn train(samples: &[Vec<u8>], dict_size: usize) -> io::Result<Self> {
92        if samples.len() < MIN_TRAINING_SAMPLES {
93            return Err(io::Error::new(
94                io::ErrorKind::InvalidInput,
95                format!(
96                    "Need at least {} samples for dictionary training, got {}",
97                    MIN_TRAINING_SAMPLES,
98                    samples.len()
99                ),
100            ));
101        }
102
103        // Use zstd dictionary training - samples need to be Vec<u8> or similar
104        let dict_data = zstd::dict::from_samples(samples, dict_size)
105            .map_err(|e| io::Error::other(e.to_string()))?;
106
107        // Extract dictionary ID from the trained dictionary
108        let id = Self::extract_dict_id(&dict_data);
109
110        Ok(Self {
111            data: dict_data,
112            id,
113        })
114    }
115
116    /// Create a dictionary from raw bytes (for loading from storage).
117    pub fn from_bytes(data: Vec<u8>) -> Self {
118        let id = Self::extract_dict_id(&data);
119        Self { data, id }
120    }
121
122    /// Get the raw dictionary bytes.
123    pub fn as_bytes(&self) -> &[u8] {
124        &self.data
125    }
126
127    /// Get the dictionary size.
128    pub fn size(&self) -> usize {
129        self.data.len()
130    }
131
132    /// Get the dictionary ID.
133    pub fn id(&self) -> u32 {
134        self.id
135    }
136
137    /// Extract dictionary ID from raw bytes.
138    fn extract_dict_id(data: &[u8]) -> u32 {
139        if data.len() >= 8 {
140            // Zstd dictionary ID is at bytes 4-7
141            u32::from_le_bytes([data[4], data[5], data[6], data[7]])
142        } else {
143            0
144        }
145    }
146}
147
148/// Compressor using a pre-trained dictionary.
149pub struct DictionaryCompressor {
150    /// The compression dictionary bytes (owned copy)
151    dict_bytes: Vec<u8>,
152    /// Compression level (1-22, default 3)
153    level: i32,
154}
155
156impl DictionaryCompressor {
157    /// Create a new dictionary compressor.
158    pub fn new(dict: CompressionDictionary, level: i32) -> Self {
159        Self {
160            dict_bytes: dict.data,
161            level,
162        }
163    }
164
165    /// Create with default compression level.
166    pub fn with_default_level(dict: CompressionDictionary) -> Self {
167        Self::new(dict, 3)
168    }
169
170    /// Compress data using the dictionary.
171    pub fn compress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
172        // Create a compressor with the dictionary
173        let mut compressor = zstd::bulk::Compressor::with_dictionary(self.level, &self.dict_bytes)
174            .map_err(|e| io::Error::other(e.to_string()))?;
175
176        compressor
177            .compress(data)
178            .map_err(|e| io::Error::other(e.to_string()))
179    }
180
181    /// Get the dictionary bytes.
182    pub fn dictionary_bytes(&self) -> &[u8] {
183        &self.dict_bytes
184    }
185}
186
187/// Decompressor using a pre-trained dictionary.
188pub struct DictionaryDecompressor {
189    /// The decompression dictionary bytes
190    dict_bytes: Vec<u8>,
191}
192
193impl DictionaryDecompressor {
194    /// Create a new dictionary decompressor.
195    pub fn new(dict: CompressionDictionary) -> Self {
196        Self {
197            dict_bytes: dict.data,
198        }
199    }
200
201    /// Decompress data using the dictionary.
202    pub fn decompress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
203        // Create a decompressor with the dictionary
204        let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&self.dict_bytes)
205            .map_err(|e| io::Error::other(e.to_string()))?;
206
207        decompressor
208            .decompress(data, data.len() * 20) // estimate 20x expansion max
209            .map_err(|e| io::Error::other(e.to_string()))
210    }
211
212    /// Decompress into a pre-allocated buffer.
213    pub fn decompress_to(&self, data: &[u8], output: &mut Vec<u8>) -> io::Result<()> {
214        let result = self.decompress(data)?;
215        output.clear();
216        output.extend_from_slice(&result);
217        Ok(())
218    }
219
220    /// Get the dictionary bytes.
221    pub fn dictionary_bytes(&self) -> &[u8] {
222        &self.dict_bytes
223    }
224}
225
226/// Builder for collecting samples and training a dictionary.
227#[derive(Default)]
228pub struct DictionaryBuilder {
229    samples: Vec<Vec<u8>>,
230    max_samples: usize,
231    dict_size: usize,
232}
233
234impl DictionaryBuilder {
235    /// Create a new dictionary builder.
236    pub fn new() -> Self {
237        Self {
238            samples: Vec::new(),
239            max_samples: 10000,
240            dict_size: DEFAULT_DICT_SIZE,
241        }
242    }
243
244    /// Set the maximum number of samples to collect.
245    pub fn max_samples(mut self, max: usize) -> Self {
246        self.max_samples = max;
247        self
248    }
249
250    /// Set the target dictionary size.
251    pub fn dict_size(mut self, size: usize) -> Self {
252        self.dict_size = size;
253        self
254    }
255
256    /// Add a sample for training.
257    pub fn add_sample(&mut self, sample: Vec<u8>) {
258        if self.samples.len() < self.max_samples {
259            self.samples.push(sample);
260        }
261    }
262
263    /// Add a sample from a slice.
264    pub fn add_sample_slice(&mut self, sample: &[u8]) {
265        if self.samples.len() < self.max_samples {
266            self.samples.push(sample.to_vec());
267        }
268    }
269
270    /// Get the current number of samples.
271    pub fn sample_count(&self) -> usize {
272        self.samples.len()
273    }
274
275    /// Check if we have enough samples to train.
276    pub fn can_train(&self) -> bool {
277        self.samples.len() >= MIN_TRAINING_SAMPLES
278    }
279
280    /// Train a dictionary from collected samples.
281    pub fn build(self) -> io::Result<CompressionDictionary> {
282        CompressionDictionary::train(&self.samples, self.dict_size)
283    }
284}
285
286/// Statistics for dictionary compression.
287#[derive(Debug, Default, Clone)]
288pub struct DictionaryCompressionStats {
289    /// Total bytes before compression
290    pub bytes_in: u64,
291    /// Total bytes after compression
292    pub bytes_out: u64,
293    /// Number of compressions
294    pub compressions: u64,
295    /// Number of decompressions
296    pub decompressions: u64,
297}
298
299impl DictionaryCompressionStats {
300    /// Record a compression operation.
301    pub fn record_compression(&mut self, input_size: usize, output_size: usize) {
302        self.bytes_in += input_size as u64;
303        self.bytes_out += output_size as u64;
304        self.compressions += 1;
305    }
306
307    /// Record a decompression operation.
308    pub fn record_decompression(&mut self, compressed_size: usize, decompressed_size: usize) {
309        self.bytes_out += compressed_size as u64;
310        self.bytes_in += decompressed_size as u64;
311        self.decompressions += 1;
312    }
313
314    /// Get the compression ratio.
315    pub fn compression_ratio(&self) -> f64 {
316        if self.bytes_out == 0 {
317            1.0
318        } else {
319            self.bytes_in as f64 / self.bytes_out as f64
320        }
321    }
322
323    /// Get the space savings percentage.
324    pub fn space_savings(&self) -> f64 {
325        if self.bytes_in == 0 {
326            0.0
327        } else {
328            (1.0 - (self.bytes_out as f64 / self.bytes_in as f64)) * 100.0
329        }
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    fn generate_json_samples(count: usize) -> Vec<Vec<u8>> {
338        (0..count)
339            .map(|i| {
340                format!(
341                    r#"{{"id":{},"type":"trace","agent":"agent_{}","model":"gpt-4","prompt":"Hello, how are you?","response":"I am doing well, thank you!","tokens":{{"input":10,"output":15}},"latency_ms":{}}}"#,
342                    i,
343                    i % 10,
344                    100 + (i % 500)
345                )
346                .into_bytes()
347            })
348            .collect()
349    }
350
351    #[test]
352    fn test_dictionary_builder() {
353        let mut builder = DictionaryBuilder::new()
354            .max_samples(200)
355            .dict_size(16 * 1024);
356
357        let samples = generate_json_samples(150);
358        for sample in samples {
359            builder.add_sample(sample);
360        }
361
362        assert_eq!(builder.sample_count(), 150);
363        assert!(builder.can_train());
364
365        let dict = builder.build().unwrap();
366        assert!(dict.size() > 0);
367        assert!(dict.size() <= 16 * 1024);
368    }
369
370    #[test]
371    fn test_dictionary_compression_roundtrip() {
372        let samples = generate_json_samples(200);
373
374        let dict = CompressionDictionary::train(&samples, 16 * 1024).unwrap();
375
376        let compressor = DictionaryCompressor::with_default_level(dict.clone());
377        let decompressor = DictionaryDecompressor::new(dict);
378
379        // Test compression/decompression of a new sample
380        let test_data = r#"{"id":9999,"type":"trace","agent":"agent_5","model":"gpt-4","prompt":"Test message","response":"Test response","tokens":{"input":5,"output":10},"latency_ms":150}"#.as_bytes();
381
382        let compressed = compressor.compress(test_data).unwrap();
383        let decompressed = decompressor.decompress(&compressed).unwrap();
384
385        assert_eq!(decompressed, test_data);
386
387        // Verify compression ratio
388        let ratio = test_data.len() as f64 / compressed.len() as f64;
389        println!(
390            "Compression ratio: {:.2}x ({} -> {} bytes)",
391            ratio,
392            test_data.len(),
393            compressed.len()
394        );
395
396        // With dictionary, we should get at least 1x compression
397        assert!(
398            ratio >= 0.9,
399            "Expected reasonable compression, got {:.2}x",
400            ratio
401        );
402    }
403
404    #[test]
405    fn test_dictionary_from_bytes() {
406        let samples = generate_json_samples(150);
407
408        let original = CompressionDictionary::train(&samples, 8 * 1024).unwrap();
409        let bytes = original.as_bytes().to_vec();
410
411        let restored = CompressionDictionary::from_bytes(bytes);
412
413        assert_eq!(restored.id(), original.id());
414        assert_eq!(restored.size(), original.size());
415    }
416
417    #[test]
418    fn test_compression_stats() {
419        let mut stats = DictionaryCompressionStats::default();
420
421        stats.record_compression(1000, 200);
422        stats.record_compression(2000, 400);
423
424        assert_eq!(stats.compressions, 2);
425        assert_eq!(stats.bytes_in, 3000);
426        assert_eq!(stats.bytes_out, 600);
427        assert!((stats.compression_ratio() - 5.0).abs() < 0.01);
428        assert!((stats.space_savings() - 80.0).abs() < 0.01);
429    }
430
431    #[test]
432    fn test_insufficient_samples() {
433        let samples: Vec<Vec<u8>> = vec![b"too few samples".to_vec()];
434        let result = CompressionDictionary::train(&samples, DEFAULT_DICT_SIZE);
435        assert!(result.is_err());
436    }
437
438    #[test]
439    fn test_dictionary_improves_small_payload_compression() {
440        // Generate training samples
441        let samples = generate_json_samples(200);
442
443        let dict = CompressionDictionary::train(&samples, 32 * 1024).unwrap();
444        let compressor = DictionaryCompressor::with_default_level(dict);
445
446        // Compress a small payload (typical agent trace)
447        let small_payload = r#"{"id":1,"type":"trace","agent":"agent_1","model":"gpt-4","prompt":"Hi","response":"Hello!","tokens":{"input":1,"output":2},"latency_ms":50}"#.as_bytes();
448
449        let with_dict = compressor.compress(small_payload).unwrap();
450        let without_dict = zstd::encode_all(Cursor::new(small_payload), 3).unwrap();
451
452        println!("Small payload: {} bytes", small_payload.len());
453        println!(
454            "With dictionary: {} bytes ({:.1}x)",
455            with_dict.len(),
456            small_payload.len() as f64 / with_dict.len() as f64
457        );
458        println!(
459            "Without dictionary: {} bytes ({:.1}x)",
460            without_dict.len(),
461            small_payload.len() as f64 / without_dict.len() as f64
462        );
463
464        // Dictionary should provide better compression for small payloads
465        // (or at least not be worse - small payloads sometimes expand with standard zstd)
466        assert!(
467            with_dict.len() <= without_dict.len() + 50, // Allow some margin
468            "Dictionary compression should be competitive"
469        );
470    }
471}