1use crate::{
7 compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8 domain::{DomainError, DomainResult},
9 stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone)]
16pub struct StreamingCompressor {
17 skeleton_compressor: SchemaCompressor,
19 content_compressor: SchemaCompressor,
21 stats: CompressionStats,
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct CompressionStats {
27 pub total_input_bytes: usize,
29 pub total_output_bytes: usize,
31 pub frames_processed: u32,
33 pub priority_ratios: HashMap<u8, f32>,
35}
36
37#[derive(Debug, Clone)]
39pub struct CompressedFrame {
40 pub frame: StreamFrame,
42 pub compressed_data: CompressedData,
44 pub decompression_metadata: DecompressionMetadata,
46}
47
48#[derive(Debug, Clone)]
49pub struct DecompressionMetadata {
50 pub strategy: CompressionStrategy,
52 pub dictionary_map: HashMap<u16, String>,
54 pub delta_bases: HashMap<String, f64>,
56 pub priority_hints: HashMap<u8, String>,
58}
59
60impl StreamingCompressor {
61 pub fn new() -> Self {
63 Self {
64 skeleton_compressor: SchemaCompressor::new(),
65 content_compressor: SchemaCompressor::new(),
66 stats: CompressionStats::default(),
67 }
68 }
69
70 pub fn with_strategies(
72 skeleton_strategy: CompressionStrategy,
73 content_strategy: CompressionStrategy,
74 ) -> Self {
75 Self {
76 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
77 content_compressor: SchemaCompressor::with_strategy(content_strategy),
78 stats: CompressionStats::default(),
79 }
80 }
81
82 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
84 let compressor = self.select_compressor_for_priority(frame.priority);
85
86 let original_size = serde_json::to_string(&frame.data)
88 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
89 .len();
90
91 let compressed_data = compressor.compress(&frame.data)?;
93
94 self.update_stats(
96 frame.priority,
97 original_size,
98 compressed_data.compressed_size,
99 );
100
101 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
103
104 Ok(CompressedFrame {
105 frame,
106 compressed_data,
107 decompression_metadata,
108 })
109 }
110
111 pub fn optimize_for_data(
113 &mut self,
114 skeleton: &JsonValue,
115 sample_data: &[JsonValue],
116 ) -> DomainResult<()> {
117 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
119
120 if !sample_data.is_empty() {
122 let combined_sample = JsonValue::Array(sample_data.to_vec());
124 self.content_compressor
125 .analyze_and_optimize(&combined_sample)?;
126 }
127
128 Ok(())
129 }
130
131 pub fn get_stats(&self) -> &CompressionStats {
133 &self.stats
134 }
135
136 pub fn reset_stats(&mut self) {
138 self.stats = CompressionStats::default();
139 }
140
141 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
143 match priority {
144 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
146 _ => &mut self.content_compressor,
148 }
149 }
150
151 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
153 self.stats.total_input_bytes += original_size;
154 self.stats.total_output_bytes += compressed_size;
155 self.stats.frames_processed += 1;
156
157 let ratio = if original_size > 0 {
158 compressed_size as f32 / original_size as f32
159 } else {
160 1.0
161 };
162
163 self.stats.priority_ratios.insert(priority.value(), ratio);
164 }
165
166 fn create_decompression_metadata(
168 &self,
169 compressed_data: &CompressedData,
170 ) -> DomainResult<DecompressionMetadata> {
171 let mut dictionary_map = HashMap::new();
172 let mut delta_bases = HashMap::new();
173
174 for (key, value) in &compressed_data.compression_metadata {
176 if key.starts_with("dict_") {
177 if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
178 && let Some(string_val) = value.as_str()
179 {
180 dictionary_map.insert(index, string_val.to_string());
181 }
182 } else if key.starts_with("base_") {
183 let path = key.strip_prefix("base_").unwrap();
184 if let Some(num) = value.as_f64() {
185 delta_bases.insert(path.to_string(), num);
186 }
187 }
188 }
189
190 Ok(DecompressionMetadata {
191 strategy: compressed_data.strategy.clone(),
192 dictionary_map,
193 delta_bases,
194 priority_hints: HashMap::new(), })
196 }
197}
198
199impl CompressionStats {
200 pub fn overall_compression_ratio(&self) -> f32 {
202 if self.total_input_bytes == 0 {
203 return 1.0;
204 }
205 self.total_output_bytes as f32 / self.total_input_bytes as f32
206 }
207
208 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
210 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
211 }
212
213 pub fn bytes_saved(&self) -> isize {
215 self.total_input_bytes as isize - self.total_output_bytes as isize
216 }
217
218 pub fn percentage_saved(&self) -> f32 {
220 if self.total_input_bytes == 0 {
221 return 0.0;
222 }
223 let ratio = self.overall_compression_ratio();
224 (1.0 - ratio) * 100.0
225 }
226}
227
228#[derive(Debug, Clone)]
230pub struct StreamingDecompressor {
231 active_dictionary: HashMap<u16, String>,
233 delta_bases: HashMap<String, f64>,
235 stats: DecompressionStats,
237}
238
239#[derive(Debug, Clone, Default)]
240pub struct DecompressionStats {
241 pub frames_decompressed: u32,
243 pub total_decompressed_bytes: usize,
245 pub avg_decompression_time_us: u64,
247}
248
249impl StreamingDecompressor {
250 pub fn new() -> Self {
252 Self {
253 active_dictionary: HashMap::new(),
254 delta_bases: HashMap::new(),
255 stats: DecompressionStats::default(),
256 }
257 }
258
259 pub fn decompress_frame(
261 &mut self,
262 compressed_frame: CompressedFrame,
263 ) -> DomainResult<StreamFrame> {
264 let start_time = std::time::Instant::now();
265
266 self.update_context(&compressed_frame.decompression_metadata)?;
268
269 let decompressed_data = self.decompress_data(
271 &compressed_frame.compressed_data,
272 &compressed_frame.decompression_metadata.strategy,
273 )?;
274
275 let decompression_time = start_time.elapsed();
277 self.update_decompression_stats(&decompressed_data, decompression_time);
278
279 Ok(StreamFrame {
280 data: decompressed_data,
281 priority: compressed_frame.frame.priority,
282 metadata: compressed_frame.frame.metadata,
283 })
284 }
285
286 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
288 for (&index, string) in &metadata.dictionary_map {
290 self.active_dictionary.insert(index, string.clone());
291 }
292
293 for (path, &base) in &metadata.delta_bases {
295 self.delta_bases.insert(path.clone(), base);
296 }
297
298 Ok(())
299 }
300
301 fn decompress_data(
303 &self,
304 compressed_data: &CompressedData,
305 strategy: &CompressionStrategy,
306 ) -> DomainResult<JsonValue> {
307 match strategy {
308 CompressionStrategy::None => Ok(compressed_data.data.clone()),
309
310 CompressionStrategy::Dictionary { .. } => {
311 self.decompress_dictionary(&compressed_data.data)
312 }
313
314 CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
315
316 CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
317
318 CompressionStrategy::Hybrid { .. } => {
319 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
321 self.decompress_dictionary(&delta_decompressed)
322 }
323 }
324 }
325
326 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
328 match data {
329 JsonValue::Object(obj) => {
330 let mut decompressed = serde_json::Map::new();
331 for (key, value) in obj {
332 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
333 }
334 Ok(JsonValue::Object(decompressed))
335 }
336 JsonValue::Array(arr) => {
337 let decompressed: Result<Vec<_>, _> = arr
338 .iter()
339 .map(|item| self.decompress_dictionary(item))
340 .collect();
341 Ok(JsonValue::Array(decompressed?))
342 }
343 JsonValue::Number(n) => {
344 if let Some(index) = n.as_u64()
346 && let Some(string_val) = self.active_dictionary.get(&(index as u16))
347 {
348 return Ok(JsonValue::String(string_val.clone()));
349 }
350 Ok(data.clone())
351 }
352 _ => Ok(data.clone()),
353 }
354 }
355
356 fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
358 Ok(data.clone())
361 }
362
363 fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
365 Ok(data.clone())
367 }
368
369 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
371 self.stats.frames_decompressed += 1;
372
373 if let Ok(serialized) = serde_json::to_string(data) {
374 self.stats.total_decompressed_bytes += serialized.len();
375 }
376
377 let new_time_us = duration.as_micros() as u64;
378 if self.stats.frames_decompressed == 1 {
379 self.stats.avg_decompression_time_us = new_time_us;
380 } else {
381 let total_frames = self.stats.frames_decompressed as u64;
383 let total_time =
384 self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
385 self.stats.avg_decompression_time_us = total_time / total_frames;
386 }
387 }
388
389 pub fn get_stats(&self) -> &DecompressionStats {
391 &self.stats
392 }
393}
394
395impl Default for StreamingCompressor {
396 fn default() -> Self {
397 Self::new()
398 }
399}
400
401impl Default for StreamingDecompressor {
402 fn default() -> Self {
403 Self::new()
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use serde_json::json;
411
412 #[test]
413 fn test_streaming_compressor_basic() {
414 let mut compressor = StreamingCompressor::new();
415
416 let frame = StreamFrame {
417 data: json!({
418 "message": "test message",
419 "count": 42
420 }),
421 priority: Priority::MEDIUM,
422 metadata: HashMap::new(),
423 };
424
425 let result = compressor.compress_frame(frame);
426 assert!(result.is_ok());
427
428 let compressed = result.unwrap();
429 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
430 }
431
432 #[test]
433 fn test_compression_stats() {
434 let stats = CompressionStats {
435 total_input_bytes: 1000,
436 total_output_bytes: 600,
437 ..Default::default()
438 };
439
440 assert_eq!(stats.overall_compression_ratio(), 0.6);
441 assert_eq!(stats.bytes_saved(), 400);
442 let percentage = stats.percentage_saved();
444 assert!((percentage - 40.0).abs() < 0.001);
445 }
446
447 #[test]
448 fn test_streaming_decompressor_basic() {
449 let mut decompressor = StreamingDecompressor::new();
450
451 let compressed_frame = CompressedFrame {
452 frame: StreamFrame {
453 data: json!({"test": "data"}),
454 priority: Priority::MEDIUM,
455 metadata: HashMap::new(),
456 },
457 compressed_data: CompressedData {
458 strategy: CompressionStrategy::None,
459 compressed_size: 20,
460 data: json!({"test": "data"}),
461 compression_metadata: HashMap::new(),
462 },
463 decompression_metadata: DecompressionMetadata {
464 strategy: CompressionStrategy::None,
465 dictionary_map: HashMap::new(),
466 delta_bases: HashMap::new(),
467 priority_hints: HashMap::new(),
468 },
469 };
470
471 let result = decompressor.decompress_frame(compressed_frame);
472 assert!(result.is_ok());
473
474 let decompressed = result.unwrap();
475 assert_eq!(decompressed.data, json!({"test": "data"}));
476 }
477
478 #[test]
479 fn test_dictionary_decompression() {
480 let mut decompressor = StreamingDecompressor::new();
481 decompressor
482 .active_dictionary
483 .insert(0, "hello".to_string());
484 decompressor
485 .active_dictionary
486 .insert(1, "world".to_string());
487
488 let compressed = json!({
490 "greeting": 0,
491 "target": 1
492 });
493
494 let result = decompressor.decompress_dictionary(&compressed).unwrap();
495 assert_eq!(
496 result,
497 json!({
498 "greeting": "hello",
499 "target": "world"
500 })
501 );
502 }
503
504 #[test]
505 fn test_priority_based_compression() {
506 let mut compressor = StreamingCompressor::new();
507
508 let critical_frame = StreamFrame {
509 data: json!({"error": "critical failure"}),
510 priority: Priority::CRITICAL,
511 metadata: HashMap::new(),
512 };
513
514 let low_frame = StreamFrame {
515 data: json!({"debug": "verbose information"}),
516 priority: Priority::LOW,
517 metadata: HashMap::new(),
518 };
519
520 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
521 let _low_result = compressor.compress_frame(low_frame).unwrap();
522
523 let stats = compressor.get_stats();
524 assert_eq!(stats.frames_processed, 2);
525 assert!(stats.total_input_bytes > 0);
526 }
527}