pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{SchemaCompressor, CompressionStrategy, CompressedData},
8    domain::{DomainResult, DomainError},
9    stream::{StreamFrame, Priority},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Streaming compressor that maintains compression state across frames
15#[derive(Debug, Clone)]
16pub struct StreamingCompressor {
17    /// Primary compressor for skeleton and critical data
18    skeleton_compressor: SchemaCompressor,
19    /// Secondary compressor for non-critical data
20    content_compressor: SchemaCompressor,
21    /// Compression statistics
22    stats: CompressionStats,
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct CompressionStats {
27    /// Total bytes processed
28    pub total_input_bytes: usize,
29    /// Total bytes after compression
30    pub total_output_bytes: usize,
31    /// Number of frames processed
32    pub frames_processed: u32,
33    /// Compression ratio by priority level
34    pub priority_ratios: HashMap<u8, f32>,
35}
36
37/// Compressed stream frame with metadata
38#[derive(Debug, Clone)]
39pub struct CompressedFrame {
40    /// Original frame metadata
41    pub frame: StreamFrame,
42    /// Compressed data
43    pub compressed_data: CompressedData,
44    /// Decompression instructions for client
45    pub decompression_metadata: DecompressionMetadata,
46}
47
48#[derive(Debug, Clone)]
49pub struct DecompressionMetadata {
50    /// Compression strategy used
51    pub strategy: CompressionStrategy,
52    /// Dictionary indices mapping
53    pub dictionary_map: HashMap<u16, String>,
54    /// Delta base values for numeric decompression
55    pub delta_bases: HashMap<String, f64>,
56    /// Priority-specific decompression hints
57    pub priority_hints: HashMap<u8, String>,
58}
59
60impl StreamingCompressor {
61    /// Create new streaming compressor
62    pub fn new() -> Self {
63        Self {
64            skeleton_compressor: SchemaCompressor::new(),
65            content_compressor: SchemaCompressor::new(),
66            stats: CompressionStats::default(),
67        }
68    }
69
70    /// Create with custom compression strategies
71    pub fn with_strategies(
72        skeleton_strategy: CompressionStrategy,
73        content_strategy: CompressionStrategy,
74    ) -> Self {
75        Self {
76            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
77            content_compressor: SchemaCompressor::with_strategy(content_strategy),
78            stats: CompressionStats::default(),
79        }
80    }
81
82    /// Process and compress a stream frame based on its priority
83    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
84        let compressor = self.select_compressor_for_priority(frame.priority);
85        
86        // Calculate original size
87        let original_size = serde_json::to_string(&frame.data)
88            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
89            .len();
90
91        // Compress based on frame content and priority
92        let compressed_data = compressor.compress(&frame.data)?;
93        
94        // Update statistics
95        self.update_stats(frame.priority, original_size, compressed_data.compressed_size);
96
97        // Create decompression metadata
98        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
99
100        Ok(CompressedFrame {
101            frame,
102            compressed_data,
103            decompression_metadata,
104        })
105    }
106
107    /// Analyze JSON data to optimize compression strategies
108    pub fn optimize_for_data(&mut self, skeleton: &JsonValue, sample_data: &[JsonValue]) -> DomainResult<()> {
109        // Optimize skeleton compressor for critical structural data
110        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
111
112        // Analyze sample content data to optimize content compressor
113        if !sample_data.is_empty() {
114            // Combine samples for comprehensive analysis
115            let combined_sample = JsonValue::Array(sample_data.to_vec());
116            self.content_compressor.analyze_and_optimize(&combined_sample)?;
117        }
118
119        Ok(())
120    }
121
122    /// Get current compression statistics
123    pub fn get_stats(&self) -> &CompressionStats {
124        &self.stats
125    }
126
127    /// Reset compression statistics
128    pub fn reset_stats(&mut self) {
129        self.stats = CompressionStats::default();
130    }
131
132    /// Select appropriate compressor based on frame priority
133    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
134        match priority {
135            // Critical data (skeleton, errors) - use specialized compressor
136            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
137            // Regular content data - use content compressor
138            _ => &mut self.content_compressor,
139        }
140    }
141
142    /// Update compression statistics
143    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
144        self.stats.total_input_bytes += original_size;
145        self.stats.total_output_bytes += compressed_size;
146        self.stats.frames_processed += 1;
147
148        let ratio = if original_size > 0 {
149            compressed_size as f32 / original_size as f32
150        } else {
151            1.0
152        };
153
154        self.stats.priority_ratios.insert(priority.value(), ratio);
155    }
156
157    /// Create decompression metadata for client
158    fn create_decompression_metadata(&self, compressed_data: &CompressedData) -> DomainResult<DecompressionMetadata> {
159        let mut dictionary_map = HashMap::new();
160        let mut delta_bases = HashMap::new();
161
162        // Extract dictionary mappings
163        for (key, value) in &compressed_data.compression_metadata {
164            if key.starts_with("dict_") {
165                if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
166                    && let Some(string_val) = value.as_str() {
167                        dictionary_map.insert(index, string_val.to_string());
168                    }
169            } else if key.starts_with("base_") {
170                let path = key.strip_prefix("base_").unwrap();
171                if let Some(num) = value.as_f64() {
172                    delta_bases.insert(path.to_string(), num);
173                }
174            }
175        }
176
177        Ok(DecompressionMetadata {
178            strategy: compressed_data.strategy.clone(),
179            dictionary_map,
180            delta_bases,
181            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
182        })
183    }
184}
185
186impl CompressionStats {
187    /// Calculate overall compression ratio
188    pub fn overall_compression_ratio(&self) -> f32 {
189        if self.total_input_bytes == 0 {
190            return 1.0;
191        }
192        self.total_output_bytes as f32 / self.total_input_bytes as f32
193    }
194
195    /// Get compression ratio for specific priority level
196    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
197        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
198    }
199
200    /// Calculate bytes saved
201    pub fn bytes_saved(&self) -> isize {
202        self.total_input_bytes as isize - self.total_output_bytes as isize
203    }
204
205    /// Calculate percentage saved
206    pub fn percentage_saved(&self) -> f32 {
207        if self.total_input_bytes == 0 {
208            return 0.0;
209        }
210        let ratio = self.overall_compression_ratio();
211        (1.0 - ratio) * 100.0
212    }
213}
214
215/// Client-side decompressor for receiving compressed frames
216#[derive(Debug, Clone)]
217pub struct StreamingDecompressor {
218    /// Active dictionary for string decompression
219    active_dictionary: HashMap<u16, String>,
220    /// Delta base values for numeric decompression  
221    delta_bases: HashMap<String, f64>,
222    /// Decompression statistics
223    stats: DecompressionStats,
224}
225
226#[derive(Debug, Clone, Default)]
227pub struct DecompressionStats {
228    /// Total frames decompressed
229    pub frames_decompressed: u32,
230    /// Total bytes decompressed
231    pub total_decompressed_bytes: usize,
232    /// Average decompression time in microseconds
233    pub avg_decompression_time_us: u64,
234}
235
236impl StreamingDecompressor {
237    /// Create new streaming decompressor
238    pub fn new() -> Self {
239        Self {
240            active_dictionary: HashMap::new(),
241            delta_bases: HashMap::new(),
242            stats: DecompressionStats::default(),
243        }
244    }
245
246    /// Decompress a compressed frame
247    pub fn decompress_frame(&mut self, compressed_frame: CompressedFrame) -> DomainResult<StreamFrame> {
248        let start_time = std::time::Instant::now();
249
250        // Update decompression context with metadata
251        self.update_context(&compressed_frame.decompression_metadata)?;
252
253        // Decompress data based on strategy
254        let decompressed_data = self.decompress_data(
255            &compressed_frame.compressed_data,
256            &compressed_frame.decompression_metadata.strategy,
257        )?;
258
259        // Update statistics
260        let decompression_time = start_time.elapsed();
261        self.update_decompression_stats(&decompressed_data, decompression_time);
262
263        Ok(StreamFrame {
264            data: decompressed_data,
265            priority: compressed_frame.frame.priority,
266            metadata: compressed_frame.frame.metadata,
267        })
268    }
269
270    /// Update decompression context with new metadata
271    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
272        // Update dictionary
273        for (&index, string) in &metadata.dictionary_map {
274            self.active_dictionary.insert(index, string.clone());
275        }
276
277        // Update delta bases
278        for (path, &base) in &metadata.delta_bases {
279            self.delta_bases.insert(path.clone(), base);
280        }
281
282        Ok(())
283    }
284
285    /// Decompress data according to strategy
286    fn decompress_data(&self, compressed_data: &CompressedData, strategy: &CompressionStrategy) -> DomainResult<JsonValue> {
287        match strategy {
288            CompressionStrategy::None => Ok(compressed_data.data.clone()),
289            
290            CompressionStrategy::Dictionary { .. } => {
291                self.decompress_dictionary(&compressed_data.data)
292            }
293            
294            CompressionStrategy::Delta { .. } => {
295                self.decompress_delta(&compressed_data.data)
296            }
297            
298            CompressionStrategy::RunLength => {
299                self.decompress_run_length(&compressed_data.data)
300            }
301            
302            CompressionStrategy::Hybrid { .. } => {
303                // Apply decompression in reverse order: delta first, then dictionary
304                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
305                self.decompress_dictionary(&delta_decompressed)
306            }
307        }
308    }
309
310    /// Decompress dictionary-encoded strings
311    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
312        match data {
313            JsonValue::Object(obj) => {
314                let mut decompressed = serde_json::Map::new();
315                for (key, value) in obj {
316                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
317                }
318                Ok(JsonValue::Object(decompressed))
319            }
320            JsonValue::Array(arr) => {
321                let decompressed: Result<Vec<_>, _> = arr.iter()
322                    .map(|item| self.decompress_dictionary(item))
323                    .collect();
324                Ok(JsonValue::Array(decompressed?))
325            }
326            JsonValue::Number(n) => {
327                // Check if this is a dictionary index
328                if let Some(index) = n.as_u64()
329                    && let Some(string_val) = self.active_dictionary.get(&(index as u16)) {
330                        return Ok(JsonValue::String(string_val.clone()));
331                    }
332                Ok(data.clone())
333            }
334            _ => Ok(data.clone()),
335        }
336    }
337
338    /// Decompress delta-encoded values
339    fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
340        // TODO: Implement delta decompression for numeric sequences
341        // This would reconstruct original values from deltas and base values
342        Ok(data.clone())
343    }
344
345    /// Decompress run-length encoded data
346    fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
347        // TODO: Implement run-length decompression
348        Ok(data.clone())
349    }
350
351    /// Update decompression statistics
352    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
353        self.stats.frames_decompressed += 1;
354        
355        if let Ok(serialized) = serde_json::to_string(data) {
356            self.stats.total_decompressed_bytes += serialized.len();
357        }
358
359        let new_time_us = duration.as_micros() as u64;
360        if self.stats.frames_decompressed == 1 {
361            self.stats.avg_decompression_time_us = new_time_us;
362        } else {
363            // Calculate running average
364            let total_frames = self.stats.frames_decompressed as u64;
365            let total_time = self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
366            self.stats.avg_decompression_time_us = total_time / total_frames;
367        }
368    }
369
370    /// Get decompression statistics
371    pub fn get_stats(&self) -> &DecompressionStats {
372        &self.stats
373    }
374}
375
376impl Default for StreamingCompressor {
377    fn default() -> Self {
378        Self::new()
379    }
380}
381
382impl Default for StreamingDecompressor {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use serde_json::json;
392
393    #[test]
394    fn test_streaming_compressor_basic() {
395        let mut compressor = StreamingCompressor::new();
396        
397        let frame = StreamFrame {
398            data: json!({
399                "message": "test message",
400                "count": 42
401            }),
402            priority: Priority::MEDIUM,
403            metadata: HashMap::new(),
404        };
405
406        let result = compressor.compress_frame(frame);
407        assert!(result.is_ok());
408        
409        let compressed = result.unwrap();
410        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
411    }
412
413    #[test]
414    fn test_compression_stats() {
415        let stats = CompressionStats { 
416            total_input_bytes: 1000, 
417            total_output_bytes: 600, 
418            ..Default::default() 
419        };
420        
421        assert_eq!(stats.overall_compression_ratio(), 0.6);
422        assert_eq!(stats.bytes_saved(), 400);
423        // Use approximate comparison for float precision
424        let percentage = stats.percentage_saved();
425        assert!((percentage - 40.0).abs() < 0.001);
426    }
427
428    #[test]
429    fn test_streaming_decompressor_basic() {
430        let mut decompressor = StreamingDecompressor::new();
431        
432        let compressed_frame = CompressedFrame {
433            frame: StreamFrame {
434                data: json!({"test": "data"}),
435                priority: Priority::MEDIUM,
436                metadata: HashMap::new(),
437            },
438            compressed_data: CompressedData {
439                strategy: CompressionStrategy::None,
440                compressed_size: 20,
441                data: json!({"test": "data"}),
442                compression_metadata: HashMap::new(),
443            },
444            decompression_metadata: DecompressionMetadata {
445                strategy: CompressionStrategy::None,
446                dictionary_map: HashMap::new(),
447                delta_bases: HashMap::new(),
448                priority_hints: HashMap::new(),
449            },
450        };
451
452        let result = decompressor.decompress_frame(compressed_frame);
453        assert!(result.is_ok());
454        
455        let decompressed = result.unwrap();
456        assert_eq!(decompressed.data, json!({"test": "data"}));
457    }
458
459    #[test]
460    fn test_dictionary_decompression() {
461        let mut decompressor = StreamingDecompressor::new();
462        decompressor.active_dictionary.insert(0, "hello".to_string());
463        decompressor.active_dictionary.insert(1, "world".to_string());
464
465        // Test with dictionary indices
466        let compressed = json!({
467            "greeting": 0,
468            "target": 1
469        });
470
471        let result = decompressor.decompress_dictionary(&compressed).unwrap();
472        assert_eq!(result, json!({
473            "greeting": "hello",
474            "target": "world"
475        }));
476    }
477
478    #[test]
479    fn test_priority_based_compression() {
480        let mut compressor = StreamingCompressor::new();
481
482        let critical_frame = StreamFrame {
483            data: json!({"error": "critical failure"}),
484            priority: Priority::CRITICAL,
485            metadata: HashMap::new(),
486        };
487
488        let low_frame = StreamFrame {
489            data: json!({"debug": "verbose information"}),
490            priority: Priority::LOW,
491            metadata: HashMap::new(),
492        };
493
494        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
495        let _low_result = compressor.compress_frame(low_frame).unwrap();
496
497        let stats = compressor.get_stats();
498        assert_eq!(stats.frames_processed, 2);
499        assert!(stats.total_input_bytes > 0);
500    }
501}