Skip to main content

pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8    domain::{DomainError, DomainResult},
9    stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14// Security limits to prevent decompression bomb attacks
15const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; // 10MB
18
19/// Streaming compressor that maintains compression state across frames
20#[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22    /// Primary compressor for skeleton and critical data
23    skeleton_compressor: SchemaCompressor,
24    /// Secondary compressor for non-critical data
25    content_compressor: SchemaCompressor,
26    /// Compression statistics
27    stats: CompressionStats,
28}
29
30/// Compression statistics gathered by [`StreamingCompressor`].
31#[derive(Debug, Clone, Default)]
32pub struct CompressionStats {
33    /// Total bytes processed
34    pub total_input_bytes: usize,
35    /// Total bytes after compression
36    pub total_output_bytes: usize,
37    /// Number of frames processed
38    pub frames_processed: u32,
39    /// Compression ratio by priority level
40    pub priority_ratios: HashMap<u8, f32>,
41}
42
43/// Compressed stream frame with metadata
44#[derive(Debug, Clone)]
45pub struct CompressedFrame {
46    /// Original frame metadata
47    pub frame: StreamFrame,
48    /// Compressed data
49    pub compressed_data: CompressedData,
50    /// Decompression instructions for client
51    pub decompression_metadata: DecompressionMetadata,
52}
53
54/// Side-channel data needed by clients to undo a [`CompressedFrame`].
55#[derive(Debug, Clone)]
56pub struct DecompressionMetadata {
57    /// Compression strategy used
58    pub strategy: CompressionStrategy,
59    /// Dictionary indices mapping
60    pub dictionary_map: HashMap<u16, String>,
61    /// Delta base values for numeric decompression
62    pub delta_bases: HashMap<String, f64>,
63    /// Priority-specific decompression hints
64    pub priority_hints: HashMap<u8, String>,
65}
66
67impl StreamingCompressor {
68    /// Create new streaming compressor
69    pub fn new() -> Self {
70        Self {
71            skeleton_compressor: SchemaCompressor::new(),
72            content_compressor: SchemaCompressor::new(),
73            stats: CompressionStats::default(),
74        }
75    }
76
77    /// Create with custom compression strategies
78    pub fn with_strategies(
79        skeleton_strategy: CompressionStrategy,
80        content_strategy: CompressionStrategy,
81    ) -> Self {
82        Self {
83            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
84            content_compressor: SchemaCompressor::with_strategy(content_strategy),
85            stats: CompressionStats::default(),
86        }
87    }
88
89    /// Process and compress a stream frame based on its priority
90    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
91        let compressor = self.select_compressor_for_priority(frame.priority);
92
93        // Calculate original size
94        let original_size = serde_json::to_string(&frame.data)
95            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
96            .len();
97
98        // Compress based on frame content and priority
99        let compressed_data = compressor.compress(&frame.data)?;
100
101        // Update statistics
102        self.update_stats(
103            frame.priority,
104            original_size,
105            compressed_data.compressed_size,
106        );
107
108        // Create decompression metadata
109        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
110
111        Ok(CompressedFrame {
112            frame,
113            compressed_data,
114            decompression_metadata,
115        })
116    }
117
118    /// Analyze JSON data to optimize compression strategies
119    pub fn optimize_for_data(
120        &mut self,
121        skeleton: &JsonValue,
122        sample_data: &[JsonValue],
123    ) -> DomainResult<()> {
124        // Optimize skeleton compressor for critical structural data
125        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
126
127        // Analyze sample content data to optimize content compressor
128        if !sample_data.is_empty() {
129            // Combine samples for comprehensive analysis
130            let combined_sample = JsonValue::Array(sample_data.to_vec());
131            self.content_compressor
132                .analyze_and_optimize(&combined_sample)?;
133        }
134
135        Ok(())
136    }
137
138    /// Get current compression statistics
139    pub fn get_stats(&self) -> &CompressionStats {
140        &self.stats
141    }
142
143    /// Reset compression statistics
144    pub fn reset_stats(&mut self) {
145        self.stats = CompressionStats::default();
146    }
147
148    /// Select appropriate compressor based on frame priority
149    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
150        match priority {
151            // Critical data (skeleton, errors) - use specialized compressor
152            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
153            // Regular content data - use content compressor
154            _ => &mut self.content_compressor,
155        }
156    }
157
158    /// Update compression statistics
159    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
160        self.stats.total_input_bytes += original_size;
161        self.stats.total_output_bytes += compressed_size;
162        self.stats.frames_processed += 1;
163
164        let ratio = if original_size > 0 {
165            compressed_size as f32 / original_size as f32
166        } else {
167            1.0
168        };
169
170        self.stats.priority_ratios.insert(priority.value(), ratio);
171    }
172
173    /// Create decompression metadata for client
174    fn create_decompression_metadata(
175        &self,
176        compressed_data: &CompressedData,
177    ) -> DomainResult<DecompressionMetadata> {
178        let mut dictionary_map = HashMap::new();
179        let mut delta_bases = HashMap::new();
180
181        // Extract dictionary mappings
182        for (key, value) in &compressed_data.compression_metadata {
183            if let Some(suffix) = key.strip_prefix("dict_") {
184                if let Ok(index) = suffix.parse::<u16>()
185                    && let Some(string_val) = value.as_str()
186                {
187                    dictionary_map.insert(index, string_val.to_string());
188                }
189            } else if let Some(path) = key.strip_prefix("base_")
190                && let Some(num) = value.as_f64()
191            {
192                delta_bases.insert(path.to_string(), num);
193            }
194        }
195
196        Ok(DecompressionMetadata {
197            strategy: compressed_data.strategy.clone(),
198            dictionary_map,
199            delta_bases,
200            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
201        })
202    }
203}
204
205impl CompressionStats {
206    /// Calculate overall compression ratio
207    pub fn overall_compression_ratio(&self) -> f32 {
208        if self.total_input_bytes == 0 {
209            return 1.0;
210        }
211        self.total_output_bytes as f32 / self.total_input_bytes as f32
212    }
213
214    /// Get compression ratio for specific priority level
215    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
216        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
217    }
218
219    /// Calculate bytes saved
220    pub fn bytes_saved(&self) -> isize {
221        self.total_input_bytes as isize - self.total_output_bytes as isize
222    }
223
224    /// Calculate percentage saved
225    pub fn percentage_saved(&self) -> f32 {
226        if self.total_input_bytes == 0 {
227            return 0.0;
228        }
229        let ratio = self.overall_compression_ratio();
230        (1.0 - ratio) * 100.0
231    }
232}
233
234/// Client-side decompressor for receiving compressed frames
235#[derive(Debug, Clone)]
236pub struct StreamingDecompressor {
237    /// Active dictionary for string decompression
238    active_dictionary: HashMap<u16, String>,
239    /// Delta base values for numeric decompression  
240    delta_bases: HashMap<String, f64>,
241    /// Decompression statistics
242    stats: DecompressionStats,
243}
244
245/// Counters tracking decompression activity in [`StreamingDecompressor`].
246#[derive(Debug, Clone, Default)]
247pub struct DecompressionStats {
248    /// Total frames decompressed
249    pub frames_decompressed: u32,
250    /// Total bytes decompressed
251    pub total_decompressed_bytes: usize,
252    /// Average decompression time in microseconds
253    pub avg_decompression_time_us: u64,
254}
255
256impl StreamingDecompressor {
257    /// Create new streaming decompressor
258    pub fn new() -> Self {
259        Self {
260            active_dictionary: HashMap::new(),
261            delta_bases: HashMap::new(),
262            stats: DecompressionStats::default(),
263        }
264    }
265
266    /// Decompress a compressed frame
267    pub fn decompress_frame(
268        &mut self,
269        compressed_frame: CompressedFrame,
270    ) -> DomainResult<StreamFrame> {
271        let start_time = std::time::Instant::now();
272
273        // Update decompression context with metadata
274        self.update_context(&compressed_frame.decompression_metadata)?;
275
276        // Decompress data based on strategy
277        let decompressed_data = self.decompress_data(
278            &compressed_frame.compressed_data,
279            &compressed_frame.decompression_metadata.strategy,
280        )?;
281
282        // Update statistics
283        let decompression_time = start_time.elapsed();
284        self.update_decompression_stats(&decompressed_data, decompression_time);
285
286        Ok(StreamFrame {
287            data: decompressed_data,
288            priority: compressed_frame.frame.priority,
289            metadata: compressed_frame.frame.metadata,
290        })
291    }
292
293    /// Update decompression context with new metadata
294    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
295        // Update dictionary
296        for (&index, string) in &metadata.dictionary_map {
297            self.active_dictionary.insert(index, string.clone());
298        }
299
300        // Update delta bases
301        for (path, &base) in &metadata.delta_bases {
302            self.delta_bases.insert(path.clone(), base);
303        }
304
305        Ok(())
306    }
307
308    /// Decompress data according to strategy
309    fn decompress_data(
310        &self,
311        compressed_data: &CompressedData,
312        strategy: &CompressionStrategy,
313    ) -> DomainResult<JsonValue> {
314        match strategy {
315            CompressionStrategy::None => Ok(compressed_data.data.clone()),
316
317            CompressionStrategy::Dictionary { .. } => {
318                self.decompress_dictionary(&compressed_data.data)
319            }
320
321            CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
322
323            CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
324
325            CompressionStrategy::Hybrid { .. } => {
326                // Apply decompression in reverse order: delta first, then dictionary
327                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
328                self.decompress_dictionary(&delta_decompressed)
329            }
330        }
331    }
332
333    /// Decompress dictionary-encoded strings
334    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
335        match data {
336            JsonValue::Object(obj) => {
337                let mut decompressed = serde_json::Map::new();
338                for (key, value) in obj {
339                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
340                }
341                Ok(JsonValue::Object(decompressed))
342            }
343            JsonValue::Array(arr) => {
344                let decompressed: Result<Vec<_>, _> = arr
345                    .iter()
346                    .map(|item| self.decompress_dictionary(item))
347                    .collect();
348                Ok(JsonValue::Array(decompressed?))
349            }
350            JsonValue::Number(n) => {
351                // Check if this is a dictionary index
352                if let Some(index) = n.as_u64()
353                    && let Some(string_val) = self.active_dictionary.get(&(index as u16))
354                {
355                    return Ok(JsonValue::String(string_val.clone()));
356                }
357                Ok(data.clone())
358            }
359            _ => Ok(data.clone()),
360        }
361    }
362
363    /// Decompress delta-encoded values
364    pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
365        match data {
366            JsonValue::Object(obj) => {
367                let mut decompressed_obj = serde_json::Map::new();
368                for (key, value) in obj {
369                    decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
370                }
371                Ok(JsonValue::Object(decompressed_obj))
372            }
373            JsonValue::Array(arr) => {
374                if arr.is_empty() {
375                    return Ok(JsonValue::Array(arr.clone()));
376                }
377
378                // Check if this is a delta-compressed array
379                if let Some(first) = arr.first()
380                    && let Some(obj) = first.as_object()
381                    && obj.contains_key("delta_base")
382                    && obj.contains_key("delta_type")
383                {
384                    // This is a delta-compressed numeric sequence
385                    return self.decompress_delta_array(arr);
386                }
387
388                // Not a delta-compressed array, process elements recursively
389                let decompressed_arr: Result<Vec<_>, _> =
390                    arr.iter().map(|item| self.decompress_delta(item)).collect();
391                Ok(JsonValue::Array(decompressed_arr?))
392            }
393            _ => Ok(data.clone()),
394        }
395    }
396
397    /// Decompress delta-encoded array back to original values
398    fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
399        if arr.is_empty() {
400            return Ok(JsonValue::Array(Vec::new()));
401        }
402
403        // VULN-002 FIX: Validate array size to prevent memory exhaustion
404        if arr.len() > MAX_DELTA_ARRAY_SIZE {
405            return Err(DomainError::CompressionError(format!(
406                "Delta array size {} exceeds maximum {}",
407                arr.len(),
408                MAX_DELTA_ARRAY_SIZE
409            )));
410        }
411
412        // Extract base value from metadata
413        let base_value = arr[0]
414            .get("delta_base")
415            .and_then(|v| v.as_f64())
416            .ok_or_else(|| {
417                DomainError::CompressionError(
418                    "Missing or invalid delta_base in metadata".to_string(),
419                )
420            })?;
421
422        // Reconstruct original values from deltas
423        let mut original_values = Vec::new();
424        for delta_value in arr.iter().skip(1) {
425            let delta = delta_value.as_f64().ok_or_else(|| {
426                DomainError::CompressionError("Invalid delta value: expected number".to_string())
427            })?;
428
429            let original = base_value + delta;
430            original_values.push(JsonValue::from(original));
431        }
432
433        Ok(JsonValue::Array(original_values))
434    }
435
436    /// Decompress run-length encoded data
437    pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
438        match data {
439            JsonValue::Object(obj) => {
440                let mut decompressed_obj = serde_json::Map::new();
441                for (key, value) in obj {
442                    decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
443                }
444                Ok(JsonValue::Object(decompressed_obj))
445            }
446            JsonValue::Array(arr) => {
447                let mut decompressed_values = Vec::new();
448                let mut total_size = 0usize;
449
450                for item in arr {
451                    if let Some(obj) = item.as_object() {
452                        // Validate RLE object integrity: both keys must be present or both absent
453                        let has_rle_value = obj.contains_key("rle_value");
454                        let has_rle_count = obj.contains_key("rle_count");
455
456                        if has_rle_value && !has_rle_count {
457                            return Err(DomainError::CompressionError(
458                                "Malformed RLE object: rle_value without rle_count".to_string(),
459                            ));
460                        }
461                        if has_rle_count && !has_rle_value {
462                            return Err(DomainError::CompressionError(
463                                "Malformed RLE object: rle_count without rle_value".to_string(),
464                            ));
465                        }
466
467                        // Check if this is an RLE-encoded run
468                        if has_rle_value && has_rle_count {
469                            let value = obj
470                                .get("rle_value")
471                                .ok_or_else(|| {
472                                    DomainError::CompressionError("Missing rle_value".to_string())
473                                })?
474                                .clone();
475
476                            let count =
477                                obj.get("rle_count")
478                                    .and_then(|v| v.as_u64())
479                                    .ok_or_else(|| {
480                                        DomainError::CompressionError(
481                                            "Invalid rle_count: expected positive integer"
482                                                .to_string(),
483                                        )
484                                    })?;
485
486                            // VULN-001 FIX: Validate RLE count to prevent decompression bomb
487                            if count > MAX_RLE_COUNT {
488                                return Err(DomainError::CompressionError(format!(
489                                    "RLE count {} exceeds maximum {}",
490                                    count, MAX_RLE_COUNT
491                                )));
492                            }
493
494                            // VULN-003 FIX: Convert u64 to usize safely to prevent overflow
495                            let count_usize = usize::try_from(count).map_err(|_| {
496                                DomainError::CompressionError(format!(
497                                    "RLE count {} exceeds platform maximum",
498                                    count
499                                ))
500                            })?;
501
502                            // Track total decompressed size across all RLE runs
503                            total_size = total_size.checked_add(count_usize).ok_or_else(|| {
504                                DomainError::CompressionError(
505                                    "Total decompressed size overflow".to_string(),
506                                )
507                            })?;
508
509                            if total_size > MAX_DECOMPRESSED_SIZE {
510                                return Err(DomainError::CompressionError(format!(
511                                    "Decompressed size {} exceeds maximum {}",
512                                    total_size, MAX_DECOMPRESSED_SIZE
513                                )));
514                            }
515
516                            // Expand the run
517                            for _ in 0..count {
518                                decompressed_values.push(value.clone());
519                            }
520                        } else {
521                            // Not an RLE object, process recursively
522                            decompressed_values.push(self.decompress_run_length(item)?);
523                        }
524                    } else {
525                        // Not an object, process recursively
526                        decompressed_values.push(self.decompress_run_length(item)?);
527                    }
528                }
529
530                Ok(JsonValue::Array(decompressed_values))
531            }
532            _ => Ok(data.clone()),
533        }
534    }
535
536    /// Update decompression statistics
537    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
538        self.stats.frames_decompressed += 1;
539
540        if let Ok(serialized) = serde_json::to_string(data) {
541            self.stats.total_decompressed_bytes += serialized.len();
542        }
543
544        let new_time_us = duration.as_micros() as u64;
545        if self.stats.frames_decompressed == 1 {
546            self.stats.avg_decompression_time_us = new_time_us;
547        } else {
548            // Calculate running average
549            let total_frames = self.stats.frames_decompressed as u64;
550            let total_time =
551                self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
552            self.stats.avg_decompression_time_us = total_time / total_frames;
553        }
554    }
555
556    /// Get decompression statistics
557    pub fn get_stats(&self) -> &DecompressionStats {
558        &self.stats
559    }
560}
561
562impl Default for StreamingCompressor {
563    fn default() -> Self {
564        Self::new()
565    }
566}
567
568impl Default for StreamingDecompressor {
569    fn default() -> Self {
570        Self::new()
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use serde_json::json;
578
579    #[test]
580    fn test_streaming_compressor_basic() {
581        let mut compressor = StreamingCompressor::new();
582
583        let frame = StreamFrame {
584            data: json!({
585                "message": "test message",
586                "count": 42
587            }),
588            priority: Priority::MEDIUM,
589            metadata: HashMap::new(),
590        };
591
592        let result = compressor.compress_frame(frame);
593        assert!(result.is_ok());
594
595        let compressed = result.unwrap();
596        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
597    }
598
599    #[test]
600    fn test_compression_stats() {
601        let stats = CompressionStats {
602            total_input_bytes: 1000,
603            total_output_bytes: 600,
604            ..Default::default()
605        };
606
607        assert_eq!(stats.overall_compression_ratio(), 0.6);
608        assert_eq!(stats.bytes_saved(), 400);
609        // Use approximate comparison for float precision
610        let percentage = stats.percentage_saved();
611        assert!((percentage - 40.0).abs() < 0.001);
612    }
613
614    #[test]
615    fn test_streaming_decompressor_basic() {
616        let mut decompressor = StreamingDecompressor::new();
617
618        let compressed_frame = CompressedFrame {
619            frame: StreamFrame {
620                data: json!({"test": "data"}),
621                priority: Priority::MEDIUM,
622                metadata: HashMap::new(),
623            },
624            compressed_data: CompressedData {
625                strategy: CompressionStrategy::None,
626                compressed_size: 20,
627                data: json!({"test": "data"}),
628                compression_metadata: HashMap::new(),
629            },
630            decompression_metadata: DecompressionMetadata {
631                strategy: CompressionStrategy::None,
632                dictionary_map: HashMap::new(),
633                delta_bases: HashMap::new(),
634                priority_hints: HashMap::new(),
635            },
636        };
637
638        let result = decompressor.decompress_frame(compressed_frame);
639        assert!(result.is_ok());
640
641        let decompressed = result.unwrap();
642        assert_eq!(decompressed.data, json!({"test": "data"}));
643    }
644
645    #[test]
646    fn test_dictionary_decompression() {
647        let mut decompressor = StreamingDecompressor::new();
648        decompressor
649            .active_dictionary
650            .insert(0, "hello".to_string());
651        decompressor
652            .active_dictionary
653            .insert(1, "world".to_string());
654
655        // Test with dictionary indices
656        let compressed = json!({
657            "greeting": 0,
658            "target": 1
659        });
660
661        let result = decompressor.decompress_dictionary(&compressed).unwrap();
662        assert_eq!(
663            result,
664            json!({
665                "greeting": "hello",
666                "target": "world"
667            })
668        );
669    }
670
671    #[test]
672    fn test_priority_based_compression() {
673        let mut compressor = StreamingCompressor::new();
674
675        let critical_frame = StreamFrame {
676            data: json!({"error": "critical failure"}),
677            priority: Priority::CRITICAL,
678            metadata: HashMap::new(),
679        };
680
681        let low_frame = StreamFrame {
682            data: json!({"debug": "verbose information"}),
683            priority: Priority::LOW,
684            metadata: HashMap::new(),
685        };
686
687        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
688        let _low_result = compressor.compress_frame(low_frame).unwrap();
689
690        let stats = compressor.get_stats();
691        assert_eq!(stats.frames_processed, 2);
692        assert!(stats.total_input_bytes > 0);
693    }
694
695    #[test]
696    fn test_delta_decompression_basic() {
697        let decompressor = StreamingDecompressor::new();
698
699        let compressed_data = json!([
700            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
701            0.0,
702            1.0,
703            2.0,
704            3.0,
705            4.0
706        ]);
707
708        let result = decompressor.decompress_delta(&compressed_data).unwrap();
709        assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
710    }
711
712    #[test]
713    fn test_delta_decompression_negative_deltas() {
714        let decompressor = StreamingDecompressor::new();
715
716        let compressed_data = json!([
717            {"delta_base": 50.0, "delta_type": "numeric_sequence"},
718            -10.0,
719            0.0,
720            10.0,
721            20.0
722        ]);
723
724        let result = decompressor.decompress_delta(&compressed_data).unwrap();
725        assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
726    }
727
728    #[test]
729    fn test_delta_decompression_fractional_deltas() {
730        let decompressor = StreamingDecompressor::new();
731
732        let compressed_data = json!([
733            {"delta_base": 10.0, "delta_type": "numeric_sequence"},
734            0.5,
735            1.0,
736            1.5,
737            2.0
738        ]);
739
740        let result = decompressor.decompress_delta(&compressed_data).unwrap();
741        assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
742    }
743
744    #[test]
745    fn test_delta_decompression_empty_array() {
746        let decompressor = StreamingDecompressor::new();
747
748        let compressed_data = json!([]);
749
750        let result = decompressor.decompress_delta(&compressed_data).unwrap();
751        assert_eq!(result, json!([]));
752    }
753
754    #[test]
755    fn test_delta_decompression_single_element() {
756        let decompressor = StreamingDecompressor::new();
757
758        let compressed_data = json!([
759            {"delta_base": 100.0, "delta_type": "numeric_sequence"}
760        ]);
761
762        let result = decompressor.decompress_delta(&compressed_data).unwrap();
763        assert_eq!(result, json!([]));
764    }
765
766    #[test]
767    fn test_delta_decompression_nested_structure() {
768        let decompressor = StreamingDecompressor::new();
769
770        let compressed_data = json!({
771            "sequence": [
772                {"delta_base": 100.0, "delta_type": "numeric_sequence"},
773                0.0,
774                1.0,
775                2.0
776            ],
777            "other": "data"
778        });
779
780        let result = decompressor.decompress_delta(&compressed_data).unwrap();
781        assert_eq!(
782            result,
783            json!({
784                "sequence": [100.0, 101.0, 102.0],
785                "other": "data"
786            })
787        );
788    }
789
790    #[test]
791    fn test_delta_decompression_invalid_metadata() {
792        let decompressor = StreamingDecompressor::new();
793
794        let compressed_data = json!([
795            {"wrong_key": 100.0},
796            0.0,
797            1.0
798        ]);
799
800        let result = decompressor.decompress_delta(&compressed_data);
801        assert!(result.is_ok());
802        // Should return as-is if not valid delta format
803    }
804
805    #[test]
806    fn test_delta_decompression_invalid_delta_value() {
807        let decompressor = StreamingDecompressor::new();
808
809        let compressed_data = json!([
810            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
811            "not_a_number"
812        ]);
813
814        let result = decompressor.decompress_delta(&compressed_data);
815        assert!(result.is_err());
816    }
817
818    #[test]
819    fn test_rle_decompression_basic() {
820        let decompressor = StreamingDecompressor::new();
821
822        let compressed_data = json!([
823            {"rle_value": 1, "rle_count": 3},
824            {"rle_value": 2, "rle_count": 2},
825            {"rle_value": 3, "rle_count": 4}
826        ]);
827
828        let result = decompressor
829            .decompress_run_length(&compressed_data)
830            .unwrap();
831        assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
832    }
833
834    #[test]
835    fn test_rle_decompression_mixed_runs() {
836        let decompressor = StreamingDecompressor::new();
837
838        let compressed_data = json!([
839            {"rle_value": "a", "rle_count": 2},
840            "b",
841            {"rle_value": "c", "rle_count": 3}
842        ]);
843
844        let result = decompressor
845            .decompress_run_length(&compressed_data)
846            .unwrap();
847        assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
848    }
849
850    #[test]
851    fn test_rle_decompression_single_count() {
852        let decompressor = StreamingDecompressor::new();
853
854        let compressed_data = json!([
855            {"rle_value": "x", "rle_count": 1}
856        ]);
857
858        let result = decompressor
859            .decompress_run_length(&compressed_data)
860            .unwrap();
861        assert_eq!(result, json!(["x"]));
862    }
863
864    #[test]
865    fn test_rle_decompression_zero_count() {
866        let decompressor = StreamingDecompressor::new();
867
868        let compressed_data = json!([
869            {"rle_value": "x", "rle_count": 0}
870        ]);
871
872        let result = decompressor
873            .decompress_run_length(&compressed_data)
874            .unwrap();
875        assert_eq!(result, json!([]));
876    }
877
878    #[test]
879    fn test_rle_decompression_nested_values() {
880        let decompressor = StreamingDecompressor::new();
881
882        let compressed_data = json!([
883            {"rle_value": {"name": "test"}, "rle_count": 3}
884        ]);
885
886        let result = decompressor
887            .decompress_run_length(&compressed_data)
888            .unwrap();
889        assert_eq!(
890            result,
891            json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
892        );
893    }
894
895    #[test]
896    fn test_rle_decompression_nested_structure() {
897        let decompressor = StreamingDecompressor::new();
898
899        let compressed_data = json!({
900            "data": [
901                {"rle_value": 1, "rle_count": 3},
902                {"rle_value": 2, "rle_count": 2}
903            ],
904            "other": "field"
905        });
906
907        let result = decompressor
908            .decompress_run_length(&compressed_data)
909            .unwrap();
910        assert_eq!(
911            result,
912            json!({
913                "data": [1, 1, 1, 2, 2],
914                "other": "field"
915            })
916        );
917    }
918
919    #[test]
920    fn test_rle_decompression_empty_array() {
921        let decompressor = StreamingDecompressor::new();
922
923        let compressed_data = json!([]);
924
925        let result = decompressor
926            .decompress_run_length(&compressed_data)
927            .unwrap();
928        assert_eq!(result, json!([]));
929    }
930
931    #[test]
932    fn test_rle_decompression_invalid_count() {
933        let decompressor = StreamingDecompressor::new();
934
935        let compressed_data = json!([
936            {"rle_value": "x", "rle_count": "not_a_number"}
937        ]);
938
939        let result = decompressor.decompress_run_length(&compressed_data);
940        assert!(result.is_err());
941    }
942
943    #[test]
944    fn test_rle_decompression_missing_value() {
945        let decompressor = StreamingDecompressor::new();
946
947        let compressed_data = json!([
948            {"rle_count": 3}
949        ]);
950
951        let result = decompressor.decompress_run_length(&compressed_data);
952        assert!(result.is_err());
953    }
954
955    #[test]
956    fn test_rle_decompression_missing_count() {
957        let decompressor = StreamingDecompressor::new();
958
959        let compressed_data = json!([
960            {"rle_value": "x"}
961        ]);
962
963        let result = decompressor.decompress_run_length(&compressed_data);
964        assert!(result.is_err());
965    }
966
967    #[test]
968    fn test_rle_decompression_non_rle_objects() {
969        let decompressor = StreamingDecompressor::new();
970
971        let compressed_data = json!([
972            {"regular": "object"},
973            {"another": "one"}
974        ]);
975
976        let result = decompressor
977            .decompress_run_length(&compressed_data)
978            .unwrap();
979        // Should return as-is if objects don't have RLE format
980        assert_eq!(
981            result,
982            json!([
983                {"regular": "object"},
984                {"another": "one"}
985            ])
986        );
987    }
988
989    // NEW TESTS FOR COVERAGE IMPROVEMENT (Task P2-TEST-002)
990
991    #[test]
992    fn test_compress_frame_with_custom_strategies() {
993        let mut dict = HashMap::new();
994        dict.insert("test".to_string(), 0);
995
996        let mut bases = HashMap::new();
997        bases.insert("value".to_string(), 100.0);
998
999        let mut compressor = StreamingCompressor::with_strategies(
1000            CompressionStrategy::Dictionary { dictionary: dict },
1001            CompressionStrategy::Delta { base_values: bases },
1002        );
1003
1004        let frame = StreamFrame {
1005            data: json!({"value": 123, "other": 456}),
1006            priority: Priority::HIGH,
1007            metadata: HashMap::new(),
1008        };
1009
1010        let result = compressor.compress_frame(frame);
1011        assert!(result.is_ok());
1012        assert_eq!(compressor.stats.frames_processed, 1);
1013    }
1014
1015    #[test]
1016    fn test_optimize_for_data_with_samples() {
1017        let mut compressor = StreamingCompressor::new();
1018
1019        let skeleton = json!({
1020            "id": null,
1021            "name": null
1022        });
1023
1024        let samples = vec![
1025            json!({"id": 1, "name": "test1"}),
1026            json!({"id": 2, "name": "test2"}),
1027            json!({"id": 3, "name": "test3"}),
1028        ];
1029
1030        let result = compressor.optimize_for_data(&skeleton, &samples);
1031        assert!(result.is_ok());
1032    }
1033
1034    #[test]
1035    fn test_optimize_for_data_empty_samples() {
1036        let mut compressor = StreamingCompressor::new();
1037
1038        let skeleton = json!({"key": "value"});
1039        let result = compressor.optimize_for_data(&skeleton, &[]);
1040        assert!(result.is_ok());
1041    }
1042
1043    #[test]
1044    fn test_reset_stats() {
1045        let mut compressor = StreamingCompressor::new();
1046
1047        compressor.stats.total_input_bytes = 1000;
1048        compressor.stats.total_output_bytes = 500;
1049        compressor.stats.frames_processed = 10;
1050
1051        compressor.reset_stats();
1052
1053        assert_eq!(compressor.stats.total_input_bytes, 0);
1054        assert_eq!(compressor.stats.total_output_bytes, 0);
1055        assert_eq!(compressor.stats.frames_processed, 0);
1056    }
1057
1058    #[test]
1059    fn test_compressor_critical_vs_low_priority() {
1060        let mut compressor = StreamingCompressor::new();
1061
1062        let critical_frame = StreamFrame {
1063            data: json!({"critical": "data"}),
1064            priority: Priority::CRITICAL,
1065            metadata: HashMap::new(),
1066        };
1067
1068        let low_frame = StreamFrame {
1069            data: json!({"low": "data"}),
1070            priority: Priority::LOW,
1071            metadata: HashMap::new(),
1072        };
1073
1074        compressor.compress_frame(critical_frame).unwrap();
1075        compressor.compress_frame(low_frame).unwrap();
1076
1077        assert_eq!(compressor.stats.frames_processed, 2);
1078    }
1079
1080    #[test]
1081    fn test_decompressor_hybrid_strategy() {
1082        let mut decompressor = StreamingDecompressor::new();
1083
1084        // Setup delta bases and dictionary
1085        decompressor.delta_bases.insert("value".to_string(), 100.0);
1086        decompressor.active_dictionary.insert(0, "test".to_string());
1087
1088        let mut string_dict = HashMap::new();
1089        string_dict.insert("test".to_string(), 0);
1090
1091        let mut numeric_deltas = HashMap::new();
1092        numeric_deltas.insert("value".to_string(), 100.0);
1093
1094        let compressed_frame = CompressedFrame {
1095            frame: StreamFrame {
1096                data: json!({"test": "data"}),
1097                priority: Priority::MEDIUM,
1098                metadata: HashMap::new(),
1099            },
1100            compressed_data: CompressedData {
1101                strategy: CompressionStrategy::Hybrid {
1102                    string_dict: string_dict.clone(),
1103                    numeric_deltas: numeric_deltas.clone(),
1104                },
1105                compressed_size: 20,
1106                data: json!({"value": 5.0}), // Delta from base 100
1107                compression_metadata: HashMap::new(),
1108            },
1109            decompression_metadata: DecompressionMetadata {
1110                strategy: CompressionStrategy::Hybrid {
1111                    string_dict,
1112                    numeric_deltas,
1113                },
1114                dictionary_map: HashMap::new(),
1115                delta_bases: HashMap::new(),
1116                priority_hints: HashMap::new(),
1117            },
1118        };
1119
1120        let result = decompressor.decompress_frame(compressed_frame);
1121        assert!(result.is_ok());
1122    }
1123
1124    #[test]
1125    fn test_decompress_dictionary_nested_arrays() {
1126        let mut decompressor = StreamingDecompressor::new();
1127        decompressor
1128            .active_dictionary
1129            .insert(0, "item1".to_string());
1130        decompressor
1131            .active_dictionary
1132            .insert(1, "item2".to_string());
1133
1134        let data = json!([[0, 1], [1, 0]]);
1135        let result = decompressor.decompress_dictionary(&data).unwrap();
1136
1137        assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
1138    }
1139
1140    #[test]
1141    fn test_decompress_dictionary_non_index_numbers() {
1142        let mut decompressor = StreamingDecompressor::new();
1143        decompressor.active_dictionary.insert(0, "test".to_string());
1144
1145        // Number that doesn't map to dictionary
1146        let data = json!({"value": 999});
1147        let result = decompressor.decompress_dictionary(&data).unwrap();
1148
1149        // Should remain as number since not in dictionary
1150        assert_eq!(result, json!({"value": 999}));
1151    }
1152
1153    #[test]
1154    fn test_decompress_delta_non_array() {
1155        let decompressor = StreamingDecompressor::new();
1156
1157        // Non-array delta encoding (passthrough)
1158        let data = json!({"key": "value"});
1159        let result = decompressor.decompress_delta(&data).unwrap();
1160
1161        assert_eq!(result, json!({"key": "value"}));
1162    }
1163
1164    #[test]
1165    fn test_decompress_delta_array_without_metadata() {
1166        let decompressor = StreamingDecompressor::new();
1167
1168        // Array without delta metadata
1169        let data = json!([1, 2, 3, 4]);
1170        let result = decompressor.decompress_delta(&data).unwrap();
1171
1172        assert_eq!(result, json!([1, 2, 3, 4]));
1173    }
1174
1175    #[test]
1176    fn test_decompress_run_length_nested_objects() {
1177        let decompressor = StreamingDecompressor::new();
1178
1179        let data = json!({
1180            "outer": {
1181                "inner": [
1182                    {"rle_value": {"nested": "obj"}, "rle_count": 2}
1183                ]
1184            }
1185        });
1186
1187        let result = decompressor.decompress_run_length(&data).unwrap();
1188        assert_eq!(
1189            result,
1190            json!({
1191                "outer": {
1192                    "inner": [{"nested": "obj"}, {"nested": "obj"}]
1193                }
1194            })
1195        );
1196    }
1197
1198    #[test]
1199    fn test_decompression_stats_tracking() {
1200        let mut decompressor = StreamingDecompressor::new();
1201
1202        assert_eq!(decompressor.stats.frames_decompressed, 0);
1203
1204        let frame = CompressedFrame {
1205            frame: StreamFrame {
1206                data: json!({"test": "data"}),
1207                priority: Priority::MEDIUM,
1208                metadata: HashMap::new(),
1209            },
1210            compressed_data: CompressedData {
1211                strategy: CompressionStrategy::None,
1212                compressed_size: 15,
1213                data: json!({"test": "data"}),
1214                compression_metadata: HashMap::new(),
1215            },
1216            decompression_metadata: DecompressionMetadata {
1217                strategy: CompressionStrategy::None,
1218                dictionary_map: HashMap::new(),
1219                delta_bases: HashMap::new(),
1220                priority_hints: HashMap::new(),
1221            },
1222        };
1223
1224        decompressor.decompress_frame(frame).unwrap();
1225
1226        assert_eq!(decompressor.stats.frames_decompressed, 1);
1227        assert!(decompressor.stats.total_decompressed_bytes > 0);
1228        assert!(decompressor.stats.avg_decompression_time_us > 0);
1229    }
1230
1231    #[test]
1232    fn test_decompress_delta_array_malformed_metadata() {
1233        let decompressor = StreamingDecompressor::new();
1234
1235        // Array with object but missing delta_base - passes through as-is
1236        let data = json!([
1237            {"delta_type": "numeric_sequence"},
1238            1.0,
1239            2.0
1240        ]);
1241
1242        let result = decompressor.decompress_delta(&data);
1243        // Without both delta_base and delta_type matching, it's treated as regular array
1244        assert!(result.is_ok());
1245        // Should return as-is since it doesn't match delta format
1246        assert_eq!(result.unwrap(), data);
1247    }
1248
1249    #[test]
1250    fn test_decompress_run_length_large_count() {
1251        let decompressor = StreamingDecompressor::new();
1252
1253        // Within MAX_RLE_COUNT
1254        let data = json!([
1255            {"rle_value": "x", "rle_count": 1000}
1256        ]);
1257
1258        let result = decompressor.decompress_run_length(&data);
1259        assert!(result.is_ok());
1260        let decompressed = result.unwrap();
1261        if let Some(arr) = decompressed.as_array() {
1262            assert_eq!(arr.len(), 1000);
1263        }
1264    }
1265
1266    #[test]
1267    fn test_decompress_run_length_exceeds_max_count() {
1268        let decompressor = StreamingDecompressor::new();
1269
1270        // Exceeds MAX_RLE_COUNT (100_000)
1271        let data = json!([
1272            {"rle_value": "x", "rle_count": 200_000}
1273        ]);
1274
1275        let result = decompressor.decompress_run_length(&data);
1276        assert!(result.is_err()); // Should error
1277    }
1278
1279    #[test]
1280    fn test_decompress_run_length_cumulative_overflow() {
1281        let decompressor = StreamingDecompressor::new();
1282
1283        // Multiple runs that sum to exceed MAX_DECOMPRESSED_SIZE
1284        let data = json!([
1285            {"rle_value": "a", "rle_count": 5_000_000},
1286            {"rle_value": "b", "rle_count": 6_000_000}
1287        ]);
1288
1289        let result = decompressor.decompress_run_length(&data);
1290        // Should error when cumulative size exceeds limit
1291        assert!(result.is_err());
1292    }
1293
1294    #[test]
1295    fn test_decompress_delta_array_size_limit() {
1296        let decompressor = StreamingDecompressor::new();
1297
1298        // Create very large array (exceeds MAX_DELTA_ARRAY_SIZE)
1299        let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
1300        for _i in 0..1_000_001 {
1301            large_array.push(json!(0.0));
1302        }
1303
1304        let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
1305        assert!(result.is_err()); // Should error on size limit
1306    }
1307
1308    #[test]
1309    fn test_compression_stats_default() {
1310        let stats = CompressionStats::default();
1311        assert_eq!(stats.total_input_bytes, 0);
1312        assert_eq!(stats.total_output_bytes, 0);
1313        assert_eq!(stats.frames_processed, 0);
1314        assert_eq!(stats.overall_compression_ratio(), 1.0);
1315    }
1316
1317    #[test]
1318    fn test_decompression_stats_default() {
1319        let stats = DecompressionStats::default();
1320        assert_eq!(stats.frames_decompressed, 0);
1321        assert_eq!(stats.total_decompressed_bytes, 0);
1322        assert_eq!(stats.avg_decompression_time_us, 0);
1323    }
1324
1325    // ============================================================================
1326    // Additional coverage tests for P2-TEST-002 (70%+ coverage target)
1327    // ============================================================================
1328
1329    #[test]
1330    fn test_decompress_dictionary_with_strings() {
1331        let decompressor = StreamingDecompressor::new();
1332        let data = json!({"key": "value", "nested": {"inner": "string"}});
1333        let result = decompressor.decompress_dictionary(&data);
1334        assert!(result.is_ok());
1335        assert_eq!(result.unwrap(), data);
1336    }
1337
1338    #[test]
1339    fn test_decompress_dictionary_with_null() {
1340        let decompressor = StreamingDecompressor::new();
1341        let data = json!(null);
1342        let result = decompressor.decompress_dictionary(&data);
1343        assert!(result.is_ok());
1344        assert_eq!(result.unwrap(), json!(null));
1345    }
1346
1347    #[test]
1348    fn test_decompress_dictionary_with_boolean() {
1349        let decompressor = StreamingDecompressor::new();
1350        let data = json!(true);
1351        let result = decompressor.decompress_dictionary(&data);
1352        assert!(result.is_ok());
1353        assert_eq!(result.unwrap(), json!(true));
1354    }
1355
1356    #[test]
1357    fn test_decompress_dictionary_with_string() {
1358        let decompressor = StreamingDecompressor::new();
1359        let data = json!("plain string");
1360        let result = decompressor.decompress_dictionary(&data);
1361        assert!(result.is_ok());
1362        assert_eq!(result.unwrap(), json!("plain string"));
1363    }
1364
1365    #[test]
1366    fn test_decompress_delta_with_object_no_array() {
1367        let decompressor = StreamingDecompressor::new();
1368        let data = json!({"key": "value"});
1369        let result = decompressor.decompress_delta(&data);
1370        assert!(result.is_ok());
1371        assert_eq!(result.unwrap(), json!({"key": "value"}));
1372    }
1373
1374    #[test]
1375    fn test_decompress_delta_with_primitive_values() {
1376        let decompressor = StreamingDecompressor::new();
1377
1378        assert_eq!(
1379            decompressor.decompress_delta(&json!("string")).unwrap(),
1380            json!("string")
1381        );
1382        assert_eq!(
1383            decompressor.decompress_delta(&json!(42)).unwrap(),
1384            json!(42)
1385        );
1386        assert_eq!(
1387            decompressor.decompress_delta(&json!(true)).unwrap(),
1388            json!(true)
1389        );
1390        assert_eq!(
1391            decompressor.decompress_delta(&json!(null)).unwrap(),
1392            json!(null)
1393        );
1394    }
1395
1396    #[test]
1397    fn test_decompress_run_length_with_primitive_values() {
1398        let decompressor = StreamingDecompressor::new();
1399
1400        assert_eq!(
1401            decompressor
1402                .decompress_run_length(&json!("string"))
1403                .unwrap(),
1404            json!("string")
1405        );
1406        assert_eq!(
1407            decompressor.decompress_run_length(&json!(123)).unwrap(),
1408            json!(123)
1409        );
1410        assert_eq!(
1411            decompressor.decompress_run_length(&json!(false)).unwrap(),
1412            json!(false)
1413        );
1414        assert_eq!(
1415            decompressor.decompress_run_length(&json!(null)).unwrap(),
1416            json!(null)
1417        );
1418    }
1419
1420    #[test]
1421    fn test_decompress_data_strategy_dictionary() {
1422        let mut decompressor = StreamingDecompressor::new();
1423        decompressor.active_dictionary.insert(0, "test".to_string());
1424
1425        let mut dict = HashMap::new();
1426        dict.insert("test".to_string(), 0);
1427
1428        let compressed_data = CompressedData {
1429            strategy: CompressionStrategy::Dictionary { dictionary: dict },
1430            compressed_size: 10,
1431            data: json!({"field": 0}),
1432            compression_metadata: HashMap::new(),
1433        };
1434
1435        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1436        assert!(result.is_ok());
1437    }
1438
1439    #[test]
1440    fn test_decompress_data_strategy_delta() {
1441        let decompressor = StreamingDecompressor::new();
1442
1443        let mut bases = HashMap::new();
1444        bases.insert("value".to_string(), 100.0);
1445
1446        let compressed_data = CompressedData {
1447            strategy: CompressionStrategy::Delta {
1448                base_values: bases.clone(),
1449            },
1450            compressed_size: 10,
1451            data: json!({
1452                "sequence": [
1453                    {"delta_base": 100.0, "delta_type": "numeric_sequence"},
1454                    5.0,
1455                    10.0
1456                ]
1457            }),
1458            compression_metadata: HashMap::new(),
1459        };
1460
1461        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1462        assert!(result.is_ok());
1463    }
1464
1465    #[test]
1466    fn test_decompress_data_strategy_run_length() {
1467        let decompressor = StreamingDecompressor::new();
1468
1469        let compressed_data = CompressedData {
1470            strategy: CompressionStrategy::RunLength,
1471            compressed_size: 10,
1472            data: json!([
1473                {"rle_value": "x", "rle_count": 3}
1474            ]),
1475            compression_metadata: HashMap::new(),
1476        };
1477
1478        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1479        assert!(result.is_ok());
1480        assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
1481    }
1482
1483    #[test]
1484    fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
1485        let mut decompressor = StreamingDecompressor::new();
1486        decompressor.active_dictionary.insert(0, "test".to_string());
1487
1488        let mut string_dict = HashMap::new();
1489        string_dict.insert("test".to_string(), 0);
1490
1491        let mut numeric_deltas = HashMap::new();
1492        numeric_deltas.insert("value".to_string(), 100.0);
1493
1494        let compressed_data = CompressedData {
1495            strategy: CompressionStrategy::Hybrid {
1496                string_dict,
1497                numeric_deltas,
1498            },
1499            compressed_size: 10,
1500            data: json!({
1501                "field": 0
1502            }),
1503            compression_metadata: HashMap::new(),
1504        };
1505
1506        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1507        assert!(result.is_ok());
1508    }
1509
1510    #[test]
1511    fn test_select_compressor_critical_priority() {
1512        let mut compressor = StreamingCompressor::new();
1513        let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
1514    }
1515
1516    #[test]
1517    fn test_select_compressor_high_priority() {
1518        let mut compressor = StreamingCompressor::new();
1519        let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
1520    }
1521
1522    #[test]
1523    fn test_select_compressor_medium_priority() {
1524        let mut compressor = StreamingCompressor::new();
1525        let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
1526    }
1527
1528    #[test]
1529    fn test_select_compressor_low_priority() {
1530        let mut compressor = StreamingCompressor::new();
1531        let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
1532    }
1533
1534    #[test]
1535    fn test_select_compressor_background_priority() {
1536        let mut compressor = StreamingCompressor::new();
1537        let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
1538    }
1539
1540    #[test]
1541    fn test_update_stats_with_zero_original_size() {
1542        let mut compressor = StreamingCompressor::new();
1543        compressor.update_stats(Priority::MEDIUM, 0, 10);
1544
1545        let stats = compressor.get_stats();
1546        assert_eq!(stats.frames_processed, 1);
1547        assert_eq!(stats.total_input_bytes, 0);
1548        assert_eq!(stats.total_output_bytes, 10);
1549        assert_eq!(
1550            stats.priority_compression_ratio(Priority::MEDIUM.value()),
1551            1.0
1552        );
1553    }
1554
1555    #[test]
1556    fn test_update_stats_with_normal_compression() {
1557        let mut compressor = StreamingCompressor::new();
1558        compressor.update_stats(Priority::HIGH, 1000, 500);
1559
1560        let stats = compressor.get_stats();
1561        assert_eq!(stats.frames_processed, 1);
1562        assert_eq!(stats.total_input_bytes, 1000);
1563        assert_eq!(stats.total_output_bytes, 500);
1564        assert_eq!(
1565            stats.priority_compression_ratio(Priority::HIGH.value()),
1566            0.5
1567        );
1568    }
1569
1570    #[test]
1571    fn test_update_decompression_stats_first_frame() {
1572        let mut decompressor = StreamingDecompressor::new();
1573        let data = json!({"test": "data"});
1574        let duration = std::time::Duration::from_micros(100);
1575
1576        decompressor.update_decompression_stats(&data, duration);
1577
1578        let stats = decompressor.get_stats();
1579        assert_eq!(stats.frames_decompressed, 1);
1580        assert_eq!(stats.avg_decompression_time_us, 100);
1581    }
1582
1583    #[test]
1584    fn test_update_decompression_stats_multiple_frames() {
1585        let mut decompressor = StreamingDecompressor::new();
1586        let data = json!({"test": "data"});
1587
1588        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
1589        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
1590        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
1591
1592        let stats = decompressor.get_stats();
1593        assert_eq!(stats.frames_decompressed, 3);
1594        assert_eq!(stats.avg_decompression_time_us, 200); // (100 + 200 + 300) / 3
1595    }
1596
1597    #[test]
1598    fn test_create_decompression_metadata_with_dict() {
1599        let compressor = StreamingCompressor::new();
1600        let mut metadata = HashMap::new();
1601        metadata.insert("dict_0".to_string(), json!("hello"));
1602        metadata.insert("dict_1".to_string(), json!("world"));
1603
1604        let compressed_data = CompressedData {
1605            strategy: CompressionStrategy::None,
1606            compressed_size: 10,
1607            data: json!({}),
1608            compression_metadata: metadata,
1609        };
1610
1611        let result = compressor.create_decompression_metadata(&compressed_data);
1612        assert!(result.is_ok());
1613        let meta = result.unwrap();
1614        assert_eq!(meta.dictionary_map.len(), 2);
1615        assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
1616        assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
1617    }
1618
1619    #[test]
1620    fn test_create_decompression_metadata_with_delta_bases() {
1621        let compressor = StreamingCompressor::new();
1622        let mut metadata = HashMap::new();
1623        metadata.insert("base_value1".to_string(), json!(100.0));
1624        metadata.insert("base_value2".to_string(), json!(200.0));
1625
1626        let compressed_data = CompressedData {
1627            strategy: CompressionStrategy::None,
1628            compressed_size: 10,
1629            data: json!({}),
1630            compression_metadata: metadata,
1631        };
1632
1633        let result = compressor.create_decompression_metadata(&compressed_data);
1634        assert!(result.is_ok());
1635        let meta = result.unwrap();
1636        assert_eq!(meta.delta_bases.len(), 2);
1637        assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
1638        assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
1639    }
1640
1641    #[test]
1642    fn test_create_decompression_metadata_with_invalid_dict_index() {
1643        let compressor = StreamingCompressor::new();
1644        let mut metadata = HashMap::new();
1645        metadata.insert("dict_invalid".to_string(), json!("value"));
1646        metadata.insert("dict_0".to_string(), json!("valid"));
1647
1648        let compressed_data = CompressedData {
1649            strategy: CompressionStrategy::None,
1650            compressed_size: 10,
1651            data: json!({}),
1652            compression_metadata: metadata,
1653        };
1654
1655        let result = compressor.create_decompression_metadata(&compressed_data);
1656        assert!(result.is_ok());
1657        let meta = result.unwrap();
1658        // Only valid index should be parsed
1659        assert_eq!(meta.dictionary_map.len(), 1);
1660        assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
1661    }
1662
1663    #[test]
1664    fn test_update_context_updates_dictionary() {
1665        let mut decompressor = StreamingDecompressor::new();
1666
1667        let mut metadata = DecompressionMetadata {
1668            strategy: CompressionStrategy::None,
1669            dictionary_map: HashMap::new(),
1670            delta_bases: HashMap::new(),
1671            priority_hints: HashMap::new(),
1672        };
1673        metadata.dictionary_map.insert(0, "hello".to_string());
1674        metadata.dictionary_map.insert(1, "world".to_string());
1675
1676        let result = decompressor.update_context(&metadata);
1677        assert!(result.is_ok());
1678        assert_eq!(decompressor.active_dictionary.len(), 2);
1679        assert_eq!(
1680            decompressor.active_dictionary.get(&0),
1681            Some(&"hello".to_string())
1682        );
1683    }
1684
1685    #[test]
1686    fn test_update_context_updates_delta_bases() {
1687        let mut decompressor = StreamingDecompressor::new();
1688
1689        let mut metadata = DecompressionMetadata {
1690            strategy: CompressionStrategy::None,
1691            dictionary_map: HashMap::new(),
1692            delta_bases: HashMap::new(),
1693            priority_hints: HashMap::new(),
1694        };
1695        metadata.delta_bases.insert("value1".to_string(), 100.0);
1696        metadata.delta_bases.insert("value2".to_string(), 200.0);
1697
1698        let result = decompressor.update_context(&metadata);
1699        assert!(result.is_ok());
1700        assert_eq!(decompressor.delta_bases.len(), 2);
1701        assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
1702    }
1703
1704    #[test]
1705    fn test_decompress_dictionary_with_float_that_is_not_u64() {
1706        let mut decompressor = StreamingDecompressor::new();
1707        decompressor.active_dictionary.insert(0, "test".to_string());
1708
1709        // Float that can't be represented as u64
1710        let data = json!({"value": 1.5});
1711        let result = decompressor.decompress_dictionary(&data);
1712        assert!(result.is_ok());
1713        // Should remain as float since not a valid index
1714        assert_eq!(result.unwrap(), json!({"value": 1.5}));
1715    }
1716
1717    #[test]
1718    fn test_decompress_dictionary_with_negative_number() {
1719        let mut decompressor = StreamingDecompressor::new();
1720        decompressor.active_dictionary.insert(0, "test".to_string());
1721
1722        let data = json!({"value": -1});
1723        let result = decompressor.decompress_dictionary(&data);
1724        assert!(result.is_ok());
1725        // Negative numbers can't be indices
1726        assert_eq!(result.unwrap(), json!({"value": -1}));
1727    }
1728
1729    #[test]
1730    fn test_decompress_delta_array_checks_first_element_structure() {
1731        let decompressor = StreamingDecompressor::new();
1732
1733        // Array without proper metadata structure
1734        let data = json!([
1735            {"wrong_field": 100.0},
1736            1.0,
1737            2.0
1738        ]);
1739
1740        let result = decompressor.decompress_delta(&data);
1741        assert!(result.is_ok());
1742        // Should be processed recursively, not as delta array
1743        assert_eq!(result.unwrap(), data);
1744    }
1745
1746    #[test]
1747    fn test_decompress_delta_array_requires_both_base_and_type() {
1748        let decompressor = StreamingDecompressor::new();
1749
1750        // Has delta_base but not delta_type
1751        let data1 = json!([
1752            {"delta_base": 100.0},
1753            1.0
1754        ]);
1755        let result1 = decompressor.decompress_delta(&data1);
1756        assert!(result1.is_ok());
1757
1758        // Has delta_type but not delta_base
1759        let data2 = json!([
1760            {"delta_type": "numeric_sequence"},
1761            1.0
1762        ]);
1763        let result2 = decompressor.decompress_delta(&data2);
1764        assert!(result2.is_ok());
1765    }
1766
1767    #[test]
1768    fn test_decompress_run_length_with_non_objects_in_array() {
1769        let decompressor = StreamingDecompressor::new();
1770
1771        // Mix of RLE objects and plain values
1772        let data = json!([
1773            {"rle_value": "a", "rle_count": 2},
1774            "plain",
1775            42,
1776            true
1777        ]);
1778
1779        let result = decompressor.decompress_run_length(&data);
1780        assert!(result.is_ok());
1781        assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
1782    }
1783
1784    #[test]
1785    fn test_decompress_run_length_integrity_check_rle_value_without_count() {
1786        let decompressor = StreamingDecompressor::new();
1787
1788        let data = json!([
1789            {"rle_value": "x"}
1790        ]);
1791
1792        let result = decompressor.decompress_run_length(&data);
1793        assert!(result.is_err());
1794    }
1795
1796    #[test]
1797    fn test_decompress_run_length_integrity_check_rle_count_without_value() {
1798        let decompressor = StreamingDecompressor::new();
1799
1800        let data = json!([
1801            {"rle_count": 3}
1802        ]);
1803
1804        let result = decompressor.decompress_run_length(&data);
1805        assert!(result.is_err());
1806    }
1807
1808    #[test]
1809    fn test_decompress_run_length_non_number_count() {
1810        let decompressor = StreamingDecompressor::new();
1811
1812        let data = json!([
1813            {"rle_value": "x", "rle_count": "three"}
1814        ]);
1815
1816        let result = decompressor.decompress_run_length(&data);
1817        assert!(result.is_err());
1818    }
1819
1820    #[test]
1821    fn test_compress_frame_with_large_data() {
1822        let mut compressor = StreamingCompressor::new();
1823
1824        let large_data = json!({
1825            "users": (0..100).map(|i| json!({
1826                "id": i,
1827                "name": format!("User{}", i),
1828                "email": format!("user{}@example.com", i),
1829                "age": 20 + (i % 50),
1830                "active": i % 2 == 0
1831            })).collect::<Vec<_>>()
1832        });
1833
1834        let frame = StreamFrame {
1835            data: large_data,
1836            priority: Priority::MEDIUM,
1837            metadata: HashMap::new(),
1838        };
1839
1840        let result = compressor.compress_frame(frame);
1841        assert!(result.is_ok());
1842
1843        let stats = compressor.get_stats();
1844        assert_eq!(stats.frames_processed, 1);
1845        assert!(stats.total_input_bytes > 1000);
1846    }
1847
1848    #[test]
1849    fn test_decompress_delta_with_very_large_deltas() {
1850        let decompressor = StreamingDecompressor::new();
1851
1852        let data = json!([
1853            {"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
1854            100_000.0,
1855            200_000.0,
1856            300_000.0
1857        ]);
1858
1859        let result = decompressor.decompress_delta(&data);
1860        assert!(result.is_ok());
1861        assert_eq!(
1862            result.unwrap(),
1863            json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
1864        );
1865    }
1866}