Skip to main content

pjson_rs/compression/
mod.rs

1//! Schema-based compression for PJS protocol
2//!
3//! Implements intelligent compression strategies based on JSON schema analysis
4//! to optimize bandwidth usage while maintaining streaming capabilities.
5
6pub mod secure;
7
8#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
9pub mod zstd;
10
11use crate::domain::{DomainError, DomainResult};
12use serde_json::{Value as JsonValue, json};
13use std::collections::HashMap;
14
15/// Configuration constants for compression algorithms
16#[derive(Debug, Clone)]
17pub struct CompressionConfig {
18    /// Minimum array length for pattern analysis
19    pub min_array_length: usize,
20    /// Minimum string length for dictionary inclusion
21    pub min_string_length: usize,
22    /// Minimum frequency for dictionary inclusion
23    pub min_frequency_count: u32,
24    /// Minimum compression potential for UUID patterns
25    pub uuid_compression_potential: f32,
26    /// Threshold score for string dictionary compression
27    pub string_dict_threshold: f32,
28    /// Threshold score for delta compression
29    pub delta_threshold: f32,
30    /// Minimum delta potential for numeric compression
31    pub min_delta_potential: f32,
32    /// Threshold for run-length compression
33    pub run_length_threshold: f32,
34    /// Minimum compression potential for pattern selection
35    pub min_compression_potential: f32,
36    /// Minimum array size for numeric sequence analysis
37    pub min_numeric_sequence_size: usize,
38}
39
40impl Default for CompressionConfig {
41    fn default() -> Self {
42        Self {
43            min_array_length: 2,
44            min_string_length: 3,
45            min_frequency_count: 1,
46            uuid_compression_potential: 0.3,
47            string_dict_threshold: 50.0,
48            delta_threshold: 30.0,
49            min_delta_potential: 0.3,
50            run_length_threshold: 20.0,
51            min_compression_potential: 0.4,
52            min_numeric_sequence_size: 3,
53        }
54    }
55}
56
57/// Compression strategy based on schema analysis
58#[derive(Debug, Clone, PartialEq)]
59pub enum CompressionStrategy {
60    /// No compression applied
61    None,
62    /// Dictionary-based compression for repeating string patterns
63    Dictionary { dictionary: HashMap<String, u16> },
64    /// Delta encoding for numeric sequences
65    Delta { base_values: HashMap<String, f64> },
66    /// Run-length encoding for repeated values
67    RunLength,
68    /// Hybrid approach combining multiple strategies
69    Hybrid {
70        string_dict: HashMap<String, u16>,
71        numeric_deltas: HashMap<String, f64>,
72    },
73}
74
75/// Schema analyzer for determining optimal compression strategy
76#[derive(Debug, Clone)]
77pub struct SchemaAnalyzer {
78    /// Pattern frequency analysis
79    patterns: HashMap<String, PatternInfo>,
80    /// Numeric field analysis
81    numeric_fields: HashMap<String, NumericStats>,
82    /// String repetition analysis
83    string_repetitions: HashMap<String, u32>,
84    /// Configuration for compression algorithms
85    config: CompressionConfig,
86}
87
88#[derive(Debug, Clone)]
89struct PatternInfo {
90    frequency: u32,
91    compression_potential: f32,
92}
93
94#[derive(Debug, Clone)]
95struct NumericStats {
96    values: Vec<f64>,
97    delta_potential: f32,
98    base_value: f64,
99}
100
101impl SchemaAnalyzer {
102    /// Create new schema analyzer
103    pub fn new() -> Self {
104        Self {
105            patterns: HashMap::new(),
106            numeric_fields: HashMap::new(),
107            string_repetitions: HashMap::new(),
108            config: CompressionConfig::default(),
109        }
110    }
111
112    /// Create new schema analyzer with custom configuration
113    pub fn with_config(config: CompressionConfig) -> Self {
114        Self {
115            patterns: HashMap::new(),
116            numeric_fields: HashMap::new(),
117            string_repetitions: HashMap::new(),
118            config,
119        }
120    }
121
122    /// Analyze JSON data to determine optimal compression strategy
123    pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
124        // Reset analysis state
125        self.patterns.clear();
126        self.numeric_fields.clear();
127        self.string_repetitions.clear();
128
129        // Perform deep analysis
130        self.analyze_recursive(data, "")?;
131
132        // Determine best strategy based on analysis
133        self.determine_strategy()
134    }
135
136    /// Analyze data recursively
137    fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
138        match value {
139            JsonValue::Object(obj) => {
140                for (key, val) in obj {
141                    let field_path = if path.is_empty() {
142                        key.clone()
143                    } else {
144                        format!("{path}.{key}")
145                    };
146                    self.analyze_recursive(val, &field_path)?;
147                }
148            }
149            JsonValue::Array(arr) => {
150                // Analyze array patterns
151                if arr.len() > self.config.min_array_length {
152                    self.analyze_array_patterns(arr, path)?;
153                }
154                for (idx, item) in arr.iter().enumerate() {
155                    let item_path = format!("{path}[{idx}]");
156                    self.analyze_recursive(item, &item_path)?;
157                }
158            }
159            JsonValue::String(s) => {
160                self.analyze_string_pattern(s, path);
161            }
162            JsonValue::Number(n) => {
163                if let Some(f) = n.as_f64() {
164                    self.analyze_numeric_pattern(f, path);
165                }
166            }
167            _ => {}
168        }
169        Ok(())
170    }
171
172    /// Analyze array for repeating patterns
173    fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
174        // Check for repeating object structures
175        if let Some(JsonValue::Object(first)) = arr.first() {
176            let structure_key = format!("array_structure:{path}");
177            let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
178            let pattern = field_names.join(",");
179
180            // Count how many objects share this structure
181            let matching_count = arr
182                .iter()
183                .filter_map(|v| v.as_object())
184                .filter(|obj| {
185                    let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
186                    obj_fields.join(",") == pattern
187                })
188                .count();
189
190            if matching_count > self.config.min_frequency_count as usize {
191                let info = PatternInfo {
192                    frequency: matching_count as u32,
193                    compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
194                };
195                self.patterns.insert(structure_key, info);
196            }
197        }
198
199        // Check for repeating primitive values
200        if arr.len() > 2 {
201            let mut value_counts = HashMap::new();
202            for value in arr {
203                let key = match value {
204                    JsonValue::String(s) => format!("string:{s}"),
205                    JsonValue::Number(n) => format!("number:{n}"),
206                    JsonValue::Bool(b) => format!("bool:{b}"),
207                    _ => continue,
208                };
209                *value_counts.entry(key).or_insert(0) += 1;
210            }
211
212            for (value_key, count) in value_counts {
213                if count > self.config.min_frequency_count {
214                    let info = PatternInfo {
215                        frequency: count,
216                        compression_potential: (count as f32 - 1.0) / count as f32,
217                    };
218                    self.patterns
219                        .insert(format!("array_value:{path}:{value_key}"), info);
220                }
221            }
222        }
223
224        Ok(())
225    }
226
227    /// Analyze string for repetition patterns
228    fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
229        // Track string repetitions across different paths
230        *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
231
232        // Analyze common prefixes/suffixes for URLs, IDs, etc.
233        if s.len() > 10 {
234            // Check for URL patterns
235            if s.starts_with("http://") || s.starts_with("https://") {
236                let prefix = if s.starts_with("https://") {
237                    "https://"
238                } else {
239                    "http://"
240                };
241                self.patterns
242                    .entry(format!("url_prefix:{prefix}"))
243                    .or_insert(PatternInfo {
244                        frequency: 0,
245                        compression_potential: 0.0,
246                    })
247                    .frequency += 1;
248            }
249
250            // Check for ID patterns (UUID-like)
251            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
252                self.patterns
253                    .entry("uuid_pattern".to_string())
254                    .or_insert(PatternInfo {
255                        frequency: 0,
256                        compression_potential: self.config.uuid_compression_potential,
257                    })
258                    .frequency += 1;
259            }
260        }
261    }
262
263    /// Analyze numeric patterns for delta compression
264    fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
265        self.numeric_fields
266            .entry(path.to_string())
267            .or_insert_with(|| NumericStats {
268                values: Vec::new(),
269                delta_potential: 0.0,
270                base_value: value,
271            })
272            .values
273            .push(value);
274    }
275
276    /// Determine optimal compression strategy based on analysis
277    fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
278        // Calculate compression potentials
279        let mut string_dict_score = 0.0;
280        let mut delta_score = 0.0;
281
282        // Analyze string repetition potential
283        let mut string_dict = HashMap::new();
284        let mut dict_index = 0u16;
285
286        for (string, count) in &self.string_repetitions {
287            if *count > self.config.min_frequency_count
288                && string.len() > self.config.min_string_length
289            {
290                string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
291                string_dict.insert(string.clone(), dict_index);
292                dict_index += 1;
293            }
294        }
295
296        // Analyze numeric delta potential
297        let mut numeric_deltas = HashMap::new();
298
299        for (path, stats) in &mut self.numeric_fields {
300            if stats.values.len() > 2 {
301                // Calculate variance to determine delta effectiveness
302                stats
303                    .values
304                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
305
306                let deltas: Vec<f64> = stats
307                    .values
308                    .windows(2)
309                    .map(|window| window[1] - window[0])
310                    .collect();
311
312                if !deltas.is_empty() {
313                    let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
314                    let delta_variance =
315                        deltas.iter().map(|d| (d - avg_delta).powi(2)).sum::<f64>()
316                            / deltas.len() as f64;
317
318                    // Low variance suggests good delta compression potential
319                    stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
320
321                    if stats.delta_potential > self.config.min_delta_potential {
322                        delta_score += stats.delta_potential * stats.values.len() as f32;
323                        numeric_deltas.insert(path.clone(), stats.base_value);
324                    }
325                }
326            }
327        }
328
329        // Choose strategy based on scores
330        match (
331            string_dict_score > self.config.string_dict_threshold,
332            delta_score > self.config.delta_threshold,
333        ) {
334            (true, true) => Ok(CompressionStrategy::Hybrid {
335                string_dict,
336                numeric_deltas,
337            }),
338            (true, false) => Ok(CompressionStrategy::Dictionary {
339                dictionary: string_dict,
340            }),
341            (false, true) => Ok(CompressionStrategy::Delta {
342                base_values: numeric_deltas,
343            }),
344            (false, false) => {
345                // Check for run-length potential
346                let run_length_score = self
347                    .patterns
348                    .values()
349                    .filter(|p| p.compression_potential > self.config.min_compression_potential)
350                    .map(|p| p.frequency as f32 * p.compression_potential)
351                    .sum::<f32>();
352
353                if run_length_score > self.config.run_length_threshold {
354                    Ok(CompressionStrategy::RunLength)
355                } else {
356                    Ok(CompressionStrategy::None)
357                }
358            }
359        }
360    }
361}
362
363/// Schema-aware compressor
364#[derive(Debug, Clone)]
365pub struct SchemaCompressor {
366    strategy: CompressionStrategy,
367    analyzer: SchemaAnalyzer,
368    config: CompressionConfig,
369}
370
371impl SchemaCompressor {
372    /// Create new compressor with automatic strategy detection
373    pub fn new() -> Self {
374        let config = CompressionConfig::default();
375        Self {
376            strategy: CompressionStrategy::None,
377            analyzer: SchemaAnalyzer::with_config(config.clone()),
378            config,
379        }
380    }
381
382    /// Create compressor with specific strategy
383    pub fn with_strategy(strategy: CompressionStrategy) -> Self {
384        let config = CompressionConfig::default();
385        Self {
386            strategy,
387            analyzer: SchemaAnalyzer::with_config(config.clone()),
388            config,
389        }
390    }
391
392    /// Create compressor with custom configuration
393    pub fn with_config(config: CompressionConfig) -> Self {
394        Self {
395            strategy: CompressionStrategy::None,
396            analyzer: SchemaAnalyzer::with_config(config.clone()),
397            config,
398        }
399    }
400
401    /// Analyze data and update compression strategy
402    pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
403        self.strategy = self.analyzer.analyze(data)?;
404        Ok(&self.strategy)
405    }
406
407    /// Compress JSON data according to current strategy
408    pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
409        match &self.strategy {
410            CompressionStrategy::None => Ok(CompressedData {
411                strategy: self.strategy.clone(),
412                compressed_size: serde_json::to_string(data)
413                    .map_err(|e| {
414                        DomainError::CompressionError(format!("JSON serialization failed: {e}"))
415                    })?
416                    .len(),
417                data: data.clone(),
418                compression_metadata: HashMap::new(),
419            }),
420
421            CompressionStrategy::Dictionary { dictionary } => {
422                self.compress_with_dictionary(data, dictionary)
423            }
424
425            CompressionStrategy::Delta { base_values } => {
426                self.compress_with_delta(data, base_values)
427            }
428
429            CompressionStrategy::RunLength => self.compress_with_run_length(data),
430
431            CompressionStrategy::Hybrid {
432                string_dict,
433                numeric_deltas,
434            } => self.compress_hybrid(data, string_dict, numeric_deltas),
435        }
436    }
437
438    /// Dictionary-based compression
439    fn compress_with_dictionary(
440        &self,
441        data: &JsonValue,
442        dictionary: &HashMap<String, u16>,
443    ) -> DomainResult<CompressedData> {
444        let mut metadata = HashMap::new();
445
446        // Store dictionary for decompression
447        for (string, index) in dictionary {
448            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
449        }
450
451        // Replace strings with dictionary indices
452        let compressed = self.replace_strings_with_indices(data, dictionary)?;
453        let compressed_size = serde_json::to_string(&compressed)
454            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
455            .len();
456
457        Ok(CompressedData {
458            strategy: self.strategy.clone(),
459            compressed_size,
460            data: compressed,
461            compression_metadata: metadata,
462        })
463    }
464
465    /// Delta compression for numeric sequences
466    fn compress_with_delta(
467        &self,
468        data: &JsonValue,
469        base_values: &HashMap<String, f64>,
470    ) -> DomainResult<CompressedData> {
471        let mut metadata = HashMap::new();
472
473        // Store base values
474        for (path, base) in base_values {
475            metadata.insert(
476                format!("base_{path}"),
477                JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()),
478            );
479        }
480
481        // Apply delta compression
482        let compressed = self.apply_delta_compression(data, base_values)?;
483        let compressed_size = serde_json::to_string(&compressed)
484            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
485            .len();
486
487        Ok(CompressedData {
488            strategy: self.strategy.clone(),
489            compressed_size,
490            data: compressed,
491            compression_metadata: metadata,
492        })
493    }
494
495    /// Run-length encoding compression
496    fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
497        let compressed = self.apply_run_length_encoding(data)?;
498        let compressed_size = serde_json::to_string(&compressed)
499            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
500            .len();
501
502        Ok(CompressedData {
503            strategy: self.strategy.clone(),
504            compressed_size,
505            data: compressed,
506            compression_metadata: HashMap::new(),
507        })
508    }
509
510    /// Apply run-length encoding to arrays with repeated values
511    fn apply_run_length_encoding(&self, data: &JsonValue) -> DomainResult<JsonValue> {
512        match data {
513            JsonValue::Object(obj) => {
514                let mut compressed_obj = serde_json::Map::new();
515                for (key, value) in obj {
516                    compressed_obj.insert(key.clone(), self.apply_run_length_encoding(value)?);
517                }
518                Ok(JsonValue::Object(compressed_obj))
519            }
520            JsonValue::Array(arr) if arr.len() > 2 => {
521                // Apply run-length encoding to array
522                let mut compressed_runs = Vec::new();
523                let mut current_value = None;
524                let mut run_count = 0;
525
526                for item in arr {
527                    if Some(item) == current_value.as_ref() {
528                        run_count += 1;
529                    } else {
530                        // Save previous run if it exists
531                        if let Some(value) = current_value {
532                            if run_count > self.config.min_frequency_count {
533                                // Use run-length encoding: [value, count]
534                                compressed_runs.push(json!({
535                                    "rle_value": value,
536                                    "rle_count": run_count
537                                }));
538                            } else {
539                                // Single occurrence, keep as-is
540                                compressed_runs.push(value);
541                            }
542                        }
543
544                        // Start new run
545                        current_value = Some(item.clone());
546                        run_count = 1;
547                    }
548                }
549
550                // Handle final run
551                if let Some(value) = current_value {
552                    if run_count > self.config.min_frequency_count {
553                        compressed_runs.push(json!({
554                            "rle_value": value,
555                            "rle_count": run_count
556                        }));
557                    } else {
558                        compressed_runs.push(value);
559                    }
560                }
561
562                Ok(JsonValue::Array(compressed_runs))
563            }
564            JsonValue::Array(arr) => {
565                // Array too small for run-length encoding, process recursively
566                let compressed_arr: Result<Vec<_>, _> = arr
567                    .iter()
568                    .map(|item| self.apply_run_length_encoding(item))
569                    .collect();
570                Ok(JsonValue::Array(compressed_arr?))
571            }
572            _ => Ok(data.clone()),
573        }
574    }
575
576    /// Hybrid compression combining multiple strategies
577    fn compress_hybrid(
578        &self,
579        data: &JsonValue,
580        string_dict: &HashMap<String, u16>,
581        numeric_deltas: &HashMap<String, f64>,
582    ) -> DomainResult<CompressedData> {
583        let mut metadata = HashMap::new();
584
585        // Add dictionary metadata
586        for (string, index) in string_dict {
587            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
588        }
589
590        // Add delta base values
591        for (path, base) in numeric_deltas {
592            metadata.insert(
593                format!("base_{path}"),
594                JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()),
595            );
596        }
597
598        // Apply both compression strategies
599        let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
600        let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
601
602        let compressed_size = serde_json::to_string(&final_compressed)
603            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
604            .len();
605
606        Ok(CompressedData {
607            strategy: self.strategy.clone(),
608            compressed_size,
609            data: final_compressed,
610            compression_metadata: metadata,
611        })
612    }
613
614    /// Replace strings with dictionary indices
615    #[allow(clippy::only_used_in_recursion)]
616    fn replace_strings_with_indices(
617        &self,
618        data: &JsonValue,
619        dictionary: &HashMap<String, u16>,
620    ) -> DomainResult<JsonValue> {
621        match data {
622            JsonValue::Object(obj) => {
623                let mut compressed_obj = serde_json::Map::new();
624                for (key, value) in obj {
625                    compressed_obj.insert(
626                        key.clone(),
627                        self.replace_strings_with_indices(value, dictionary)?,
628                    );
629                }
630                Ok(JsonValue::Object(compressed_obj))
631            }
632            JsonValue::Array(arr) => {
633                let compressed_arr: Result<Vec<_>, _> = arr
634                    .iter()
635                    .map(|item| self.replace_strings_with_indices(item, dictionary))
636                    .collect();
637                Ok(JsonValue::Array(compressed_arr?))
638            }
639            JsonValue::String(s) => {
640                if let Some(&index) = dictionary.get(s) {
641                    Ok(JsonValue::Number(serde_json::Number::from(index)))
642                } else {
643                    Ok(data.clone())
644                }
645            }
646            _ => Ok(data.clone()),
647        }
648    }
649
650    /// Apply delta compression to numeric sequences in arrays
651    fn apply_delta_compression(
652        &self,
653        data: &JsonValue,
654        base_values: &HashMap<String, f64>,
655    ) -> DomainResult<JsonValue> {
656        self.apply_delta_recursive(data, "", base_values)
657    }
658
659    /// Recursively apply delta compression to JSON structure
660    fn apply_delta_recursive(
661        &self,
662        data: &JsonValue,
663        path: &str,
664        base_values: &HashMap<String, f64>,
665    ) -> DomainResult<JsonValue> {
666        match data {
667            JsonValue::Object(obj) => {
668                let mut compressed_obj = serde_json::Map::new();
669                for (key, value) in obj {
670                    let field_path = if path.is_empty() {
671                        key.clone()
672                    } else {
673                        format!("{path}.{key}")
674                    };
675                    compressed_obj.insert(
676                        key.clone(),
677                        self.apply_delta_recursive(value, &field_path, base_values)?,
678                    );
679                }
680                Ok(JsonValue::Object(compressed_obj))
681            }
682            JsonValue::Array(arr) if arr.len() > 2 => {
683                // Check if this array contains numeric sequences that can be delta-compressed
684                if self.is_numeric_sequence(arr) {
685                    self.compress_numeric_array_with_delta(arr, path, base_values)
686                } else {
687                    // Process array elements recursively
688                    let compressed_arr: Result<Vec<_>, _> = arr
689                        .iter()
690                        .enumerate()
691                        .map(|(idx, item)| {
692                            let item_path = format!("{path}[{idx}]");
693                            self.apply_delta_recursive(item, &item_path, base_values)
694                        })
695                        .collect();
696                    Ok(JsonValue::Array(compressed_arr?))
697                }
698            }
699            JsonValue::Array(arr) => {
700                // Array too small for delta compression, process recursively
701                let compressed_arr: Result<Vec<_>, _> = arr
702                    .iter()
703                    .enumerate()
704                    .map(|(idx, item)| {
705                        let item_path = format!("{path}[{idx}]");
706                        self.apply_delta_recursive(item, &item_path, base_values)
707                    })
708                    .collect();
709                Ok(JsonValue::Array(compressed_arr?))
710            }
711            _ => Ok(data.clone()),
712        }
713    }
714
715    /// Check if array contains a numeric sequence suitable for delta compression
716    fn is_numeric_sequence(&self, arr: &[JsonValue]) -> bool {
717        if arr.len() < self.config.min_numeric_sequence_size {
718            return false;
719        }
720
721        // Check if all elements are numbers
722        arr.iter().all(|v| v.is_number())
723    }
724
725    /// Apply delta compression to numeric array
726    fn compress_numeric_array_with_delta(
727        &self,
728        arr: &[JsonValue],
729        path: &str,
730        base_values: &HashMap<String, f64>,
731    ) -> DomainResult<JsonValue> {
732        let mut compressed_array = Vec::new();
733
734        // Extract numeric values
735        let numbers: Vec<f64> = arr.iter().filter_map(|v| v.as_f64()).collect();
736
737        if numbers.is_empty() {
738            return Ok(JsonValue::Array(arr.to_vec()));
739        }
740
741        // Use base value from analysis or first element as base
742        let base_value = base_values.get(path).copied().unwrap_or(numbers[0]);
743
744        // Add metadata for base value
745        compressed_array.push(json!({
746            "delta_base": base_value,
747            "delta_type": "numeric_sequence"
748        }));
749
750        // Calculate deltas from base value
751        let deltas: Vec<f64> = numbers.iter().map(|&num| num - base_value).collect();
752
753        // Check if delta compression is beneficial
754        let original_precision = numbers.iter().map(|n| format!("{n}").len()).sum::<usize>();
755
756        let delta_precision = deltas.iter().map(|d| format!("{d}").len()).sum::<usize>();
757
758        if delta_precision < original_precision {
759            // Delta compression is beneficial
760            compressed_array.extend(deltas.into_iter().map(JsonValue::from));
761        } else {
762            // Keep original values
763            return Ok(JsonValue::Array(arr.to_vec()));
764        }
765
766        Ok(JsonValue::Array(compressed_array))
767    }
768}
769
770/// Compressed data with metadata
771#[derive(Debug, Clone)]
772pub struct CompressedData {
773    pub strategy: CompressionStrategy,
774    pub compressed_size: usize,
775    pub data: JsonValue,
776    pub compression_metadata: HashMap<String, JsonValue>,
777}
778
779impl CompressedData {
780    /// Calculate compression ratio
781    pub fn compression_ratio(&self, original_size: usize) -> f32 {
782        if original_size == 0 {
783            return 1.0;
784        }
785        self.compressed_size as f32 / original_size as f32
786    }
787
788    /// Get compression savings in bytes
789    pub fn compression_savings(&self, original_size: usize) -> isize {
790        original_size as isize - self.compressed_size as isize
791    }
792}
793
794impl Default for SchemaAnalyzer {
795    fn default() -> Self {
796        Self::new()
797    }
798}
799
800impl Default for SchemaCompressor {
801    fn default() -> Self {
802        Self::new()
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809    use serde_json::json;
810
811    #[test]
812    fn test_schema_analyzer_dictionary_potential() {
813        let mut analyzer = SchemaAnalyzer::new();
814
815        let data = json!({
816            "users": [
817                {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
818                {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
819                {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
820                {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
821                {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
822                {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
823                {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
824                {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
825            ]
826        });
827
828        let strategy = analyzer.analyze(&data).unwrap();
829
830        // Should detect repeating strings like "admin", "active"
831        match strategy {
832            CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
833                // Expected outcome
834            }
835            _ => panic!("Expected dictionary-based compression strategy"),
836        }
837    }
838
839    #[test]
840    fn test_schema_compressor_basic() {
841        let compressor = SchemaCompressor::new();
842
843        let data = json!({
844            "message": "hello world",
845            "count": 42
846        });
847
848        let original_size = serde_json::to_string(&data).unwrap().len();
849        let compressed = compressor.compress(&data).unwrap();
850
851        assert!(compressed.compressed_size > 0);
852        assert!(compressed.compression_ratio(original_size) <= 1.0);
853    }
854
855    #[test]
856    fn test_dictionary_compression() {
857        let mut dictionary = HashMap::new();
858        dictionary.insert("active".to_string(), 0);
859        dictionary.insert("admin".to_string(), 1);
860
861        let compressor =
862            SchemaCompressor::with_strategy(CompressionStrategy::Dictionary { dictionary });
863
864        let data = json!({
865            "status": "active",
866            "role": "admin",
867            "description": "active admin user"
868        });
869
870        let result = compressor.compress(&data).unwrap();
871
872        // Verify compression metadata contains dictionary
873        assert!(result.compression_metadata.contains_key("dict_0"));
874        assert!(result.compression_metadata.contains_key("dict_1"));
875    }
876
877    #[test]
878    fn test_compression_strategy_selection() {
879        let mut analyzer = SchemaAnalyzer::new();
880
881        // Test data with no clear patterns
882        let simple_data = json!({
883            "unique_field_1": "unique_value_1",
884            "unique_field_2": "unique_value_2"
885        });
886
887        let strategy = analyzer.analyze(&simple_data).unwrap();
888        assert_eq!(strategy, CompressionStrategy::None);
889    }
890
891    #[test]
892    fn test_numeric_delta_analysis() {
893        let mut analyzer = SchemaAnalyzer::new();
894
895        let data = json!({
896            "measurements": [
897                {"time": 100, "value": 10.0},
898                {"time": 101, "value": 10.5},
899                {"time": 102, "value": 11.0},
900                {"time": 103, "value": 11.5}
901            ]
902        });
903
904        let _strategy = analyzer.analyze(&data).unwrap();
905
906        // Should detect incremental numeric patterns
907        assert!(!analyzer.numeric_fields.is_empty());
908    }
909
910    #[test]
911    fn test_run_length_encoding() {
912        let compressor = SchemaCompressor::with_strategy(CompressionStrategy::RunLength);
913
914        let data = json!({
915            "repeated_values": [1, 1, 1, 2, 2, 3, 3, 3, 3]
916        });
917
918        let result = compressor.compress(&data).unwrap();
919
920        // Should compress repeated sequences
921        assert!(result.compressed_size > 0);
922
923        // Verify RLE format in the compressed data
924        let compressed_array = &result.data["repeated_values"];
925        assert!(compressed_array.is_array());
926
927        // Should contain RLE objects
928        let array = compressed_array.as_array().unwrap();
929        let has_rle = array.iter().any(|v| v.get("rle_value").is_some());
930        assert!(has_rle);
931    }
932
933    #[test]
934    fn test_delta_compression() {
935        let mut base_values = HashMap::new();
936        base_values.insert("sequence".to_string(), 100.0);
937
938        let compressor =
939            SchemaCompressor::with_strategy(CompressionStrategy::Delta { base_values });
940
941        let data = json!({
942            "sequence": [100.0, 101.0, 102.0, 103.0, 104.0]
943        });
944
945        let result = compressor.compress(&data).unwrap();
946
947        // Should apply delta compression
948        assert!(result.compressed_size > 0);
949
950        // Verify delta format in the compressed data
951        let compressed_array = &result.data["sequence"];
952        assert!(compressed_array.is_array());
953
954        // Should contain delta metadata
955        let array = compressed_array.as_array().unwrap();
956        let has_delta_base = array.iter().any(|v| v.get("delta_base").is_some());
957        assert!(has_delta_base);
958    }
959}