pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8    domain::{DomainError, DomainResult},
9    stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Streaming compressor that maintains compression state across frames
15#[derive(Debug, Clone)]
16pub struct StreamingCompressor {
17    /// Primary compressor for skeleton and critical data
18    skeleton_compressor: SchemaCompressor,
19    /// Secondary compressor for non-critical data
20    content_compressor: SchemaCompressor,
21    /// Compression statistics
22    stats: CompressionStats,
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct CompressionStats {
27    /// Total bytes processed
28    pub total_input_bytes: usize,
29    /// Total bytes after compression
30    pub total_output_bytes: usize,
31    /// Number of frames processed
32    pub frames_processed: u32,
33    /// Compression ratio by priority level
34    pub priority_ratios: HashMap<u8, f32>,
35}
36
37/// Compressed stream frame with metadata
38#[derive(Debug, Clone)]
39pub struct CompressedFrame {
40    /// Original frame metadata
41    pub frame: StreamFrame,
42    /// Compressed data
43    pub compressed_data: CompressedData,
44    /// Decompression instructions for client
45    pub decompression_metadata: DecompressionMetadata,
46}
47
48#[derive(Debug, Clone)]
49pub struct DecompressionMetadata {
50    /// Compression strategy used
51    pub strategy: CompressionStrategy,
52    /// Dictionary indices mapping
53    pub dictionary_map: HashMap<u16, String>,
54    /// Delta base values for numeric decompression
55    pub delta_bases: HashMap<String, f64>,
56    /// Priority-specific decompression hints
57    pub priority_hints: HashMap<u8, String>,
58}
59
60impl StreamingCompressor {
61    /// Create new streaming compressor
62    pub fn new() -> Self {
63        Self {
64            skeleton_compressor: SchemaCompressor::new(),
65            content_compressor: SchemaCompressor::new(),
66            stats: CompressionStats::default(),
67        }
68    }
69
70    /// Create with custom compression strategies
71    pub fn with_strategies(
72        skeleton_strategy: CompressionStrategy,
73        content_strategy: CompressionStrategy,
74    ) -> Self {
75        Self {
76            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
77            content_compressor: SchemaCompressor::with_strategy(content_strategy),
78            stats: CompressionStats::default(),
79        }
80    }
81
82    /// Process and compress a stream frame based on its priority
83    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
84        let compressor = self.select_compressor_for_priority(frame.priority);
85
86        // Calculate original size
87        let original_size = serde_json::to_string(&frame.data)
88            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
89            .len();
90
91        // Compress based on frame content and priority
92        let compressed_data = compressor.compress(&frame.data)?;
93
94        // Update statistics
95        self.update_stats(
96            frame.priority,
97            original_size,
98            compressed_data.compressed_size,
99        );
100
101        // Create decompression metadata
102        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
103
104        Ok(CompressedFrame {
105            frame,
106            compressed_data,
107            decompression_metadata,
108        })
109    }
110
111    /// Analyze JSON data to optimize compression strategies
112    pub fn optimize_for_data(
113        &mut self,
114        skeleton: &JsonValue,
115        sample_data: &[JsonValue],
116    ) -> DomainResult<()> {
117        // Optimize skeleton compressor for critical structural data
118        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
119
120        // Analyze sample content data to optimize content compressor
121        if !sample_data.is_empty() {
122            // Combine samples for comprehensive analysis
123            let combined_sample = JsonValue::Array(sample_data.to_vec());
124            self.content_compressor
125                .analyze_and_optimize(&combined_sample)?;
126        }
127
128        Ok(())
129    }
130
131    /// Get current compression statistics
132    pub fn get_stats(&self) -> &CompressionStats {
133        &self.stats
134    }
135
136    /// Reset compression statistics
137    pub fn reset_stats(&mut self) {
138        self.stats = CompressionStats::default();
139    }
140
141    /// Select appropriate compressor based on frame priority
142    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
143        match priority {
144            // Critical data (skeleton, errors) - use specialized compressor
145            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
146            // Regular content data - use content compressor
147            _ => &mut self.content_compressor,
148        }
149    }
150
151    /// Update compression statistics
152    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
153        self.stats.total_input_bytes += original_size;
154        self.stats.total_output_bytes += compressed_size;
155        self.stats.frames_processed += 1;
156
157        let ratio = if original_size > 0 {
158            compressed_size as f32 / original_size as f32
159        } else {
160            1.0
161        };
162
163        self.stats.priority_ratios.insert(priority.value(), ratio);
164    }
165
166    /// Create decompression metadata for client
167    fn create_decompression_metadata(
168        &self,
169        compressed_data: &CompressedData,
170    ) -> DomainResult<DecompressionMetadata> {
171        let mut dictionary_map = HashMap::new();
172        let mut delta_bases = HashMap::new();
173
174        // Extract dictionary mappings
175        for (key, value) in &compressed_data.compression_metadata {
176            if key.starts_with("dict_") {
177                if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
178                    && let Some(string_val) = value.as_str()
179                {
180                    dictionary_map.insert(index, string_val.to_string());
181                }
182            } else if key.starts_with("base_") {
183                let path = key.strip_prefix("base_").unwrap();
184                if let Some(num) = value.as_f64() {
185                    delta_bases.insert(path.to_string(), num);
186                }
187            }
188        }
189
190        Ok(DecompressionMetadata {
191            strategy: compressed_data.strategy.clone(),
192            dictionary_map,
193            delta_bases,
194            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
195        })
196    }
197}
198
199impl CompressionStats {
200    /// Calculate overall compression ratio
201    pub fn overall_compression_ratio(&self) -> f32 {
202        if self.total_input_bytes == 0 {
203            return 1.0;
204        }
205        self.total_output_bytes as f32 / self.total_input_bytes as f32
206    }
207
208    /// Get compression ratio for specific priority level
209    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
210        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
211    }
212
213    /// Calculate bytes saved
214    pub fn bytes_saved(&self) -> isize {
215        self.total_input_bytes as isize - self.total_output_bytes as isize
216    }
217
218    /// Calculate percentage saved
219    pub fn percentage_saved(&self) -> f32 {
220        if self.total_input_bytes == 0 {
221            return 0.0;
222        }
223        let ratio = self.overall_compression_ratio();
224        (1.0 - ratio) * 100.0
225    }
226}
227
228/// Client-side decompressor for receiving compressed frames
229#[derive(Debug, Clone)]
230pub struct StreamingDecompressor {
231    /// Active dictionary for string decompression
232    active_dictionary: HashMap<u16, String>,
233    /// Delta base values for numeric decompression  
234    delta_bases: HashMap<String, f64>,
235    /// Decompression statistics
236    stats: DecompressionStats,
237}
238
239#[derive(Debug, Clone, Default)]
240pub struct DecompressionStats {
241    /// Total frames decompressed
242    pub frames_decompressed: u32,
243    /// Total bytes decompressed
244    pub total_decompressed_bytes: usize,
245    /// Average decompression time in microseconds
246    pub avg_decompression_time_us: u64,
247}
248
249impl StreamingDecompressor {
250    /// Create new streaming decompressor
251    pub fn new() -> Self {
252        Self {
253            active_dictionary: HashMap::new(),
254            delta_bases: HashMap::new(),
255            stats: DecompressionStats::default(),
256        }
257    }
258
259    /// Decompress a compressed frame
260    pub fn decompress_frame(
261        &mut self,
262        compressed_frame: CompressedFrame,
263    ) -> DomainResult<StreamFrame> {
264        let start_time = std::time::Instant::now();
265
266        // Update decompression context with metadata
267        self.update_context(&compressed_frame.decompression_metadata)?;
268
269        // Decompress data based on strategy
270        let decompressed_data = self.decompress_data(
271            &compressed_frame.compressed_data,
272            &compressed_frame.decompression_metadata.strategy,
273        )?;
274
275        // Update statistics
276        let decompression_time = start_time.elapsed();
277        self.update_decompression_stats(&decompressed_data, decompression_time);
278
279        Ok(StreamFrame {
280            data: decompressed_data,
281            priority: compressed_frame.frame.priority,
282            metadata: compressed_frame.frame.metadata,
283        })
284    }
285
286    /// Update decompression context with new metadata
287    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
288        // Update dictionary
289        for (&index, string) in &metadata.dictionary_map {
290            self.active_dictionary.insert(index, string.clone());
291        }
292
293        // Update delta bases
294        for (path, &base) in &metadata.delta_bases {
295            self.delta_bases.insert(path.clone(), base);
296        }
297
298        Ok(())
299    }
300
301    /// Decompress data according to strategy
302    fn decompress_data(
303        &self,
304        compressed_data: &CompressedData,
305        strategy: &CompressionStrategy,
306    ) -> DomainResult<JsonValue> {
307        match strategy {
308            CompressionStrategy::None => Ok(compressed_data.data.clone()),
309
310            CompressionStrategy::Dictionary { .. } => {
311                self.decompress_dictionary(&compressed_data.data)
312            }
313
314            CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
315
316            CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
317
318            CompressionStrategy::Hybrid { .. } => {
319                // Apply decompression in reverse order: delta first, then dictionary
320                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
321                self.decompress_dictionary(&delta_decompressed)
322            }
323        }
324    }
325
326    /// Decompress dictionary-encoded strings
327    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
328        match data {
329            JsonValue::Object(obj) => {
330                let mut decompressed = serde_json::Map::new();
331                for (key, value) in obj {
332                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
333                }
334                Ok(JsonValue::Object(decompressed))
335            }
336            JsonValue::Array(arr) => {
337                let decompressed: Result<Vec<_>, _> = arr
338                    .iter()
339                    .map(|item| self.decompress_dictionary(item))
340                    .collect();
341                Ok(JsonValue::Array(decompressed?))
342            }
343            JsonValue::Number(n) => {
344                // Check if this is a dictionary index
345                if let Some(index) = n.as_u64()
346                    && let Some(string_val) = self.active_dictionary.get(&(index as u16))
347                {
348                    return Ok(JsonValue::String(string_val.clone()));
349                }
350                Ok(data.clone())
351            }
352            _ => Ok(data.clone()),
353        }
354    }
355
356    /// Decompress delta-encoded values
357    fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
358        // TODO: Implement delta decompression for numeric sequences
359        // This would reconstruct original values from deltas and base values
360        Ok(data.clone())
361    }
362
363    /// Decompress run-length encoded data
364    fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
365        // TODO: Implement run-length decompression
366        Ok(data.clone())
367    }
368
369    /// Update decompression statistics
370    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
371        self.stats.frames_decompressed += 1;
372
373        if let Ok(serialized) = serde_json::to_string(data) {
374            self.stats.total_decompressed_bytes += serialized.len();
375        }
376
377        let new_time_us = duration.as_micros() as u64;
378        if self.stats.frames_decompressed == 1 {
379            self.stats.avg_decompression_time_us = new_time_us;
380        } else {
381            // Calculate running average
382            let total_frames = self.stats.frames_decompressed as u64;
383            let total_time =
384                self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
385            self.stats.avg_decompression_time_us = total_time / total_frames;
386        }
387    }
388
389    /// Get decompression statistics
390    pub fn get_stats(&self) -> &DecompressionStats {
391        &self.stats
392    }
393}
394
395impl Default for StreamingCompressor {
396    fn default() -> Self {
397        Self::new()
398    }
399}
400
401impl Default for StreamingDecompressor {
402    fn default() -> Self {
403        Self::new()
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use serde_json::json;
411
412    #[test]
413    fn test_streaming_compressor_basic() {
414        let mut compressor = StreamingCompressor::new();
415
416        let frame = StreamFrame {
417            data: json!({
418                "message": "test message",
419                "count": 42
420            }),
421            priority: Priority::MEDIUM,
422            metadata: HashMap::new(),
423        };
424
425        let result = compressor.compress_frame(frame);
426        assert!(result.is_ok());
427
428        let compressed = result.unwrap();
429        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
430    }
431
432    #[test]
433    fn test_compression_stats() {
434        let stats = CompressionStats {
435            total_input_bytes: 1000,
436            total_output_bytes: 600,
437            ..Default::default()
438        };
439
440        assert_eq!(stats.overall_compression_ratio(), 0.6);
441        assert_eq!(stats.bytes_saved(), 400);
442        // Use approximate comparison for float precision
443        let percentage = stats.percentage_saved();
444        assert!((percentage - 40.0).abs() < 0.001);
445    }
446
447    #[test]
448    fn test_streaming_decompressor_basic() {
449        let mut decompressor = StreamingDecompressor::new();
450
451        let compressed_frame = CompressedFrame {
452            frame: StreamFrame {
453                data: json!({"test": "data"}),
454                priority: Priority::MEDIUM,
455                metadata: HashMap::new(),
456            },
457            compressed_data: CompressedData {
458                strategy: CompressionStrategy::None,
459                compressed_size: 20,
460                data: json!({"test": "data"}),
461                compression_metadata: HashMap::new(),
462            },
463            decompression_metadata: DecompressionMetadata {
464                strategy: CompressionStrategy::None,
465                dictionary_map: HashMap::new(),
466                delta_bases: HashMap::new(),
467                priority_hints: HashMap::new(),
468            },
469        };
470
471        let result = decompressor.decompress_frame(compressed_frame);
472        assert!(result.is_ok());
473
474        let decompressed = result.unwrap();
475        assert_eq!(decompressed.data, json!({"test": "data"}));
476    }
477
478    #[test]
479    fn test_dictionary_decompression() {
480        let mut decompressor = StreamingDecompressor::new();
481        decompressor
482            .active_dictionary
483            .insert(0, "hello".to_string());
484        decompressor
485            .active_dictionary
486            .insert(1, "world".to_string());
487
488        // Test with dictionary indices
489        let compressed = json!({
490            "greeting": 0,
491            "target": 1
492        });
493
494        let result = decompressor.decompress_dictionary(&compressed).unwrap();
495        assert_eq!(
496            result,
497            json!({
498                "greeting": "hello",
499                "target": "world"
500            })
501        );
502    }
503
504    #[test]
505    fn test_priority_based_compression() {
506        let mut compressor = StreamingCompressor::new();
507
508        let critical_frame = StreamFrame {
509            data: json!({"error": "critical failure"}),
510            priority: Priority::CRITICAL,
511            metadata: HashMap::new(),
512        };
513
514        let low_frame = StreamFrame {
515            data: json!({"debug": "verbose information"}),
516            priority: Priority::LOW,
517            metadata: HashMap::new(),
518        };
519
520        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
521        let _low_result = compressor.compress_frame(low_frame).unwrap();
522
523        let stats = compressor.get_stats();
524        assert_eq!(stats.frames_processed, 2);
525        assert!(stats.total_input_bytes > 0);
526    }
527}