pjson_rs/compression/
mod.rs

1//! Schema-based compression for PJS protocol
2//!
3//! Implements intelligent compression strategies based on JSON schema analysis
4//! to optimize bandwidth usage while maintaining streaming capabilities.
5
6pub mod secure;
7
8use crate::domain::{DomainError, DomainResult};
9use serde_json::{Value as JsonValue, json};
10use std::collections::HashMap;
11
12/// Configuration constants for compression algorithms
13#[derive(Debug, Clone)]
14pub struct CompressionConfig {
15    /// Minimum array length for pattern analysis
16    pub min_array_length: usize,
17    /// Minimum string length for dictionary inclusion
18    pub min_string_length: usize,
19    /// Minimum frequency for dictionary inclusion
20    pub min_frequency_count: u32,
21    /// Minimum compression potential for UUID patterns
22    pub uuid_compression_potential: f32,
23    /// Threshold score for string dictionary compression
24    pub string_dict_threshold: f32,
25    /// Threshold score for delta compression
26    pub delta_threshold: f32,
27    /// Minimum delta potential for numeric compression
28    pub min_delta_potential: f32,
29    /// Threshold for run-length compression
30    pub run_length_threshold: f32,
31    /// Minimum compression potential for pattern selection
32    pub min_compression_potential: f32,
33    /// Minimum array size for numeric sequence analysis
34    pub min_numeric_sequence_size: usize,
35}
36
37impl Default for CompressionConfig {
38    fn default() -> Self {
39        Self {
40            min_array_length: 2,
41            min_string_length: 3,
42            min_frequency_count: 1,
43            uuid_compression_potential: 0.3,
44            string_dict_threshold: 50.0,
45            delta_threshold: 30.0,
46            min_delta_potential: 0.3,
47            run_length_threshold: 20.0,
48            min_compression_potential: 0.4,
49            min_numeric_sequence_size: 3,
50        }
51    }
52}
53
54/// Compression strategy based on schema analysis
55#[derive(Debug, Clone, PartialEq)]
56pub enum CompressionStrategy {
57    /// No compression applied
58    None,
59    /// Dictionary-based compression for repeating string patterns
60    Dictionary { dictionary: HashMap<String, u16> },
61    /// Delta encoding for numeric sequences
62    Delta { base_values: HashMap<String, f64> },
63    /// Run-length encoding for repeated values
64    RunLength,
65    /// Hybrid approach combining multiple strategies
66    Hybrid {
67        string_dict: HashMap<String, u16>,
68        numeric_deltas: HashMap<String, f64>,
69    },
70}
71
72/// Schema analyzer for determining optimal compression strategy
73#[derive(Debug, Clone)]
74pub struct SchemaAnalyzer {
75    /// Pattern frequency analysis
76    patterns: HashMap<String, PatternInfo>,
77    /// Numeric field analysis
78    numeric_fields: HashMap<String, NumericStats>,
79    /// String repetition analysis
80    string_repetitions: HashMap<String, u32>,
81    /// Configuration for compression algorithms
82    config: CompressionConfig,
83}
84
85#[derive(Debug, Clone)]
86struct PatternInfo {
87    frequency: u32,
88    #[allow(dead_code)] // Future: used for compression ratio calculation
89    total_size: usize,
90    compression_potential: f32,
91}
92
93#[derive(Debug, Clone)]
94struct NumericStats {
95    values: Vec<f64>,
96    delta_potential: f32,
97    base_value: f64,
98}
99
100impl SchemaAnalyzer {
101    /// Create new schema analyzer
102    pub fn new() -> Self {
103        Self {
104            patterns: HashMap::new(),
105            numeric_fields: HashMap::new(),
106            string_repetitions: HashMap::new(),
107            config: CompressionConfig::default(),
108        }
109    }
110
111    /// Create new schema analyzer with custom configuration
112    pub fn with_config(config: CompressionConfig) -> Self {
113        Self {
114            patterns: HashMap::new(),
115            numeric_fields: HashMap::new(),
116            string_repetitions: HashMap::new(),
117            config,
118        }
119    }
120
121    /// Analyze JSON data to determine optimal compression strategy
122    pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
123        // Reset analysis state
124        self.patterns.clear();
125        self.numeric_fields.clear();
126        self.string_repetitions.clear();
127
128        // Perform deep analysis
129        self.analyze_recursive(data, "")?;
130
131        // Determine best strategy based on analysis
132        self.determine_strategy()
133    }
134
135    /// Analyze data recursively
136    fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
137        match value {
138            JsonValue::Object(obj) => {
139                for (key, val) in obj {
140                    let field_path = if path.is_empty() {
141                        key.clone()
142                    } else {
143                        format!("{path}.{key}")
144                    };
145                    self.analyze_recursive(val, &field_path)?;
146                }
147            }
148            JsonValue::Array(arr) => {
149                // Analyze array patterns
150                if arr.len() > self.config.min_array_length {
151                    self.analyze_array_patterns(arr, path)?;
152                }
153                for (idx, item) in arr.iter().enumerate() {
154                    let item_path = format!("{path}[{idx}]");
155                    self.analyze_recursive(item, &item_path)?;
156                }
157            }
158            JsonValue::String(s) => {
159                self.analyze_string_pattern(s, path);
160            }
161            JsonValue::Number(n) => {
162                if let Some(f) = n.as_f64() {
163                    self.analyze_numeric_pattern(f, path);
164                }
165            }
166            _ => {}
167        }
168        Ok(())
169    }
170
171    /// Analyze array for repeating patterns
172    fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
173        // Check for repeating object structures
174        if let Some(JsonValue::Object(first)) = arr.first() {
175            let structure_key = format!("array_structure:{path}");
176            let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
177            let pattern = field_names.join(",");
178
179            // Count how many objects share this structure
180            let matching_count = arr
181                .iter()
182                .filter_map(|v| v.as_object())
183                .filter(|obj| {
184                    let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
185                    obj_fields.join(",") == pattern
186                })
187                .count();
188
189            if matching_count > self.config.min_frequency_count as usize {
190                let info = PatternInfo {
191                    frequency: matching_count as u32,
192                    total_size: pattern.len() * matching_count,
193                    compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
194                };
195                self.patterns.insert(structure_key, info);
196            }
197        }
198
199        // Check for repeating primitive values
200        if arr.len() > 2 {
201            let mut value_counts = HashMap::new();
202            for value in arr {
203                let key = match value {
204                    JsonValue::String(s) => format!("string:{s}"),
205                    JsonValue::Number(n) => format!("number:{n}"),
206                    JsonValue::Bool(b) => format!("bool:{b}"),
207                    _ => continue,
208                };
209                *value_counts.entry(key).or_insert(0) += 1;
210            }
211
212            for (value_key, count) in value_counts {
213                if count > self.config.min_frequency_count {
214                    let info = PatternInfo {
215                        frequency: count,
216                        total_size: value_key.len() * count as usize,
217                        compression_potential: (count as f32 - 1.0) / count as f32,
218                    };
219                    self.patterns
220                        .insert(format!("array_value:{path}:{value_key}"), info);
221                }
222            }
223        }
224
225        Ok(())
226    }
227
228    /// Analyze string for repetition patterns
229    fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
230        // Track string repetitions across different paths
231        *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
232
233        // Analyze common prefixes/suffixes for URLs, IDs, etc.
234        if s.len() > 10 {
235            // Check for URL patterns
236            if s.starts_with("http://") || s.starts_with("https://") {
237                let prefix = if s.starts_with("https://") {
238                    "https://"
239                } else {
240                    "http://"
241                };
242                self.patterns
243                    .entry(format!("url_prefix:{prefix}"))
244                    .or_insert(PatternInfo {
245                        frequency: 0,
246                        total_size: 0,
247                        compression_potential: 0.0,
248                    })
249                    .frequency += 1;
250            }
251
252            // Check for ID patterns (UUID-like)
253            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
254                self.patterns
255                    .entry("uuid_pattern".to_string())
256                    .or_insert(PatternInfo {
257                        frequency: 0,
258                        total_size: 36,
259                        compression_potential: self.config.uuid_compression_potential,
260                    })
261                    .frequency += 1;
262            }
263        }
264    }
265
266    /// Analyze numeric patterns for delta compression
267    fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
268        self.numeric_fields
269            .entry(path.to_string())
270            .or_insert_with(|| NumericStats {
271                values: Vec::new(),
272                delta_potential: 0.0,
273                base_value: value,
274            })
275            .values
276            .push(value);
277    }
278
279    /// Determine optimal compression strategy based on analysis
280    fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
281        // Calculate compression potentials
282        let mut string_dict_score = 0.0;
283        let mut delta_score = 0.0;
284
285        // Analyze string repetition potential
286        let mut string_dict = HashMap::new();
287        let mut dict_index = 0u16;
288
289        for (string, count) in &self.string_repetitions {
290            if *count > self.config.min_frequency_count
291                && string.len() > self.config.min_string_length
292            {
293                string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
294                string_dict.insert(string.clone(), dict_index);
295                dict_index += 1;
296            }
297        }
298
299        // Analyze numeric delta potential
300        let mut numeric_deltas = HashMap::new();
301
302        for (path, stats) in &mut self.numeric_fields {
303            if stats.values.len() > 2 {
304                // Calculate variance to determine delta effectiveness
305                stats
306                    .values
307                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
308
309                let deltas: Vec<f64> = stats
310                    .values
311                    .windows(2)
312                    .map(|window| window[1] - window[0])
313                    .collect();
314
315                if !deltas.is_empty() {
316                    let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
317                    let delta_variance =
318                        deltas.iter().map(|d| (d - avg_delta).powi(2)).sum::<f64>()
319                            / deltas.len() as f64;
320
321                    // Low variance suggests good delta compression potential
322                    stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
323
324                    if stats.delta_potential > self.config.min_delta_potential {
325                        delta_score += stats.delta_potential * stats.values.len() as f32;
326                        numeric_deltas.insert(path.clone(), stats.base_value);
327                    }
328                }
329            }
330        }
331
332        // Choose strategy based on scores
333        match (
334            string_dict_score > self.config.string_dict_threshold,
335            delta_score > self.config.delta_threshold,
336        ) {
337            (true, true) => Ok(CompressionStrategy::Hybrid {
338                string_dict,
339                numeric_deltas,
340            }),
341            (true, false) => Ok(CompressionStrategy::Dictionary {
342                dictionary: string_dict,
343            }),
344            (false, true) => Ok(CompressionStrategy::Delta {
345                base_values: numeric_deltas,
346            }),
347            (false, false) => {
348                // Check for run-length potential
349                let run_length_score = self
350                    .patterns
351                    .values()
352                    .filter(|p| p.compression_potential > self.config.min_compression_potential)
353                    .map(|p| p.frequency as f32 * p.compression_potential)
354                    .sum::<f32>();
355
356                if run_length_score > self.config.run_length_threshold {
357                    Ok(CompressionStrategy::RunLength)
358                } else {
359                    Ok(CompressionStrategy::None)
360                }
361            }
362        }
363    }
364}
365
366/// Schema-aware compressor
367#[derive(Debug, Clone)]
368pub struct SchemaCompressor {
369    strategy: CompressionStrategy,
370    analyzer: SchemaAnalyzer,
371    config: CompressionConfig,
372}
373
374impl SchemaCompressor {
375    /// Create new compressor with automatic strategy detection
376    pub fn new() -> Self {
377        let config = CompressionConfig::default();
378        Self {
379            strategy: CompressionStrategy::None,
380            analyzer: SchemaAnalyzer::with_config(config.clone()),
381            config,
382        }
383    }
384
385    /// Create compressor with specific strategy
386    pub fn with_strategy(strategy: CompressionStrategy) -> Self {
387        let config = CompressionConfig::default();
388        Self {
389            strategy,
390            analyzer: SchemaAnalyzer::with_config(config.clone()),
391            config,
392        }
393    }
394
395    /// Create compressor with custom configuration
396    pub fn with_config(config: CompressionConfig) -> Self {
397        Self {
398            strategy: CompressionStrategy::None,
399            analyzer: SchemaAnalyzer::with_config(config.clone()),
400            config,
401        }
402    }
403
404    /// Analyze data and update compression strategy
405    pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
406        self.strategy = self.analyzer.analyze(data)?;
407        Ok(&self.strategy)
408    }
409
410    /// Compress JSON data according to current strategy
411    pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
412        match &self.strategy {
413            CompressionStrategy::None => Ok(CompressedData {
414                strategy: self.strategy.clone(),
415                compressed_size: serde_json::to_string(data)
416                    .map_err(|e| {
417                        DomainError::CompressionError(format!("JSON serialization failed: {e}"))
418                    })?
419                    .len(),
420                data: data.clone(),
421                compression_metadata: HashMap::new(),
422            }),
423
424            CompressionStrategy::Dictionary { dictionary } => {
425                self.compress_with_dictionary(data, dictionary)
426            }
427
428            CompressionStrategy::Delta { base_values } => {
429                self.compress_with_delta(data, base_values)
430            }
431
432            CompressionStrategy::RunLength => self.compress_with_run_length(data),
433
434            CompressionStrategy::Hybrid {
435                string_dict,
436                numeric_deltas,
437            } => self.compress_hybrid(data, string_dict, numeric_deltas),
438        }
439    }
440
441    /// Dictionary-based compression
442    fn compress_with_dictionary(
443        &self,
444        data: &JsonValue,
445        dictionary: &HashMap<String, u16>,
446    ) -> DomainResult<CompressedData> {
447        let mut metadata = HashMap::new();
448
449        // Store dictionary for decompression
450        for (string, index) in dictionary {
451            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
452        }
453
454        // Replace strings with dictionary indices
455        let compressed = self.replace_strings_with_indices(data, dictionary)?;
456        let compressed_size = serde_json::to_string(&compressed)
457            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
458            .len();
459
460        Ok(CompressedData {
461            strategy: self.strategy.clone(),
462            compressed_size,
463            data: compressed,
464            compression_metadata: metadata,
465        })
466    }
467
468    /// Delta compression for numeric sequences
469    fn compress_with_delta(
470        &self,
471        data: &JsonValue,
472        base_values: &HashMap<String, f64>,
473    ) -> DomainResult<CompressedData> {
474        let mut metadata = HashMap::new();
475
476        // Store base values
477        for (path, base) in base_values {
478            metadata.insert(
479                format!("base_{path}"),
480                JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()),
481            );
482        }
483
484        // Apply delta compression
485        let compressed = self.apply_delta_compression(data, base_values)?;
486        let compressed_size = serde_json::to_string(&compressed)
487            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
488            .len();
489
490        Ok(CompressedData {
491            strategy: self.strategy.clone(),
492            compressed_size,
493            data: compressed,
494            compression_metadata: metadata,
495        })
496    }
497
498    /// Run-length encoding compression
499    fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
500        let compressed = self.apply_run_length_encoding(data)?;
501        let compressed_size = serde_json::to_string(&compressed)
502            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
503            .len();
504
505        Ok(CompressedData {
506            strategy: self.strategy.clone(),
507            compressed_size,
508            data: compressed,
509            compression_metadata: HashMap::new(),
510        })
511    }
512
513    /// Apply run-length encoding to arrays with repeated values
514    fn apply_run_length_encoding(&self, data: &JsonValue) -> DomainResult<JsonValue> {
515        match data {
516            JsonValue::Object(obj) => {
517                let mut compressed_obj = serde_json::Map::new();
518                for (key, value) in obj {
519                    compressed_obj.insert(key.clone(), self.apply_run_length_encoding(value)?);
520                }
521                Ok(JsonValue::Object(compressed_obj))
522            }
523            JsonValue::Array(arr) if arr.len() > 2 => {
524                // Apply run-length encoding to array
525                let mut compressed_runs = Vec::new();
526                let mut current_value = None;
527                let mut run_count = 0;
528
529                for item in arr {
530                    if Some(item) == current_value.as_ref() {
531                        run_count += 1;
532                    } else {
533                        // Save previous run if it exists
534                        if let Some(value) = current_value {
535                            if run_count > self.config.min_frequency_count {
536                                // Use run-length encoding: [value, count]
537                                compressed_runs.push(json!({
538                                    "rle_value": value,
539                                    "rle_count": run_count
540                                }));
541                            } else {
542                                // Single occurrence, keep as-is
543                                compressed_runs.push(value);
544                            }
545                        }
546
547                        // Start new run
548                        current_value = Some(item.clone());
549                        run_count = 1;
550                    }
551                }
552
553                // Handle final run
554                if let Some(value) = current_value {
555                    if run_count > self.config.min_frequency_count {
556                        compressed_runs.push(json!({
557                            "rle_value": value,
558                            "rle_count": run_count
559                        }));
560                    } else {
561                        compressed_runs.push(value);
562                    }
563                }
564
565                Ok(JsonValue::Array(compressed_runs))
566            }
567            JsonValue::Array(arr) => {
568                // Array too small for run-length encoding, process recursively
569                let compressed_arr: Result<Vec<_>, _> = arr
570                    .iter()
571                    .map(|item| self.apply_run_length_encoding(item))
572                    .collect();
573                Ok(JsonValue::Array(compressed_arr?))
574            }
575            _ => Ok(data.clone()),
576        }
577    }
578
579    /// Hybrid compression combining multiple strategies
580    fn compress_hybrid(
581        &self,
582        data: &JsonValue,
583        string_dict: &HashMap<String, u16>,
584        numeric_deltas: &HashMap<String, f64>,
585    ) -> DomainResult<CompressedData> {
586        let mut metadata = HashMap::new();
587
588        // Add dictionary metadata
589        for (string, index) in string_dict {
590            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
591        }
592
593        // Add delta base values
594        for (path, base) in numeric_deltas {
595            metadata.insert(
596                format!("base_{path}"),
597                JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()),
598            );
599        }
600
601        // Apply both compression strategies
602        let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
603        let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
604
605        let compressed_size = serde_json::to_string(&final_compressed)
606            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
607            .len();
608
609        Ok(CompressedData {
610            strategy: self.strategy.clone(),
611            compressed_size,
612            data: final_compressed,
613            compression_metadata: metadata,
614        })
615    }
616
617    /// Replace strings with dictionary indices
618    #[allow(clippy::only_used_in_recursion)]
619    fn replace_strings_with_indices(
620        &self,
621        data: &JsonValue,
622        dictionary: &HashMap<String, u16>,
623    ) -> DomainResult<JsonValue> {
624        match data {
625            JsonValue::Object(obj) => {
626                let mut compressed_obj = serde_json::Map::new();
627                for (key, value) in obj {
628                    compressed_obj.insert(
629                        key.clone(),
630                        self.replace_strings_with_indices(value, dictionary)?,
631                    );
632                }
633                Ok(JsonValue::Object(compressed_obj))
634            }
635            JsonValue::Array(arr) => {
636                let compressed_arr: Result<Vec<_>, _> = arr
637                    .iter()
638                    .map(|item| self.replace_strings_with_indices(item, dictionary))
639                    .collect();
640                Ok(JsonValue::Array(compressed_arr?))
641            }
642            JsonValue::String(s) => {
643                if let Some(&index) = dictionary.get(s) {
644                    Ok(JsonValue::Number(serde_json::Number::from(index)))
645                } else {
646                    Ok(data.clone())
647                }
648            }
649            _ => Ok(data.clone()),
650        }
651    }
652
653    /// Apply delta compression to numeric sequences in arrays
654    fn apply_delta_compression(
655        &self,
656        data: &JsonValue,
657        base_values: &HashMap<String, f64>,
658    ) -> DomainResult<JsonValue> {
659        self.apply_delta_recursive(data, "", base_values)
660    }
661
662    /// Recursively apply delta compression to JSON structure
663    fn apply_delta_recursive(
664        &self,
665        data: &JsonValue,
666        path: &str,
667        base_values: &HashMap<String, f64>,
668    ) -> DomainResult<JsonValue> {
669        match data {
670            JsonValue::Object(obj) => {
671                let mut compressed_obj = serde_json::Map::new();
672                for (key, value) in obj {
673                    let field_path = if path.is_empty() {
674                        key.clone()
675                    } else {
676                        format!("{path}.{key}")
677                    };
678                    compressed_obj.insert(
679                        key.clone(),
680                        self.apply_delta_recursive(value, &field_path, base_values)?,
681                    );
682                }
683                Ok(JsonValue::Object(compressed_obj))
684            }
685            JsonValue::Array(arr) if arr.len() > 2 => {
686                // Check if this array contains numeric sequences that can be delta-compressed
687                if self.is_numeric_sequence(arr) {
688                    self.compress_numeric_array_with_delta(arr, path, base_values)
689                } else {
690                    // Process array elements recursively
691                    let compressed_arr: Result<Vec<_>, _> = arr
692                        .iter()
693                        .enumerate()
694                        .map(|(idx, item)| {
695                            let item_path = format!("{path}[{idx}]");
696                            self.apply_delta_recursive(item, &item_path, base_values)
697                        })
698                        .collect();
699                    Ok(JsonValue::Array(compressed_arr?))
700                }
701            }
702            JsonValue::Array(arr) => {
703                // Array too small for delta compression, process recursively
704                let compressed_arr: Result<Vec<_>, _> = arr
705                    .iter()
706                    .enumerate()
707                    .map(|(idx, item)| {
708                        let item_path = format!("{path}[{idx}]");
709                        self.apply_delta_recursive(item, &item_path, base_values)
710                    })
711                    .collect();
712                Ok(JsonValue::Array(compressed_arr?))
713            }
714            _ => Ok(data.clone()),
715        }
716    }
717
718    /// Check if array contains a numeric sequence suitable for delta compression
719    fn is_numeric_sequence(&self, arr: &[JsonValue]) -> bool {
720        if arr.len() < self.config.min_numeric_sequence_size {
721            return false;
722        }
723
724        // Check if all elements are numbers
725        arr.iter().all(|v| v.is_number())
726    }
727
728    /// Apply delta compression to numeric array
729    fn compress_numeric_array_with_delta(
730        &self,
731        arr: &[JsonValue],
732        path: &str,
733        base_values: &HashMap<String, f64>,
734    ) -> DomainResult<JsonValue> {
735        let mut compressed_array = Vec::new();
736
737        // Extract numeric values
738        let numbers: Vec<f64> = arr.iter().filter_map(|v| v.as_f64()).collect();
739
740        if numbers.is_empty() {
741            return Ok(JsonValue::Array(arr.to_vec()));
742        }
743
744        // Use base value from analysis or first element as base
745        let base_value = base_values.get(path).copied().unwrap_or(numbers[0]);
746
747        // Add metadata for base value
748        compressed_array.push(json!({
749            "delta_base": base_value,
750            "delta_type": "numeric_sequence"
751        }));
752
753        // Calculate deltas from base value
754        let deltas: Vec<f64> = numbers.iter().map(|&num| num - base_value).collect();
755
756        // Check if delta compression is beneficial
757        let original_precision = numbers.iter().map(|n| format!("{n}").len()).sum::<usize>();
758
759        let delta_precision = deltas.iter().map(|d| format!("{d}").len()).sum::<usize>();
760
761        if delta_precision < original_precision {
762            // Delta compression is beneficial
763            compressed_array.extend(deltas.into_iter().map(JsonValue::from));
764        } else {
765            // Keep original values
766            return Ok(JsonValue::Array(arr.to_vec()));
767        }
768
769        Ok(JsonValue::Array(compressed_array))
770    }
771}
772
773/// Compressed data with metadata
774#[derive(Debug, Clone)]
775pub struct CompressedData {
776    pub strategy: CompressionStrategy,
777    pub compressed_size: usize,
778    pub data: JsonValue,
779    pub compression_metadata: HashMap<String, JsonValue>,
780}
781
782impl CompressedData {
783    /// Calculate compression ratio
784    pub fn compression_ratio(&self, original_size: usize) -> f32 {
785        if original_size == 0 {
786            return 1.0;
787        }
788        self.compressed_size as f32 / original_size as f32
789    }
790
791    /// Get compression savings in bytes
792    pub fn compression_savings(&self, original_size: usize) -> isize {
793        original_size as isize - self.compressed_size as isize
794    }
795}
796
797impl Default for SchemaAnalyzer {
798    fn default() -> Self {
799        Self::new()
800    }
801}
802
803impl Default for SchemaCompressor {
804    fn default() -> Self {
805        Self::new()
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use super::*;
812    use serde_json::json;
813
814    #[test]
815    fn test_schema_analyzer_dictionary_potential() {
816        let mut analyzer = SchemaAnalyzer::new();
817
818        let data = json!({
819            "users": [
820                {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
821                {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
822                {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
823                {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
824                {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
825                {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
826                {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
827                {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
828            ]
829        });
830
831        let strategy = analyzer.analyze(&data).unwrap();
832
833        // Should detect repeating strings like "admin", "active"
834        match strategy {
835            CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
836                // Expected outcome
837            }
838            _ => panic!("Expected dictionary-based compression strategy"),
839        }
840    }
841
842    #[test]
843    fn test_schema_compressor_basic() {
844        let compressor = SchemaCompressor::new();
845
846        let data = json!({
847            "message": "hello world",
848            "count": 42
849        });
850
851        let original_size = serde_json::to_string(&data).unwrap().len();
852        let compressed = compressor.compress(&data).unwrap();
853
854        assert!(compressed.compressed_size > 0);
855        assert!(compressed.compression_ratio(original_size) <= 1.0);
856    }
857
858    #[test]
859    fn test_dictionary_compression() {
860        let mut dictionary = HashMap::new();
861        dictionary.insert("active".to_string(), 0);
862        dictionary.insert("admin".to_string(), 1);
863
864        let compressor =
865            SchemaCompressor::with_strategy(CompressionStrategy::Dictionary { dictionary });
866
867        let data = json!({
868            "status": "active",
869            "role": "admin",
870            "description": "active admin user"
871        });
872
873        let result = compressor.compress(&data).unwrap();
874
875        // Verify compression metadata contains dictionary
876        assert!(result.compression_metadata.contains_key("dict_0"));
877        assert!(result.compression_metadata.contains_key("dict_1"));
878    }
879
880    #[test]
881    fn test_compression_strategy_selection() {
882        let mut analyzer = SchemaAnalyzer::new();
883
884        // Test data with no clear patterns
885        let simple_data = json!({
886            "unique_field_1": "unique_value_1",
887            "unique_field_2": "unique_value_2"
888        });
889
890        let strategy = analyzer.analyze(&simple_data).unwrap();
891        assert_eq!(strategy, CompressionStrategy::None);
892    }
893
894    #[test]
895    fn test_numeric_delta_analysis() {
896        let mut analyzer = SchemaAnalyzer::new();
897
898        let data = json!({
899            "measurements": [
900                {"time": 100, "value": 10.0},
901                {"time": 101, "value": 10.5},
902                {"time": 102, "value": 11.0},
903                {"time": 103, "value": 11.5}
904            ]
905        });
906
907        let _strategy = analyzer.analyze(&data).unwrap();
908
909        // Should detect incremental numeric patterns
910        assert!(!analyzer.numeric_fields.is_empty());
911    }
912
913    #[test]
914    fn test_run_length_encoding() {
915        let compressor = SchemaCompressor::with_strategy(CompressionStrategy::RunLength);
916
917        let data = json!({
918            "repeated_values": [1, 1, 1, 2, 2, 3, 3, 3, 3]
919        });
920
921        let result = compressor.compress(&data).unwrap();
922
923        // Should compress repeated sequences
924        assert!(result.compressed_size > 0);
925
926        // Verify RLE format in the compressed data
927        let compressed_array = &result.data["repeated_values"];
928        assert!(compressed_array.is_array());
929
930        // Should contain RLE objects
931        let array = compressed_array.as_array().unwrap();
932        let has_rle = array.iter().any(|v| v.get("rle_value").is_some());
933        assert!(has_rle);
934    }
935
936    #[test]
937    fn test_delta_compression() {
938        let mut base_values = HashMap::new();
939        base_values.insert("sequence".to_string(), 100.0);
940
941        let compressor =
942            SchemaCompressor::with_strategy(CompressionStrategy::Delta { base_values });
943
944        let data = json!({
945            "sequence": [100.0, 101.0, 102.0, 103.0, 104.0]
946        });
947
948        let result = compressor.compress(&data).unwrap();
949
950        // Should apply delta compression
951        assert!(result.compressed_size > 0);
952
953        // Verify delta format in the compressed data
954        let compressed_array = &result.data["sequence"];
955        assert!(compressed_array.is_array());
956
957        // Should contain delta metadata
958        let array = compressed_array.as_array().unwrap();
959        let has_delta_base = array.iter().any(|v| v.get("delta_base").is_some());
960        assert!(has_delta_base);
961    }
962}