pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{SchemaCompressor, CompressionStrategy, CompressedData},
8    domain::{DomainResult, DomainError},
9    stream::{StreamFrame, Priority},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Streaming compressor that maintains compression state across frames
15#[derive(Debug, Clone)]
16pub struct StreamingCompressor {
17    /// Primary compressor for skeleton and critical data
18    skeleton_compressor: SchemaCompressor,
19    /// Secondary compressor for non-critical data
20    content_compressor: SchemaCompressor,
21    /// Compression statistics
22    stats: CompressionStats,
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct CompressionStats {
27    /// Total bytes processed
28    pub total_input_bytes: usize,
29    /// Total bytes after compression
30    pub total_output_bytes: usize,
31    /// Number of frames processed
32    pub frames_processed: u32,
33    /// Compression ratio by priority level
34    pub priority_ratios: HashMap<u8, f32>,
35}
36
37/// Compressed stream frame with metadata
38#[derive(Debug, Clone)]
39pub struct CompressedFrame {
40    /// Original frame metadata
41    pub frame: StreamFrame,
42    /// Compressed data
43    pub compressed_data: CompressedData,
44    /// Decompression instructions for client
45    pub decompression_metadata: DecompressionMetadata,
46}
47
48#[derive(Debug, Clone)]
49pub struct DecompressionMetadata {
50    /// Compression strategy used
51    pub strategy: CompressionStrategy,
52    /// Dictionary indices mapping
53    pub dictionary_map: HashMap<u16, String>,
54    /// Delta base values for numeric decompression
55    pub delta_bases: HashMap<String, f64>,
56    /// Priority-specific decompression hints
57    pub priority_hints: HashMap<u8, String>,
58}
59
60impl StreamingCompressor {
61    /// Create new streaming compressor
62    pub fn new() -> Self {
63        Self {
64            skeleton_compressor: SchemaCompressor::new(),
65            content_compressor: SchemaCompressor::new(),
66            stats: CompressionStats::default(),
67        }
68    }
69
70    /// Create with custom compression strategies
71    pub fn with_strategies(
72        skeleton_strategy: CompressionStrategy,
73        content_strategy: CompressionStrategy,
74    ) -> Self {
75        Self {
76            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
77            content_compressor: SchemaCompressor::with_strategy(content_strategy),
78            stats: CompressionStats::default(),
79        }
80    }
81
82    /// Process and compress a stream frame based on its priority
83    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
84        let compressor = self.select_compressor_for_priority(frame.priority);
85        
86        // Calculate original size
87        let original_size = serde_json::to_string(&frame.data)
88            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
89            .len();
90
91        // Compress based on frame content and priority
92        let compressed_data = compressor.compress(&frame.data)?;
93        
94        // Update statistics
95        self.update_stats(frame.priority, original_size, compressed_data.compressed_size);
96
97        // Create decompression metadata
98        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
99
100        Ok(CompressedFrame {
101            frame,
102            compressed_data,
103            decompression_metadata,
104        })
105    }
106
107    /// Analyze JSON data to optimize compression strategies
108    pub fn optimize_for_data(&mut self, skeleton: &JsonValue, sample_data: &[JsonValue]) -> DomainResult<()> {
109        // Optimize skeleton compressor for critical structural data
110        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
111
112        // Analyze sample content data to optimize content compressor
113        if !sample_data.is_empty() {
114            // Combine samples for comprehensive analysis
115            let combined_sample = JsonValue::Array(sample_data.to_vec());
116            self.content_compressor.analyze_and_optimize(&combined_sample)?;
117        }
118
119        Ok(())
120    }
121
122    /// Get current compression statistics
123    pub fn get_stats(&self) -> &CompressionStats {
124        &self.stats
125    }
126
127    /// Reset compression statistics
128    pub fn reset_stats(&mut self) {
129        self.stats = CompressionStats::default();
130    }
131
132    /// Select appropriate compressor based on frame priority
133    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
134        match priority {
135            // Critical data (skeleton, errors) - use specialized compressor
136            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
137            // Regular content data - use content compressor
138            _ => &mut self.content_compressor,
139        }
140    }
141
142    /// Update compression statistics
143    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
144        self.stats.total_input_bytes += original_size;
145        self.stats.total_output_bytes += compressed_size;
146        self.stats.frames_processed += 1;
147
148        let ratio = if original_size > 0 {
149            compressed_size as f32 / original_size as f32
150        } else {
151            1.0
152        };
153
154        self.stats.priority_ratios.insert(priority.value(), ratio);
155    }
156
157    /// Create decompression metadata for client
158    fn create_decompression_metadata(&self, compressed_data: &CompressedData) -> DomainResult<DecompressionMetadata> {
159        let mut dictionary_map = HashMap::new();
160        let mut delta_bases = HashMap::new();
161
162        // Extract dictionary mappings
163        for (key, value) in &compressed_data.compression_metadata {
164            if key.starts_with("dict_") {
165                if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>() {
166                    if let Some(string_val) = value.as_str() {
167                        dictionary_map.insert(index, string_val.to_string());
168                    }
169                }
170            } else if key.starts_with("base_") {
171                let path = key.strip_prefix("base_").unwrap();
172                if let Some(num) = value.as_f64() {
173                    delta_bases.insert(path.to_string(), num);
174                }
175            }
176        }
177
178        Ok(DecompressionMetadata {
179            strategy: compressed_data.strategy.clone(),
180            dictionary_map,
181            delta_bases,
182            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
183        })
184    }
185}
186
187impl CompressionStats {
188    /// Calculate overall compression ratio
189    pub fn overall_compression_ratio(&self) -> f32 {
190        if self.total_input_bytes == 0 {
191            return 1.0;
192        }
193        self.total_output_bytes as f32 / self.total_input_bytes as f32
194    }
195
196    /// Get compression ratio for specific priority level
197    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
198        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
199    }
200
201    /// Calculate bytes saved
202    pub fn bytes_saved(&self) -> isize {
203        self.total_input_bytes as isize - self.total_output_bytes as isize
204    }
205
206    /// Calculate percentage saved
207    pub fn percentage_saved(&self) -> f32 {
208        if self.total_input_bytes == 0 {
209            return 0.0;
210        }
211        let ratio = self.overall_compression_ratio();
212        (1.0 - ratio) * 100.0
213    }
214}
215
216/// Client-side decompressor for receiving compressed frames
217#[derive(Debug, Clone)]
218pub struct StreamingDecompressor {
219    /// Active dictionary for string decompression
220    active_dictionary: HashMap<u16, String>,
221    /// Delta base values for numeric decompression  
222    delta_bases: HashMap<String, f64>,
223    /// Decompression statistics
224    stats: DecompressionStats,
225}
226
227#[derive(Debug, Clone, Default)]
228pub struct DecompressionStats {
229    /// Total frames decompressed
230    pub frames_decompressed: u32,
231    /// Total bytes decompressed
232    pub total_decompressed_bytes: usize,
233    /// Average decompression time in microseconds
234    pub avg_decompression_time_us: u64,
235}
236
237impl StreamingDecompressor {
238    /// Create new streaming decompressor
239    pub fn new() -> Self {
240        Self {
241            active_dictionary: HashMap::new(),
242            delta_bases: HashMap::new(),
243            stats: DecompressionStats::default(),
244        }
245    }
246
247    /// Decompress a compressed frame
248    pub fn decompress_frame(&mut self, compressed_frame: CompressedFrame) -> DomainResult<StreamFrame> {
249        let start_time = std::time::Instant::now();
250
251        // Update decompression context with metadata
252        self.update_context(&compressed_frame.decompression_metadata)?;
253
254        // Decompress data based on strategy
255        let decompressed_data = self.decompress_data(
256            &compressed_frame.compressed_data,
257            &compressed_frame.decompression_metadata.strategy,
258        )?;
259
260        // Update statistics
261        let decompression_time = start_time.elapsed();
262        self.update_decompression_stats(&decompressed_data, decompression_time);
263
264        Ok(StreamFrame {
265            data: decompressed_data,
266            priority: compressed_frame.frame.priority,
267            metadata: compressed_frame.frame.metadata,
268        })
269    }
270
271    /// Update decompression context with new metadata
272    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
273        // Update dictionary
274        for (&index, string) in &metadata.dictionary_map {
275            self.active_dictionary.insert(index, string.clone());
276        }
277
278        // Update delta bases
279        for (path, &base) in &metadata.delta_bases {
280            self.delta_bases.insert(path.clone(), base);
281        }
282
283        Ok(())
284    }
285
286    /// Decompress data according to strategy
287    fn decompress_data(&self, compressed_data: &CompressedData, strategy: &CompressionStrategy) -> DomainResult<JsonValue> {
288        match strategy {
289            CompressionStrategy::None => Ok(compressed_data.data.clone()),
290            
291            CompressionStrategy::Dictionary { .. } => {
292                self.decompress_dictionary(&compressed_data.data)
293            }
294            
295            CompressionStrategy::Delta { .. } => {
296                self.decompress_delta(&compressed_data.data)
297            }
298            
299            CompressionStrategy::RunLength => {
300                self.decompress_run_length(&compressed_data.data)
301            }
302            
303            CompressionStrategy::Hybrid { .. } => {
304                // Apply decompression in reverse order: delta first, then dictionary
305                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
306                self.decompress_dictionary(&delta_decompressed)
307            }
308        }
309    }
310
311    /// Decompress dictionary-encoded strings
312    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
313        match data {
314            JsonValue::Object(obj) => {
315                let mut decompressed = serde_json::Map::new();
316                for (key, value) in obj {
317                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
318                }
319                Ok(JsonValue::Object(decompressed))
320            }
321            JsonValue::Array(arr) => {
322                let decompressed: Result<Vec<_>, _> = arr.iter()
323                    .map(|item| self.decompress_dictionary(item))
324                    .collect();
325                Ok(JsonValue::Array(decompressed?))
326            }
327            JsonValue::Number(n) => {
328                // Check if this is a dictionary index
329                if let Some(index) = n.as_u64() {
330                    if let Some(string_val) = self.active_dictionary.get(&(index as u16)) {
331                        return Ok(JsonValue::String(string_val.clone()));
332                    }
333                }
334                Ok(data.clone())
335            }
336            _ => Ok(data.clone()),
337        }
338    }
339
340    /// Decompress delta-encoded values
341    fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
342        // TODO: Implement delta decompression for numeric sequences
343        // This would reconstruct original values from deltas and base values
344        Ok(data.clone())
345    }
346
347    /// Decompress run-length encoded data
348    fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
349        // TODO: Implement run-length decompression
350        Ok(data.clone())
351    }
352
353    /// Update decompression statistics
354    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
355        self.stats.frames_decompressed += 1;
356        
357        if let Ok(serialized) = serde_json::to_string(data) {
358            self.stats.total_decompressed_bytes += serialized.len();
359        }
360
361        let new_time_us = duration.as_micros() as u64;
362        if self.stats.frames_decompressed == 1 {
363            self.stats.avg_decompression_time_us = new_time_us;
364        } else {
365            // Calculate running average
366            let total_frames = self.stats.frames_decompressed as u64;
367            let total_time = self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
368            self.stats.avg_decompression_time_us = total_time / total_frames;
369        }
370    }
371
372    /// Get decompression statistics
373    pub fn get_stats(&self) -> &DecompressionStats {
374        &self.stats
375    }
376}
377
378impl Default for StreamingCompressor {
379    fn default() -> Self {
380        Self::new()
381    }
382}
383
384impl Default for StreamingDecompressor {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use serde_json::json;
394
395    #[test]
396    fn test_streaming_compressor_basic() {
397        let mut compressor = StreamingCompressor::new();
398        
399        let frame = StreamFrame {
400            data: json!({
401                "message": "test message",
402                "count": 42
403            }),
404            priority: Priority::MEDIUM,
405            metadata: HashMap::new(),
406        };
407
408        let result = compressor.compress_frame(frame);
409        assert!(result.is_ok());
410        
411        let compressed = result.unwrap();
412        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
413    }
414
415    #[test]
416    fn test_compression_stats() {
417        let stats = CompressionStats { 
418            total_input_bytes: 1000, 
419            total_output_bytes: 600, 
420            ..Default::default() 
421        };
422        
423        assert_eq!(stats.overall_compression_ratio(), 0.6);
424        assert_eq!(stats.bytes_saved(), 400);
425        // Use approximate comparison for float precision
426        let percentage = stats.percentage_saved();
427        assert!((percentage - 40.0).abs() < 0.001);
428    }
429
430    #[test]
431    fn test_streaming_decompressor_basic() {
432        let mut decompressor = StreamingDecompressor::new();
433        
434        let compressed_frame = CompressedFrame {
435            frame: StreamFrame {
436                data: json!({"test": "data"}),
437                priority: Priority::MEDIUM,
438                metadata: HashMap::new(),
439            },
440            compressed_data: CompressedData {
441                strategy: CompressionStrategy::None,
442                compressed_size: 20,
443                data: json!({"test": "data"}),
444                compression_metadata: HashMap::new(),
445            },
446            decompression_metadata: DecompressionMetadata {
447                strategy: CompressionStrategy::None,
448                dictionary_map: HashMap::new(),
449                delta_bases: HashMap::new(),
450                priority_hints: HashMap::new(),
451            },
452        };
453
454        let result = decompressor.decompress_frame(compressed_frame);
455        assert!(result.is_ok());
456        
457        let decompressed = result.unwrap();
458        assert_eq!(decompressed.data, json!({"test": "data"}));
459    }
460
461    #[test]
462    fn test_dictionary_decompression() {
463        let mut decompressor = StreamingDecompressor::new();
464        decompressor.active_dictionary.insert(0, "hello".to_string());
465        decompressor.active_dictionary.insert(1, "world".to_string());
466
467        // Test with dictionary indices
468        let compressed = json!({
469            "greeting": 0,
470            "target": 1
471        });
472
473        let result = decompressor.decompress_dictionary(&compressed).unwrap();
474        assert_eq!(result, json!({
475            "greeting": "hello",
476            "target": "world"
477        }));
478    }
479
480    #[test]
481    fn test_priority_based_compression() {
482        let mut compressor = StreamingCompressor::new();
483
484        let critical_frame = StreamFrame {
485            data: json!({"error": "critical failure"}),
486            priority: Priority::CRITICAL,
487            metadata: HashMap::new(),
488        };
489
490        let low_frame = StreamFrame {
491            data: json!({"debug": "verbose information"}),
492            priority: Priority::LOW,
493            metadata: HashMap::new(),
494        };
495
496        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
497        let _low_result = compressor.compress_frame(low_frame).unwrap();
498
499        let stats = compressor.get_stats();
500        assert_eq!(stats.frames_processed, 2);
501        assert!(stats.total_input_bytes > 0);
502    }
503}