pjson_rs/compression/
mod.rs

1//! Schema-based compression for PJS protocol
2//!
3//! Implements intelligent compression strategies based on JSON schema analysis
4//! to optimize bandwidth usage while maintaining streaming capabilities.
5
6use crate::domain::{DomainResult, DomainError};
7use serde_json::Value as JsonValue;
8use std::collections::HashMap;
9
10/// Compression strategy based on schema analysis
11#[derive(Debug, Clone, PartialEq)]
12pub enum CompressionStrategy {
13    /// No compression applied
14    None,
15    /// Dictionary-based compression for repeating string patterns
16    Dictionary { dictionary: HashMap<String, u16> },
17    /// Delta encoding for numeric sequences
18    Delta { base_values: HashMap<String, f64> },
19    /// Run-length encoding for repeated values
20    RunLength,
21    /// Hybrid approach combining multiple strategies
22    Hybrid {
23        string_dict: HashMap<String, u16>,
24        numeric_deltas: HashMap<String, f64>,
25    },
26}
27
28/// Schema analyzer for determining optimal compression strategy
29#[derive(Debug, Clone)]
30pub struct SchemaAnalyzer {
31    /// Pattern frequency analysis
32    patterns: HashMap<String, PatternInfo>,
33    /// Numeric field analysis
34    numeric_fields: HashMap<String, NumericStats>,
35    /// String repetition analysis
36    string_repetitions: HashMap<String, u32>,
37}
38
39#[derive(Debug, Clone)]
40struct PatternInfo {
41    frequency: u32,
42    total_size: usize,
43    compression_potential: f32,
44}
45
46#[derive(Debug, Clone)]
47struct NumericStats {
48    values: Vec<f64>,
49    delta_potential: f32,
50    base_value: f64,
51}
52
53impl SchemaAnalyzer {
54    /// Create new schema analyzer
55    pub fn new() -> Self {
56        Self {
57            patterns: HashMap::new(),
58            numeric_fields: HashMap::new(),
59            string_repetitions: HashMap::new(),
60        }
61    }
62
63    /// Analyze JSON data to determine optimal compression strategy
64    pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
65        // Reset analysis state
66        self.patterns.clear();
67        self.numeric_fields.clear();
68        self.string_repetitions.clear();
69
70        // Perform deep analysis
71        self.analyze_recursive(data, "")?;
72
73        // Determine best strategy based on analysis
74        self.determine_strategy()
75    }
76
77    /// Analyze data recursively
78    fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
79        match value {
80            JsonValue::Object(obj) => {
81                for (key, val) in obj {
82                    let field_path = if path.is_empty() {
83                        key.clone()
84                    } else {
85                        format!("{path}.{key}")
86                    };
87                    self.analyze_recursive(val, &field_path)?;
88                }
89            }
90            JsonValue::Array(arr) => {
91                // Analyze array patterns
92                if arr.len() > 1 {
93                    self.analyze_array_patterns(arr, path)?;
94                }
95                for (idx, item) in arr.iter().enumerate() {
96                    let item_path = format!("{path}[{idx}]");
97                    self.analyze_recursive(item, &item_path)?;
98                }
99            }
100            JsonValue::String(s) => {
101                self.analyze_string_pattern(s, path);
102            }
103            JsonValue::Number(n) => {
104                if let Some(f) = n.as_f64() {
105                    self.analyze_numeric_pattern(f, path);
106                }
107            }
108            _ => {}
109        }
110        Ok(())
111    }
112
113    /// Analyze array for repeating patterns
114    fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
115        // Check for repeating object structures
116        if let Some(JsonValue::Object(first)) = arr.first() {
117            let structure_key = format!("array_structure:{path}");
118            let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
119            let pattern = field_names.join(",");
120            
121            // Count how many objects share this structure
122            let matching_count = arr.iter()
123                .filter_map(|v| v.as_object())
124                .filter(|obj| {
125                    let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
126                    obj_fields.join(",") == pattern
127                })
128                .count();
129
130            if matching_count > 1 {
131                let info = PatternInfo {
132                    frequency: matching_count as u32,
133                    total_size: pattern.len() * matching_count,
134                    compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
135                };
136                self.patterns.insert(structure_key, info);
137            }
138        }
139
140        // Check for repeating primitive values
141        if arr.len() > 2 {
142            let mut value_counts = HashMap::new();
143            for value in arr {
144                let key = match value {
145                    JsonValue::String(s) => format!("string:{s}"),
146                    JsonValue::Number(n) => format!("number:{n}"),
147                    JsonValue::Bool(b) => format!("bool:{b}"),
148                    _ => continue,
149                };
150                *value_counts.entry(key).or_insert(0) += 1;
151            }
152
153            for (value_key, count) in value_counts {
154                if count > 1 {
155                    let info = PatternInfo {
156                        frequency: count,
157                        total_size: value_key.len() * count as usize,
158                        compression_potential: (count as f32 - 1.0) / count as f32,
159                    };
160                    self.patterns.insert(format!("array_value:{path}:{value_key}"), info);
161                }
162            }
163        }
164
165        Ok(())
166    }
167
168    /// Analyze string for repetition patterns
169    fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
170        // Track string repetitions across different paths
171        *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
172
173        // Analyze common prefixes/suffixes for URLs, IDs, etc.
174        if s.len() > 10 {
175            // Check for URL patterns
176            if s.starts_with("http://") || s.starts_with("https://") {
177                let prefix = if s.starts_with("https://") { "https://" } else { "http://" };
178                self.patterns.entry(format!("url_prefix:{prefix}")).or_insert(PatternInfo {
179                    frequency: 0,
180                    total_size: 0,
181                    compression_potential: 0.0,
182                }).frequency += 1;
183            }
184
185            // Check for ID patterns (UUID-like)
186            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
187                self.patterns.entry("uuid_pattern".to_string()).or_insert(PatternInfo {
188                    frequency: 0,
189                    total_size: 36,
190                    compression_potential: 0.3,
191                }).frequency += 1;
192            }
193        }
194    }
195
196    /// Analyze numeric patterns for delta compression
197    fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
198        self.numeric_fields
199            .entry(path.to_string())
200            .or_insert_with(|| NumericStats {
201                values: Vec::new(),
202                delta_potential: 0.0,
203                base_value: value,
204            })
205            .values
206            .push(value);
207    }
208
209    /// Determine optimal compression strategy based on analysis
210    fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
211        // Calculate compression potentials
212        let mut string_dict_score = 0.0;
213        let mut delta_score = 0.0;
214
215        // Analyze string repetition potential
216        let mut string_dict = HashMap::new();
217        let mut dict_index = 0u16;
218        
219        for (string, count) in &self.string_repetitions {
220            if *count > 1 && string.len() > 3 {
221                string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
222                string_dict.insert(string.clone(), dict_index);
223                dict_index += 1;
224            }
225        }
226
227        // Analyze numeric delta potential
228        let mut numeric_deltas = HashMap::new();
229        
230        for (path, stats) in &mut self.numeric_fields {
231            if stats.values.len() > 2 {
232                // Calculate variance to determine delta effectiveness
233                stats.values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
234                
235                let deltas: Vec<f64> = stats.values.windows(2)
236                    .map(|window| window[1] - window[0])
237                    .collect();
238                
239                if !deltas.is_empty() {
240                    let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
241                    let delta_variance = deltas.iter()
242                        .map(|d| (d - avg_delta).powi(2))
243                        .sum::<f64>() / deltas.len() as f64;
244                    
245                    // Low variance suggests good delta compression potential
246                    stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
247                    
248                    if stats.delta_potential > 0.3 {
249                        delta_score += stats.delta_potential * stats.values.len() as f32;
250                        numeric_deltas.insert(path.clone(), stats.base_value);
251                    }
252                }
253            }
254        }
255
256        // Choose strategy based on scores
257        match (string_dict_score > 50.0, delta_score > 30.0) {
258            (true, true) => Ok(CompressionStrategy::Hybrid {
259                string_dict,
260                numeric_deltas,
261            }),
262            (true, false) => Ok(CompressionStrategy::Dictionary { 
263                dictionary: string_dict 
264            }),
265            (false, true) => Ok(CompressionStrategy::Delta { 
266                base_values: numeric_deltas 
267            }),
268            (false, false) => {
269                // Check for run-length potential
270                let run_length_score = self.patterns.values()
271                    .filter(|p| p.compression_potential > 0.4)
272                    .map(|p| p.frequency as f32 * p.compression_potential)
273                    .sum::<f32>();
274                
275                if run_length_score > 20.0 {
276                    Ok(CompressionStrategy::RunLength)
277                } else {
278                    Ok(CompressionStrategy::None)
279                }
280            }
281        }
282    }
283}
284
285/// Schema-aware compressor
286#[derive(Debug, Clone)]
287pub struct SchemaCompressor {
288    strategy: CompressionStrategy,
289    analyzer: SchemaAnalyzer,
290}
291
292impl SchemaCompressor {
293    /// Create new compressor with automatic strategy detection
294    pub fn new() -> Self {
295        Self {
296            strategy: CompressionStrategy::None,
297            analyzer: SchemaAnalyzer::new(),
298        }
299    }
300
301    /// Create compressor with specific strategy
302    pub fn with_strategy(strategy: CompressionStrategy) -> Self {
303        Self {
304            strategy,
305            analyzer: SchemaAnalyzer::new(),
306        }
307    }
308
309    /// Analyze data and update compression strategy
310    pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
311        self.strategy = self.analyzer.analyze(data)?;
312        Ok(&self.strategy)
313    }
314
315    /// Compress JSON data according to current strategy
316    pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
317        match &self.strategy {
318            CompressionStrategy::None => Ok(CompressedData {
319                strategy: self.strategy.clone(),
320                compressed_size: serde_json::to_string(data)
321                    .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
322                    .len(),
323                data: data.clone(),
324                compression_metadata: HashMap::new(),
325            }),
326            
327            CompressionStrategy::Dictionary { dictionary } => {
328                self.compress_with_dictionary(data, dictionary)
329            }
330            
331            CompressionStrategy::Delta { base_values } => {
332                self.compress_with_delta(data, base_values)
333            }
334            
335            CompressionStrategy::RunLength => {
336                self.compress_with_run_length(data)
337            }
338            
339            CompressionStrategy::Hybrid { string_dict, numeric_deltas } => {
340                self.compress_hybrid(data, string_dict, numeric_deltas)
341            }
342        }
343    }
344
345    /// Dictionary-based compression
346    fn compress_with_dictionary(&self, data: &JsonValue, dictionary: &HashMap<String, u16>) -> DomainResult<CompressedData> {
347        let mut metadata = HashMap::new();
348        
349        // Store dictionary for decompression
350        for (string, index) in dictionary {
351            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
352        }
353
354        // Replace strings with dictionary indices
355        let compressed = self.replace_strings_with_indices(data, dictionary)?;
356        let compressed_size = serde_json::to_string(&compressed)
357            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
358            .len();
359
360        Ok(CompressedData {
361            strategy: self.strategy.clone(),
362            compressed_size,
363            data: compressed,
364            compression_metadata: metadata,
365        })
366    }
367
368    /// Delta compression for numeric sequences
369    fn compress_with_delta(&self, data: &JsonValue, base_values: &HashMap<String, f64>) -> DomainResult<CompressedData> {
370        let mut metadata = HashMap::new();
371        
372        // Store base values
373        for (path, base) in base_values {
374            metadata.insert(format!("base_{path}"), JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()));
375        }
376
377        // Apply delta compression
378        let compressed = self.apply_delta_compression(data, base_values)?;
379        let compressed_size = serde_json::to_string(&compressed)
380            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
381            .len();
382
383        Ok(CompressedData {
384            strategy: self.strategy.clone(),
385            compressed_size,
386            data: compressed,
387            compression_metadata: metadata,
388        })
389    }
390
391    /// Run-length encoding compression
392    fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
393        // TODO: Implement run-length encoding for arrays with repeated values
394        let compressed_size = serde_json::to_string(data)
395            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
396            .len();
397
398        Ok(CompressedData {
399            strategy: self.strategy.clone(),
400            compressed_size,
401            data: data.clone(),
402            compression_metadata: HashMap::new(),
403        })
404    }
405
406    /// Hybrid compression combining multiple strategies
407    fn compress_hybrid(&self, data: &JsonValue, string_dict: &HashMap<String, u16>, numeric_deltas: &HashMap<String, f64>) -> DomainResult<CompressedData> {
408        let mut metadata = HashMap::new();
409        
410        // Add dictionary metadata
411        for (string, index) in string_dict {
412            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
413        }
414        
415        // Add delta base values
416        for (path, base) in numeric_deltas {
417            metadata.insert(format!("base_{path}"), JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()));
418        }
419
420        // Apply both compression strategies
421        let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
422        let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
423        
424        let compressed_size = serde_json::to_string(&final_compressed)
425            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
426            .len();
427
428        Ok(CompressedData {
429            strategy: self.strategy.clone(),
430            compressed_size,
431            data: final_compressed,
432            compression_metadata: metadata,
433        })
434    }
435
436    /// Replace strings with dictionary indices
437    #[allow(clippy::only_used_in_recursion)]
438    fn replace_strings_with_indices(&self, data: &JsonValue, dictionary: &HashMap<String, u16>) -> DomainResult<JsonValue> {
439        match data {
440            JsonValue::Object(obj) => {
441                let mut compressed_obj = serde_json::Map::new();
442                for (key, value) in obj {
443                    compressed_obj.insert(
444                        key.clone(),
445                        self.replace_strings_with_indices(value, dictionary)?
446                    );
447                }
448                Ok(JsonValue::Object(compressed_obj))
449            }
450            JsonValue::Array(arr) => {
451                let compressed_arr: Result<Vec<_>, _> = arr.iter()
452                    .map(|item| self.replace_strings_with_indices(item, dictionary))
453                    .collect();
454                Ok(JsonValue::Array(compressed_arr?))
455            }
456            JsonValue::String(s) => {
457                if let Some(&index) = dictionary.get(s) {
458                    Ok(JsonValue::Number(serde_json::Number::from(index)))
459                } else {
460                    Ok(data.clone())
461                }
462            }
463            _ => Ok(data.clone()),
464        }
465    }
466
467    /// Apply delta compression to numeric values
468    fn apply_delta_compression(&self, data: &JsonValue, _base_values: &HashMap<String, f64>) -> DomainResult<JsonValue> {
469        // TODO: Implement delta compression for numeric sequences in arrays
470        // This is a simplified version - real implementation would track field paths
471        Ok(data.clone())
472    }
473}
474
475/// Compressed data with metadata
476#[derive(Debug, Clone)]
477pub struct CompressedData {
478    pub strategy: CompressionStrategy,
479    pub compressed_size: usize,
480    pub data: JsonValue,
481    pub compression_metadata: HashMap<String, JsonValue>,
482}
483
484impl CompressedData {
485    /// Calculate compression ratio
486    pub fn compression_ratio(&self, original_size: usize) -> f32 {
487        if original_size == 0 {
488            return 1.0;
489        }
490        self.compressed_size as f32 / original_size as f32
491    }
492
493    /// Get compression savings in bytes
494    pub fn compression_savings(&self, original_size: usize) -> isize {
495        original_size as isize - self.compressed_size as isize
496    }
497}
498
499impl Default for SchemaAnalyzer {
500    fn default() -> Self {
501        Self::new()
502    }
503}
504
505impl Default for SchemaCompressor {
506    fn default() -> Self {
507        Self::new()
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use serde_json::json;
515
516    #[test]
517    fn test_schema_analyzer_dictionary_potential() {
518        let mut analyzer = SchemaAnalyzer::new();
519        
520        let data = json!({
521            "users": [
522                {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
523                {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
524                {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
525                {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
526                {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
527                {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
528                {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
529                {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
530            ]
531        });
532
533        let strategy = analyzer.analyze(&data).unwrap();
534        
535        // Should detect repeating strings like "admin", "active"
536        match strategy {
537            CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
538                // Expected outcome
539            }
540            _ => panic!("Expected dictionary-based compression strategy"),
541        }
542    }
543
544    #[test]
545    fn test_schema_compressor_basic() {
546        let compressor = SchemaCompressor::new();
547        
548        let data = json!({
549            "message": "hello world",
550            "count": 42
551        });
552
553        let original_size = serde_json::to_string(&data).unwrap().len();
554        let compressed = compressor.compress(&data).unwrap();
555        
556        assert!(compressed.compressed_size > 0);
557        assert!(compressed.compression_ratio(original_size) <= 1.0);
558    }
559
560    #[test]
561    fn test_dictionary_compression() {
562        let mut dictionary = HashMap::new();
563        dictionary.insert("active".to_string(), 0);
564        dictionary.insert("admin".to_string(), 1);
565        
566        let compressor = SchemaCompressor::with_strategy(
567            CompressionStrategy::Dictionary { dictionary }
568        );
569        
570        let data = json!({
571            "status": "active",
572            "role": "admin", 
573            "description": "active admin user"
574        });
575
576        let result = compressor.compress(&data).unwrap();
577        
578        // Verify compression metadata contains dictionary
579        assert!(result.compression_metadata.contains_key("dict_0"));
580        assert!(result.compression_metadata.contains_key("dict_1"));
581    }
582
583    #[test]
584    fn test_compression_strategy_selection() {
585        let mut analyzer = SchemaAnalyzer::new();
586        
587        // Test data with no clear patterns
588        let simple_data = json!({
589            "unique_field_1": "unique_value_1",
590            "unique_field_2": "unique_value_2"
591        });
592        
593        let strategy = analyzer.analyze(&simple_data).unwrap();
594        assert_eq!(strategy, CompressionStrategy::None);
595    }
596
597    #[test] 
598    fn test_numeric_delta_analysis() {
599        let mut analyzer = SchemaAnalyzer::new();
600        
601        let data = json!({
602            "measurements": [
603                {"time": 100, "value": 10.0},
604                {"time": 101, "value": 10.5},  
605                {"time": 102, "value": 11.0},
606                {"time": 103, "value": 11.5}
607            ]
608        });
609
610        let _strategy = analyzer.analyze(&data).unwrap();
611        
612        // Should detect incremental numeric patterns
613        assert!(!analyzer.numeric_fields.is_empty());
614    }
615}