1use crate::{
7 compression::{SchemaCompressor, CompressionStrategy, CompressedData},
8 domain::{DomainResult, DomainError},
9 stream::{StreamFrame, Priority},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone)]
16pub struct StreamingCompressor {
17 skeleton_compressor: SchemaCompressor,
19 content_compressor: SchemaCompressor,
21 stats: CompressionStats,
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct CompressionStats {
27 pub total_input_bytes: usize,
29 pub total_output_bytes: usize,
31 pub frames_processed: u32,
33 pub priority_ratios: HashMap<u8, f32>,
35}
36
37#[derive(Debug, Clone)]
39pub struct CompressedFrame {
40 pub frame: StreamFrame,
42 pub compressed_data: CompressedData,
44 pub decompression_metadata: DecompressionMetadata,
46}
47
48#[derive(Debug, Clone)]
49pub struct DecompressionMetadata {
50 pub strategy: CompressionStrategy,
52 pub dictionary_map: HashMap<u16, String>,
54 pub delta_bases: HashMap<String, f64>,
56 pub priority_hints: HashMap<u8, String>,
58}
59
60impl StreamingCompressor {
61 pub fn new() -> Self {
63 Self {
64 skeleton_compressor: SchemaCompressor::new(),
65 content_compressor: SchemaCompressor::new(),
66 stats: CompressionStats::default(),
67 }
68 }
69
70 pub fn with_strategies(
72 skeleton_strategy: CompressionStrategy,
73 content_strategy: CompressionStrategy,
74 ) -> Self {
75 Self {
76 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
77 content_compressor: SchemaCompressor::with_strategy(content_strategy),
78 stats: CompressionStats::default(),
79 }
80 }
81
82 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
84 let compressor = self.select_compressor_for_priority(frame.priority);
85
86 let original_size = serde_json::to_string(&frame.data)
88 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
89 .len();
90
91 let compressed_data = compressor.compress(&frame.data)?;
93
94 self.update_stats(frame.priority, original_size, compressed_data.compressed_size);
96
97 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
99
100 Ok(CompressedFrame {
101 frame,
102 compressed_data,
103 decompression_metadata,
104 })
105 }
106
107 pub fn optimize_for_data(&mut self, skeleton: &JsonValue, sample_data: &[JsonValue]) -> DomainResult<()> {
109 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
111
112 if !sample_data.is_empty() {
114 let combined_sample = JsonValue::Array(sample_data.to_vec());
116 self.content_compressor.analyze_and_optimize(&combined_sample)?;
117 }
118
119 Ok(())
120 }
121
122 pub fn get_stats(&self) -> &CompressionStats {
124 &self.stats
125 }
126
127 pub fn reset_stats(&mut self) {
129 self.stats = CompressionStats::default();
130 }
131
132 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
134 match priority {
135 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
137 _ => &mut self.content_compressor,
139 }
140 }
141
142 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
144 self.stats.total_input_bytes += original_size;
145 self.stats.total_output_bytes += compressed_size;
146 self.stats.frames_processed += 1;
147
148 let ratio = if original_size > 0 {
149 compressed_size as f32 / original_size as f32
150 } else {
151 1.0
152 };
153
154 self.stats.priority_ratios.insert(priority.value(), ratio);
155 }
156
157 fn create_decompression_metadata(&self, compressed_data: &CompressedData) -> DomainResult<DecompressionMetadata> {
159 let mut dictionary_map = HashMap::new();
160 let mut delta_bases = HashMap::new();
161
162 for (key, value) in &compressed_data.compression_metadata {
164 if key.starts_with("dict_") {
165 if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>() {
166 if let Some(string_val) = value.as_str() {
167 dictionary_map.insert(index, string_val.to_string());
168 }
169 }
170 } else if key.starts_with("base_") {
171 let path = key.strip_prefix("base_").unwrap();
172 if let Some(num) = value.as_f64() {
173 delta_bases.insert(path.to_string(), num);
174 }
175 }
176 }
177
178 Ok(DecompressionMetadata {
179 strategy: compressed_data.strategy.clone(),
180 dictionary_map,
181 delta_bases,
182 priority_hints: HashMap::new(), })
184 }
185}
186
187impl CompressionStats {
188 pub fn overall_compression_ratio(&self) -> f32 {
190 if self.total_input_bytes == 0 {
191 return 1.0;
192 }
193 self.total_output_bytes as f32 / self.total_input_bytes as f32
194 }
195
196 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
198 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
199 }
200
201 pub fn bytes_saved(&self) -> isize {
203 self.total_input_bytes as isize - self.total_output_bytes as isize
204 }
205
206 pub fn percentage_saved(&self) -> f32 {
208 if self.total_input_bytes == 0 {
209 return 0.0;
210 }
211 let ratio = self.overall_compression_ratio();
212 (1.0 - ratio) * 100.0
213 }
214}
215
216#[derive(Debug, Clone)]
218pub struct StreamingDecompressor {
219 active_dictionary: HashMap<u16, String>,
221 delta_bases: HashMap<String, f64>,
223 stats: DecompressionStats,
225}
226
227#[derive(Debug, Clone, Default)]
228pub struct DecompressionStats {
229 pub frames_decompressed: u32,
231 pub total_decompressed_bytes: usize,
233 pub avg_decompression_time_us: u64,
235}
236
237impl StreamingDecompressor {
238 pub fn new() -> Self {
240 Self {
241 active_dictionary: HashMap::new(),
242 delta_bases: HashMap::new(),
243 stats: DecompressionStats::default(),
244 }
245 }
246
247 pub fn decompress_frame(&mut self, compressed_frame: CompressedFrame) -> DomainResult<StreamFrame> {
249 let start_time = std::time::Instant::now();
250
251 self.update_context(&compressed_frame.decompression_metadata)?;
253
254 let decompressed_data = self.decompress_data(
256 &compressed_frame.compressed_data,
257 &compressed_frame.decompression_metadata.strategy,
258 )?;
259
260 let decompression_time = start_time.elapsed();
262 self.update_decompression_stats(&decompressed_data, decompression_time);
263
264 Ok(StreamFrame {
265 data: decompressed_data,
266 priority: compressed_frame.frame.priority,
267 metadata: compressed_frame.frame.metadata,
268 })
269 }
270
271 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
273 for (&index, string) in &metadata.dictionary_map {
275 self.active_dictionary.insert(index, string.clone());
276 }
277
278 for (path, &base) in &metadata.delta_bases {
280 self.delta_bases.insert(path.clone(), base);
281 }
282
283 Ok(())
284 }
285
286 fn decompress_data(&self, compressed_data: &CompressedData, strategy: &CompressionStrategy) -> DomainResult<JsonValue> {
288 match strategy {
289 CompressionStrategy::None => Ok(compressed_data.data.clone()),
290
291 CompressionStrategy::Dictionary { .. } => {
292 self.decompress_dictionary(&compressed_data.data)
293 }
294
295 CompressionStrategy::Delta { .. } => {
296 self.decompress_delta(&compressed_data.data)
297 }
298
299 CompressionStrategy::RunLength => {
300 self.decompress_run_length(&compressed_data.data)
301 }
302
303 CompressionStrategy::Hybrid { .. } => {
304 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
306 self.decompress_dictionary(&delta_decompressed)
307 }
308 }
309 }
310
311 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
313 match data {
314 JsonValue::Object(obj) => {
315 let mut decompressed = serde_json::Map::new();
316 for (key, value) in obj {
317 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
318 }
319 Ok(JsonValue::Object(decompressed))
320 }
321 JsonValue::Array(arr) => {
322 let decompressed: Result<Vec<_>, _> = arr.iter()
323 .map(|item| self.decompress_dictionary(item))
324 .collect();
325 Ok(JsonValue::Array(decompressed?))
326 }
327 JsonValue::Number(n) => {
328 if let Some(index) = n.as_u64() {
330 if let Some(string_val) = self.active_dictionary.get(&(index as u16)) {
331 return Ok(JsonValue::String(string_val.clone()));
332 }
333 }
334 Ok(data.clone())
335 }
336 _ => Ok(data.clone()),
337 }
338 }
339
340 fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
342 Ok(data.clone())
345 }
346
347 fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
349 Ok(data.clone())
351 }
352
353 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
355 self.stats.frames_decompressed += 1;
356
357 if let Ok(serialized) = serde_json::to_string(data) {
358 self.stats.total_decompressed_bytes += serialized.len();
359 }
360
361 let new_time_us = duration.as_micros() as u64;
362 if self.stats.frames_decompressed == 1 {
363 self.stats.avg_decompression_time_us = new_time_us;
364 } else {
365 let total_frames = self.stats.frames_decompressed as u64;
367 let total_time = self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
368 self.stats.avg_decompression_time_us = total_time / total_frames;
369 }
370 }
371
372 pub fn get_stats(&self) -> &DecompressionStats {
374 &self.stats
375 }
376}
377
378impl Default for StreamingCompressor {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384impl Default for StreamingDecompressor {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use serde_json::json;
394
395 #[test]
396 fn test_streaming_compressor_basic() {
397 let mut compressor = StreamingCompressor::new();
398
399 let frame = StreamFrame {
400 data: json!({
401 "message": "test message",
402 "count": 42
403 }),
404 priority: Priority::MEDIUM,
405 metadata: HashMap::new(),
406 };
407
408 let result = compressor.compress_frame(frame);
409 assert!(result.is_ok());
410
411 let compressed = result.unwrap();
412 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
413 }
414
415 #[test]
416 fn test_compression_stats() {
417 let stats = CompressionStats {
418 total_input_bytes: 1000,
419 total_output_bytes: 600,
420 ..Default::default()
421 };
422
423 assert_eq!(stats.overall_compression_ratio(), 0.6);
424 assert_eq!(stats.bytes_saved(), 400);
425 let percentage = stats.percentage_saved();
427 assert!((percentage - 40.0).abs() < 0.001);
428 }
429
430 #[test]
431 fn test_streaming_decompressor_basic() {
432 let mut decompressor = StreamingDecompressor::new();
433
434 let compressed_frame = CompressedFrame {
435 frame: StreamFrame {
436 data: json!({"test": "data"}),
437 priority: Priority::MEDIUM,
438 metadata: HashMap::new(),
439 },
440 compressed_data: CompressedData {
441 strategy: CompressionStrategy::None,
442 compressed_size: 20,
443 data: json!({"test": "data"}),
444 compression_metadata: HashMap::new(),
445 },
446 decompression_metadata: DecompressionMetadata {
447 strategy: CompressionStrategy::None,
448 dictionary_map: HashMap::new(),
449 delta_bases: HashMap::new(),
450 priority_hints: HashMap::new(),
451 },
452 };
453
454 let result = decompressor.decompress_frame(compressed_frame);
455 assert!(result.is_ok());
456
457 let decompressed = result.unwrap();
458 assert_eq!(decompressed.data, json!({"test": "data"}));
459 }
460
461 #[test]
462 fn test_dictionary_decompression() {
463 let mut decompressor = StreamingDecompressor::new();
464 decompressor.active_dictionary.insert(0, "hello".to_string());
465 decompressor.active_dictionary.insert(1, "world".to_string());
466
467 let compressed = json!({
469 "greeting": 0,
470 "target": 1
471 });
472
473 let result = decompressor.decompress_dictionary(&compressed).unwrap();
474 assert_eq!(result, json!({
475 "greeting": "hello",
476 "target": "world"
477 }));
478 }
479
480 #[test]
481 fn test_priority_based_compression() {
482 let mut compressor = StreamingCompressor::new();
483
484 let critical_frame = StreamFrame {
485 data: json!({"error": "critical failure"}),
486 priority: Priority::CRITICAL,
487 metadata: HashMap::new(),
488 };
489
490 let low_frame = StreamFrame {
491 data: json!({"debug": "verbose information"}),
492 priority: Priority::LOW,
493 metadata: HashMap::new(),
494 };
495
496 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
497 let _low_result = compressor.compress_frame(low_frame).unwrap();
498
499 let stats = compressor.get_stats();
500 assert_eq!(stats.frames_processed, 2);
501 assert!(stats.total_input_bytes > 0);
502 }
503}