pjson_rs/compression/
mod.rs

1//! Schema-based compression for PJS protocol
2//!
3//! Implements intelligent compression strategies based on JSON schema analysis
4//! to optimize bandwidth usage while maintaining streaming capabilities.
5
6pub mod secure;
7
8use crate::domain::{DomainError, DomainResult};
9use serde_json::{Value as JsonValue, json};
10use std::collections::HashMap;
11
12/// Configuration constants for compression algorithms
13#[derive(Debug, Clone)]
14pub struct CompressionConfig {
15    /// Minimum array length for pattern analysis
16    pub min_array_length: usize,
17    /// Minimum string length for dictionary inclusion
18    pub min_string_length: usize,
19    /// Minimum frequency for dictionary inclusion
20    pub min_frequency_count: u32,
21    /// Minimum compression potential for UUID patterns
22    pub uuid_compression_potential: f32,
23    /// Threshold score for string dictionary compression
24    pub string_dict_threshold: f32,
25    /// Threshold score for delta compression
26    pub delta_threshold: f32,
27    /// Minimum delta potential for numeric compression
28    pub min_delta_potential: f32,
29    /// Threshold for run-length compression
30    pub run_length_threshold: f32,
31    /// Minimum compression potential for pattern selection
32    pub min_compression_potential: f32,
33    /// Minimum array size for numeric sequence analysis
34    pub min_numeric_sequence_size: usize,
35}
36
37impl Default for CompressionConfig {
38    fn default() -> Self {
39        Self {
40            min_array_length: 2,
41            min_string_length: 3,
42            min_frequency_count: 1,
43            uuid_compression_potential: 0.3,
44            string_dict_threshold: 50.0,
45            delta_threshold: 30.0,
46            min_delta_potential: 0.3,
47            run_length_threshold: 20.0,
48            min_compression_potential: 0.4,
49            min_numeric_sequence_size: 3,
50        }
51    }
52}
53
54/// Compression strategy based on schema analysis
55#[derive(Debug, Clone, PartialEq)]
56pub enum CompressionStrategy {
57    /// No compression applied
58    None,
59    /// Dictionary-based compression for repeating string patterns
60    Dictionary { dictionary: HashMap<String, u16> },
61    /// Delta encoding for numeric sequences
62    Delta { base_values: HashMap<String, f64> },
63    /// Run-length encoding for repeated values
64    RunLength,
65    /// Hybrid approach combining multiple strategies
66    Hybrid {
67        string_dict: HashMap<String, u16>,
68        numeric_deltas: HashMap<String, f64>,
69    },
70}
71
72/// Schema analyzer for determining optimal compression strategy
73#[derive(Debug, Clone)]
74pub struct SchemaAnalyzer {
75    /// Pattern frequency analysis
76    patterns: HashMap<String, PatternInfo>,
77    /// Numeric field analysis
78    numeric_fields: HashMap<String, NumericStats>,
79    /// String repetition analysis
80    string_repetitions: HashMap<String, u32>,
81    /// Configuration for compression algorithms
82    config: CompressionConfig,
83}
84
85#[derive(Debug, Clone)]
86struct PatternInfo {
87    frequency: u32,
88    total_size: usize,
89    compression_potential: f32,
90}
91
92#[derive(Debug, Clone)]
93struct NumericStats {
94    values: Vec<f64>,
95    delta_potential: f32,
96    base_value: f64,
97}
98
99impl SchemaAnalyzer {
100    /// Create new schema analyzer
101    pub fn new() -> Self {
102        Self {
103            patterns: HashMap::new(),
104            numeric_fields: HashMap::new(),
105            string_repetitions: HashMap::new(),
106            config: CompressionConfig::default(),
107        }
108    }
109
110    /// Create new schema analyzer with custom configuration
111    pub fn with_config(config: CompressionConfig) -> Self {
112        Self {
113            patterns: HashMap::new(),
114            numeric_fields: HashMap::new(),
115            string_repetitions: HashMap::new(),
116            config,
117        }
118    }
119
120    /// Analyze JSON data to determine optimal compression strategy
121    pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
122        // Reset analysis state
123        self.patterns.clear();
124        self.numeric_fields.clear();
125        self.string_repetitions.clear();
126
127        // Perform deep analysis
128        self.analyze_recursive(data, "")?;
129
130        // Determine best strategy based on analysis
131        self.determine_strategy()
132    }
133
134    /// Analyze data recursively
135    fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
136        match value {
137            JsonValue::Object(obj) => {
138                for (key, val) in obj {
139                    let field_path = if path.is_empty() {
140                        key.clone()
141                    } else {
142                        format!("{path}.{key}")
143                    };
144                    self.analyze_recursive(val, &field_path)?;
145                }
146            }
147            JsonValue::Array(arr) => {
148                // Analyze array patterns
149                if arr.len() > self.config.min_array_length {
150                    self.analyze_array_patterns(arr, path)?;
151                }
152                for (idx, item) in arr.iter().enumerate() {
153                    let item_path = format!("{path}[{idx}]");
154                    self.analyze_recursive(item, &item_path)?;
155                }
156            }
157            JsonValue::String(s) => {
158                self.analyze_string_pattern(s, path);
159            }
160            JsonValue::Number(n) => {
161                if let Some(f) = n.as_f64() {
162                    self.analyze_numeric_pattern(f, path);
163                }
164            }
165            _ => {}
166        }
167        Ok(())
168    }
169
170    /// Analyze array for repeating patterns
171    fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
172        // Check for repeating object structures
173        if let Some(JsonValue::Object(first)) = arr.first() {
174            let structure_key = format!("array_structure:{path}");
175            let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
176            let pattern = field_names.join(",");
177
178            // Count how many objects share this structure
179            let matching_count = arr
180                .iter()
181                .filter_map(|v| v.as_object())
182                .filter(|obj| {
183                    let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
184                    obj_fields.join(",") == pattern
185                })
186                .count();
187
188            if matching_count > self.config.min_frequency_count as usize {
189                let info = PatternInfo {
190                    frequency: matching_count as u32,
191                    total_size: pattern.len() * matching_count,
192                    compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
193                };
194                self.patterns.insert(structure_key, info);
195            }
196        }
197
198        // Check for repeating primitive values
199        if arr.len() > 2 {
200            let mut value_counts = HashMap::new();
201            for value in arr {
202                let key = match value {
203                    JsonValue::String(s) => format!("string:{s}"),
204                    JsonValue::Number(n) => format!("number:{n}"),
205                    JsonValue::Bool(b) => format!("bool:{b}"),
206                    _ => continue,
207                };
208                *value_counts.entry(key).or_insert(0) += 1;
209            }
210
211            for (value_key, count) in value_counts {
212                if count > self.config.min_frequency_count {
213                    let info = PatternInfo {
214                        frequency: count,
215                        total_size: value_key.len() * count as usize,
216                        compression_potential: (count as f32 - 1.0) / count as f32,
217                    };
218                    self.patterns
219                        .insert(format!("array_value:{path}:{value_key}"), info);
220                }
221            }
222        }
223
224        Ok(())
225    }
226
227    /// Analyze string for repetition patterns
228    fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
229        // Track string repetitions across different paths
230        *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
231
232        // Analyze common prefixes/suffixes for URLs, IDs, etc.
233        if s.len() > 10 {
234            // Check for URL patterns
235            if s.starts_with("http://") || s.starts_with("https://") {
236                let prefix = if s.starts_with("https://") {
237                    "https://"
238                } else {
239                    "http://"
240                };
241                self.patterns
242                    .entry(format!("url_prefix:{prefix}"))
243                    .or_insert(PatternInfo {
244                        frequency: 0,
245                        total_size: 0,
246                        compression_potential: 0.0,
247                    })
248                    .frequency += 1;
249            }
250
251            // Check for ID patterns (UUID-like)
252            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
253                self.patterns
254                    .entry("uuid_pattern".to_string())
255                    .or_insert(PatternInfo {
256                        frequency: 0,
257                        total_size: 36,
258                        compression_potential: self.config.uuid_compression_potential,
259                    })
260                    .frequency += 1;
261            }
262        }
263    }
264
265    /// Analyze numeric patterns for delta compression
266    fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
267        self.numeric_fields
268            .entry(path.to_string())
269            .or_insert_with(|| NumericStats {
270                values: Vec::new(),
271                delta_potential: 0.0,
272                base_value: value,
273            })
274            .values
275            .push(value);
276    }
277
278    /// Determine optimal compression strategy based on analysis
279    fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
280        // Calculate compression potentials
281        let mut string_dict_score = 0.0;
282        let mut delta_score = 0.0;
283
284        // Analyze string repetition potential
285        let mut string_dict = HashMap::new();
286        let mut dict_index = 0u16;
287
288        for (string, count) in &self.string_repetitions {
289            if *count > self.config.min_frequency_count
290                && string.len() > self.config.min_string_length
291            {
292                string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
293                string_dict.insert(string.clone(), dict_index);
294                dict_index += 1;
295            }
296        }
297
298        // Analyze numeric delta potential
299        let mut numeric_deltas = HashMap::new();
300
301        for (path, stats) in &mut self.numeric_fields {
302            if stats.values.len() > 2 {
303                // Calculate variance to determine delta effectiveness
304                stats
305                    .values
306                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
307
308                let deltas: Vec<f64> = stats
309                    .values
310                    .windows(2)
311                    .map(|window| window[1] - window[0])
312                    .collect();
313
314                if !deltas.is_empty() {
315                    let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
316                    let delta_variance =
317                        deltas.iter().map(|d| (d - avg_delta).powi(2)).sum::<f64>()
318                            / deltas.len() as f64;
319
320                    // Low variance suggests good delta compression potential
321                    stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
322
323                    if stats.delta_potential > self.config.min_delta_potential {
324                        delta_score += stats.delta_potential * stats.values.len() as f32;
325                        numeric_deltas.insert(path.clone(), stats.base_value);
326                    }
327                }
328            }
329        }
330
331        // Choose strategy based on scores
332        match (
333            string_dict_score > self.config.string_dict_threshold,
334            delta_score > self.config.delta_threshold,
335        ) {
336            (true, true) => Ok(CompressionStrategy::Hybrid {
337                string_dict,
338                numeric_deltas,
339            }),
340            (true, false) => Ok(CompressionStrategy::Dictionary {
341                dictionary: string_dict,
342            }),
343            (false, true) => Ok(CompressionStrategy::Delta {
344                base_values: numeric_deltas,
345            }),
346            (false, false) => {
347                // Check for run-length potential
348                let run_length_score = self
349                    .patterns
350                    .values()
351                    .filter(|p| p.compression_potential > self.config.min_compression_potential)
352                    .map(|p| p.frequency as f32 * p.compression_potential)
353                    .sum::<f32>();
354
355                if run_length_score > self.config.run_length_threshold {
356                    Ok(CompressionStrategy::RunLength)
357                } else {
358                    Ok(CompressionStrategy::None)
359                }
360            }
361        }
362    }
363}
364
365/// Schema-aware compressor
366#[derive(Debug, Clone)]
367pub struct SchemaCompressor {
368    strategy: CompressionStrategy,
369    analyzer: SchemaAnalyzer,
370    config: CompressionConfig,
371}
372
373impl SchemaCompressor {
374    /// Create new compressor with automatic strategy detection
375    pub fn new() -> Self {
376        let config = CompressionConfig::default();
377        Self {
378            strategy: CompressionStrategy::None,
379            analyzer: SchemaAnalyzer::with_config(config.clone()),
380            config,
381        }
382    }
383
384    /// Create compressor with specific strategy
385    pub fn with_strategy(strategy: CompressionStrategy) -> Self {
386        let config = CompressionConfig::default();
387        Self {
388            strategy,
389            analyzer: SchemaAnalyzer::with_config(config.clone()),
390            config,
391        }
392    }
393
394    /// Create compressor with custom configuration
395    pub fn with_config(config: CompressionConfig) -> Self {
396        Self {
397            strategy: CompressionStrategy::None,
398            analyzer: SchemaAnalyzer::with_config(config.clone()),
399            config,
400        }
401    }
402
403    /// Analyze data and update compression strategy
404    pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
405        self.strategy = self.analyzer.analyze(data)?;
406        Ok(&self.strategy)
407    }
408
409    /// Compress JSON data according to current strategy
410    pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
411        match &self.strategy {
412            CompressionStrategy::None => Ok(CompressedData {
413                strategy: self.strategy.clone(),
414                compressed_size: serde_json::to_string(data)
415                    .map_err(|e| {
416                        DomainError::CompressionError(format!("JSON serialization failed: {e}"))
417                    })?
418                    .len(),
419                data: data.clone(),
420                compression_metadata: HashMap::new(),
421            }),
422
423            CompressionStrategy::Dictionary { dictionary } => {
424                self.compress_with_dictionary(data, dictionary)
425            }
426
427            CompressionStrategy::Delta { base_values } => {
428                self.compress_with_delta(data, base_values)
429            }
430
431            CompressionStrategy::RunLength => self.compress_with_run_length(data),
432
433            CompressionStrategy::Hybrid {
434                string_dict,
435                numeric_deltas,
436            } => self.compress_hybrid(data, string_dict, numeric_deltas),
437        }
438    }
439
440    /// Dictionary-based compression
441    fn compress_with_dictionary(
442        &self,
443        data: &JsonValue,
444        dictionary: &HashMap<String, u16>,
445    ) -> DomainResult<CompressedData> {
446        let mut metadata = HashMap::new();
447
448        // Store dictionary for decompression
449        for (string, index) in dictionary {
450            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
451        }
452
453        // Replace strings with dictionary indices
454        let compressed = self.replace_strings_with_indices(data, dictionary)?;
455        let compressed_size = serde_json::to_string(&compressed)
456            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
457            .len();
458
459        Ok(CompressedData {
460            strategy: self.strategy.clone(),
461            compressed_size,
462            data: compressed,
463            compression_metadata: metadata,
464        })
465    }
466
467    /// Delta compression for numeric sequences
468    fn compress_with_delta(
469        &self,
470        data: &JsonValue,
471        base_values: &HashMap<String, f64>,
472    ) -> DomainResult<CompressedData> {
473        let mut metadata = HashMap::new();
474
475        // Store base values
476        for (path, base) in base_values {
477            metadata.insert(
478                format!("base_{path}"),
479                JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()),
480            );
481        }
482
483        // Apply delta compression
484        let compressed = self.apply_delta_compression(data, base_values)?;
485        let compressed_size = serde_json::to_string(&compressed)
486            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
487            .len();
488
489        Ok(CompressedData {
490            strategy: self.strategy.clone(),
491            compressed_size,
492            data: compressed,
493            compression_metadata: metadata,
494        })
495    }
496
497    /// Run-length encoding compression
498    fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
499        let compressed = self.apply_run_length_encoding(data)?;
500        let compressed_size = serde_json::to_string(&compressed)
501            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
502            .len();
503
504        Ok(CompressedData {
505            strategy: self.strategy.clone(),
506            compressed_size,
507            data: compressed,
508            compression_metadata: HashMap::new(),
509        })
510    }
511
512    /// Apply run-length encoding to arrays with repeated values
513    fn apply_run_length_encoding(&self, data: &JsonValue) -> DomainResult<JsonValue> {
514        match data {
515            JsonValue::Object(obj) => {
516                let mut compressed_obj = serde_json::Map::new();
517                for (key, value) in obj {
518                    compressed_obj.insert(key.clone(), self.apply_run_length_encoding(value)?);
519                }
520                Ok(JsonValue::Object(compressed_obj))
521            }
522            JsonValue::Array(arr) if arr.len() > 2 => {
523                // Apply run-length encoding to array
524                let mut compressed_runs = Vec::new();
525                let mut current_value = None;
526                let mut run_count = 0;
527
528                for item in arr {
529                    if Some(item) == current_value.as_ref() {
530                        run_count += 1;
531                    } else {
532                        // Save previous run if it exists
533                        if let Some(value) = current_value {
534                            if run_count > self.config.min_frequency_count {
535                                // Use run-length encoding: [value, count]
536                                compressed_runs.push(json!({
537                                    "rle_value": value,
538                                    "rle_count": run_count
539                                }));
540                            } else {
541                                // Single occurrence, keep as-is
542                                compressed_runs.push(value);
543                            }
544                        }
545
546                        // Start new run
547                        current_value = Some(item.clone());
548                        run_count = 1;
549                    }
550                }
551
552                // Handle final run
553                if let Some(value) = current_value {
554                    if run_count > self.config.min_frequency_count {
555                        compressed_runs.push(json!({
556                            "rle_value": value,
557                            "rle_count": run_count
558                        }));
559                    } else {
560                        compressed_runs.push(value);
561                    }
562                }
563
564                Ok(JsonValue::Array(compressed_runs))
565            }
566            JsonValue::Array(arr) => {
567                // Array too small for run-length encoding, process recursively
568                let compressed_arr: Result<Vec<_>, _> = arr
569                    .iter()
570                    .map(|item| self.apply_run_length_encoding(item))
571                    .collect();
572                Ok(JsonValue::Array(compressed_arr?))
573            }
574            _ => Ok(data.clone()),
575        }
576    }
577
578    /// Hybrid compression combining multiple strategies
579    fn compress_hybrid(
580        &self,
581        data: &JsonValue,
582        string_dict: &HashMap<String, u16>,
583        numeric_deltas: &HashMap<String, f64>,
584    ) -> DomainResult<CompressedData> {
585        let mut metadata = HashMap::new();
586
587        // Add dictionary metadata
588        for (string, index) in string_dict {
589            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
590        }
591
592        // Add delta base values
593        for (path, base) in numeric_deltas {
594            metadata.insert(
595                format!("base_{path}"),
596                JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()),
597            );
598        }
599
600        // Apply both compression strategies
601        let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
602        let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
603
604        let compressed_size = serde_json::to_string(&final_compressed)
605            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
606            .len();
607
608        Ok(CompressedData {
609            strategy: self.strategy.clone(),
610            compressed_size,
611            data: final_compressed,
612            compression_metadata: metadata,
613        })
614    }
615
616    /// Replace strings with dictionary indices
617    #[allow(clippy::only_used_in_recursion)]
618    fn replace_strings_with_indices(
619        &self,
620        data: &JsonValue,
621        dictionary: &HashMap<String, u16>,
622    ) -> DomainResult<JsonValue> {
623        match data {
624            JsonValue::Object(obj) => {
625                let mut compressed_obj = serde_json::Map::new();
626                for (key, value) in obj {
627                    compressed_obj.insert(
628                        key.clone(),
629                        self.replace_strings_with_indices(value, dictionary)?,
630                    );
631                }
632                Ok(JsonValue::Object(compressed_obj))
633            }
634            JsonValue::Array(arr) => {
635                let compressed_arr: Result<Vec<_>, _> = arr
636                    .iter()
637                    .map(|item| self.replace_strings_with_indices(item, dictionary))
638                    .collect();
639                Ok(JsonValue::Array(compressed_arr?))
640            }
641            JsonValue::String(s) => {
642                if let Some(&index) = dictionary.get(s) {
643                    Ok(JsonValue::Number(serde_json::Number::from(index)))
644                } else {
645                    Ok(data.clone())
646                }
647            }
648            _ => Ok(data.clone()),
649        }
650    }
651
652    /// Apply delta compression to numeric sequences in arrays
653    fn apply_delta_compression(
654        &self,
655        data: &JsonValue,
656        base_values: &HashMap<String, f64>,
657    ) -> DomainResult<JsonValue> {
658        self.apply_delta_recursive(data, "", base_values)
659    }
660
661    /// Recursively apply delta compression to JSON structure
662    fn apply_delta_recursive(
663        &self,
664        data: &JsonValue,
665        path: &str,
666        base_values: &HashMap<String, f64>,
667    ) -> DomainResult<JsonValue> {
668        match data {
669            JsonValue::Object(obj) => {
670                let mut compressed_obj = serde_json::Map::new();
671                for (key, value) in obj {
672                    let field_path = if path.is_empty() {
673                        key.clone()
674                    } else {
675                        format!("{path}.{key}")
676                    };
677                    compressed_obj.insert(
678                        key.clone(),
679                        self.apply_delta_recursive(value, &field_path, base_values)?,
680                    );
681                }
682                Ok(JsonValue::Object(compressed_obj))
683            }
684            JsonValue::Array(arr) if arr.len() > 2 => {
685                // Check if this array contains numeric sequences that can be delta-compressed
686                if self.is_numeric_sequence(arr) {
687                    self.compress_numeric_array_with_delta(arr, path, base_values)
688                } else {
689                    // Process array elements recursively
690                    let compressed_arr: Result<Vec<_>, _> = arr
691                        .iter()
692                        .enumerate()
693                        .map(|(idx, item)| {
694                            let item_path = format!("{path}[{idx}]");
695                            self.apply_delta_recursive(item, &item_path, base_values)
696                        })
697                        .collect();
698                    Ok(JsonValue::Array(compressed_arr?))
699                }
700            }
701            JsonValue::Array(arr) => {
702                // Array too small for delta compression, process recursively
703                let compressed_arr: Result<Vec<_>, _> = arr
704                    .iter()
705                    .enumerate()
706                    .map(|(idx, item)| {
707                        let item_path = format!("{path}[{idx}]");
708                        self.apply_delta_recursive(item, &item_path, base_values)
709                    })
710                    .collect();
711                Ok(JsonValue::Array(compressed_arr?))
712            }
713            _ => Ok(data.clone()),
714        }
715    }
716
717    /// Check if array contains a numeric sequence suitable for delta compression
718    fn is_numeric_sequence(&self, arr: &[JsonValue]) -> bool {
719        if arr.len() < self.config.min_numeric_sequence_size {
720            return false;
721        }
722
723        // Check if all elements are numbers
724        arr.iter().all(|v| v.is_number())
725    }
726
727    /// Apply delta compression to numeric array
728    fn compress_numeric_array_with_delta(
729        &self,
730        arr: &[JsonValue],
731        path: &str,
732        base_values: &HashMap<String, f64>,
733    ) -> DomainResult<JsonValue> {
734        let mut compressed_array = Vec::new();
735
736        // Extract numeric values
737        let numbers: Vec<f64> = arr.iter().filter_map(|v| v.as_f64()).collect();
738
739        if numbers.is_empty() {
740            return Ok(JsonValue::Array(arr.to_vec()));
741        }
742
743        // Use base value from analysis or first element as base
744        let base_value = base_values.get(path).copied().unwrap_or(numbers[0]);
745
746        // Add metadata for base value
747        compressed_array.push(json!({
748            "delta_base": base_value,
749            "delta_type": "numeric_sequence"
750        }));
751
752        // Calculate deltas from base value
753        let deltas: Vec<f64> = numbers.iter().map(|&num| num - base_value).collect();
754
755        // Check if delta compression is beneficial
756        let original_precision = numbers.iter().map(|n| format!("{n}").len()).sum::<usize>();
757
758        let delta_precision = deltas.iter().map(|d| format!("{d}").len()).sum::<usize>();
759
760        if delta_precision < original_precision {
761            // Delta compression is beneficial
762            compressed_array.extend(deltas.into_iter().map(JsonValue::from));
763        } else {
764            // Keep original values
765            return Ok(JsonValue::Array(arr.to_vec()));
766        }
767
768        Ok(JsonValue::Array(compressed_array))
769    }
770}
771
772/// Compressed data with metadata
773#[derive(Debug, Clone)]
774pub struct CompressedData {
775    pub strategy: CompressionStrategy,
776    pub compressed_size: usize,
777    pub data: JsonValue,
778    pub compression_metadata: HashMap<String, JsonValue>,
779}
780
781impl CompressedData {
782    /// Calculate compression ratio
783    pub fn compression_ratio(&self, original_size: usize) -> f32 {
784        if original_size == 0 {
785            return 1.0;
786        }
787        self.compressed_size as f32 / original_size as f32
788    }
789
790    /// Get compression savings in bytes
791    pub fn compression_savings(&self, original_size: usize) -> isize {
792        original_size as isize - self.compressed_size as isize
793    }
794}
795
796impl Default for SchemaAnalyzer {
797    fn default() -> Self {
798        Self::new()
799    }
800}
801
802impl Default for SchemaCompressor {
803    fn default() -> Self {
804        Self::new()
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811    use serde_json::json;
812
813    #[test]
814    fn test_schema_analyzer_dictionary_potential() {
815        let mut analyzer = SchemaAnalyzer::new();
816
817        let data = json!({
818            "users": [
819                {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
820                {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
821                {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
822                {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
823                {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
824                {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
825                {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
826                {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
827            ]
828        });
829
830        let strategy = analyzer.analyze(&data).unwrap();
831
832        // Should detect repeating strings like "admin", "active"
833        match strategy {
834            CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
835                // Expected outcome
836            }
837            _ => panic!("Expected dictionary-based compression strategy"),
838        }
839    }
840
841    #[test]
842    fn test_schema_compressor_basic() {
843        let compressor = SchemaCompressor::new();
844
845        let data = json!({
846            "message": "hello world",
847            "count": 42
848        });
849
850        let original_size = serde_json::to_string(&data).unwrap().len();
851        let compressed = compressor.compress(&data).unwrap();
852
853        assert!(compressed.compressed_size > 0);
854        assert!(compressed.compression_ratio(original_size) <= 1.0);
855    }
856
857    #[test]
858    fn test_dictionary_compression() {
859        let mut dictionary = HashMap::new();
860        dictionary.insert("active".to_string(), 0);
861        dictionary.insert("admin".to_string(), 1);
862
863        let compressor =
864            SchemaCompressor::with_strategy(CompressionStrategy::Dictionary { dictionary });
865
866        let data = json!({
867            "status": "active",
868            "role": "admin",
869            "description": "active admin user"
870        });
871
872        let result = compressor.compress(&data).unwrap();
873
874        // Verify compression metadata contains dictionary
875        assert!(result.compression_metadata.contains_key("dict_0"));
876        assert!(result.compression_metadata.contains_key("dict_1"));
877    }
878
879    #[test]
880    fn test_compression_strategy_selection() {
881        let mut analyzer = SchemaAnalyzer::new();
882
883        // Test data with no clear patterns
884        let simple_data = json!({
885            "unique_field_1": "unique_value_1",
886            "unique_field_2": "unique_value_2"
887        });
888
889        let strategy = analyzer.analyze(&simple_data).unwrap();
890        assert_eq!(strategy, CompressionStrategy::None);
891    }
892
893    #[test]
894    fn test_numeric_delta_analysis() {
895        let mut analyzer = SchemaAnalyzer::new();
896
897        let data = json!({
898            "measurements": [
899                {"time": 100, "value": 10.0},
900                {"time": 101, "value": 10.5},
901                {"time": 102, "value": 11.0},
902                {"time": 103, "value": 11.5}
903            ]
904        });
905
906        let _strategy = analyzer.analyze(&data).unwrap();
907
908        // Should detect incremental numeric patterns
909        assert!(!analyzer.numeric_fields.is_empty());
910    }
911
912    #[test]
913    fn test_run_length_encoding() {
914        let compressor = SchemaCompressor::with_strategy(CompressionStrategy::RunLength);
915
916        let data = json!({
917            "repeated_values": [1, 1, 1, 2, 2, 3, 3, 3, 3]
918        });
919
920        let result = compressor.compress(&data).unwrap();
921
922        // Should compress repeated sequences
923        assert!(result.compressed_size > 0);
924
925        // Verify RLE format in the compressed data
926        let compressed_array = &result.data["repeated_values"];
927        assert!(compressed_array.is_array());
928
929        // Should contain RLE objects
930        let array = compressed_array.as_array().unwrap();
931        let has_rle = array.iter().any(|v| v.get("rle_value").is_some());
932        assert!(has_rle);
933    }
934
935    #[test]
936    fn test_delta_compression() {
937        let mut base_values = HashMap::new();
938        base_values.insert("sequence".to_string(), 100.0);
939
940        let compressor =
941            SchemaCompressor::with_strategy(CompressionStrategy::Delta { base_values });
942
943        let data = json!({
944            "sequence": [100.0, 101.0, 102.0, 103.0, 104.0]
945        });
946
947        let result = compressor.compress(&data).unwrap();
948
949        // Should apply delta compression
950        assert!(result.compressed_size > 0);
951
952        // Verify delta format in the compressed data
953        let compressed_array = &result.data["sequence"];
954        assert!(compressed_array.is_array());
955
956        // Should contain delta metadata
957        let array = compressed_array.as_array().unwrap();
958        let has_delta_base = array.iter().any(|v| v.get("delta_base").is_some());
959        assert!(has_delta_base);
960    }
961}