1use std::io;
45
46#[cfg(test)]
47use std::io::Cursor;
48
49pub const DEFAULT_DICT_SIZE: usize = 32 * 1024;
51
52pub const MIN_TRAINING_SAMPLES: usize = 100;
54
55pub const MAX_SAMPLE_SIZE: usize = 128 * 1024;
57
58#[derive(Clone)]
63pub struct CompressionDictionary {
64 data: Vec<u8>,
66 id: u32,
68}
69
70impl std::fmt::Debug for CompressionDictionary {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("CompressionDictionary")
73 .field("size", &self.data.len())
74 .field("id", &self.id)
75 .finish()
76 }
77}
78
79impl CompressionDictionary {
80 pub fn train(samples: &[Vec<u8>], dict_size: usize) -> io::Result<Self> {
95 if samples.len() < MIN_TRAINING_SAMPLES {
96 return Err(io::Error::new(
97 io::ErrorKind::InvalidInput,
98 format!(
99 "Need at least {} samples for dictionary training, got {}",
100 MIN_TRAINING_SAMPLES,
101 samples.len()
102 ),
103 ));
104 }
105
106 let dict_data = zstd::dict::from_samples(samples, dict_size)
108 .map_err(|e| io::Error::other(e.to_string()))?;
109
110 let id = Self::extract_dict_id(&dict_data);
112
113 Ok(Self {
114 data: dict_data,
115 id,
116 })
117 }
118
119 pub fn from_bytes(data: Vec<u8>) -> Self {
121 let id = Self::extract_dict_id(&data);
122 Self { data, id }
123 }
124
125 pub fn as_bytes(&self) -> &[u8] {
127 &self.data
128 }
129
130 pub fn size(&self) -> usize {
132 self.data.len()
133 }
134
135 pub fn id(&self) -> u32 {
137 self.id
138 }
139
140 fn extract_dict_id(data: &[u8]) -> u32 {
142 if data.len() >= 8 {
143 u32::from_le_bytes([data[4], data[5], data[6], data[7]])
145 } else {
146 0
147 }
148 }
149}
150
151pub struct DictionaryCompressor {
153 dict_bytes: Vec<u8>,
155 level: i32,
157}
158
159impl DictionaryCompressor {
160 pub fn new(dict: CompressionDictionary, level: i32) -> Self {
162 Self {
163 dict_bytes: dict.data,
164 level,
165 }
166 }
167
168 pub fn with_default_level(dict: CompressionDictionary) -> Self {
170 Self::new(dict, 3)
171 }
172
173 pub fn compress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
175 let mut compressor = zstd::bulk::Compressor::with_dictionary(self.level, &self.dict_bytes)
177 .map_err(|e| io::Error::other(e.to_string()))?;
178
179 compressor
180 .compress(data)
181 .map_err(|e| io::Error::other(e.to_string()))
182 }
183
184 pub fn dictionary_bytes(&self) -> &[u8] {
186 &self.dict_bytes
187 }
188}
189
190pub struct DictionaryDecompressor {
192 dict_bytes: Vec<u8>,
194}
195
196impl DictionaryDecompressor {
197 pub fn new(dict: CompressionDictionary) -> Self {
199 Self {
200 dict_bytes: dict.data,
201 }
202 }
203
204 pub fn decompress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
206 let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&self.dict_bytes)
208 .map_err(|e| io::Error::other(e.to_string()))?;
209
210 decompressor
211 .decompress(data, data.len() * 20) .map_err(|e| io::Error::other(e.to_string()))
213 }
214
215 pub fn decompress_to(&self, data: &[u8], output: &mut Vec<u8>) -> io::Result<()> {
217 let result = self.decompress(data)?;
218 output.clear();
219 output.extend_from_slice(&result);
220 Ok(())
221 }
222
223 pub fn dictionary_bytes(&self) -> &[u8] {
225 &self.dict_bytes
226 }
227}
228
229#[derive(Default)]
231pub struct DictionaryBuilder {
232 samples: Vec<Vec<u8>>,
233 max_samples: usize,
234 dict_size: usize,
235}
236
237impl DictionaryBuilder {
238 pub fn new() -> Self {
240 Self {
241 samples: Vec::new(),
242 max_samples: 10000,
243 dict_size: DEFAULT_DICT_SIZE,
244 }
245 }
246
247 pub fn max_samples(mut self, max: usize) -> Self {
249 self.max_samples = max;
250 self
251 }
252
253 pub fn dict_size(mut self, size: usize) -> Self {
255 self.dict_size = size;
256 self
257 }
258
259 pub fn add_sample(&mut self, sample: Vec<u8>) {
261 if self.samples.len() < self.max_samples {
262 self.samples.push(sample);
263 }
264 }
265
266 pub fn add_sample_slice(&mut self, sample: &[u8]) {
268 if self.samples.len() < self.max_samples {
269 self.samples.push(sample.to_vec());
270 }
271 }
272
273 pub fn sample_count(&self) -> usize {
275 self.samples.len()
276 }
277
278 pub fn can_train(&self) -> bool {
280 self.samples.len() >= MIN_TRAINING_SAMPLES
281 }
282
283 pub fn build(self) -> io::Result<CompressionDictionary> {
285 CompressionDictionary::train(&self.samples, self.dict_size)
286 }
287}
288
289#[derive(Debug, Default, Clone)]
291pub struct DictionaryCompressionStats {
292 pub bytes_in: u64,
294 pub bytes_out: u64,
296 pub compressions: u64,
298 pub decompressions: u64,
300}
301
302impl DictionaryCompressionStats {
303 pub fn record_compression(&mut self, input_size: usize, output_size: usize) {
305 self.bytes_in += input_size as u64;
306 self.bytes_out += output_size as u64;
307 self.compressions += 1;
308 }
309
310 pub fn record_decompression(&mut self, compressed_size: usize, decompressed_size: usize) {
312 self.bytes_out += compressed_size as u64;
313 self.bytes_in += decompressed_size as u64;
314 self.decompressions += 1;
315 }
316
317 pub fn compression_ratio(&self) -> f64 {
319 if self.bytes_out == 0 {
320 1.0
321 } else {
322 self.bytes_in as f64 / self.bytes_out as f64
323 }
324 }
325
326 pub fn space_savings(&self) -> f64 {
328 if self.bytes_in == 0 {
329 0.0
330 } else {
331 (1.0 - (self.bytes_out as f64 / self.bytes_in as f64)) * 100.0
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 fn generate_json_samples(count: usize) -> Vec<Vec<u8>> {
341 (0..count)
342 .map(|i| {
343 format!(
344 r#"{{"id":{},"type":"trace","agent":"agent_{}","model":"gpt-4","prompt":"Hello, how are you?","response":"I am doing well, thank you!","tokens":{{"input":10,"output":15}},"latency_ms":{}}}"#,
345 i,
346 i % 10,
347 100 + (i % 500)
348 )
349 .into_bytes()
350 })
351 .collect()
352 }
353
354 #[test]
355 fn test_dictionary_builder() {
356 let mut builder = DictionaryBuilder::new()
357 .max_samples(200)
358 .dict_size(16 * 1024);
359
360 let samples = generate_json_samples(150);
361 for sample in samples {
362 builder.add_sample(sample);
363 }
364
365 assert_eq!(builder.sample_count(), 150);
366 assert!(builder.can_train());
367
368 let dict = builder.build().unwrap();
369 assert!(dict.size() > 0);
370 assert!(dict.size() <= 16 * 1024);
371 }
372
373 #[test]
374 fn test_dictionary_compression_roundtrip() {
375 let samples = generate_json_samples(200);
376
377 let dict = CompressionDictionary::train(&samples, 16 * 1024).unwrap();
378
379 let compressor = DictionaryCompressor::with_default_level(dict.clone());
380 let decompressor = DictionaryDecompressor::new(dict);
381
382 let test_data = r#"{"id":9999,"type":"trace","agent":"agent_5","model":"gpt-4","prompt":"Test message","response":"Test response","tokens":{"input":5,"output":10},"latency_ms":150}"#.as_bytes();
384
385 let compressed = compressor.compress(test_data).unwrap();
386 let decompressed = decompressor.decompress(&compressed).unwrap();
387
388 assert_eq!(decompressed, test_data);
389
390 let ratio = test_data.len() as f64 / compressed.len() as f64;
392 println!(
393 "Compression ratio: {:.2}x ({} -> {} bytes)",
394 ratio,
395 test_data.len(),
396 compressed.len()
397 );
398
399 assert!(
401 ratio >= 0.9,
402 "Expected reasonable compression, got {:.2}x",
403 ratio
404 );
405 }
406
407 #[test]
408 fn test_dictionary_from_bytes() {
409 let samples = generate_json_samples(150);
410
411 let original = CompressionDictionary::train(&samples, 8 * 1024).unwrap();
412 let bytes = original.as_bytes().to_vec();
413
414 let restored = CompressionDictionary::from_bytes(bytes);
415
416 assert_eq!(restored.id(), original.id());
417 assert_eq!(restored.size(), original.size());
418 }
419
420 #[test]
421 fn test_compression_stats() {
422 let mut stats = DictionaryCompressionStats::default();
423
424 stats.record_compression(1000, 200);
425 stats.record_compression(2000, 400);
426
427 assert_eq!(stats.compressions, 2);
428 assert_eq!(stats.bytes_in, 3000);
429 assert_eq!(stats.bytes_out, 600);
430 assert!((stats.compression_ratio() - 5.0).abs() < 0.01);
431 assert!((stats.space_savings() - 80.0).abs() < 0.01);
432 }
433
434 #[test]
435 fn test_insufficient_samples() {
436 let samples: Vec<Vec<u8>> = vec![b"too few samples".to_vec()];
437 let result = CompressionDictionary::train(&samples, DEFAULT_DICT_SIZE);
438 assert!(result.is_err());
439 }
440
441 #[test]
442 fn test_dictionary_improves_small_payload_compression() {
443 let samples = generate_json_samples(200);
445
446 let dict = CompressionDictionary::train(&samples, 32 * 1024).unwrap();
447 let compressor = DictionaryCompressor::with_default_level(dict);
448
449 let small_payload = r#"{"id":1,"type":"trace","agent":"agent_1","model":"gpt-4","prompt":"Hi","response":"Hello!","tokens":{"input":1,"output":2},"latency_ms":50}"#.as_bytes();
451
452 let with_dict = compressor.compress(small_payload).unwrap();
453 let without_dict = zstd::encode_all(Cursor::new(small_payload), 3).unwrap();
454
455 println!("Small payload: {} bytes", small_payload.len());
456 println!(
457 "With dictionary: {} bytes ({:.1}x)",
458 with_dict.len(),
459 small_payload.len() as f64 / with_dict.len() as f64
460 );
461 println!(
462 "Without dictionary: {} bytes ({:.1}x)",
463 without_dict.len(),
464 small_payload.len() as f64 / without_dict.len() as f64
465 );
466
467 assert!(
470 with_dict.len() <= without_dict.len() + 50, "Dictionary compression should be competitive"
472 );
473 }
474}