pjson_rs/compression/
mod.rs

1//! Schema-based compression for PJS protocol
2//!
3//! Implements intelligent compression strategies based on JSON schema analysis
4//! to optimize bandwidth usage while maintaining streaming capabilities.
5
6use crate::domain::{DomainResult, DomainError};
7use serde_json::{json, Value as JsonValue};
8use std::collections::HashMap;
9
10/// Configuration constants for compression algorithms
11#[derive(Debug, Clone)]
12pub struct CompressionConfig {
13    /// Minimum array length for pattern analysis
14    pub min_array_length: usize,
15    /// Minimum string length for dictionary inclusion
16    pub min_string_length: usize,
17    /// Minimum frequency for dictionary inclusion
18    pub min_frequency_count: u32,
19    /// Minimum compression potential for UUID patterns
20    pub uuid_compression_potential: f32,
21    /// Threshold score for string dictionary compression
22    pub string_dict_threshold: f32,
23    /// Threshold score for delta compression
24    pub delta_threshold: f32,
25    /// Minimum delta potential for numeric compression
26    pub min_delta_potential: f32,
27    /// Threshold for run-length compression
28    pub run_length_threshold: f32,
29    /// Minimum compression potential for pattern selection
30    pub min_compression_potential: f32,
31    /// Minimum array size for numeric sequence analysis
32    pub min_numeric_sequence_size: usize,
33}
34
35impl Default for CompressionConfig {
36    fn default() -> Self {
37        Self {
38            min_array_length: 2,
39            min_string_length: 3,
40            min_frequency_count: 1,
41            uuid_compression_potential: 0.3,
42            string_dict_threshold: 50.0,
43            delta_threshold: 30.0,
44            min_delta_potential: 0.3,
45            run_length_threshold: 20.0,
46            min_compression_potential: 0.4,
47            min_numeric_sequence_size: 3,
48        }
49    }
50}
51
52/// Compression strategy based on schema analysis
53#[derive(Debug, Clone, PartialEq)]
54pub enum CompressionStrategy {
55    /// No compression applied
56    None,
57    /// Dictionary-based compression for repeating string patterns
58    Dictionary { dictionary: HashMap<String, u16> },
59    /// Delta encoding for numeric sequences
60    Delta { base_values: HashMap<String, f64> },
61    /// Run-length encoding for repeated values
62    RunLength,
63    /// Hybrid approach combining multiple strategies
64    Hybrid {
65        string_dict: HashMap<String, u16>,
66        numeric_deltas: HashMap<String, f64>,
67    },
68}
69
70/// Schema analyzer for determining optimal compression strategy
71#[derive(Debug, Clone)]
72pub struct SchemaAnalyzer {
73    /// Pattern frequency analysis
74    patterns: HashMap<String, PatternInfo>,
75    /// Numeric field analysis
76    numeric_fields: HashMap<String, NumericStats>,
77    /// String repetition analysis
78    string_repetitions: HashMap<String, u32>,
79    /// Configuration for compression algorithms
80    config: CompressionConfig,
81}
82
83#[derive(Debug, Clone)]
84struct PatternInfo {
85    frequency: u32,
86    total_size: usize,
87    compression_potential: f32,
88}
89
90#[derive(Debug, Clone)]
91struct NumericStats {
92    values: Vec<f64>,
93    delta_potential: f32,
94    base_value: f64,
95}
96
97impl SchemaAnalyzer {
98    /// Create new schema analyzer
99    pub fn new() -> Self {
100        Self {
101            patterns: HashMap::new(),
102            numeric_fields: HashMap::new(),
103            string_repetitions: HashMap::new(),
104            config: CompressionConfig::default(),
105        }
106    }
107
108    /// Create new schema analyzer with custom configuration
109    pub fn with_config(config: CompressionConfig) -> Self {
110        Self {
111            patterns: HashMap::new(),
112            numeric_fields: HashMap::new(),
113            string_repetitions: HashMap::new(),
114            config,
115        }
116    }
117
118    /// Analyze JSON data to determine optimal compression strategy
119    pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
120        // Reset analysis state
121        self.patterns.clear();
122        self.numeric_fields.clear();
123        self.string_repetitions.clear();
124
125        // Perform deep analysis
126        self.analyze_recursive(data, "")?;
127
128        // Determine best strategy based on analysis
129        self.determine_strategy()
130    }
131
132    /// Analyze data recursively
133    fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
134        match value {
135            JsonValue::Object(obj) => {
136                for (key, val) in obj {
137                    let field_path = if path.is_empty() {
138                        key.clone()
139                    } else {
140                        format!("{path}.{key}")
141                    };
142                    self.analyze_recursive(val, &field_path)?;
143                }
144            }
145            JsonValue::Array(arr) => {
146                // Analyze array patterns
147                if arr.len() > self.config.min_array_length {
148                    self.analyze_array_patterns(arr, path)?;
149                }
150                for (idx, item) in arr.iter().enumerate() {
151                    let item_path = format!("{path}[{idx}]");
152                    self.analyze_recursive(item, &item_path)?;
153                }
154            }
155            JsonValue::String(s) => {
156                self.analyze_string_pattern(s, path);
157            }
158            JsonValue::Number(n) => {
159                if let Some(f) = n.as_f64() {
160                    self.analyze_numeric_pattern(f, path);
161                }
162            }
163            _ => {}
164        }
165        Ok(())
166    }
167
168    /// Analyze array for repeating patterns
169    fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
170        // Check for repeating object structures
171        if let Some(JsonValue::Object(first)) = arr.first() {
172            let structure_key = format!("array_structure:{path}");
173            let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
174            let pattern = field_names.join(",");
175            
176            // Count how many objects share this structure
177            let matching_count = arr.iter()
178                .filter_map(|v| v.as_object())
179                .filter(|obj| {
180                    let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
181                    obj_fields.join(",") == pattern
182                })
183                .count();
184
185            if matching_count > self.config.min_frequency_count as usize {
186                let info = PatternInfo {
187                    frequency: matching_count as u32,
188                    total_size: pattern.len() * matching_count,
189                    compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
190                };
191                self.patterns.insert(structure_key, info);
192            }
193        }
194
195        // Check for repeating primitive values
196        if arr.len() > 2 {
197            let mut value_counts = HashMap::new();
198            for value in arr {
199                let key = match value {
200                    JsonValue::String(s) => format!("string:{s}"),
201                    JsonValue::Number(n) => format!("number:{n}"),
202                    JsonValue::Bool(b) => format!("bool:{b}"),
203                    _ => continue,
204                };
205                *value_counts.entry(key).or_insert(0) += 1;
206            }
207
208            for (value_key, count) in value_counts {
209                if count > self.config.min_frequency_count {
210                    let info = PatternInfo {
211                        frequency: count,
212                        total_size: value_key.len() * count as usize,
213                        compression_potential: (count as f32 - 1.0) / count as f32,
214                    };
215                    self.patterns.insert(format!("array_value:{path}:{value_key}"), info);
216                }
217            }
218        }
219
220        Ok(())
221    }
222
223    /// Analyze string for repetition patterns
224    fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
225        // Track string repetitions across different paths
226        *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
227
228        // Analyze common prefixes/suffixes for URLs, IDs, etc.
229        if s.len() > 10 {
230            // Check for URL patterns
231            if s.starts_with("http://") || s.starts_with("https://") {
232                let prefix = if s.starts_with("https://") { "https://" } else { "http://" };
233                self.patterns.entry(format!("url_prefix:{prefix}")).or_insert(PatternInfo {
234                    frequency: 0,
235                    total_size: 0,
236                    compression_potential: 0.0,
237                }).frequency += 1;
238            }
239
240            // Check for ID patterns (UUID-like)
241            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
242                self.patterns.entry("uuid_pattern".to_string()).or_insert(PatternInfo {
243                    frequency: 0,
244                    total_size: 36,
245                    compression_potential: self.config.uuid_compression_potential,
246                }).frequency += 1;
247            }
248        }
249    }
250
251    /// Analyze numeric patterns for delta compression
252    fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
253        self.numeric_fields
254            .entry(path.to_string())
255            .or_insert_with(|| NumericStats {
256                values: Vec::new(),
257                delta_potential: 0.0,
258                base_value: value,
259            })
260            .values
261            .push(value);
262    }
263
264    /// Determine optimal compression strategy based on analysis
265    fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
266        // Calculate compression potentials
267        let mut string_dict_score = 0.0;
268        let mut delta_score = 0.0;
269
270        // Analyze string repetition potential
271        let mut string_dict = HashMap::new();
272        let mut dict_index = 0u16;
273        
274        for (string, count) in &self.string_repetitions {
275            if *count > self.config.min_frequency_count && string.len() > self.config.min_string_length {
276                string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
277                string_dict.insert(string.clone(), dict_index);
278                dict_index += 1;
279            }
280        }
281
282        // Analyze numeric delta potential
283        let mut numeric_deltas = HashMap::new();
284        
285        for (path, stats) in &mut self.numeric_fields {
286            if stats.values.len() > 2 {
287                // Calculate variance to determine delta effectiveness
288                stats.values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
289                
290                let deltas: Vec<f64> = stats.values.windows(2)
291                    .map(|window| window[1] - window[0])
292                    .collect();
293                
294                if !deltas.is_empty() {
295                    let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
296                    let delta_variance = deltas.iter()
297                        .map(|d| (d - avg_delta).powi(2))
298                        .sum::<f64>() / deltas.len() as f64;
299                    
300                    // Low variance suggests good delta compression potential
301                    stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
302                    
303                    if stats.delta_potential > self.config.min_delta_potential {
304                        delta_score += stats.delta_potential * stats.values.len() as f32;
305                        numeric_deltas.insert(path.clone(), stats.base_value);
306                    }
307                }
308            }
309        }
310
311        // Choose strategy based on scores
312        match (string_dict_score > self.config.string_dict_threshold, delta_score > self.config.delta_threshold) {
313            (true, true) => Ok(CompressionStrategy::Hybrid {
314                string_dict,
315                numeric_deltas,
316            }),
317            (true, false) => Ok(CompressionStrategy::Dictionary { 
318                dictionary: string_dict 
319            }),
320            (false, true) => Ok(CompressionStrategy::Delta { 
321                base_values: numeric_deltas 
322            }),
323            (false, false) => {
324                // Check for run-length potential
325                let run_length_score = self.patterns.values()
326                    .filter(|p| p.compression_potential > self.config.min_compression_potential)
327                    .map(|p| p.frequency as f32 * p.compression_potential)
328                    .sum::<f32>();
329                
330                if run_length_score > self.config.run_length_threshold {
331                    Ok(CompressionStrategy::RunLength)
332                } else {
333                    Ok(CompressionStrategy::None)
334                }
335            }
336        }
337    }
338}
339
340/// Schema-aware compressor
341#[derive(Debug, Clone)]
342pub struct SchemaCompressor {
343    strategy: CompressionStrategy,
344    analyzer: SchemaAnalyzer,
345    config: CompressionConfig,
346}
347
348impl SchemaCompressor {
349    /// Create new compressor with automatic strategy detection
350    pub fn new() -> Self {
351        let config = CompressionConfig::default();
352        Self {
353            strategy: CompressionStrategy::None,
354            analyzer: SchemaAnalyzer::with_config(config.clone()),
355            config,
356        }
357    }
358
359    /// Create compressor with specific strategy
360    pub fn with_strategy(strategy: CompressionStrategy) -> Self {
361        let config = CompressionConfig::default();
362        Self {
363            strategy,
364            analyzer: SchemaAnalyzer::with_config(config.clone()),
365            config,
366        }
367    }
368
369    /// Create compressor with custom configuration
370    pub fn with_config(config: CompressionConfig) -> Self {
371        Self {
372            strategy: CompressionStrategy::None,
373            analyzer: SchemaAnalyzer::with_config(config.clone()),
374            config,
375        }
376    }
377
378    /// Analyze data and update compression strategy
379    pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
380        self.strategy = self.analyzer.analyze(data)?;
381        Ok(&self.strategy)
382    }
383
384    /// Compress JSON data according to current strategy
385    pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
386        match &self.strategy {
387            CompressionStrategy::None => Ok(CompressedData {
388                strategy: self.strategy.clone(),
389                compressed_size: serde_json::to_string(data)
390                    .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
391                    .len(),
392                data: data.clone(),
393                compression_metadata: HashMap::new(),
394            }),
395            
396            CompressionStrategy::Dictionary { dictionary } => {
397                self.compress_with_dictionary(data, dictionary)
398            }
399            
400            CompressionStrategy::Delta { base_values } => {
401                self.compress_with_delta(data, base_values)
402            }
403            
404            CompressionStrategy::RunLength => {
405                self.compress_with_run_length(data)
406            }
407            
408            CompressionStrategy::Hybrid { string_dict, numeric_deltas } => {
409                self.compress_hybrid(data, string_dict, numeric_deltas)
410            }
411        }
412    }
413
414    /// Dictionary-based compression
415    fn compress_with_dictionary(&self, data: &JsonValue, dictionary: &HashMap<String, u16>) -> DomainResult<CompressedData> {
416        let mut metadata = HashMap::new();
417        
418        // Store dictionary for decompression
419        for (string, index) in dictionary {
420            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
421        }
422
423        // Replace strings with dictionary indices
424        let compressed = self.replace_strings_with_indices(data, dictionary)?;
425        let compressed_size = serde_json::to_string(&compressed)
426            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
427            .len();
428
429        Ok(CompressedData {
430            strategy: self.strategy.clone(),
431            compressed_size,
432            data: compressed,
433            compression_metadata: metadata,
434        })
435    }
436
437    /// Delta compression for numeric sequences
438    fn compress_with_delta(&self, data: &JsonValue, base_values: &HashMap<String, f64>) -> DomainResult<CompressedData> {
439        let mut metadata = HashMap::new();
440        
441        // Store base values
442        for (path, base) in base_values {
443            metadata.insert(format!("base_{path}"), JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()));
444        }
445
446        // Apply delta compression
447        let compressed = self.apply_delta_compression(data, base_values)?;
448        let compressed_size = serde_json::to_string(&compressed)
449            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
450            .len();
451
452        Ok(CompressedData {
453            strategy: self.strategy.clone(),
454            compressed_size,
455            data: compressed,
456            compression_metadata: metadata,
457        })
458    }
459
460    /// Run-length encoding compression
461    fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
462        let compressed = self.apply_run_length_encoding(data)?;
463        let compressed_size = serde_json::to_string(&compressed)
464            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
465            .len();
466
467        Ok(CompressedData {
468            strategy: self.strategy.clone(),
469            compressed_size,
470            data: compressed,
471            compression_metadata: HashMap::new(),
472        })
473    }
474
475    /// Apply run-length encoding to arrays with repeated values
476    fn apply_run_length_encoding(&self, data: &JsonValue) -> DomainResult<JsonValue> {
477        match data {
478            JsonValue::Object(obj) => {
479                let mut compressed_obj = serde_json::Map::new();
480                for (key, value) in obj {
481                    compressed_obj.insert(
482                        key.clone(),
483                        self.apply_run_length_encoding(value)?
484                    );
485                }
486                Ok(JsonValue::Object(compressed_obj))
487            }
488            JsonValue::Array(arr) if arr.len() > 2 => {
489                // Apply run-length encoding to array
490                let mut compressed_runs = Vec::new();
491                let mut current_value = None;
492                let mut run_count = 0;
493
494                for item in arr {
495                    if Some(item) == current_value.as_ref() {
496                        run_count += 1;
497                    } else {
498                        // Save previous run if it exists
499                        if let Some(value) = current_value {
500                            if run_count > self.config.min_frequency_count {
501                                // Use run-length encoding: [value, count]
502                                compressed_runs.push(json!({
503                                    "rle_value": value,
504                                    "rle_count": run_count
505                                }));
506                            } else {
507                                // Single occurrence, keep as-is
508                                compressed_runs.push(value);
509                            }
510                        }
511                        
512                        // Start new run
513                        current_value = Some(item.clone());
514                        run_count = 1;
515                    }
516                }
517
518                // Handle final run
519                if let Some(value) = current_value {
520                    if run_count > self.config.min_frequency_count {
521                        compressed_runs.push(json!({
522                            "rle_value": value,
523                            "rle_count": run_count
524                        }));
525                    } else {
526                        compressed_runs.push(value);
527                    }
528                }
529
530                Ok(JsonValue::Array(compressed_runs))
531            }
532            JsonValue::Array(arr) => {
533                // Array too small for run-length encoding, process recursively
534                let compressed_arr: Result<Vec<_>, _> = arr.iter()
535                    .map(|item| self.apply_run_length_encoding(item))
536                    .collect();
537                Ok(JsonValue::Array(compressed_arr?))
538            }
539            _ => Ok(data.clone()),
540        }
541    }
542
543    /// Hybrid compression combining multiple strategies
544    fn compress_hybrid(&self, data: &JsonValue, string_dict: &HashMap<String, u16>, numeric_deltas: &HashMap<String, f64>) -> DomainResult<CompressedData> {
545        let mut metadata = HashMap::new();
546        
547        // Add dictionary metadata
548        for (string, index) in string_dict {
549            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
550        }
551        
552        // Add delta base values
553        for (path, base) in numeric_deltas {
554            metadata.insert(format!("base_{path}"), JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()));
555        }
556
557        // Apply both compression strategies
558        let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
559        let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
560        
561        let compressed_size = serde_json::to_string(&final_compressed)
562            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
563            .len();
564
565        Ok(CompressedData {
566            strategy: self.strategy.clone(),
567            compressed_size,
568            data: final_compressed,
569            compression_metadata: metadata,
570        })
571    }
572
573    /// Replace strings with dictionary indices
574    #[allow(clippy::only_used_in_recursion)]
575    fn replace_strings_with_indices(&self, data: &JsonValue, dictionary: &HashMap<String, u16>) -> DomainResult<JsonValue> {
576        match data {
577            JsonValue::Object(obj) => {
578                let mut compressed_obj = serde_json::Map::new();
579                for (key, value) in obj {
580                    compressed_obj.insert(
581                        key.clone(),
582                        self.replace_strings_with_indices(value, dictionary)?
583                    );
584                }
585                Ok(JsonValue::Object(compressed_obj))
586            }
587            JsonValue::Array(arr) => {
588                let compressed_arr: Result<Vec<_>, _> = arr.iter()
589                    .map(|item| self.replace_strings_with_indices(item, dictionary))
590                    .collect();
591                Ok(JsonValue::Array(compressed_arr?))
592            }
593            JsonValue::String(s) => {
594                if let Some(&index) = dictionary.get(s) {
595                    Ok(JsonValue::Number(serde_json::Number::from(index)))
596                } else {
597                    Ok(data.clone())
598                }
599            }
600            _ => Ok(data.clone()),
601        }
602    }
603
604    /// Apply delta compression to numeric sequences in arrays
605    fn apply_delta_compression(&self, data: &JsonValue, base_values: &HashMap<String, f64>) -> DomainResult<JsonValue> {
606        self.apply_delta_recursive(data, "", base_values)
607    }
608
609    /// Recursively apply delta compression to JSON structure
610    fn apply_delta_recursive(&self, data: &JsonValue, path: &str, base_values: &HashMap<String, f64>) -> DomainResult<JsonValue> {
611        match data {
612            JsonValue::Object(obj) => {
613                let mut compressed_obj = serde_json::Map::new();
614                for (key, value) in obj {
615                    let field_path = if path.is_empty() {
616                        key.clone()
617                    } else {
618                        format!("{path}.{key}")
619                    };
620                    compressed_obj.insert(
621                        key.clone(),
622                        self.apply_delta_recursive(value, &field_path, base_values)?
623                    );
624                }
625                Ok(JsonValue::Object(compressed_obj))
626            }
627            JsonValue::Array(arr) if arr.len() > 2 => {
628                // Check if this array contains numeric sequences that can be delta-compressed
629                if self.is_numeric_sequence(arr) {
630                    self.compress_numeric_array_with_delta(arr, path, base_values)
631                } else {
632                    // Process array elements recursively
633                    let compressed_arr: Result<Vec<_>, _> = arr.iter().enumerate()
634                        .map(|(idx, item)| {
635                            let item_path = format!("{path}[{idx}]");
636                            self.apply_delta_recursive(item, &item_path, base_values)
637                        })
638                        .collect();
639                    Ok(JsonValue::Array(compressed_arr?))
640                }
641            }
642            JsonValue::Array(arr) => {
643                // Array too small for delta compression, process recursively
644                let compressed_arr: Result<Vec<_>, _> = arr.iter().enumerate()
645                    .map(|(idx, item)| {
646                        let item_path = format!("{path}[{idx}]");
647                        self.apply_delta_recursive(item, &item_path, base_values)
648                    })
649                    .collect();
650                Ok(JsonValue::Array(compressed_arr?))
651            }
652            _ => Ok(data.clone()),
653        }
654    }
655
656    /// Check if array contains a numeric sequence suitable for delta compression
657    fn is_numeric_sequence(&self, arr: &[JsonValue]) -> bool {
658        if arr.len() < self.config.min_numeric_sequence_size {
659            return false;
660        }
661
662        // Check if all elements are numbers
663        arr.iter().all(|v| v.is_number())
664    }
665
666    /// Apply delta compression to numeric array
667    fn compress_numeric_array_with_delta(&self, arr: &[JsonValue], path: &str, base_values: &HashMap<String, f64>) -> DomainResult<JsonValue> {
668        let mut compressed_array = Vec::new();
669        
670        // Extract numeric values
671        let numbers: Vec<f64> = arr.iter()
672            .filter_map(|v| v.as_f64())
673            .collect();
674
675        if numbers.is_empty() {
676            return Ok(JsonValue::Array(arr.to_vec()));
677        }
678
679        // Use base value from analysis or first element as base
680        let base_value = base_values.get(path)
681            .copied()
682            .unwrap_or(numbers[0]);
683
684        // Add metadata for base value
685        compressed_array.push(json!({
686            "delta_base": base_value,
687            "delta_type": "numeric_sequence"
688        }));
689
690        // Calculate deltas from base value
691        let deltas: Vec<f64> = numbers.iter()
692            .map(|&num| num - base_value)
693            .collect();
694
695        // Check if delta compression is beneficial
696        let original_precision = numbers.iter()
697            .map(|n| format!("{n}").len())
698            .sum::<usize>();
699        
700        let delta_precision = deltas.iter()
701            .map(|d| format!("{d}").len())
702            .sum::<usize>();
703
704        if delta_precision < original_precision {
705            // Delta compression is beneficial
706            compressed_array.extend(deltas.into_iter().map(JsonValue::from));
707        } else {
708            // Keep original values
709            return Ok(JsonValue::Array(arr.to_vec()));
710        }
711
712        Ok(JsonValue::Array(compressed_array))
713    }
714}
715
716/// Compressed data with metadata
717#[derive(Debug, Clone)]
718pub struct CompressedData {
719    pub strategy: CompressionStrategy,
720    pub compressed_size: usize,
721    pub data: JsonValue,
722    pub compression_metadata: HashMap<String, JsonValue>,
723}
724
725impl CompressedData {
726    /// Calculate compression ratio
727    pub fn compression_ratio(&self, original_size: usize) -> f32 {
728        if original_size == 0 {
729            return 1.0;
730        }
731        self.compressed_size as f32 / original_size as f32
732    }
733
734    /// Get compression savings in bytes
735    pub fn compression_savings(&self, original_size: usize) -> isize {
736        original_size as isize - self.compressed_size as isize
737    }
738}
739
740impl Default for SchemaAnalyzer {
741    fn default() -> Self {
742        Self::new()
743    }
744}
745
746impl Default for SchemaCompressor {
747    fn default() -> Self {
748        Self::new()
749    }
750}
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755    use serde_json::json;
756
757    #[test]
758    fn test_schema_analyzer_dictionary_potential() {
759        let mut analyzer = SchemaAnalyzer::new();
760        
761        let data = json!({
762            "users": [
763                {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
764                {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
765                {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
766                {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
767                {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
768                {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
769                {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
770                {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
771            ]
772        });
773
774        let strategy = analyzer.analyze(&data).unwrap();
775        
776        // Should detect repeating strings like "admin", "active"
777        match strategy {
778            CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
779                // Expected outcome
780            }
781            _ => panic!("Expected dictionary-based compression strategy"),
782        }
783    }
784
785    #[test]
786    fn test_schema_compressor_basic() {
787        let compressor = SchemaCompressor::new();
788        
789        let data = json!({
790            "message": "hello world",
791            "count": 42
792        });
793
794        let original_size = serde_json::to_string(&data).unwrap().len();
795        let compressed = compressor.compress(&data).unwrap();
796        
797        assert!(compressed.compressed_size > 0);
798        assert!(compressed.compression_ratio(original_size) <= 1.0);
799    }
800
801    #[test]
802    fn test_dictionary_compression() {
803        let mut dictionary = HashMap::new();
804        dictionary.insert("active".to_string(), 0);
805        dictionary.insert("admin".to_string(), 1);
806        
807        let compressor = SchemaCompressor::with_strategy(
808            CompressionStrategy::Dictionary { dictionary }
809        );
810        
811        let data = json!({
812            "status": "active",
813            "role": "admin", 
814            "description": "active admin user"
815        });
816
817        let result = compressor.compress(&data).unwrap();
818        
819        // Verify compression metadata contains dictionary
820        assert!(result.compression_metadata.contains_key("dict_0"));
821        assert!(result.compression_metadata.contains_key("dict_1"));
822    }
823
824    #[test]
825    fn test_compression_strategy_selection() {
826        let mut analyzer = SchemaAnalyzer::new();
827        
828        // Test data with no clear patterns
829        let simple_data = json!({
830            "unique_field_1": "unique_value_1",
831            "unique_field_2": "unique_value_2"
832        });
833        
834        let strategy = analyzer.analyze(&simple_data).unwrap();
835        assert_eq!(strategy, CompressionStrategy::None);
836    }
837
838    #[test] 
839    fn test_numeric_delta_analysis() {
840        let mut analyzer = SchemaAnalyzer::new();
841        
842        let data = json!({
843            "measurements": [
844                {"time": 100, "value": 10.0},
845                {"time": 101, "value": 10.5},  
846                {"time": 102, "value": 11.0},
847                {"time": 103, "value": 11.5}
848            ]
849        });
850
851        let _strategy = analyzer.analyze(&data).unwrap();
852        
853        // Should detect incremental numeric patterns
854        assert!(!analyzer.numeric_fields.is_empty());
855    }
856
857    #[test]
858    fn test_run_length_encoding() {
859        let compressor = SchemaCompressor::with_strategy(CompressionStrategy::RunLength);
860        
861        let data = json!({
862            "repeated_values": [1, 1, 1, 2, 2, 3, 3, 3, 3]
863        });
864
865        let result = compressor.compress(&data).unwrap();
866        
867        // Should compress repeated sequences
868        assert!(result.compressed_size > 0);
869        
870        // Verify RLE format in the compressed data
871        let compressed_array = &result.data["repeated_values"];
872        assert!(compressed_array.is_array());
873        
874        // Should contain RLE objects
875        let array = compressed_array.as_array().unwrap();
876        let has_rle = array.iter().any(|v| v.get("rle_value").is_some());
877        assert!(has_rle);
878    }
879
880    #[test]
881    fn test_delta_compression() {
882        let mut base_values = HashMap::new();
883        base_values.insert("sequence".to_string(), 100.0);
884        
885        let compressor = SchemaCompressor::with_strategy(
886            CompressionStrategy::Delta { base_values }
887        );
888        
889        let data = json!({
890            "sequence": [100.0, 101.0, 102.0, 103.0, 104.0]
891        });
892
893        let result = compressor.compress(&data).unwrap();
894        
895        // Should apply delta compression
896        assert!(result.compressed_size > 0);
897        
898        // Verify delta format in the compressed data
899        let compressed_array = &result.data["sequence"];
900        assert!(compressed_array.is_array());
901        
902        // Should contain delta metadata
903        let array = compressed_array.as_array().unwrap();
904        let has_delta_base = array.iter().any(|v| v.get("delta_base").is_some());
905        assert!(has_delta_base);
906    }
907}