1use crate::{
7 compression::{SchemaCompressor, CompressionStrategy, CompressedData},
8 domain::{DomainResult, DomainError},
9 stream::{StreamFrame, Priority},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone)]
16pub struct StreamingCompressor {
17 skeleton_compressor: SchemaCompressor,
19 content_compressor: SchemaCompressor,
21 stats: CompressionStats,
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct CompressionStats {
27 pub total_input_bytes: usize,
29 pub total_output_bytes: usize,
31 pub frames_processed: u32,
33 pub priority_ratios: HashMap<u8, f32>,
35}
36
37#[derive(Debug, Clone)]
39pub struct CompressedFrame {
40 pub frame: StreamFrame,
42 pub compressed_data: CompressedData,
44 pub decompression_metadata: DecompressionMetadata,
46}
47
48#[derive(Debug, Clone)]
49pub struct DecompressionMetadata {
50 pub strategy: CompressionStrategy,
52 pub dictionary_map: HashMap<u16, String>,
54 pub delta_bases: HashMap<String, f64>,
56 pub priority_hints: HashMap<u8, String>,
58}
59
60impl StreamingCompressor {
61 pub fn new() -> Self {
63 Self {
64 skeleton_compressor: SchemaCompressor::new(),
65 content_compressor: SchemaCompressor::new(),
66 stats: CompressionStats::default(),
67 }
68 }
69
70 pub fn with_strategies(
72 skeleton_strategy: CompressionStrategy,
73 content_strategy: CompressionStrategy,
74 ) -> Self {
75 Self {
76 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
77 content_compressor: SchemaCompressor::with_strategy(content_strategy),
78 stats: CompressionStats::default(),
79 }
80 }
81
82 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
84 let compressor = self.select_compressor_for_priority(frame.priority);
85
86 let original_size = serde_json::to_string(&frame.data)
88 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
89 .len();
90
91 let compressed_data = compressor.compress(&frame.data)?;
93
94 self.update_stats(frame.priority, original_size, compressed_data.compressed_size);
96
97 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
99
100 Ok(CompressedFrame {
101 frame,
102 compressed_data,
103 decompression_metadata,
104 })
105 }
106
107 pub fn optimize_for_data(&mut self, skeleton: &JsonValue, sample_data: &[JsonValue]) -> DomainResult<()> {
109 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
111
112 if !sample_data.is_empty() {
114 let combined_sample = JsonValue::Array(sample_data.to_vec());
116 self.content_compressor.analyze_and_optimize(&combined_sample)?;
117 }
118
119 Ok(())
120 }
121
122 pub fn get_stats(&self) -> &CompressionStats {
124 &self.stats
125 }
126
127 pub fn reset_stats(&mut self) {
129 self.stats = CompressionStats::default();
130 }
131
132 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
134 match priority {
135 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
137 _ => &mut self.content_compressor,
139 }
140 }
141
142 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
144 self.stats.total_input_bytes += original_size;
145 self.stats.total_output_bytes += compressed_size;
146 self.stats.frames_processed += 1;
147
148 let ratio = if original_size > 0 {
149 compressed_size as f32 / original_size as f32
150 } else {
151 1.0
152 };
153
154 self.stats.priority_ratios.insert(priority.value(), ratio);
155 }
156
157 fn create_decompression_metadata(&self, compressed_data: &CompressedData) -> DomainResult<DecompressionMetadata> {
159 let mut dictionary_map = HashMap::new();
160 let mut delta_bases = HashMap::new();
161
162 for (key, value) in &compressed_data.compression_metadata {
164 if key.starts_with("dict_") {
165 if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
166 && let Some(string_val) = value.as_str() {
167 dictionary_map.insert(index, string_val.to_string());
168 }
169 } else if key.starts_with("base_") {
170 let path = key.strip_prefix("base_").unwrap();
171 if let Some(num) = value.as_f64() {
172 delta_bases.insert(path.to_string(), num);
173 }
174 }
175 }
176
177 Ok(DecompressionMetadata {
178 strategy: compressed_data.strategy.clone(),
179 dictionary_map,
180 delta_bases,
181 priority_hints: HashMap::new(), })
183 }
184}
185
186impl CompressionStats {
187 pub fn overall_compression_ratio(&self) -> f32 {
189 if self.total_input_bytes == 0 {
190 return 1.0;
191 }
192 self.total_output_bytes as f32 / self.total_input_bytes as f32
193 }
194
195 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
197 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
198 }
199
200 pub fn bytes_saved(&self) -> isize {
202 self.total_input_bytes as isize - self.total_output_bytes as isize
203 }
204
205 pub fn percentage_saved(&self) -> f32 {
207 if self.total_input_bytes == 0 {
208 return 0.0;
209 }
210 let ratio = self.overall_compression_ratio();
211 (1.0 - ratio) * 100.0
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct StreamingDecompressor {
218 active_dictionary: HashMap<u16, String>,
220 delta_bases: HashMap<String, f64>,
222 stats: DecompressionStats,
224}
225
226#[derive(Debug, Clone, Default)]
227pub struct DecompressionStats {
228 pub frames_decompressed: u32,
230 pub total_decompressed_bytes: usize,
232 pub avg_decompression_time_us: u64,
234}
235
236impl StreamingDecompressor {
237 pub fn new() -> Self {
239 Self {
240 active_dictionary: HashMap::new(),
241 delta_bases: HashMap::new(),
242 stats: DecompressionStats::default(),
243 }
244 }
245
246 pub fn decompress_frame(&mut self, compressed_frame: CompressedFrame) -> DomainResult<StreamFrame> {
248 let start_time = std::time::Instant::now();
249
250 self.update_context(&compressed_frame.decompression_metadata)?;
252
253 let decompressed_data = self.decompress_data(
255 &compressed_frame.compressed_data,
256 &compressed_frame.decompression_metadata.strategy,
257 )?;
258
259 let decompression_time = start_time.elapsed();
261 self.update_decompression_stats(&decompressed_data, decompression_time);
262
263 Ok(StreamFrame {
264 data: decompressed_data,
265 priority: compressed_frame.frame.priority,
266 metadata: compressed_frame.frame.metadata,
267 })
268 }
269
270 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
272 for (&index, string) in &metadata.dictionary_map {
274 self.active_dictionary.insert(index, string.clone());
275 }
276
277 for (path, &base) in &metadata.delta_bases {
279 self.delta_bases.insert(path.clone(), base);
280 }
281
282 Ok(())
283 }
284
285 fn decompress_data(&self, compressed_data: &CompressedData, strategy: &CompressionStrategy) -> DomainResult<JsonValue> {
287 match strategy {
288 CompressionStrategy::None => Ok(compressed_data.data.clone()),
289
290 CompressionStrategy::Dictionary { .. } => {
291 self.decompress_dictionary(&compressed_data.data)
292 }
293
294 CompressionStrategy::Delta { .. } => {
295 self.decompress_delta(&compressed_data.data)
296 }
297
298 CompressionStrategy::RunLength => {
299 self.decompress_run_length(&compressed_data.data)
300 }
301
302 CompressionStrategy::Hybrid { .. } => {
303 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
305 self.decompress_dictionary(&delta_decompressed)
306 }
307 }
308 }
309
310 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
312 match data {
313 JsonValue::Object(obj) => {
314 let mut decompressed = serde_json::Map::new();
315 for (key, value) in obj {
316 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
317 }
318 Ok(JsonValue::Object(decompressed))
319 }
320 JsonValue::Array(arr) => {
321 let decompressed: Result<Vec<_>, _> = arr.iter()
322 .map(|item| self.decompress_dictionary(item))
323 .collect();
324 Ok(JsonValue::Array(decompressed?))
325 }
326 JsonValue::Number(n) => {
327 if let Some(index) = n.as_u64()
329 && let Some(string_val) = self.active_dictionary.get(&(index as u16)) {
330 return Ok(JsonValue::String(string_val.clone()));
331 }
332 Ok(data.clone())
333 }
334 _ => Ok(data.clone()),
335 }
336 }
337
338 fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
340 Ok(data.clone())
343 }
344
345 fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
347 Ok(data.clone())
349 }
350
351 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
353 self.stats.frames_decompressed += 1;
354
355 if let Ok(serialized) = serde_json::to_string(data) {
356 self.stats.total_decompressed_bytes += serialized.len();
357 }
358
359 let new_time_us = duration.as_micros() as u64;
360 if self.stats.frames_decompressed == 1 {
361 self.stats.avg_decompression_time_us = new_time_us;
362 } else {
363 let total_frames = self.stats.frames_decompressed as u64;
365 let total_time = self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
366 self.stats.avg_decompression_time_us = total_time / total_frames;
367 }
368 }
369
370 pub fn get_stats(&self) -> &DecompressionStats {
372 &self.stats
373 }
374}
375
376impl Default for StreamingCompressor {
377 fn default() -> Self {
378 Self::new()
379 }
380}
381
382impl Default for StreamingDecompressor {
383 fn default() -> Self {
384 Self::new()
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use serde_json::json;
392
393 #[test]
394 fn test_streaming_compressor_basic() {
395 let mut compressor = StreamingCompressor::new();
396
397 let frame = StreamFrame {
398 data: json!({
399 "message": "test message",
400 "count": 42
401 }),
402 priority: Priority::MEDIUM,
403 metadata: HashMap::new(),
404 };
405
406 let result = compressor.compress_frame(frame);
407 assert!(result.is_ok());
408
409 let compressed = result.unwrap();
410 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
411 }
412
413 #[test]
414 fn test_compression_stats() {
415 let stats = CompressionStats {
416 total_input_bytes: 1000,
417 total_output_bytes: 600,
418 ..Default::default()
419 };
420
421 assert_eq!(stats.overall_compression_ratio(), 0.6);
422 assert_eq!(stats.bytes_saved(), 400);
423 let percentage = stats.percentage_saved();
425 assert!((percentage - 40.0).abs() < 0.001);
426 }
427
428 #[test]
429 fn test_streaming_decompressor_basic() {
430 let mut decompressor = StreamingDecompressor::new();
431
432 let compressed_frame = CompressedFrame {
433 frame: StreamFrame {
434 data: json!({"test": "data"}),
435 priority: Priority::MEDIUM,
436 metadata: HashMap::new(),
437 },
438 compressed_data: CompressedData {
439 strategy: CompressionStrategy::None,
440 compressed_size: 20,
441 data: json!({"test": "data"}),
442 compression_metadata: HashMap::new(),
443 },
444 decompression_metadata: DecompressionMetadata {
445 strategy: CompressionStrategy::None,
446 dictionary_map: HashMap::new(),
447 delta_bases: HashMap::new(),
448 priority_hints: HashMap::new(),
449 },
450 };
451
452 let result = decompressor.decompress_frame(compressed_frame);
453 assert!(result.is_ok());
454
455 let decompressed = result.unwrap();
456 assert_eq!(decompressed.data, json!({"test": "data"}));
457 }
458
459 #[test]
460 fn test_dictionary_decompression() {
461 let mut decompressor = StreamingDecompressor::new();
462 decompressor.active_dictionary.insert(0, "hello".to_string());
463 decompressor.active_dictionary.insert(1, "world".to_string());
464
465 let compressed = json!({
467 "greeting": 0,
468 "target": 1
469 });
470
471 let result = decompressor.decompress_dictionary(&compressed).unwrap();
472 assert_eq!(result, json!({
473 "greeting": "hello",
474 "target": "world"
475 }));
476 }
477
478 #[test]
479 fn test_priority_based_compression() {
480 let mut compressor = StreamingCompressor::new();
481
482 let critical_frame = StreamFrame {
483 data: json!({"error": "critical failure"}),
484 priority: Priority::CRITICAL,
485 metadata: HashMap::new(),
486 };
487
488 let low_frame = StreamFrame {
489 data: json!({"debug": "verbose information"}),
490 priority: Priority::LOW,
491 metadata: HashMap::new(),
492 };
493
494 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
495 let _low_result = compressor.compress_frame(low_frame).unwrap();
496
497 let stats = compressor.get_stats();
498 assert_eq!(stats.frames_processed, 2);
499 assert!(stats.total_input_bytes > 0);
500 }
501}