Skip to main content

pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8    domain::{DomainError, DomainResult},
9    stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14// Security limits to prevent decompression bomb attacks
15const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; // 10MB
18
19/// Streaming compressor that maintains compression state across frames
20#[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22    /// Primary compressor for skeleton and critical data
23    skeleton_compressor: SchemaCompressor,
24    /// Secondary compressor for non-critical data
25    content_compressor: SchemaCompressor,
26    /// Compression statistics
27    stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct CompressionStats {
32    /// Total bytes processed
33    pub total_input_bytes: usize,
34    /// Total bytes after compression
35    pub total_output_bytes: usize,
36    /// Number of frames processed
37    pub frames_processed: u32,
38    /// Compression ratio by priority level
39    pub priority_ratios: HashMap<u8, f32>,
40}
41
42/// Compressed stream frame with metadata
43#[derive(Debug, Clone)]
44pub struct CompressedFrame {
45    /// Original frame metadata
46    pub frame: StreamFrame,
47    /// Compressed data
48    pub compressed_data: CompressedData,
49    /// Decompression instructions for client
50    pub decompression_metadata: DecompressionMetadata,
51}
52
53#[derive(Debug, Clone)]
54pub struct DecompressionMetadata {
55    /// Compression strategy used
56    pub strategy: CompressionStrategy,
57    /// Dictionary indices mapping
58    pub dictionary_map: HashMap<u16, String>,
59    /// Delta base values for numeric decompression
60    pub delta_bases: HashMap<String, f64>,
61    /// Priority-specific decompression hints
62    pub priority_hints: HashMap<u8, String>,
63}
64
65impl StreamingCompressor {
66    /// Create new streaming compressor
67    pub fn new() -> Self {
68        Self {
69            skeleton_compressor: SchemaCompressor::new(),
70            content_compressor: SchemaCompressor::new(),
71            stats: CompressionStats::default(),
72        }
73    }
74
75    /// Create with custom compression strategies
76    pub fn with_strategies(
77        skeleton_strategy: CompressionStrategy,
78        content_strategy: CompressionStrategy,
79    ) -> Self {
80        Self {
81            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
82            content_compressor: SchemaCompressor::with_strategy(content_strategy),
83            stats: CompressionStats::default(),
84        }
85    }
86
87    /// Process and compress a stream frame based on its priority
88    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
89        let compressor = self.select_compressor_for_priority(frame.priority);
90
91        // Calculate original size
92        let original_size = serde_json::to_string(&frame.data)
93            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
94            .len();
95
96        // Compress based on frame content and priority
97        let compressed_data = compressor.compress(&frame.data)?;
98
99        // Update statistics
100        self.update_stats(
101            frame.priority,
102            original_size,
103            compressed_data.compressed_size,
104        );
105
106        // Create decompression metadata
107        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
108
109        Ok(CompressedFrame {
110            frame,
111            compressed_data,
112            decompression_metadata,
113        })
114    }
115
116    /// Analyze JSON data to optimize compression strategies
117    pub fn optimize_for_data(
118        &mut self,
119        skeleton: &JsonValue,
120        sample_data: &[JsonValue],
121    ) -> DomainResult<()> {
122        // Optimize skeleton compressor for critical structural data
123        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
124
125        // Analyze sample content data to optimize content compressor
126        if !sample_data.is_empty() {
127            // Combine samples for comprehensive analysis
128            let combined_sample = JsonValue::Array(sample_data.to_vec());
129            self.content_compressor
130                .analyze_and_optimize(&combined_sample)?;
131        }
132
133        Ok(())
134    }
135
136    /// Get current compression statistics
137    pub fn get_stats(&self) -> &CompressionStats {
138        &self.stats
139    }
140
141    /// Reset compression statistics
142    pub fn reset_stats(&mut self) {
143        self.stats = CompressionStats::default();
144    }
145
146    /// Select appropriate compressor based on frame priority
147    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
148        match priority {
149            // Critical data (skeleton, errors) - use specialized compressor
150            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
151            // Regular content data - use content compressor
152            _ => &mut self.content_compressor,
153        }
154    }
155
156    /// Update compression statistics
157    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
158        self.stats.total_input_bytes += original_size;
159        self.stats.total_output_bytes += compressed_size;
160        self.stats.frames_processed += 1;
161
162        let ratio = if original_size > 0 {
163            compressed_size as f32 / original_size as f32
164        } else {
165            1.0
166        };
167
168        self.stats.priority_ratios.insert(priority.value(), ratio);
169    }
170
171    /// Create decompression metadata for client
172    fn create_decompression_metadata(
173        &self,
174        compressed_data: &CompressedData,
175    ) -> DomainResult<DecompressionMetadata> {
176        let mut dictionary_map = HashMap::new();
177        let mut delta_bases = HashMap::new();
178
179        // Extract dictionary mappings
180        for (key, value) in &compressed_data.compression_metadata {
181            if key.starts_with("dict_") {
182                if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
183                    && let Some(string_val) = value.as_str()
184                {
185                    dictionary_map.insert(index, string_val.to_string());
186                }
187            } else if key.starts_with("base_") {
188                let path = key.strip_prefix("base_").unwrap();
189                if let Some(num) = value.as_f64() {
190                    delta_bases.insert(path.to_string(), num);
191                }
192            }
193        }
194
195        Ok(DecompressionMetadata {
196            strategy: compressed_data.strategy.clone(),
197            dictionary_map,
198            delta_bases,
199            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
200        })
201    }
202}
203
204impl CompressionStats {
205    /// Calculate overall compression ratio
206    pub fn overall_compression_ratio(&self) -> f32 {
207        if self.total_input_bytes == 0 {
208            return 1.0;
209        }
210        self.total_output_bytes as f32 / self.total_input_bytes as f32
211    }
212
213    /// Get compression ratio for specific priority level
214    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
215        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
216    }
217
218    /// Calculate bytes saved
219    pub fn bytes_saved(&self) -> isize {
220        self.total_input_bytes as isize - self.total_output_bytes as isize
221    }
222
223    /// Calculate percentage saved
224    pub fn percentage_saved(&self) -> f32 {
225        if self.total_input_bytes == 0 {
226            return 0.0;
227        }
228        let ratio = self.overall_compression_ratio();
229        (1.0 - ratio) * 100.0
230    }
231}
232
233/// Client-side decompressor for receiving compressed frames
234#[derive(Debug, Clone)]
235pub struct StreamingDecompressor {
236    /// Active dictionary for string decompression
237    active_dictionary: HashMap<u16, String>,
238    /// Delta base values for numeric decompression  
239    delta_bases: HashMap<String, f64>,
240    /// Decompression statistics
241    stats: DecompressionStats,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct DecompressionStats {
246    /// Total frames decompressed
247    pub frames_decompressed: u32,
248    /// Total bytes decompressed
249    pub total_decompressed_bytes: usize,
250    /// Average decompression time in microseconds
251    pub avg_decompression_time_us: u64,
252}
253
254impl StreamingDecompressor {
255    /// Create new streaming decompressor
256    pub fn new() -> Self {
257        Self {
258            active_dictionary: HashMap::new(),
259            delta_bases: HashMap::new(),
260            stats: DecompressionStats::default(),
261        }
262    }
263
264    /// Decompress a compressed frame
265    pub fn decompress_frame(
266        &mut self,
267        compressed_frame: CompressedFrame,
268    ) -> DomainResult<StreamFrame> {
269        let start_time = std::time::Instant::now();
270
271        // Update decompression context with metadata
272        self.update_context(&compressed_frame.decompression_metadata)?;
273
274        // Decompress data based on strategy
275        let decompressed_data = self.decompress_data(
276            &compressed_frame.compressed_data,
277            &compressed_frame.decompression_metadata.strategy,
278        )?;
279
280        // Update statistics
281        let decompression_time = start_time.elapsed();
282        self.update_decompression_stats(&decompressed_data, decompression_time);
283
284        Ok(StreamFrame {
285            data: decompressed_data,
286            priority: compressed_frame.frame.priority,
287            metadata: compressed_frame.frame.metadata,
288        })
289    }
290
291    /// Update decompression context with new metadata
292    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
293        // Update dictionary
294        for (&index, string) in &metadata.dictionary_map {
295            self.active_dictionary.insert(index, string.clone());
296        }
297
298        // Update delta bases
299        for (path, &base) in &metadata.delta_bases {
300            self.delta_bases.insert(path.clone(), base);
301        }
302
303        Ok(())
304    }
305
306    /// Decompress data according to strategy
307    fn decompress_data(
308        &self,
309        compressed_data: &CompressedData,
310        strategy: &CompressionStrategy,
311    ) -> DomainResult<JsonValue> {
312        match strategy {
313            CompressionStrategy::None => Ok(compressed_data.data.clone()),
314
315            CompressionStrategy::Dictionary { .. } => {
316                self.decompress_dictionary(&compressed_data.data)
317            }
318
319            CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
320
321            CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
322
323            CompressionStrategy::Hybrid { .. } => {
324                // Apply decompression in reverse order: delta first, then dictionary
325                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
326                self.decompress_dictionary(&delta_decompressed)
327            }
328        }
329    }
330
331    /// Decompress dictionary-encoded strings
332    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
333        match data {
334            JsonValue::Object(obj) => {
335                let mut decompressed = serde_json::Map::new();
336                for (key, value) in obj {
337                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
338                }
339                Ok(JsonValue::Object(decompressed))
340            }
341            JsonValue::Array(arr) => {
342                let decompressed: Result<Vec<_>, _> = arr
343                    .iter()
344                    .map(|item| self.decompress_dictionary(item))
345                    .collect();
346                Ok(JsonValue::Array(decompressed?))
347            }
348            JsonValue::Number(n) => {
349                // Check if this is a dictionary index
350                if let Some(index) = n.as_u64()
351                    && let Some(string_val) = self.active_dictionary.get(&(index as u16))
352                {
353                    return Ok(JsonValue::String(string_val.clone()));
354                }
355                Ok(data.clone())
356            }
357            _ => Ok(data.clone()),
358        }
359    }
360
361    /// Decompress delta-encoded values
362    pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
363        match data {
364            JsonValue::Object(obj) => {
365                let mut decompressed_obj = serde_json::Map::new();
366                for (key, value) in obj {
367                    decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
368                }
369                Ok(JsonValue::Object(decompressed_obj))
370            }
371            JsonValue::Array(arr) => {
372                if arr.is_empty() {
373                    return Ok(JsonValue::Array(arr.clone()));
374                }
375
376                // Check if this is a delta-compressed array
377                if let Some(first) = arr.first()
378                    && let Some(obj) = first.as_object()
379                    && obj.contains_key("delta_base")
380                    && obj.contains_key("delta_type")
381                {
382                    // This is a delta-compressed numeric sequence
383                    return self.decompress_delta_array(arr);
384                }
385
386                // Not a delta-compressed array, process elements recursively
387                let decompressed_arr: Result<Vec<_>, _> =
388                    arr.iter().map(|item| self.decompress_delta(item)).collect();
389                Ok(JsonValue::Array(decompressed_arr?))
390            }
391            _ => Ok(data.clone()),
392        }
393    }
394
395    /// Decompress delta-encoded array back to original values
396    fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
397        if arr.is_empty() {
398            return Ok(JsonValue::Array(Vec::new()));
399        }
400
401        // VULN-002 FIX: Validate array size to prevent memory exhaustion
402        if arr.len() > MAX_DELTA_ARRAY_SIZE {
403            return Err(DomainError::CompressionError(format!(
404                "Delta array size {} exceeds maximum {}",
405                arr.len(),
406                MAX_DELTA_ARRAY_SIZE
407            )));
408        }
409
410        // Extract base value from metadata
411        let base_value = arr[0]
412            .get("delta_base")
413            .and_then(|v| v.as_f64())
414            .ok_or_else(|| {
415                DomainError::CompressionError(
416                    "Missing or invalid delta_base in metadata".to_string(),
417                )
418            })?;
419
420        // Reconstruct original values from deltas
421        let mut original_values = Vec::new();
422        for delta_value in arr.iter().skip(1) {
423            let delta = delta_value.as_f64().ok_or_else(|| {
424                DomainError::CompressionError("Invalid delta value: expected number".to_string())
425            })?;
426
427            let original = base_value + delta;
428            original_values.push(JsonValue::from(original));
429        }
430
431        Ok(JsonValue::Array(original_values))
432    }
433
434    /// Decompress run-length encoded data
435    pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
436        match data {
437            JsonValue::Object(obj) => {
438                let mut decompressed_obj = serde_json::Map::new();
439                for (key, value) in obj {
440                    decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
441                }
442                Ok(JsonValue::Object(decompressed_obj))
443            }
444            JsonValue::Array(arr) => {
445                let mut decompressed_values = Vec::new();
446                let mut total_size = 0usize;
447
448                for item in arr {
449                    if let Some(obj) = item.as_object() {
450                        // Validate RLE object integrity: both keys must be present or both absent
451                        let has_rle_value = obj.contains_key("rle_value");
452                        let has_rle_count = obj.contains_key("rle_count");
453
454                        if has_rle_value && !has_rle_count {
455                            return Err(DomainError::CompressionError(
456                                "Malformed RLE object: rle_value without rle_count".to_string(),
457                            ));
458                        }
459                        if has_rle_count && !has_rle_value {
460                            return Err(DomainError::CompressionError(
461                                "Malformed RLE object: rle_count without rle_value".to_string(),
462                            ));
463                        }
464
465                        // Check if this is an RLE-encoded run
466                        if has_rle_value && has_rle_count {
467                            let value = obj
468                                .get("rle_value")
469                                .ok_or_else(|| {
470                                    DomainError::CompressionError("Missing rle_value".to_string())
471                                })?
472                                .clone();
473
474                            let count =
475                                obj.get("rle_count")
476                                    .and_then(|v| v.as_u64())
477                                    .ok_or_else(|| {
478                                        DomainError::CompressionError(
479                                            "Invalid rle_count: expected positive integer"
480                                                .to_string(),
481                                        )
482                                    })?;
483
484                            // VULN-001 FIX: Validate RLE count to prevent decompression bomb
485                            if count > MAX_RLE_COUNT {
486                                return Err(DomainError::CompressionError(format!(
487                                    "RLE count {} exceeds maximum {}",
488                                    count, MAX_RLE_COUNT
489                                )));
490                            }
491
492                            // VULN-003 FIX: Convert u64 to usize safely to prevent overflow
493                            let count_usize = usize::try_from(count).map_err(|_| {
494                                DomainError::CompressionError(format!(
495                                    "RLE count {} exceeds platform maximum",
496                                    count
497                                ))
498                            })?;
499
500                            // Track total decompressed size across all RLE runs
501                            total_size = total_size.checked_add(count_usize).ok_or_else(|| {
502                                DomainError::CompressionError(
503                                    "Total decompressed size overflow".to_string(),
504                                )
505                            })?;
506
507                            if total_size > MAX_DECOMPRESSED_SIZE {
508                                return Err(DomainError::CompressionError(format!(
509                                    "Decompressed size {} exceeds maximum {}",
510                                    total_size, MAX_DECOMPRESSED_SIZE
511                                )));
512                            }
513
514                            // Expand the run
515                            for _ in 0..count {
516                                decompressed_values.push(value.clone());
517                            }
518                        } else {
519                            // Not an RLE object, process recursively
520                            decompressed_values.push(self.decompress_run_length(item)?);
521                        }
522                    } else {
523                        // Not an object, process recursively
524                        decompressed_values.push(self.decompress_run_length(item)?);
525                    }
526                }
527
528                Ok(JsonValue::Array(decompressed_values))
529            }
530            _ => Ok(data.clone()),
531        }
532    }
533
534    /// Update decompression statistics
535    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
536        self.stats.frames_decompressed += 1;
537
538        if let Ok(serialized) = serde_json::to_string(data) {
539            self.stats.total_decompressed_bytes += serialized.len();
540        }
541
542        let new_time_us = duration.as_micros() as u64;
543        if self.stats.frames_decompressed == 1 {
544            self.stats.avg_decompression_time_us = new_time_us;
545        } else {
546            // Calculate running average
547            let total_frames = self.stats.frames_decompressed as u64;
548            let total_time =
549                self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
550            self.stats.avg_decompression_time_us = total_time / total_frames;
551        }
552    }
553
554    /// Get decompression statistics
555    pub fn get_stats(&self) -> &DecompressionStats {
556        &self.stats
557    }
558}
559
560impl Default for StreamingCompressor {
561    fn default() -> Self {
562        Self::new()
563    }
564}
565
566impl Default for StreamingDecompressor {
567    fn default() -> Self {
568        Self::new()
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use serde_json::json;
576
577    #[test]
578    fn test_streaming_compressor_basic() {
579        let mut compressor = StreamingCompressor::new();
580
581        let frame = StreamFrame {
582            data: json!({
583                "message": "test message",
584                "count": 42
585            }),
586            priority: Priority::MEDIUM,
587            metadata: HashMap::new(),
588        };
589
590        let result = compressor.compress_frame(frame);
591        assert!(result.is_ok());
592
593        let compressed = result.unwrap();
594        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
595    }
596
597    #[test]
598    fn test_compression_stats() {
599        let stats = CompressionStats {
600            total_input_bytes: 1000,
601            total_output_bytes: 600,
602            ..Default::default()
603        };
604
605        assert_eq!(stats.overall_compression_ratio(), 0.6);
606        assert_eq!(stats.bytes_saved(), 400);
607        // Use approximate comparison for float precision
608        let percentage = stats.percentage_saved();
609        assert!((percentage - 40.0).abs() < 0.001);
610    }
611
612    #[test]
613    fn test_streaming_decompressor_basic() {
614        let mut decompressor = StreamingDecompressor::new();
615
616        let compressed_frame = CompressedFrame {
617            frame: StreamFrame {
618                data: json!({"test": "data"}),
619                priority: Priority::MEDIUM,
620                metadata: HashMap::new(),
621            },
622            compressed_data: CompressedData {
623                strategy: CompressionStrategy::None,
624                compressed_size: 20,
625                data: json!({"test": "data"}),
626                compression_metadata: HashMap::new(),
627            },
628            decompression_metadata: DecompressionMetadata {
629                strategy: CompressionStrategy::None,
630                dictionary_map: HashMap::new(),
631                delta_bases: HashMap::new(),
632                priority_hints: HashMap::new(),
633            },
634        };
635
636        let result = decompressor.decompress_frame(compressed_frame);
637        assert!(result.is_ok());
638
639        let decompressed = result.unwrap();
640        assert_eq!(decompressed.data, json!({"test": "data"}));
641    }
642
643    #[test]
644    fn test_dictionary_decompression() {
645        let mut decompressor = StreamingDecompressor::new();
646        decompressor
647            .active_dictionary
648            .insert(0, "hello".to_string());
649        decompressor
650            .active_dictionary
651            .insert(1, "world".to_string());
652
653        // Test with dictionary indices
654        let compressed = json!({
655            "greeting": 0,
656            "target": 1
657        });
658
659        let result = decompressor.decompress_dictionary(&compressed).unwrap();
660        assert_eq!(
661            result,
662            json!({
663                "greeting": "hello",
664                "target": "world"
665            })
666        );
667    }
668
669    #[test]
670    fn test_priority_based_compression() {
671        let mut compressor = StreamingCompressor::new();
672
673        let critical_frame = StreamFrame {
674            data: json!({"error": "critical failure"}),
675            priority: Priority::CRITICAL,
676            metadata: HashMap::new(),
677        };
678
679        let low_frame = StreamFrame {
680            data: json!({"debug": "verbose information"}),
681            priority: Priority::LOW,
682            metadata: HashMap::new(),
683        };
684
685        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
686        let _low_result = compressor.compress_frame(low_frame).unwrap();
687
688        let stats = compressor.get_stats();
689        assert_eq!(stats.frames_processed, 2);
690        assert!(stats.total_input_bytes > 0);
691    }
692
693    #[test]
694    fn test_delta_decompression_basic() {
695        let decompressor = StreamingDecompressor::new();
696
697        let compressed_data = json!([
698            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
699            0.0,
700            1.0,
701            2.0,
702            3.0,
703            4.0
704        ]);
705
706        let result = decompressor.decompress_delta(&compressed_data).unwrap();
707        assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
708    }
709
710    #[test]
711    fn test_delta_decompression_negative_deltas() {
712        let decompressor = StreamingDecompressor::new();
713
714        let compressed_data = json!([
715            {"delta_base": 50.0, "delta_type": "numeric_sequence"},
716            -10.0,
717            0.0,
718            10.0,
719            20.0
720        ]);
721
722        let result = decompressor.decompress_delta(&compressed_data).unwrap();
723        assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
724    }
725
726    #[test]
727    fn test_delta_decompression_fractional_deltas() {
728        let decompressor = StreamingDecompressor::new();
729
730        let compressed_data = json!([
731            {"delta_base": 10.0, "delta_type": "numeric_sequence"},
732            0.5,
733            1.0,
734            1.5,
735            2.0
736        ]);
737
738        let result = decompressor.decompress_delta(&compressed_data).unwrap();
739        assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
740    }
741
742    #[test]
743    fn test_delta_decompression_empty_array() {
744        let decompressor = StreamingDecompressor::new();
745
746        let compressed_data = json!([]);
747
748        let result = decompressor.decompress_delta(&compressed_data).unwrap();
749        assert_eq!(result, json!([]));
750    }
751
752    #[test]
753    fn test_delta_decompression_single_element() {
754        let decompressor = StreamingDecompressor::new();
755
756        let compressed_data = json!([
757            {"delta_base": 100.0, "delta_type": "numeric_sequence"}
758        ]);
759
760        let result = decompressor.decompress_delta(&compressed_data).unwrap();
761        assert_eq!(result, json!([]));
762    }
763
764    #[test]
765    fn test_delta_decompression_nested_structure() {
766        let decompressor = StreamingDecompressor::new();
767
768        let compressed_data = json!({
769            "sequence": [
770                {"delta_base": 100.0, "delta_type": "numeric_sequence"},
771                0.0,
772                1.0,
773                2.0
774            ],
775            "other": "data"
776        });
777
778        let result = decompressor.decompress_delta(&compressed_data).unwrap();
779        assert_eq!(
780            result,
781            json!({
782                "sequence": [100.0, 101.0, 102.0],
783                "other": "data"
784            })
785        );
786    }
787
788    #[test]
789    fn test_delta_decompression_invalid_metadata() {
790        let decompressor = StreamingDecompressor::new();
791
792        let compressed_data = json!([
793            {"wrong_key": 100.0},
794            0.0,
795            1.0
796        ]);
797
798        let result = decompressor.decompress_delta(&compressed_data);
799        assert!(result.is_ok());
800        // Should return as-is if not valid delta format
801    }
802
803    #[test]
804    fn test_delta_decompression_invalid_delta_value() {
805        let decompressor = StreamingDecompressor::new();
806
807        let compressed_data = json!([
808            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
809            "not_a_number"
810        ]);
811
812        let result = decompressor.decompress_delta(&compressed_data);
813        assert!(result.is_err());
814    }
815
816    #[test]
817    fn test_rle_decompression_basic() {
818        let decompressor = StreamingDecompressor::new();
819
820        let compressed_data = json!([
821            {"rle_value": 1, "rle_count": 3},
822            {"rle_value": 2, "rle_count": 2},
823            {"rle_value": 3, "rle_count": 4}
824        ]);
825
826        let result = decompressor
827            .decompress_run_length(&compressed_data)
828            .unwrap();
829        assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
830    }
831
832    #[test]
833    fn test_rle_decompression_mixed_runs() {
834        let decompressor = StreamingDecompressor::new();
835
836        let compressed_data = json!([
837            {"rle_value": "a", "rle_count": 2},
838            "b",
839            {"rle_value": "c", "rle_count": 3}
840        ]);
841
842        let result = decompressor
843            .decompress_run_length(&compressed_data)
844            .unwrap();
845        assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
846    }
847
848    #[test]
849    fn test_rle_decompression_single_count() {
850        let decompressor = StreamingDecompressor::new();
851
852        let compressed_data = json!([
853            {"rle_value": "x", "rle_count": 1}
854        ]);
855
856        let result = decompressor
857            .decompress_run_length(&compressed_data)
858            .unwrap();
859        assert_eq!(result, json!(["x"]));
860    }
861
862    #[test]
863    fn test_rle_decompression_zero_count() {
864        let decompressor = StreamingDecompressor::new();
865
866        let compressed_data = json!([
867            {"rle_value": "x", "rle_count": 0}
868        ]);
869
870        let result = decompressor
871            .decompress_run_length(&compressed_data)
872            .unwrap();
873        assert_eq!(result, json!([]));
874    }
875
876    #[test]
877    fn test_rle_decompression_nested_values() {
878        let decompressor = StreamingDecompressor::new();
879
880        let compressed_data = json!([
881            {"rle_value": {"name": "test"}, "rle_count": 3}
882        ]);
883
884        let result = decompressor
885            .decompress_run_length(&compressed_data)
886            .unwrap();
887        assert_eq!(
888            result,
889            json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
890        );
891    }
892
893    #[test]
894    fn test_rle_decompression_nested_structure() {
895        let decompressor = StreamingDecompressor::new();
896
897        let compressed_data = json!({
898            "data": [
899                {"rle_value": 1, "rle_count": 3},
900                {"rle_value": 2, "rle_count": 2}
901            ],
902            "other": "field"
903        });
904
905        let result = decompressor
906            .decompress_run_length(&compressed_data)
907            .unwrap();
908        assert_eq!(
909            result,
910            json!({
911                "data": [1, 1, 1, 2, 2],
912                "other": "field"
913            })
914        );
915    }
916
917    #[test]
918    fn test_rle_decompression_empty_array() {
919        let decompressor = StreamingDecompressor::new();
920
921        let compressed_data = json!([]);
922
923        let result = decompressor
924            .decompress_run_length(&compressed_data)
925            .unwrap();
926        assert_eq!(result, json!([]));
927    }
928
929    #[test]
930    fn test_rle_decompression_invalid_count() {
931        let decompressor = StreamingDecompressor::new();
932
933        let compressed_data = json!([
934            {"rle_value": "x", "rle_count": "not_a_number"}
935        ]);
936
937        let result = decompressor.decompress_run_length(&compressed_data);
938        assert!(result.is_err());
939    }
940
941    #[test]
942    fn test_rle_decompression_missing_value() {
943        let decompressor = StreamingDecompressor::new();
944
945        let compressed_data = json!([
946            {"rle_count": 3}
947        ]);
948
949        let result = decompressor.decompress_run_length(&compressed_data);
950        assert!(result.is_err());
951    }
952
953    #[test]
954    fn test_rle_decompression_missing_count() {
955        let decompressor = StreamingDecompressor::new();
956
957        let compressed_data = json!([
958            {"rle_value": "x"}
959        ]);
960
961        let result = decompressor.decompress_run_length(&compressed_data);
962        assert!(result.is_err());
963    }
964
965    #[test]
966    fn test_rle_decompression_non_rle_objects() {
967        let decompressor = StreamingDecompressor::new();
968
969        let compressed_data = json!([
970            {"regular": "object"},
971            {"another": "one"}
972        ]);
973
974        let result = decompressor
975            .decompress_run_length(&compressed_data)
976            .unwrap();
977        // Should return as-is if objects don't have RLE format
978        assert_eq!(
979            result,
980            json!([
981                {"regular": "object"},
982                {"another": "one"}
983            ])
984        );
985    }
986
987    // NEW TESTS FOR COVERAGE IMPROVEMENT (Task P2-TEST-002)
988
989    #[test]
990    fn test_compress_frame_with_custom_strategies() {
991        let mut dict = HashMap::new();
992        dict.insert("test".to_string(), 0);
993
994        let mut bases = HashMap::new();
995        bases.insert("value".to_string(), 100.0);
996
997        let mut compressor = StreamingCompressor::with_strategies(
998            CompressionStrategy::Dictionary { dictionary: dict },
999            CompressionStrategy::Delta { base_values: bases },
1000        );
1001
1002        let frame = StreamFrame {
1003            data: json!({"value": 123, "other": 456}),
1004            priority: Priority::HIGH,
1005            metadata: HashMap::new(),
1006        };
1007
1008        let result = compressor.compress_frame(frame);
1009        assert!(result.is_ok());
1010        assert_eq!(compressor.stats.frames_processed, 1);
1011    }
1012
1013    #[test]
1014    fn test_optimize_for_data_with_samples() {
1015        let mut compressor = StreamingCompressor::new();
1016
1017        let skeleton = json!({
1018            "id": null,
1019            "name": null
1020        });
1021
1022        let samples = vec![
1023            json!({"id": 1, "name": "test1"}),
1024            json!({"id": 2, "name": "test2"}),
1025            json!({"id": 3, "name": "test3"}),
1026        ];
1027
1028        let result = compressor.optimize_for_data(&skeleton, &samples);
1029        assert!(result.is_ok());
1030    }
1031
1032    #[test]
1033    fn test_optimize_for_data_empty_samples() {
1034        let mut compressor = StreamingCompressor::new();
1035
1036        let skeleton = json!({"key": "value"});
1037        let result = compressor.optimize_for_data(&skeleton, &[]);
1038        assert!(result.is_ok());
1039    }
1040
1041    #[test]
1042    fn test_reset_stats() {
1043        let mut compressor = StreamingCompressor::new();
1044
1045        compressor.stats.total_input_bytes = 1000;
1046        compressor.stats.total_output_bytes = 500;
1047        compressor.stats.frames_processed = 10;
1048
1049        compressor.reset_stats();
1050
1051        assert_eq!(compressor.stats.total_input_bytes, 0);
1052        assert_eq!(compressor.stats.total_output_bytes, 0);
1053        assert_eq!(compressor.stats.frames_processed, 0);
1054    }
1055
1056    #[test]
1057    fn test_compressor_critical_vs_low_priority() {
1058        let mut compressor = StreamingCompressor::new();
1059
1060        let critical_frame = StreamFrame {
1061            data: json!({"critical": "data"}),
1062            priority: Priority::CRITICAL,
1063            metadata: HashMap::new(),
1064        };
1065
1066        let low_frame = StreamFrame {
1067            data: json!({"low": "data"}),
1068            priority: Priority::LOW,
1069            metadata: HashMap::new(),
1070        };
1071
1072        compressor.compress_frame(critical_frame).unwrap();
1073        compressor.compress_frame(low_frame).unwrap();
1074
1075        assert_eq!(compressor.stats.frames_processed, 2);
1076    }
1077
1078    #[test]
1079    fn test_decompressor_hybrid_strategy() {
1080        let mut decompressor = StreamingDecompressor::new();
1081
1082        // Setup delta bases and dictionary
1083        decompressor.delta_bases.insert("value".to_string(), 100.0);
1084        decompressor.active_dictionary.insert(0, "test".to_string());
1085
1086        let mut string_dict = HashMap::new();
1087        string_dict.insert("test".to_string(), 0);
1088
1089        let mut numeric_deltas = HashMap::new();
1090        numeric_deltas.insert("value".to_string(), 100.0);
1091
1092        let compressed_frame = CompressedFrame {
1093            frame: StreamFrame {
1094                data: json!({"test": "data"}),
1095                priority: Priority::MEDIUM,
1096                metadata: HashMap::new(),
1097            },
1098            compressed_data: CompressedData {
1099                strategy: CompressionStrategy::Hybrid {
1100                    string_dict: string_dict.clone(),
1101                    numeric_deltas: numeric_deltas.clone(),
1102                },
1103                compressed_size: 20,
1104                data: json!({"value": 5.0}), // Delta from base 100
1105                compression_metadata: HashMap::new(),
1106            },
1107            decompression_metadata: DecompressionMetadata {
1108                strategy: CompressionStrategy::Hybrid {
1109                    string_dict,
1110                    numeric_deltas,
1111                },
1112                dictionary_map: HashMap::new(),
1113                delta_bases: HashMap::new(),
1114                priority_hints: HashMap::new(),
1115            },
1116        };
1117
1118        let result = decompressor.decompress_frame(compressed_frame);
1119        assert!(result.is_ok());
1120    }
1121
1122    #[test]
1123    fn test_decompress_dictionary_nested_arrays() {
1124        let mut decompressor = StreamingDecompressor::new();
1125        decompressor
1126            .active_dictionary
1127            .insert(0, "item1".to_string());
1128        decompressor
1129            .active_dictionary
1130            .insert(1, "item2".to_string());
1131
1132        let data = json!([[0, 1], [1, 0]]);
1133        let result = decompressor.decompress_dictionary(&data).unwrap();
1134
1135        assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
1136    }
1137
1138    #[test]
1139    fn test_decompress_dictionary_non_index_numbers() {
1140        let mut decompressor = StreamingDecompressor::new();
1141        decompressor.active_dictionary.insert(0, "test".to_string());
1142
1143        // Number that doesn't map to dictionary
1144        let data = json!({"value": 999});
1145        let result = decompressor.decompress_dictionary(&data).unwrap();
1146
1147        // Should remain as number since not in dictionary
1148        assert_eq!(result, json!({"value": 999}));
1149    }
1150
1151    #[test]
1152    fn test_decompress_delta_non_array() {
1153        let decompressor = StreamingDecompressor::new();
1154
1155        // Non-array delta encoding (passthrough)
1156        let data = json!({"key": "value"});
1157        let result = decompressor.decompress_delta(&data).unwrap();
1158
1159        assert_eq!(result, json!({"key": "value"}));
1160    }
1161
1162    #[test]
1163    fn test_decompress_delta_array_without_metadata() {
1164        let decompressor = StreamingDecompressor::new();
1165
1166        // Array without delta metadata
1167        let data = json!([1, 2, 3, 4]);
1168        let result = decompressor.decompress_delta(&data).unwrap();
1169
1170        assert_eq!(result, json!([1, 2, 3, 4]));
1171    }
1172
1173    #[test]
1174    fn test_decompress_run_length_nested_objects() {
1175        let decompressor = StreamingDecompressor::new();
1176
1177        let data = json!({
1178            "outer": {
1179                "inner": [
1180                    {"rle_value": {"nested": "obj"}, "rle_count": 2}
1181                ]
1182            }
1183        });
1184
1185        let result = decompressor.decompress_run_length(&data).unwrap();
1186        assert_eq!(
1187            result,
1188            json!({
1189                "outer": {
1190                    "inner": [{"nested": "obj"}, {"nested": "obj"}]
1191                }
1192            })
1193        );
1194    }
1195
1196    #[test]
1197    fn test_decompression_stats_tracking() {
1198        let mut decompressor = StreamingDecompressor::new();
1199
1200        assert_eq!(decompressor.stats.frames_decompressed, 0);
1201
1202        let frame = CompressedFrame {
1203            frame: StreamFrame {
1204                data: json!({"test": "data"}),
1205                priority: Priority::MEDIUM,
1206                metadata: HashMap::new(),
1207            },
1208            compressed_data: CompressedData {
1209                strategy: CompressionStrategy::None,
1210                compressed_size: 15,
1211                data: json!({"test": "data"}),
1212                compression_metadata: HashMap::new(),
1213            },
1214            decompression_metadata: DecompressionMetadata {
1215                strategy: CompressionStrategy::None,
1216                dictionary_map: HashMap::new(),
1217                delta_bases: HashMap::new(),
1218                priority_hints: HashMap::new(),
1219            },
1220        };
1221
1222        decompressor.decompress_frame(frame).unwrap();
1223
1224        assert_eq!(decompressor.stats.frames_decompressed, 1);
1225        assert!(decompressor.stats.total_decompressed_bytes > 0);
1226        assert!(decompressor.stats.avg_decompression_time_us > 0);
1227    }
1228
1229    #[test]
1230    fn test_decompress_delta_array_malformed_metadata() {
1231        let decompressor = StreamingDecompressor::new();
1232
1233        // Array with object but missing delta_base - passes through as-is
1234        let data = json!([
1235            {"delta_type": "numeric_sequence"},
1236            1.0,
1237            2.0
1238        ]);
1239
1240        let result = decompressor.decompress_delta(&data);
1241        // Without both delta_base and delta_type matching, it's treated as regular array
1242        assert!(result.is_ok());
1243        // Should return as-is since it doesn't match delta format
1244        assert_eq!(result.unwrap(), data);
1245    }
1246
1247    #[test]
1248    fn test_decompress_run_length_large_count() {
1249        let decompressor = StreamingDecompressor::new();
1250
1251        // Within MAX_RLE_COUNT
1252        let data = json!([
1253            {"rle_value": "x", "rle_count": 1000}
1254        ]);
1255
1256        let result = decompressor.decompress_run_length(&data);
1257        assert!(result.is_ok());
1258        let decompressed = result.unwrap();
1259        if let Some(arr) = decompressed.as_array() {
1260            assert_eq!(arr.len(), 1000);
1261        }
1262    }
1263
1264    #[test]
1265    fn test_decompress_run_length_exceeds_max_count() {
1266        let decompressor = StreamingDecompressor::new();
1267
1268        // Exceeds MAX_RLE_COUNT (100_000)
1269        let data = json!([
1270            {"rle_value": "x", "rle_count": 200_000}
1271        ]);
1272
1273        let result = decompressor.decompress_run_length(&data);
1274        assert!(result.is_err()); // Should error
1275    }
1276
1277    #[test]
1278    fn test_decompress_run_length_cumulative_overflow() {
1279        let decompressor = StreamingDecompressor::new();
1280
1281        // Multiple runs that sum to exceed MAX_DECOMPRESSED_SIZE
1282        let data = json!([
1283            {"rle_value": "a", "rle_count": 5_000_000},
1284            {"rle_value": "b", "rle_count": 6_000_000}
1285        ]);
1286
1287        let result = decompressor.decompress_run_length(&data);
1288        // Should error when cumulative size exceeds limit
1289        assert!(result.is_err());
1290    }
1291
1292    #[test]
1293    fn test_decompress_delta_array_size_limit() {
1294        let decompressor = StreamingDecompressor::new();
1295
1296        // Create very large array (exceeds MAX_DELTA_ARRAY_SIZE)
1297        let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
1298        for _i in 0..1_000_001 {
1299            large_array.push(json!(0.0));
1300        }
1301
1302        let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
1303        assert!(result.is_err()); // Should error on size limit
1304    }
1305
1306    #[test]
1307    fn test_compression_stats_default() {
1308        let stats = CompressionStats::default();
1309        assert_eq!(stats.total_input_bytes, 0);
1310        assert_eq!(stats.total_output_bytes, 0);
1311        assert_eq!(stats.frames_processed, 0);
1312        assert_eq!(stats.overall_compression_ratio(), 1.0);
1313    }
1314
1315    #[test]
1316    fn test_decompression_stats_default() {
1317        let stats = DecompressionStats::default();
1318        assert_eq!(stats.frames_decompressed, 0);
1319        assert_eq!(stats.total_decompressed_bytes, 0);
1320        assert_eq!(stats.avg_decompression_time_us, 0);
1321    }
1322
1323    // ============================================================================
1324    // Additional coverage tests for P2-TEST-002 (70%+ coverage target)
1325    // ============================================================================
1326
1327    #[test]
1328    fn test_decompress_dictionary_with_strings() {
1329        let decompressor = StreamingDecompressor::new();
1330        let data = json!({"key": "value", "nested": {"inner": "string"}});
1331        let result = decompressor.decompress_dictionary(&data);
1332        assert!(result.is_ok());
1333        assert_eq!(result.unwrap(), data);
1334    }
1335
1336    #[test]
1337    fn test_decompress_dictionary_with_null() {
1338        let decompressor = StreamingDecompressor::new();
1339        let data = json!(null);
1340        let result = decompressor.decompress_dictionary(&data);
1341        assert!(result.is_ok());
1342        assert_eq!(result.unwrap(), json!(null));
1343    }
1344
1345    #[test]
1346    fn test_decompress_dictionary_with_boolean() {
1347        let decompressor = StreamingDecompressor::new();
1348        let data = json!(true);
1349        let result = decompressor.decompress_dictionary(&data);
1350        assert!(result.is_ok());
1351        assert_eq!(result.unwrap(), json!(true));
1352    }
1353
1354    #[test]
1355    fn test_decompress_dictionary_with_string() {
1356        let decompressor = StreamingDecompressor::new();
1357        let data = json!("plain string");
1358        let result = decompressor.decompress_dictionary(&data);
1359        assert!(result.is_ok());
1360        assert_eq!(result.unwrap(), json!("plain string"));
1361    }
1362
1363    #[test]
1364    fn test_decompress_delta_with_object_no_array() {
1365        let decompressor = StreamingDecompressor::new();
1366        let data = json!({"key": "value"});
1367        let result = decompressor.decompress_delta(&data);
1368        assert!(result.is_ok());
1369        assert_eq!(result.unwrap(), json!({"key": "value"}));
1370    }
1371
1372    #[test]
1373    fn test_decompress_delta_with_primitive_values() {
1374        let decompressor = StreamingDecompressor::new();
1375
1376        assert_eq!(
1377            decompressor.decompress_delta(&json!("string")).unwrap(),
1378            json!("string")
1379        );
1380        assert_eq!(
1381            decompressor.decompress_delta(&json!(42)).unwrap(),
1382            json!(42)
1383        );
1384        assert_eq!(
1385            decompressor.decompress_delta(&json!(true)).unwrap(),
1386            json!(true)
1387        );
1388        assert_eq!(
1389            decompressor.decompress_delta(&json!(null)).unwrap(),
1390            json!(null)
1391        );
1392    }
1393
1394    #[test]
1395    fn test_decompress_run_length_with_primitive_values() {
1396        let decompressor = StreamingDecompressor::new();
1397
1398        assert_eq!(
1399            decompressor
1400                .decompress_run_length(&json!("string"))
1401                .unwrap(),
1402            json!("string")
1403        );
1404        assert_eq!(
1405            decompressor.decompress_run_length(&json!(123)).unwrap(),
1406            json!(123)
1407        );
1408        assert_eq!(
1409            decompressor.decompress_run_length(&json!(false)).unwrap(),
1410            json!(false)
1411        );
1412        assert_eq!(
1413            decompressor.decompress_run_length(&json!(null)).unwrap(),
1414            json!(null)
1415        );
1416    }
1417
1418    #[test]
1419    fn test_decompress_data_strategy_dictionary() {
1420        let mut decompressor = StreamingDecompressor::new();
1421        decompressor.active_dictionary.insert(0, "test".to_string());
1422
1423        let mut dict = HashMap::new();
1424        dict.insert("test".to_string(), 0);
1425
1426        let compressed_data = CompressedData {
1427            strategy: CompressionStrategy::Dictionary { dictionary: dict },
1428            compressed_size: 10,
1429            data: json!({"field": 0}),
1430            compression_metadata: HashMap::new(),
1431        };
1432
1433        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1434        assert!(result.is_ok());
1435    }
1436
1437    #[test]
1438    fn test_decompress_data_strategy_delta() {
1439        let decompressor = StreamingDecompressor::new();
1440
1441        let mut bases = HashMap::new();
1442        bases.insert("value".to_string(), 100.0);
1443
1444        let compressed_data = CompressedData {
1445            strategy: CompressionStrategy::Delta {
1446                base_values: bases.clone(),
1447            },
1448            compressed_size: 10,
1449            data: json!({
1450                "sequence": [
1451                    {"delta_base": 100.0, "delta_type": "numeric_sequence"},
1452                    5.0,
1453                    10.0
1454                ]
1455            }),
1456            compression_metadata: HashMap::new(),
1457        };
1458
1459        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1460        assert!(result.is_ok());
1461    }
1462
1463    #[test]
1464    fn test_decompress_data_strategy_run_length() {
1465        let decompressor = StreamingDecompressor::new();
1466
1467        let compressed_data = CompressedData {
1468            strategy: CompressionStrategy::RunLength,
1469            compressed_size: 10,
1470            data: json!([
1471                {"rle_value": "x", "rle_count": 3}
1472            ]),
1473            compression_metadata: HashMap::new(),
1474        };
1475
1476        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1477        assert!(result.is_ok());
1478        assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
1479    }
1480
1481    #[test]
1482    fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
1483        let mut decompressor = StreamingDecompressor::new();
1484        decompressor.active_dictionary.insert(0, "test".to_string());
1485
1486        let mut string_dict = HashMap::new();
1487        string_dict.insert("test".to_string(), 0);
1488
1489        let mut numeric_deltas = HashMap::new();
1490        numeric_deltas.insert("value".to_string(), 100.0);
1491
1492        let compressed_data = CompressedData {
1493            strategy: CompressionStrategy::Hybrid {
1494                string_dict,
1495                numeric_deltas,
1496            },
1497            compressed_size: 10,
1498            data: json!({
1499                "field": 0
1500            }),
1501            compression_metadata: HashMap::new(),
1502        };
1503
1504        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1505        assert!(result.is_ok());
1506    }
1507
1508    #[test]
1509    fn test_select_compressor_critical_priority() {
1510        let mut compressor = StreamingCompressor::new();
1511        let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
1512    }
1513
1514    #[test]
1515    fn test_select_compressor_high_priority() {
1516        let mut compressor = StreamingCompressor::new();
1517        let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
1518    }
1519
1520    #[test]
1521    fn test_select_compressor_medium_priority() {
1522        let mut compressor = StreamingCompressor::new();
1523        let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
1524    }
1525
1526    #[test]
1527    fn test_select_compressor_low_priority() {
1528        let mut compressor = StreamingCompressor::new();
1529        let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
1530    }
1531
1532    #[test]
1533    fn test_select_compressor_background_priority() {
1534        let mut compressor = StreamingCompressor::new();
1535        let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
1536    }
1537
1538    #[test]
1539    fn test_update_stats_with_zero_original_size() {
1540        let mut compressor = StreamingCompressor::new();
1541        compressor.update_stats(Priority::MEDIUM, 0, 10);
1542
1543        let stats = compressor.get_stats();
1544        assert_eq!(stats.frames_processed, 1);
1545        assert_eq!(stats.total_input_bytes, 0);
1546        assert_eq!(stats.total_output_bytes, 10);
1547        assert_eq!(
1548            stats.priority_compression_ratio(Priority::MEDIUM.value()),
1549            1.0
1550        );
1551    }
1552
1553    #[test]
1554    fn test_update_stats_with_normal_compression() {
1555        let mut compressor = StreamingCompressor::new();
1556        compressor.update_stats(Priority::HIGH, 1000, 500);
1557
1558        let stats = compressor.get_stats();
1559        assert_eq!(stats.frames_processed, 1);
1560        assert_eq!(stats.total_input_bytes, 1000);
1561        assert_eq!(stats.total_output_bytes, 500);
1562        assert_eq!(
1563            stats.priority_compression_ratio(Priority::HIGH.value()),
1564            0.5
1565        );
1566    }
1567
1568    #[test]
1569    fn test_update_decompression_stats_first_frame() {
1570        let mut decompressor = StreamingDecompressor::new();
1571        let data = json!({"test": "data"});
1572        let duration = std::time::Duration::from_micros(100);
1573
1574        decompressor.update_decompression_stats(&data, duration);
1575
1576        let stats = decompressor.get_stats();
1577        assert_eq!(stats.frames_decompressed, 1);
1578        assert_eq!(stats.avg_decompression_time_us, 100);
1579    }
1580
1581    #[test]
1582    fn test_update_decompression_stats_multiple_frames() {
1583        let mut decompressor = StreamingDecompressor::new();
1584        let data = json!({"test": "data"});
1585
1586        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
1587        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
1588        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
1589
1590        let stats = decompressor.get_stats();
1591        assert_eq!(stats.frames_decompressed, 3);
1592        assert_eq!(stats.avg_decompression_time_us, 200); // (100 + 200 + 300) / 3
1593    }
1594
1595    #[test]
1596    fn test_create_decompression_metadata_with_dict() {
1597        let compressor = StreamingCompressor::new();
1598        let mut metadata = HashMap::new();
1599        metadata.insert("dict_0".to_string(), json!("hello"));
1600        metadata.insert("dict_1".to_string(), json!("world"));
1601
1602        let compressed_data = CompressedData {
1603            strategy: CompressionStrategy::None,
1604            compressed_size: 10,
1605            data: json!({}),
1606            compression_metadata: metadata,
1607        };
1608
1609        let result = compressor.create_decompression_metadata(&compressed_data);
1610        assert!(result.is_ok());
1611        let meta = result.unwrap();
1612        assert_eq!(meta.dictionary_map.len(), 2);
1613        assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
1614        assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
1615    }
1616
1617    #[test]
1618    fn test_create_decompression_metadata_with_delta_bases() {
1619        let compressor = StreamingCompressor::new();
1620        let mut metadata = HashMap::new();
1621        metadata.insert("base_value1".to_string(), json!(100.0));
1622        metadata.insert("base_value2".to_string(), json!(200.0));
1623
1624        let compressed_data = CompressedData {
1625            strategy: CompressionStrategy::None,
1626            compressed_size: 10,
1627            data: json!({}),
1628            compression_metadata: metadata,
1629        };
1630
1631        let result = compressor.create_decompression_metadata(&compressed_data);
1632        assert!(result.is_ok());
1633        let meta = result.unwrap();
1634        assert_eq!(meta.delta_bases.len(), 2);
1635        assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
1636        assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
1637    }
1638
1639    #[test]
1640    fn test_create_decompression_metadata_with_invalid_dict_index() {
1641        let compressor = StreamingCompressor::new();
1642        let mut metadata = HashMap::new();
1643        metadata.insert("dict_invalid".to_string(), json!("value"));
1644        metadata.insert("dict_0".to_string(), json!("valid"));
1645
1646        let compressed_data = CompressedData {
1647            strategy: CompressionStrategy::None,
1648            compressed_size: 10,
1649            data: json!({}),
1650            compression_metadata: metadata,
1651        };
1652
1653        let result = compressor.create_decompression_metadata(&compressed_data);
1654        assert!(result.is_ok());
1655        let meta = result.unwrap();
1656        // Only valid index should be parsed
1657        assert_eq!(meta.dictionary_map.len(), 1);
1658        assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
1659    }
1660
1661    #[test]
1662    fn test_update_context_updates_dictionary() {
1663        let mut decompressor = StreamingDecompressor::new();
1664
1665        let mut metadata = DecompressionMetadata {
1666            strategy: CompressionStrategy::None,
1667            dictionary_map: HashMap::new(),
1668            delta_bases: HashMap::new(),
1669            priority_hints: HashMap::new(),
1670        };
1671        metadata.dictionary_map.insert(0, "hello".to_string());
1672        metadata.dictionary_map.insert(1, "world".to_string());
1673
1674        let result = decompressor.update_context(&metadata);
1675        assert!(result.is_ok());
1676        assert_eq!(decompressor.active_dictionary.len(), 2);
1677        assert_eq!(
1678            decompressor.active_dictionary.get(&0),
1679            Some(&"hello".to_string())
1680        );
1681    }
1682
1683    #[test]
1684    fn test_update_context_updates_delta_bases() {
1685        let mut decompressor = StreamingDecompressor::new();
1686
1687        let mut metadata = DecompressionMetadata {
1688            strategy: CompressionStrategy::None,
1689            dictionary_map: HashMap::new(),
1690            delta_bases: HashMap::new(),
1691            priority_hints: HashMap::new(),
1692        };
1693        metadata.delta_bases.insert("value1".to_string(), 100.0);
1694        metadata.delta_bases.insert("value2".to_string(), 200.0);
1695
1696        let result = decompressor.update_context(&metadata);
1697        assert!(result.is_ok());
1698        assert_eq!(decompressor.delta_bases.len(), 2);
1699        assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
1700    }
1701
1702    #[test]
1703    fn test_decompress_dictionary_with_float_that_is_not_u64() {
1704        let mut decompressor = StreamingDecompressor::new();
1705        decompressor.active_dictionary.insert(0, "test".to_string());
1706
1707        // Float that can't be represented as u64
1708        let data = json!({"value": 1.5});
1709        let result = decompressor.decompress_dictionary(&data);
1710        assert!(result.is_ok());
1711        // Should remain as float since not a valid index
1712        assert_eq!(result.unwrap(), json!({"value": 1.5}));
1713    }
1714
1715    #[test]
1716    fn test_decompress_dictionary_with_negative_number() {
1717        let mut decompressor = StreamingDecompressor::new();
1718        decompressor.active_dictionary.insert(0, "test".to_string());
1719
1720        let data = json!({"value": -1});
1721        let result = decompressor.decompress_dictionary(&data);
1722        assert!(result.is_ok());
1723        // Negative numbers can't be indices
1724        assert_eq!(result.unwrap(), json!({"value": -1}));
1725    }
1726
1727    #[test]
1728    fn test_decompress_delta_array_checks_first_element_structure() {
1729        let decompressor = StreamingDecompressor::new();
1730
1731        // Array without proper metadata structure
1732        let data = json!([
1733            {"wrong_field": 100.0},
1734            1.0,
1735            2.0
1736        ]);
1737
1738        let result = decompressor.decompress_delta(&data);
1739        assert!(result.is_ok());
1740        // Should be processed recursively, not as delta array
1741        assert_eq!(result.unwrap(), data);
1742    }
1743
1744    #[test]
1745    fn test_decompress_delta_array_requires_both_base_and_type() {
1746        let decompressor = StreamingDecompressor::new();
1747
1748        // Has delta_base but not delta_type
1749        let data1 = json!([
1750            {"delta_base": 100.0},
1751            1.0
1752        ]);
1753        let result1 = decompressor.decompress_delta(&data1);
1754        assert!(result1.is_ok());
1755
1756        // Has delta_type but not delta_base
1757        let data2 = json!([
1758            {"delta_type": "numeric_sequence"},
1759            1.0
1760        ]);
1761        let result2 = decompressor.decompress_delta(&data2);
1762        assert!(result2.is_ok());
1763    }
1764
1765    #[test]
1766    fn test_decompress_run_length_with_non_objects_in_array() {
1767        let decompressor = StreamingDecompressor::new();
1768
1769        // Mix of RLE objects and plain values
1770        let data = json!([
1771            {"rle_value": "a", "rle_count": 2},
1772            "plain",
1773            42,
1774            true
1775        ]);
1776
1777        let result = decompressor.decompress_run_length(&data);
1778        assert!(result.is_ok());
1779        assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
1780    }
1781
1782    #[test]
1783    fn test_decompress_run_length_integrity_check_rle_value_without_count() {
1784        let decompressor = StreamingDecompressor::new();
1785
1786        let data = json!([
1787            {"rle_value": "x"}
1788        ]);
1789
1790        let result = decompressor.decompress_run_length(&data);
1791        assert!(result.is_err());
1792    }
1793
1794    #[test]
1795    fn test_decompress_run_length_integrity_check_rle_count_without_value() {
1796        let decompressor = StreamingDecompressor::new();
1797
1798        let data = json!([
1799            {"rle_count": 3}
1800        ]);
1801
1802        let result = decompressor.decompress_run_length(&data);
1803        assert!(result.is_err());
1804    }
1805
1806    #[test]
1807    fn test_decompress_run_length_non_number_count() {
1808        let decompressor = StreamingDecompressor::new();
1809
1810        let data = json!([
1811            {"rle_value": "x", "rle_count": "three"}
1812        ]);
1813
1814        let result = decompressor.decompress_run_length(&data);
1815        assert!(result.is_err());
1816    }
1817
1818    #[test]
1819    fn test_compress_frame_with_large_data() {
1820        let mut compressor = StreamingCompressor::new();
1821
1822        let large_data = json!({
1823            "users": (0..100).map(|i| json!({
1824                "id": i,
1825                "name": format!("User{}", i),
1826                "email": format!("user{}@example.com", i),
1827                "age": 20 + (i % 50),
1828                "active": i % 2 == 0
1829            })).collect::<Vec<_>>()
1830        });
1831
1832        let frame = StreamFrame {
1833            data: large_data,
1834            priority: Priority::MEDIUM,
1835            metadata: HashMap::new(),
1836        };
1837
1838        let result = compressor.compress_frame(frame);
1839        assert!(result.is_ok());
1840
1841        let stats = compressor.get_stats();
1842        assert_eq!(stats.frames_processed, 1);
1843        assert!(stats.total_input_bytes > 1000);
1844    }
1845
1846    #[test]
1847    fn test_decompress_delta_with_very_large_deltas() {
1848        let decompressor = StreamingDecompressor::new();
1849
1850        let data = json!([
1851            {"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
1852            100_000.0,
1853            200_000.0,
1854            300_000.0
1855        ]);
1856
1857        let result = decompressor.decompress_delta(&data);
1858        assert!(result.is_ok());
1859        assert_eq!(
1860            result.unwrap(),
1861            json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
1862        );
1863    }
1864}