Skip to main content

sochdb_storage/
dict_compression.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Dictionary-Based Compression
19//!
20//! Implements dictionary compression for repetitive data patterns like JSON
21//! agent traces with common schemas.
22//!
23//! ## jj.md Task 5: Dictionary Compression
24//!
25//! Goals:
26//! - 2-4x better compression ratio for small payloads
27//! - Reduce storage cost by 50-70%
28//! - Faster decompression (dictionary pre-loaded)
29//!
30//! ## How It Works
31//!
32//! 1. Train a dictionary from representative sample payloads
33//! 2. Share the dictionary across an SSTable (stored in footer)
34//! 3. Use the dictionary for both compression and decompression
35//!
36//! For agent trace payloads (typically JSON with repetitive schemas like
37//! `{"prompt": ..., "response": ..., "model": ...}`), dictionary compression
38//! exploits the repeated structure for 5-10x compression ratios.
39//!
40//! ## Reference
41//!
42//! Zstd Dictionary Compression - https://facebook.github.io/zstd/#small-data
43
44use std::io;
45
46#[cfg(test)]
47use std::io::Cursor;
48
49/// Default dictionary size in bytes (32KB is a good balance)
50pub const DEFAULT_DICT_SIZE: usize = 32 * 1024;
51
52/// Minimum samples needed for effective dictionary training
53pub const MIN_TRAINING_SAMPLES: usize = 100;
54
55/// Maximum sample size for training (larger samples are truncated)
56pub const MAX_SAMPLE_SIZE: usize = 128 * 1024;
57
58/// A trained compression dictionary for Zstd.
59///
60/// The dictionary contains common patterns extracted from sample data,
61/// enabling much better compression for small, repetitive payloads.
62#[derive(Clone)]
63pub struct CompressionDictionary {
64    /// Raw dictionary bytes
65    data: Vec<u8>,
66    /// Dictionary ID (for validation)
67    id: u32,
68}
69
70impl std::fmt::Debug for CompressionDictionary {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("CompressionDictionary")
73            .field("size", &self.data.len())
74            .field("id", &self.id)
75            .finish()
76    }
77}
78
79impl CompressionDictionary {
80    /// Train a new dictionary from sample data.
81    ///
82    /// # Arguments
83    /// * `samples` - Representative sample payloads
84    /// * `dict_size` - Target dictionary size in bytes (default: 32KB)
85    ///
86    /// # Returns
87    /// A trained dictionary, or an error if training fails.
88    ///
89    /// # Example
90    /// ```ignore
91    /// let samples: Vec<Vec<u8>> = payloads.clone();
92    /// let dict = CompressionDictionary::train(&samples, 32 * 1024)?;
93    /// ```
94    pub fn train(samples: &[Vec<u8>], dict_size: usize) -> io::Result<Self> {
95        if samples.len() < MIN_TRAINING_SAMPLES {
96            return Err(io::Error::new(
97                io::ErrorKind::InvalidInput,
98                format!(
99                    "Need at least {} samples for dictionary training, got {}",
100                    MIN_TRAINING_SAMPLES,
101                    samples.len()
102                ),
103            ));
104        }
105
106        // Use zstd dictionary training - samples need to be Vec<u8> or similar
107        let dict_data = zstd::dict::from_samples(samples, dict_size)
108            .map_err(|e| io::Error::other(e.to_string()))?;
109
110        // Extract dictionary ID from the trained dictionary
111        let id = Self::extract_dict_id(&dict_data);
112
113        Ok(Self {
114            data: dict_data,
115            id,
116        })
117    }
118
119    /// Create a dictionary from raw bytes (for loading from storage).
120    pub fn from_bytes(data: Vec<u8>) -> Self {
121        let id = Self::extract_dict_id(&data);
122        Self { data, id }
123    }
124
125    /// Get the raw dictionary bytes.
126    pub fn as_bytes(&self) -> &[u8] {
127        &self.data
128    }
129
130    /// Get the dictionary size.
131    pub fn size(&self) -> usize {
132        self.data.len()
133    }
134
135    /// Get the dictionary ID.
136    pub fn id(&self) -> u32 {
137        self.id
138    }
139
140    /// Extract dictionary ID from raw bytes.
141    fn extract_dict_id(data: &[u8]) -> u32 {
142        if data.len() >= 8 {
143            // Zstd dictionary ID is at bytes 4-7
144            u32::from_le_bytes([data[4], data[5], data[6], data[7]])
145        } else {
146            0
147        }
148    }
149}
150
151/// Compressor using a pre-trained dictionary.
152pub struct DictionaryCompressor {
153    /// The compression dictionary bytes (owned copy)
154    dict_bytes: Vec<u8>,
155    /// Compression level (1-22, default 3)
156    level: i32,
157}
158
159impl DictionaryCompressor {
160    /// Create a new dictionary compressor.
161    pub fn new(dict: CompressionDictionary, level: i32) -> Self {
162        Self {
163            dict_bytes: dict.data,
164            level,
165        }
166    }
167
168    /// Create with default compression level.
169    pub fn with_default_level(dict: CompressionDictionary) -> Self {
170        Self::new(dict, 3)
171    }
172
173    /// Compress data using the dictionary.
174    pub fn compress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
175        // Create a compressor with the dictionary
176        let mut compressor = zstd::bulk::Compressor::with_dictionary(self.level, &self.dict_bytes)
177            .map_err(|e| io::Error::other(e.to_string()))?;
178
179        compressor
180            .compress(data)
181            .map_err(|e| io::Error::other(e.to_string()))
182    }
183
184    /// Get the dictionary bytes.
185    pub fn dictionary_bytes(&self) -> &[u8] {
186        &self.dict_bytes
187    }
188}
189
190/// Decompressor using a pre-trained dictionary.
191pub struct DictionaryDecompressor {
192    /// The decompression dictionary bytes
193    dict_bytes: Vec<u8>,
194}
195
196impl DictionaryDecompressor {
197    /// Create a new dictionary decompressor.
198    pub fn new(dict: CompressionDictionary) -> Self {
199        Self {
200            dict_bytes: dict.data,
201        }
202    }
203
204    /// Decompress data using the dictionary.
205    pub fn decompress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
206        // Create a decompressor with the dictionary
207        let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&self.dict_bytes)
208            .map_err(|e| io::Error::other(e.to_string()))?;
209
210        decompressor
211            .decompress(data, data.len() * 20) // estimate 20x expansion max
212            .map_err(|e| io::Error::other(e.to_string()))
213    }
214
215    /// Decompress into a pre-allocated buffer.
216    pub fn decompress_to(&self, data: &[u8], output: &mut Vec<u8>) -> io::Result<()> {
217        let result = self.decompress(data)?;
218        output.clear();
219        output.extend_from_slice(&result);
220        Ok(())
221    }
222
223    /// Get the dictionary bytes.
224    pub fn dictionary_bytes(&self) -> &[u8] {
225        &self.dict_bytes
226    }
227}
228
229/// Builder for collecting samples and training a dictionary.
230#[derive(Default)]
231pub struct DictionaryBuilder {
232    samples: Vec<Vec<u8>>,
233    max_samples: usize,
234    dict_size: usize,
235}
236
237impl DictionaryBuilder {
238    /// Create a new dictionary builder.
239    pub fn new() -> Self {
240        Self {
241            samples: Vec::new(),
242            max_samples: 10000,
243            dict_size: DEFAULT_DICT_SIZE,
244        }
245    }
246
247    /// Set the maximum number of samples to collect.
248    pub fn max_samples(mut self, max: usize) -> Self {
249        self.max_samples = max;
250        self
251    }
252
253    /// Set the target dictionary size.
254    pub fn dict_size(mut self, size: usize) -> Self {
255        self.dict_size = size;
256        self
257    }
258
259    /// Add a sample for training.
260    pub fn add_sample(&mut self, sample: Vec<u8>) {
261        if self.samples.len() < self.max_samples {
262            self.samples.push(sample);
263        }
264    }
265
266    /// Add a sample from a slice.
267    pub fn add_sample_slice(&mut self, sample: &[u8]) {
268        if self.samples.len() < self.max_samples {
269            self.samples.push(sample.to_vec());
270        }
271    }
272
273    /// Get the current number of samples.
274    pub fn sample_count(&self) -> usize {
275        self.samples.len()
276    }
277
278    /// Check if we have enough samples to train.
279    pub fn can_train(&self) -> bool {
280        self.samples.len() >= MIN_TRAINING_SAMPLES
281    }
282
283    /// Train a dictionary from collected samples.
284    pub fn build(self) -> io::Result<CompressionDictionary> {
285        CompressionDictionary::train(&self.samples, self.dict_size)
286    }
287}
288
289/// Statistics for dictionary compression.
290#[derive(Debug, Default, Clone)]
291pub struct DictionaryCompressionStats {
292    /// Total bytes before compression
293    pub bytes_in: u64,
294    /// Total bytes after compression
295    pub bytes_out: u64,
296    /// Number of compressions
297    pub compressions: u64,
298    /// Number of decompressions
299    pub decompressions: u64,
300}
301
302impl DictionaryCompressionStats {
303    /// Record a compression operation.
304    pub fn record_compression(&mut self, input_size: usize, output_size: usize) {
305        self.bytes_in += input_size as u64;
306        self.bytes_out += output_size as u64;
307        self.compressions += 1;
308    }
309
310    /// Record a decompression operation.
311    pub fn record_decompression(&mut self, compressed_size: usize, decompressed_size: usize) {
312        self.bytes_out += compressed_size as u64;
313        self.bytes_in += decompressed_size as u64;
314        self.decompressions += 1;
315    }
316
317    /// Get the compression ratio.
318    pub fn compression_ratio(&self) -> f64 {
319        if self.bytes_out == 0 {
320            1.0
321        } else {
322            self.bytes_in as f64 / self.bytes_out as f64
323        }
324    }
325
326    /// Get the space savings percentage.
327    pub fn space_savings(&self) -> f64 {
328        if self.bytes_in == 0 {
329            0.0
330        } else {
331            (1.0 - (self.bytes_out as f64 / self.bytes_in as f64)) * 100.0
332        }
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    fn generate_json_samples(count: usize) -> Vec<Vec<u8>> {
341        (0..count)
342            .map(|i| {
343                format!(
344                    r#"{{"id":{},"type":"trace","agent":"agent_{}","model":"gpt-4","prompt":"Hello, how are you?","response":"I am doing well, thank you!","tokens":{{"input":10,"output":15}},"latency_ms":{}}}"#,
345                    i,
346                    i % 10,
347                    100 + (i % 500)
348                )
349                .into_bytes()
350            })
351            .collect()
352    }
353
354    #[test]
355    fn test_dictionary_builder() {
356        let mut builder = DictionaryBuilder::new()
357            .max_samples(200)
358            .dict_size(16 * 1024);
359
360        let samples = generate_json_samples(150);
361        for sample in samples {
362            builder.add_sample(sample);
363        }
364
365        assert_eq!(builder.sample_count(), 150);
366        assert!(builder.can_train());
367
368        let dict = builder.build().unwrap();
369        assert!(dict.size() > 0);
370        assert!(dict.size() <= 16 * 1024);
371    }
372
373    #[test]
374    fn test_dictionary_compression_roundtrip() {
375        let samples = generate_json_samples(200);
376
377        let dict = CompressionDictionary::train(&samples, 16 * 1024).unwrap();
378
379        let compressor = DictionaryCompressor::with_default_level(dict.clone());
380        let decompressor = DictionaryDecompressor::new(dict);
381
382        // Test compression/decompression of a new sample
383        let test_data = r#"{"id":9999,"type":"trace","agent":"agent_5","model":"gpt-4","prompt":"Test message","response":"Test response","tokens":{"input":5,"output":10},"latency_ms":150}"#.as_bytes();
384
385        let compressed = compressor.compress(test_data).unwrap();
386        let decompressed = decompressor.decompress(&compressed).unwrap();
387
388        assert_eq!(decompressed, test_data);
389
390        // Verify compression ratio
391        let ratio = test_data.len() as f64 / compressed.len() as f64;
392        println!(
393            "Compression ratio: {:.2}x ({} -> {} bytes)",
394            ratio,
395            test_data.len(),
396            compressed.len()
397        );
398
399        // With dictionary, we should get at least 1x compression
400        assert!(
401            ratio >= 0.9,
402            "Expected reasonable compression, got {:.2}x",
403            ratio
404        );
405    }
406
407    #[test]
408    fn test_dictionary_from_bytes() {
409        let samples = generate_json_samples(150);
410
411        let original = CompressionDictionary::train(&samples, 8 * 1024).unwrap();
412        let bytes = original.as_bytes().to_vec();
413
414        let restored = CompressionDictionary::from_bytes(bytes);
415
416        assert_eq!(restored.id(), original.id());
417        assert_eq!(restored.size(), original.size());
418    }
419
420    #[test]
421    fn test_compression_stats() {
422        let mut stats = DictionaryCompressionStats::default();
423
424        stats.record_compression(1000, 200);
425        stats.record_compression(2000, 400);
426
427        assert_eq!(stats.compressions, 2);
428        assert_eq!(stats.bytes_in, 3000);
429        assert_eq!(stats.bytes_out, 600);
430        assert!((stats.compression_ratio() - 5.0).abs() < 0.01);
431        assert!((stats.space_savings() - 80.0).abs() < 0.01);
432    }
433
434    #[test]
435    fn test_insufficient_samples() {
436        let samples: Vec<Vec<u8>> = vec![b"too few samples".to_vec()];
437        let result = CompressionDictionary::train(&samples, DEFAULT_DICT_SIZE);
438        assert!(result.is_err());
439    }
440
441    #[test]
442    fn test_dictionary_improves_small_payload_compression() {
443        // Generate training samples
444        let samples = generate_json_samples(200);
445
446        let dict = CompressionDictionary::train(&samples, 32 * 1024).unwrap();
447        let compressor = DictionaryCompressor::with_default_level(dict);
448
449        // Compress a small payload (typical agent trace)
450        let small_payload = r#"{"id":1,"type":"trace","agent":"agent_1","model":"gpt-4","prompt":"Hi","response":"Hello!","tokens":{"input":1,"output":2},"latency_ms":50}"#.as_bytes();
451
452        let with_dict = compressor.compress(small_payload).unwrap();
453        let without_dict = zstd::encode_all(Cursor::new(small_payload), 3).unwrap();
454
455        println!("Small payload: {} bytes", small_payload.len());
456        println!(
457            "With dictionary: {} bytes ({:.1}x)",
458            with_dict.len(),
459            small_payload.len() as f64 / with_dict.len() as f64
460        );
461        println!(
462            "Without dictionary: {} bytes ({:.1}x)",
463            without_dict.len(),
464            small_payload.len() as f64 / without_dict.len() as f64
465        );
466
467        // Dictionary should provide better compression for small payloads
468        // (or at least not be worse - small payloads sometimes expand with standard zstd)
469        assert!(
470            with_dict.len() <= without_dict.len() + 50, // Allow some margin
471            "Dictionary compression should be competitive"
472        );
473    }
474}