Skip to main content

pjson_rs/compression/
mod.rs

1//! Schema-based compression for PJS protocol
2//!
3//! Implements intelligent compression strategies based on JSON schema analysis
4//! to optimize bandwidth usage while maintaining streaming capabilities.
5
6pub mod secure;
7
8#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
9pub mod zstd;
10
11use crate::domain::{DomainError, DomainResult};
12use serde_json::{Value as JsonValue, json};
13use std::collections::HashMap;
14
15/// Configuration constants for compression algorithms
16#[derive(Debug, Clone)]
17pub struct CompressionConfig {
18    /// Minimum array length for pattern analysis
19    pub min_array_length: usize,
20    /// Minimum string length for dictionary inclusion
21    pub min_string_length: usize,
22    /// Minimum frequency for dictionary inclusion
23    pub min_frequency_count: u32,
24    /// Minimum compression potential for UUID patterns
25    pub uuid_compression_potential: f32,
26    /// Threshold score for string dictionary compression
27    pub string_dict_threshold: f32,
28    /// Threshold score for delta compression
29    pub delta_threshold: f32,
30    /// Minimum delta potential for numeric compression
31    pub min_delta_potential: f32,
32    /// Threshold for run-length compression
33    pub run_length_threshold: f32,
34    /// Minimum compression potential for pattern selection
35    pub min_compression_potential: f32,
36    /// Minimum array size for numeric sequence analysis
37    pub min_numeric_sequence_size: usize,
38}
39
40impl Default for CompressionConfig {
41    fn default() -> Self {
42        Self {
43            min_array_length: 2,
44            min_string_length: 3,
45            min_frequency_count: 1,
46            uuid_compression_potential: 0.3,
47            string_dict_threshold: 50.0,
48            delta_threshold: 30.0,
49            min_delta_potential: 0.3,
50            run_length_threshold: 20.0,
51            min_compression_potential: 0.4,
52            min_numeric_sequence_size: 3,
53        }
54    }
55}
56
57/// Compression strategy based on schema analysis
58#[derive(Debug, Clone, PartialEq)]
59pub enum CompressionStrategy {
60    /// No compression applied
61    None,
62    /// Dictionary-based compression for repeating string patterns
63    Dictionary {
64        /// Mapping from frequent string to assigned dictionary index.
65        dictionary: HashMap<String, u16>,
66    },
67    /// Delta encoding for numeric sequences
68    Delta {
69        /// Per-field base value subtracted before delta encoding.
70        base_values: HashMap<String, f64>,
71    },
72    /// Run-length encoding for repeated values
73    RunLength,
74    /// Hybrid approach combining multiple strategies
75    Hybrid {
76        /// Dictionary used for the string-replacement pass.
77        string_dict: HashMap<String, u16>,
78        /// Per-field base values used for the delta-encoding pass.
79        numeric_deltas: HashMap<String, f64>,
80    },
81}
82
83/// Schema analyzer for determining optimal compression strategy
84#[derive(Debug, Clone)]
85pub struct SchemaAnalyzer {
86    /// Pattern frequency analysis
87    patterns: HashMap<String, PatternInfo>,
88    /// Numeric field analysis
89    numeric_fields: HashMap<String, NumericStats>,
90    /// String repetition analysis
91    string_repetitions: HashMap<String, u32>,
92    /// Configuration for compression algorithms
93    config: CompressionConfig,
94}
95
96#[derive(Debug, Clone)]
97struct PatternInfo {
98    frequency: u32,
99    compression_potential: f32,
100}
101
102#[derive(Debug, Clone)]
103struct NumericStats {
104    values: Vec<f64>,
105    delta_potential: f32,
106    base_value: f64,
107}
108
109impl SchemaAnalyzer {
110    /// Create new schema analyzer
111    pub fn new() -> Self {
112        Self {
113            patterns: HashMap::new(),
114            numeric_fields: HashMap::new(),
115            string_repetitions: HashMap::new(),
116            config: CompressionConfig::default(),
117        }
118    }
119
120    /// Create new schema analyzer with custom configuration
121    pub fn with_config(config: CompressionConfig) -> Self {
122        Self {
123            patterns: HashMap::new(),
124            numeric_fields: HashMap::new(),
125            string_repetitions: HashMap::new(),
126            config,
127        }
128    }
129
130    /// Analyze JSON data to determine optimal compression strategy
131    pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
132        // Reset analysis state
133        self.patterns.clear();
134        self.numeric_fields.clear();
135        self.string_repetitions.clear();
136
137        // Perform deep analysis
138        self.analyze_recursive(data, "")?;
139
140        // Determine best strategy based on analysis
141        self.determine_strategy()
142    }
143
144    /// Analyze data recursively
145    fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
146        match value {
147            JsonValue::Object(obj) => {
148                for (key, val) in obj {
149                    let field_path = if path.is_empty() {
150                        key.clone()
151                    } else {
152                        format!("{path}.{key}")
153                    };
154                    self.analyze_recursive(val, &field_path)?;
155                }
156            }
157            JsonValue::Array(arr) => {
158                // Analyze array patterns
159                if arr.len() > self.config.min_array_length {
160                    self.analyze_array_patterns(arr, path)?;
161                }
162                for (idx, item) in arr.iter().enumerate() {
163                    let item_path = format!("{path}[{idx}]");
164                    self.analyze_recursive(item, &item_path)?;
165                }
166            }
167            JsonValue::String(s) => {
168                self.analyze_string_pattern(s, path);
169            }
170            JsonValue::Number(n) => {
171                if let Some(f) = n.as_f64() {
172                    self.analyze_numeric_pattern(f, path);
173                }
174            }
175            _ => {}
176        }
177        Ok(())
178    }
179
180    /// Analyze array for repeating patterns
181    fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
182        // Check for repeating object structures
183        if let Some(JsonValue::Object(first)) = arr.first() {
184            let structure_key = format!("array_structure:{path}");
185            let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
186            let pattern = field_names.join(",");
187
188            // Count how many objects share this structure
189            let matching_count = arr
190                .iter()
191                .filter_map(|v| v.as_object())
192                .filter(|obj| {
193                    let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
194                    obj_fields.join(",") == pattern
195                })
196                .count();
197
198            if matching_count > self.config.min_frequency_count as usize {
199                let info = PatternInfo {
200                    frequency: matching_count as u32,
201                    compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
202                };
203                self.patterns.insert(structure_key, info);
204            }
205        }
206
207        // Check for repeating primitive values
208        if arr.len() > 2 {
209            let mut value_counts = HashMap::new();
210            for value in arr {
211                let key = match value {
212                    JsonValue::String(s) => format!("string:{s}"),
213                    JsonValue::Number(n) => format!("number:{n}"),
214                    JsonValue::Bool(b) => format!("bool:{b}"),
215                    _ => continue,
216                };
217                *value_counts.entry(key).or_insert(0) += 1;
218            }
219
220            for (value_key, count) in value_counts {
221                if count > self.config.min_frequency_count {
222                    let info = PatternInfo {
223                        frequency: count,
224                        compression_potential: (count as f32 - 1.0) / count as f32,
225                    };
226                    self.patterns
227                        .insert(format!("array_value:{path}:{value_key}"), info);
228                }
229            }
230        }
231
232        Ok(())
233    }
234
235    /// Analyze string for repetition patterns
236    fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
237        // Track string repetitions across different paths
238        *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
239
240        // Analyze common prefixes/suffixes for URLs, IDs, etc.
241        if s.len() > 10 {
242            // Check for URL patterns
243            if s.starts_with("http://") || s.starts_with("https://") {
244                let prefix = if s.starts_with("https://") {
245                    "https://"
246                } else {
247                    "http://"
248                };
249                self.patterns
250                    .entry(format!("url_prefix:{prefix}"))
251                    .or_insert(PatternInfo {
252                        frequency: 0,
253                        compression_potential: 0.0,
254                    })
255                    .frequency += 1;
256            }
257
258            // Check for ID patterns (UUID-like)
259            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
260                self.patterns
261                    .entry("uuid_pattern".to_string())
262                    .or_insert(PatternInfo {
263                        frequency: 0,
264                        compression_potential: self.config.uuid_compression_potential,
265                    })
266                    .frequency += 1;
267            }
268        }
269    }
270
271    /// Analyze numeric patterns for delta compression
272    fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
273        self.numeric_fields
274            .entry(path.to_string())
275            .or_insert_with(|| NumericStats {
276                values: Vec::new(),
277                delta_potential: 0.0,
278                base_value: value,
279            })
280            .values
281            .push(value);
282    }
283
284    /// Determine optimal compression strategy based on analysis
285    fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
286        // Calculate compression potentials
287        let mut string_dict_score = 0.0;
288        let mut delta_score = 0.0;
289
290        // Analyze string repetition potential
291        let mut string_dict = HashMap::new();
292        let mut dict_index = 0u16;
293
294        for (string, count) in &self.string_repetitions {
295            if *count > self.config.min_frequency_count
296                && string.len() > self.config.min_string_length
297            {
298                string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
299                string_dict.insert(string.clone(), dict_index);
300                dict_index += 1;
301            }
302        }
303
304        // Analyze numeric delta potential
305        let mut numeric_deltas = HashMap::new();
306
307        for (path, stats) in &mut self.numeric_fields {
308            if stats.values.len() > 2 {
309                // Calculate variance to determine delta effectiveness
310                stats
311                    .values
312                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
313
314                let deltas: Vec<f64> = stats
315                    .values
316                    .windows(2)
317                    .map(|window| window[1] - window[0])
318                    .collect();
319
320                if !deltas.is_empty() {
321                    let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
322                    let delta_variance =
323                        deltas.iter().map(|d| (d - avg_delta).powi(2)).sum::<f64>()
324                            / deltas.len() as f64;
325
326                    // Low variance suggests good delta compression potential
327                    stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
328
329                    if stats.delta_potential > self.config.min_delta_potential {
330                        delta_score += stats.delta_potential * stats.values.len() as f32;
331                        numeric_deltas.insert(path.clone(), stats.base_value);
332                    }
333                }
334            }
335        }
336
337        // Choose strategy based on scores
338        match (
339            string_dict_score > self.config.string_dict_threshold,
340            delta_score > self.config.delta_threshold,
341        ) {
342            (true, true) => Ok(CompressionStrategy::Hybrid {
343                string_dict,
344                numeric_deltas,
345            }),
346            (true, false) => Ok(CompressionStrategy::Dictionary {
347                dictionary: string_dict,
348            }),
349            (false, true) => Ok(CompressionStrategy::Delta {
350                base_values: numeric_deltas,
351            }),
352            (false, false) => {
353                // Check for run-length potential
354                let run_length_score = self
355                    .patterns
356                    .values()
357                    .filter(|p| p.compression_potential > self.config.min_compression_potential)
358                    .map(|p| p.frequency as f32 * p.compression_potential)
359                    .sum::<f32>();
360
361                if run_length_score > self.config.run_length_threshold {
362                    Ok(CompressionStrategy::RunLength)
363                } else {
364                    Ok(CompressionStrategy::None)
365                }
366            }
367        }
368    }
369}
370
371/// Schema-aware compressor
372#[derive(Debug, Clone)]
373pub struct SchemaCompressor {
374    strategy: CompressionStrategy,
375    analyzer: SchemaAnalyzer,
376    config: CompressionConfig,
377}
378
379impl SchemaCompressor {
380    /// Create new compressor with automatic strategy detection
381    pub fn new() -> Self {
382        let config = CompressionConfig::default();
383        Self {
384            strategy: CompressionStrategy::None,
385            analyzer: SchemaAnalyzer::with_config(config.clone()),
386            config,
387        }
388    }
389
390    /// Create compressor with specific strategy
391    pub fn with_strategy(strategy: CompressionStrategy) -> Self {
392        let config = CompressionConfig::default();
393        Self {
394            strategy,
395            analyzer: SchemaAnalyzer::with_config(config.clone()),
396            config,
397        }
398    }
399
400    /// Create compressor with custom configuration
401    pub fn with_config(config: CompressionConfig) -> Self {
402        Self {
403            strategy: CompressionStrategy::None,
404            analyzer: SchemaAnalyzer::with_config(config.clone()),
405            config,
406        }
407    }
408
409    /// Analyze data and update compression strategy
410    pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
411        self.strategy = self.analyzer.analyze(data)?;
412        Ok(&self.strategy)
413    }
414
415    /// Compress JSON data according to current strategy
416    pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
417        match &self.strategy {
418            CompressionStrategy::None => Ok(CompressedData {
419                strategy: self.strategy.clone(),
420                compressed_size: serde_json::to_string(data)
421                    .map_err(|e| {
422                        DomainError::CompressionError(format!("JSON serialization failed: {e}"))
423                    })?
424                    .len(),
425                data: data.clone(),
426                compression_metadata: HashMap::new(),
427            }),
428
429            CompressionStrategy::Dictionary { dictionary } => {
430                self.compress_with_dictionary(data, dictionary)
431            }
432
433            CompressionStrategy::Delta { base_values } => {
434                self.compress_with_delta(data, base_values)
435            }
436
437            CompressionStrategy::RunLength => self.compress_with_run_length(data),
438
439            CompressionStrategy::Hybrid {
440                string_dict,
441                numeric_deltas,
442            } => self.compress_hybrid(data, string_dict, numeric_deltas),
443        }
444    }
445
446    /// Dictionary-based compression
447    fn compress_with_dictionary(
448        &self,
449        data: &JsonValue,
450        dictionary: &HashMap<String, u16>,
451    ) -> DomainResult<CompressedData> {
452        let mut metadata = HashMap::new();
453
454        // Store dictionary for decompression
455        for (string, index) in dictionary {
456            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
457        }
458
459        // Replace strings with dictionary indices
460        let compressed = self.replace_strings_with_indices(data, dictionary)?;
461        let compressed_size = serde_json::to_string(&compressed)
462            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
463            .len();
464
465        Ok(CompressedData {
466            strategy: self.strategy.clone(),
467            compressed_size,
468            data: compressed,
469            compression_metadata: metadata,
470        })
471    }
472
473    /// Delta compression for numeric sequences
474    fn compress_with_delta(
475        &self,
476        data: &JsonValue,
477        base_values: &HashMap<String, f64>,
478    ) -> DomainResult<CompressedData> {
479        let mut metadata = HashMap::new();
480
481        // Store base values
482        for (path, base) in base_values {
483            let number = serde_json::Number::from_f64(*base).ok_or_else(|| {
484                DomainError::CompressionError(format!(
485                    "delta base value for path '{path}' is non-finite (NaN or Infinity); cannot compress"
486                ))
487            })?;
488            metadata.insert(format!("base_{path}"), JsonValue::Number(number));
489        }
490
491        // Apply delta compression
492        let compressed = self.apply_delta_compression(data, base_values)?;
493        let compressed_size = serde_json::to_string(&compressed)
494            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
495            .len();
496
497        Ok(CompressedData {
498            strategy: self.strategy.clone(),
499            compressed_size,
500            data: compressed,
501            compression_metadata: metadata,
502        })
503    }
504
505    /// Run-length encoding compression
506    fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
507        let compressed = self.apply_run_length_encoding(data)?;
508        let compressed_size = serde_json::to_string(&compressed)
509            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
510            .len();
511
512        Ok(CompressedData {
513            strategy: self.strategy.clone(),
514            compressed_size,
515            data: compressed,
516            compression_metadata: HashMap::new(),
517        })
518    }
519
520    /// Apply run-length encoding to arrays with repeated values
521    fn apply_run_length_encoding(&self, data: &JsonValue) -> DomainResult<JsonValue> {
522        match data {
523            JsonValue::Object(obj) => {
524                let mut compressed_obj = serde_json::Map::new();
525                for (key, value) in obj {
526                    compressed_obj.insert(key.clone(), self.apply_run_length_encoding(value)?);
527                }
528                Ok(JsonValue::Object(compressed_obj))
529            }
530            JsonValue::Array(arr) if arr.len() > 2 => {
531                // Apply run-length encoding to array
532                let mut compressed_runs = Vec::new();
533                let mut current_value = None;
534                let mut run_count = 0;
535
536                for item in arr {
537                    if Some(item) == current_value.as_ref() {
538                        run_count += 1;
539                    } else {
540                        // Save previous run if it exists
541                        if let Some(value) = current_value {
542                            if run_count > self.config.min_frequency_count {
543                                // Use run-length encoding: [value, count]
544                                compressed_runs.push(json!({
545                                    "rle_value": value,
546                                    "rle_count": run_count
547                                }));
548                            } else {
549                                // Single occurrence, keep as-is
550                                compressed_runs.push(value);
551                            }
552                        }
553
554                        // Start new run
555                        current_value = Some(item.clone());
556                        run_count = 1;
557                    }
558                }
559
560                // Handle final run
561                if let Some(value) = current_value {
562                    if run_count > self.config.min_frequency_count {
563                        compressed_runs.push(json!({
564                            "rle_value": value,
565                            "rle_count": run_count
566                        }));
567                    } else {
568                        compressed_runs.push(value);
569                    }
570                }
571
572                Ok(JsonValue::Array(compressed_runs))
573            }
574            JsonValue::Array(arr) => {
575                // Array too small for run-length encoding, process recursively
576                let compressed_arr: Result<Vec<_>, _> = arr
577                    .iter()
578                    .map(|item| self.apply_run_length_encoding(item))
579                    .collect();
580                Ok(JsonValue::Array(compressed_arr?))
581            }
582            _ => Ok(data.clone()),
583        }
584    }
585
586    /// Hybrid compression combining multiple strategies
587    fn compress_hybrid(
588        &self,
589        data: &JsonValue,
590        string_dict: &HashMap<String, u16>,
591        numeric_deltas: &HashMap<String, f64>,
592    ) -> DomainResult<CompressedData> {
593        let mut metadata = HashMap::new();
594
595        // Add dictionary metadata
596        for (string, index) in string_dict {
597            metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
598        }
599
600        // Add delta base values
601        for (path, base) in numeric_deltas {
602            let number = serde_json::Number::from_f64(*base).ok_or_else(|| {
603                DomainError::CompressionError(format!(
604                    "delta base value for path '{path}' is non-finite (NaN or Infinity); cannot compress"
605                ))
606            })?;
607            metadata.insert(format!("base_{path}"), JsonValue::Number(number));
608        }
609
610        // Apply both compression strategies
611        let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
612        let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
613
614        let compressed_size = serde_json::to_string(&final_compressed)
615            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
616            .len();
617
618        Ok(CompressedData {
619            strategy: self.strategy.clone(),
620            compressed_size,
621            data: final_compressed,
622            compression_metadata: metadata,
623        })
624    }
625
626    /// Replace strings with dictionary indices
627    #[allow(clippy::only_used_in_recursion)]
628    fn replace_strings_with_indices(
629        &self,
630        data: &JsonValue,
631        dictionary: &HashMap<String, u16>,
632    ) -> DomainResult<JsonValue> {
633        match data {
634            JsonValue::Object(obj) => {
635                let mut compressed_obj = serde_json::Map::new();
636                for (key, value) in obj {
637                    compressed_obj.insert(
638                        key.clone(),
639                        self.replace_strings_with_indices(value, dictionary)?,
640                    );
641                }
642                Ok(JsonValue::Object(compressed_obj))
643            }
644            JsonValue::Array(arr) => {
645                let compressed_arr: Result<Vec<_>, _> = arr
646                    .iter()
647                    .map(|item| self.replace_strings_with_indices(item, dictionary))
648                    .collect();
649                Ok(JsonValue::Array(compressed_arr?))
650            }
651            JsonValue::String(s) => {
652                if let Some(&index) = dictionary.get(s) {
653                    Ok(JsonValue::Number(serde_json::Number::from(index)))
654                } else {
655                    Ok(data.clone())
656                }
657            }
658            _ => Ok(data.clone()),
659        }
660    }
661
662    /// Apply delta compression to numeric sequences in arrays
663    fn apply_delta_compression(
664        &self,
665        data: &JsonValue,
666        base_values: &HashMap<String, f64>,
667    ) -> DomainResult<JsonValue> {
668        self.apply_delta_recursive(data, "", base_values)
669    }
670
671    /// Recursively apply delta compression to JSON structure
672    fn apply_delta_recursive(
673        &self,
674        data: &JsonValue,
675        path: &str,
676        base_values: &HashMap<String, f64>,
677    ) -> DomainResult<JsonValue> {
678        match data {
679            JsonValue::Object(obj) => {
680                let mut compressed_obj = serde_json::Map::new();
681                for (key, value) in obj {
682                    let field_path = if path.is_empty() {
683                        key.clone()
684                    } else {
685                        format!("{path}.{key}")
686                    };
687                    compressed_obj.insert(
688                        key.clone(),
689                        self.apply_delta_recursive(value, &field_path, base_values)?,
690                    );
691                }
692                Ok(JsonValue::Object(compressed_obj))
693            }
694            JsonValue::Array(arr) if arr.len() > 2 => {
695                // Check if this array contains numeric sequences that can be delta-compressed
696                if self.is_numeric_sequence(arr) {
697                    self.compress_numeric_array_with_delta(arr, path, base_values)
698                } else {
699                    // Process array elements recursively
700                    let compressed_arr: Result<Vec<_>, _> = arr
701                        .iter()
702                        .enumerate()
703                        .map(|(idx, item)| {
704                            let item_path = format!("{path}[{idx}]");
705                            self.apply_delta_recursive(item, &item_path, base_values)
706                        })
707                        .collect();
708                    Ok(JsonValue::Array(compressed_arr?))
709                }
710            }
711            JsonValue::Array(arr) => {
712                // Array too small for delta compression, process recursively
713                let compressed_arr: Result<Vec<_>, _> = arr
714                    .iter()
715                    .enumerate()
716                    .map(|(idx, item)| {
717                        let item_path = format!("{path}[{idx}]");
718                        self.apply_delta_recursive(item, &item_path, base_values)
719                    })
720                    .collect();
721                Ok(JsonValue::Array(compressed_arr?))
722            }
723            _ => Ok(data.clone()),
724        }
725    }
726
727    /// Check if array contains a numeric sequence suitable for delta compression
728    fn is_numeric_sequence(&self, arr: &[JsonValue]) -> bool {
729        if arr.len() < self.config.min_numeric_sequence_size {
730            return false;
731        }
732
733        // Check if all elements are numbers
734        arr.iter().all(|v| v.is_number())
735    }
736
737    /// Apply delta compression to numeric array
738    fn compress_numeric_array_with_delta(
739        &self,
740        arr: &[JsonValue],
741        path: &str,
742        base_values: &HashMap<String, f64>,
743    ) -> DomainResult<JsonValue> {
744        let mut compressed_array = Vec::new();
745
746        // Extract numeric values
747        let numbers: Vec<f64> = arr.iter().filter_map(|v| v.as_f64()).collect();
748
749        if numbers.is_empty() {
750            return Ok(JsonValue::Array(arr.to_vec()));
751        }
752
753        // Use base value from analysis or first element as base
754        let base_value = base_values.get(path).copied().unwrap_or(numbers[0]);
755
756        // Add metadata for base value
757        compressed_array.push(json!({
758            "delta_base": base_value,
759            "delta_type": "numeric_sequence"
760        }));
761
762        // Calculate deltas from base value
763        let deltas: Vec<f64> = numbers.iter().map(|&num| num - base_value).collect();
764
765        // Check if delta compression is beneficial
766        let original_precision = numbers.iter().map(|n| format!("{n}").len()).sum::<usize>();
767
768        let delta_precision = deltas.iter().map(|d| format!("{d}").len()).sum::<usize>();
769
770        if delta_precision < original_precision {
771            // Delta compression is beneficial
772            compressed_array.extend(deltas.into_iter().map(JsonValue::from));
773        } else {
774            // Keep original values
775            return Ok(JsonValue::Array(arr.to_vec()));
776        }
777
778        Ok(JsonValue::Array(compressed_array))
779    }
780}
781
782/// Compressed data with metadata
783#[derive(Debug, Clone)]
784pub struct CompressedData {
785    /// Strategy that produced the compressed payload.
786    pub strategy: CompressionStrategy,
787    /// Size of `data` after JSON serialization, in bytes.
788    pub compressed_size: usize,
789    /// JSON payload after compression has been applied.
790    pub data: JsonValue,
791    /// Side-channel metadata required for decompression (dictionaries, base values, etc.).
792    pub compression_metadata: HashMap<String, JsonValue>,
793}
794
795impl CompressedData {
796    /// Calculate compression ratio
797    pub fn compression_ratio(&self, original_size: usize) -> f32 {
798        if original_size == 0 {
799            return 1.0;
800        }
801        self.compressed_size as f32 / original_size as f32
802    }
803
804    /// Get compression savings in bytes
805    pub fn compression_savings(&self, original_size: usize) -> isize {
806        original_size as isize - self.compressed_size as isize
807    }
808}
809
810impl Default for SchemaAnalyzer {
811    fn default() -> Self {
812        Self::new()
813    }
814}
815
816impl Default for SchemaCompressor {
817    fn default() -> Self {
818        Self::new()
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825    use serde_json::json;
826
827    #[test]
828    fn test_schema_analyzer_dictionary_potential() {
829        let mut analyzer = SchemaAnalyzer::new();
830
831        let data = json!({
832            "users": [
833                {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
834                {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
835                {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
836                {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
837                {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
838                {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
839                {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
840                {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
841            ]
842        });
843
844        let strategy = analyzer.analyze(&data).unwrap();
845
846        // Should detect repeating strings like "admin", "active"
847        match strategy {
848            CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
849                // Expected outcome
850            }
851            _ => panic!("Expected dictionary-based compression strategy"),
852        }
853    }
854
855    #[test]
856    fn test_schema_compressor_basic() {
857        let compressor = SchemaCompressor::new();
858
859        let data = json!({
860            "message": "hello world",
861            "count": 42
862        });
863
864        let original_size = serde_json::to_string(&data).unwrap().len();
865        let compressed = compressor.compress(&data).unwrap();
866
867        assert!(compressed.compressed_size > 0);
868        assert!(compressed.compression_ratio(original_size) <= 1.0);
869    }
870
871    #[test]
872    fn test_dictionary_compression() {
873        let mut dictionary = HashMap::new();
874        dictionary.insert("active".to_string(), 0);
875        dictionary.insert("admin".to_string(), 1);
876
877        let compressor =
878            SchemaCompressor::with_strategy(CompressionStrategy::Dictionary { dictionary });
879
880        let data = json!({
881            "status": "active",
882            "role": "admin",
883            "description": "active admin user"
884        });
885
886        let result = compressor.compress(&data).unwrap();
887
888        // Verify compression metadata contains dictionary
889        assert!(result.compression_metadata.contains_key("dict_0"));
890        assert!(result.compression_metadata.contains_key("dict_1"));
891    }
892
893    #[test]
894    fn test_compression_strategy_selection() {
895        let mut analyzer = SchemaAnalyzer::new();
896
897        // Test data with no clear patterns
898        let simple_data = json!({
899            "unique_field_1": "unique_value_1",
900            "unique_field_2": "unique_value_2"
901        });
902
903        let strategy = analyzer.analyze(&simple_data).unwrap();
904        assert_eq!(strategy, CompressionStrategy::None);
905    }
906
907    #[test]
908    fn test_numeric_delta_analysis() {
909        let mut analyzer = SchemaAnalyzer::new();
910
911        let data = json!({
912            "measurements": [
913                {"time": 100, "value": 10.0},
914                {"time": 101, "value": 10.5},
915                {"time": 102, "value": 11.0},
916                {"time": 103, "value": 11.5}
917            ]
918        });
919
920        let _strategy = analyzer.analyze(&data).unwrap();
921
922        // Should detect incremental numeric patterns
923        assert!(!analyzer.numeric_fields.is_empty());
924    }
925
926    #[test]
927    fn test_run_length_encoding() {
928        let compressor = SchemaCompressor::with_strategy(CompressionStrategy::RunLength);
929
930        let data = json!({
931            "repeated_values": [1, 1, 1, 2, 2, 3, 3, 3, 3]
932        });
933
934        let result = compressor.compress(&data).unwrap();
935
936        // Should compress repeated sequences
937        assert!(result.compressed_size > 0);
938
939        // Verify RLE format in the compressed data
940        let compressed_array = &result.data["repeated_values"];
941        assert!(compressed_array.is_array());
942
943        // Should contain RLE objects
944        let array = compressed_array.as_array().unwrap();
945        let has_rle = array.iter().any(|v| v.get("rle_value").is_some());
946        assert!(has_rle);
947    }
948
949    #[test]
950    fn test_delta_compression() {
951        let mut base_values = HashMap::new();
952        base_values.insert("sequence".to_string(), 100.0);
953
954        let compressor =
955            SchemaCompressor::with_strategy(CompressionStrategy::Delta { base_values });
956
957        let data = json!({
958            "sequence": [100.0, 101.0, 102.0, 103.0, 104.0]
959        });
960
961        let result = compressor.compress(&data).unwrap();
962
963        // Should apply delta compression
964        assert!(result.compressed_size > 0);
965
966        // Verify delta format in the compressed data
967        let compressed_array = &result.data["sequence"];
968        assert!(compressed_array.is_array());
969
970        // Should contain delta metadata
971        let array = compressed_array.as_array().unwrap();
972        let has_delta_base = array.iter().any(|v| v.get("delta_base").is_some());
973        assert!(has_delta_base);
974    }
975
976    #[test]
977    fn test_delta_compression_rejects_nan_base() {
978        let mut base_values = HashMap::new();
979        base_values.insert("sequence".to_string(), f64::NAN);
980
981        let compressor =
982            SchemaCompressor::with_strategy(CompressionStrategy::Delta { base_values });
983
984        let data = json!({ "sequence": [1.0, 2.0, 3.0] });
985
986        let err = compressor
987            .compress(&data)
988            .expect_err("expected error for NaN base");
989        match err {
990            DomainError::CompressionError(msg) => {
991                assert!(msg.contains("non-finite"), "unexpected message: {msg}");
992                assert!(msg.contains("sequence"), "expected path in message: {msg}");
993            }
994            other => panic!("expected CompressionError, got {other:?}"),
995        }
996    }
997
998    #[test]
999    fn test_delta_compression_rejects_infinity_base() {
1000        let mut base_values = HashMap::new();
1001        base_values.insert("sequence".to_string(), f64::INFINITY);
1002
1003        let compressor =
1004            SchemaCompressor::with_strategy(CompressionStrategy::Delta { base_values });
1005
1006        let data = json!({ "sequence": [1.0, 2.0, 3.0] });
1007
1008        let err = compressor
1009            .compress(&data)
1010            .expect_err("expected error for Infinity base");
1011        assert!(matches!(err, DomainError::CompressionError(_)));
1012    }
1013
1014    #[test]
1015    fn test_hybrid_compression_rejects_nan_base() {
1016        let string_dict = HashMap::new();
1017        let mut numeric_deltas = HashMap::new();
1018        numeric_deltas.insert("sequence".to_string(), f64::NEG_INFINITY);
1019
1020        let compressor = SchemaCompressor::with_strategy(CompressionStrategy::Hybrid {
1021            string_dict,
1022            numeric_deltas,
1023        });
1024
1025        let data = json!({ "sequence": [1.0, 2.0, 3.0] });
1026
1027        let err = compressor
1028            .compress(&data)
1029            .expect_err("expected error for non-finite base");
1030        match err {
1031            DomainError::CompressionError(msg) => {
1032                assert!(msg.contains("non-finite"), "unexpected message: {msg}");
1033            }
1034            other => panic!("expected CompressionError, got {other:?}"),
1035        }
1036    }
1037}