Skip to main content

pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8    domain::{DomainError, DomainResult},
9    stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14// Security limits to prevent decompression bomb attacks
15const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; // 10MB
18
19/// Streaming compressor that maintains compression state across frames
20#[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22    /// Primary compressor for skeleton and critical data
23    skeleton_compressor: SchemaCompressor,
24    /// Secondary compressor for non-critical data
25    content_compressor: SchemaCompressor,
26    /// Compression statistics
27    stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct CompressionStats {
32    /// Total bytes processed
33    pub total_input_bytes: usize,
34    /// Total bytes after compression
35    pub total_output_bytes: usize,
36    /// Number of frames processed
37    pub frames_processed: u32,
38    /// Compression ratio by priority level
39    pub priority_ratios: HashMap<u8, f32>,
40}
41
42/// Compressed stream frame with metadata
43#[derive(Debug, Clone)]
44pub struct CompressedFrame {
45    /// Original frame metadata
46    pub frame: StreamFrame,
47    /// Compressed data
48    pub compressed_data: CompressedData,
49    /// Decompression instructions for client
50    pub decompression_metadata: DecompressionMetadata,
51}
52
53#[derive(Debug, Clone)]
54pub struct DecompressionMetadata {
55    /// Compression strategy used
56    pub strategy: CompressionStrategy,
57    /// Dictionary indices mapping
58    pub dictionary_map: HashMap<u16, String>,
59    /// Delta base values for numeric decompression
60    pub delta_bases: HashMap<String, f64>,
61    /// Priority-specific decompression hints
62    pub priority_hints: HashMap<u8, String>,
63}
64
65impl StreamingCompressor {
66    /// Create new streaming compressor
67    pub fn new() -> Self {
68        Self {
69            skeleton_compressor: SchemaCompressor::new(),
70            content_compressor: SchemaCompressor::new(),
71            stats: CompressionStats::default(),
72        }
73    }
74
75    /// Create with custom compression strategies
76    pub fn with_strategies(
77        skeleton_strategy: CompressionStrategy,
78        content_strategy: CompressionStrategy,
79    ) -> Self {
80        Self {
81            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
82            content_compressor: SchemaCompressor::with_strategy(content_strategy),
83            stats: CompressionStats::default(),
84        }
85    }
86
87    /// Process and compress a stream frame based on its priority
88    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
89        let compressor = self.select_compressor_for_priority(frame.priority);
90
91        // Calculate original size
92        let original_size = serde_json::to_string(&frame.data)
93            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
94            .len();
95
96        // Compress based on frame content and priority
97        let compressed_data = compressor.compress(&frame.data)?;
98
99        // Update statistics
100        self.update_stats(
101            frame.priority,
102            original_size,
103            compressed_data.compressed_size,
104        );
105
106        // Create decompression metadata
107        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
108
109        Ok(CompressedFrame {
110            frame,
111            compressed_data,
112            decompression_metadata,
113        })
114    }
115
116    /// Analyze JSON data to optimize compression strategies
117    pub fn optimize_for_data(
118        &mut self,
119        skeleton: &JsonValue,
120        sample_data: &[JsonValue],
121    ) -> DomainResult<()> {
122        // Optimize skeleton compressor for critical structural data
123        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
124
125        // Analyze sample content data to optimize content compressor
126        if !sample_data.is_empty() {
127            // Combine samples for comprehensive analysis
128            let combined_sample = JsonValue::Array(sample_data.to_vec());
129            self.content_compressor
130                .analyze_and_optimize(&combined_sample)?;
131        }
132
133        Ok(())
134    }
135
136    /// Get current compression statistics
137    pub fn get_stats(&self) -> &CompressionStats {
138        &self.stats
139    }
140
141    /// Reset compression statistics
142    pub fn reset_stats(&mut self) {
143        self.stats = CompressionStats::default();
144    }
145
146    /// Select appropriate compressor based on frame priority
147    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
148        match priority {
149            // Critical data (skeleton, errors) - use specialized compressor
150            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
151            // Regular content data - use content compressor
152            _ => &mut self.content_compressor,
153        }
154    }
155
156    /// Update compression statistics
157    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
158        self.stats.total_input_bytes += original_size;
159        self.stats.total_output_bytes += compressed_size;
160        self.stats.frames_processed += 1;
161
162        let ratio = if original_size > 0 {
163            compressed_size as f32 / original_size as f32
164        } else {
165            1.0
166        };
167
168        self.stats.priority_ratios.insert(priority.value(), ratio);
169    }
170
171    /// Create decompression metadata for client
172    fn create_decompression_metadata(
173        &self,
174        compressed_data: &CompressedData,
175    ) -> DomainResult<DecompressionMetadata> {
176        let mut dictionary_map = HashMap::new();
177        let mut delta_bases = HashMap::new();
178
179        // Extract dictionary mappings
180        for (key, value) in &compressed_data.compression_metadata {
181            if let Some(suffix) = key.strip_prefix("dict_") {
182                if let Ok(index) = suffix.parse::<u16>()
183                    && let Some(string_val) = value.as_str()
184                {
185                    dictionary_map.insert(index, string_val.to_string());
186                }
187            } else if let Some(path) = key.strip_prefix("base_")
188                && let Some(num) = value.as_f64()
189            {
190                delta_bases.insert(path.to_string(), num);
191            }
192        }
193
194        Ok(DecompressionMetadata {
195            strategy: compressed_data.strategy.clone(),
196            dictionary_map,
197            delta_bases,
198            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
199        })
200    }
201}
202
203impl CompressionStats {
204    /// Calculate overall compression ratio
205    pub fn overall_compression_ratio(&self) -> f32 {
206        if self.total_input_bytes == 0 {
207            return 1.0;
208        }
209        self.total_output_bytes as f32 / self.total_input_bytes as f32
210    }
211
212    /// Get compression ratio for specific priority level
213    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
214        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
215    }
216
217    /// Calculate bytes saved
218    pub fn bytes_saved(&self) -> isize {
219        self.total_input_bytes as isize - self.total_output_bytes as isize
220    }
221
222    /// Calculate percentage saved
223    pub fn percentage_saved(&self) -> f32 {
224        if self.total_input_bytes == 0 {
225            return 0.0;
226        }
227        let ratio = self.overall_compression_ratio();
228        (1.0 - ratio) * 100.0
229    }
230}
231
232/// Client-side decompressor for receiving compressed frames
233#[derive(Debug, Clone)]
234pub struct StreamingDecompressor {
235    /// Active dictionary for string decompression
236    active_dictionary: HashMap<u16, String>,
237    /// Delta base values for numeric decompression  
238    delta_bases: HashMap<String, f64>,
239    /// Decompression statistics
240    stats: DecompressionStats,
241}
242
243#[derive(Debug, Clone, Default)]
244pub struct DecompressionStats {
245    /// Total frames decompressed
246    pub frames_decompressed: u32,
247    /// Total bytes decompressed
248    pub total_decompressed_bytes: usize,
249    /// Average decompression time in microseconds
250    pub avg_decompression_time_us: u64,
251}
252
253impl StreamingDecompressor {
254    /// Create new streaming decompressor
255    pub fn new() -> Self {
256        Self {
257            active_dictionary: HashMap::new(),
258            delta_bases: HashMap::new(),
259            stats: DecompressionStats::default(),
260        }
261    }
262
263    /// Decompress a compressed frame
264    pub fn decompress_frame(
265        &mut self,
266        compressed_frame: CompressedFrame,
267    ) -> DomainResult<StreamFrame> {
268        let start_time = std::time::Instant::now();
269
270        // Update decompression context with metadata
271        self.update_context(&compressed_frame.decompression_metadata)?;
272
273        // Decompress data based on strategy
274        let decompressed_data = self.decompress_data(
275            &compressed_frame.compressed_data,
276            &compressed_frame.decompression_metadata.strategy,
277        )?;
278
279        // Update statistics
280        let decompression_time = start_time.elapsed();
281        self.update_decompression_stats(&decompressed_data, decompression_time);
282
283        Ok(StreamFrame {
284            data: decompressed_data,
285            priority: compressed_frame.frame.priority,
286            metadata: compressed_frame.frame.metadata,
287        })
288    }
289
290    /// Update decompression context with new metadata
291    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
292        // Update dictionary
293        for (&index, string) in &metadata.dictionary_map {
294            self.active_dictionary.insert(index, string.clone());
295        }
296
297        // Update delta bases
298        for (path, &base) in &metadata.delta_bases {
299            self.delta_bases.insert(path.clone(), base);
300        }
301
302        Ok(())
303    }
304
305    /// Decompress data according to strategy
306    fn decompress_data(
307        &self,
308        compressed_data: &CompressedData,
309        strategy: &CompressionStrategy,
310    ) -> DomainResult<JsonValue> {
311        match strategy {
312            CompressionStrategy::None => Ok(compressed_data.data.clone()),
313
314            CompressionStrategy::Dictionary { .. } => {
315                self.decompress_dictionary(&compressed_data.data)
316            }
317
318            CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
319
320            CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
321
322            CompressionStrategy::Hybrid { .. } => {
323                // Apply decompression in reverse order: delta first, then dictionary
324                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
325                self.decompress_dictionary(&delta_decompressed)
326            }
327        }
328    }
329
330    /// Decompress dictionary-encoded strings
331    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
332        match data {
333            JsonValue::Object(obj) => {
334                let mut decompressed = serde_json::Map::new();
335                for (key, value) in obj {
336                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
337                }
338                Ok(JsonValue::Object(decompressed))
339            }
340            JsonValue::Array(arr) => {
341                let decompressed: Result<Vec<_>, _> = arr
342                    .iter()
343                    .map(|item| self.decompress_dictionary(item))
344                    .collect();
345                Ok(JsonValue::Array(decompressed?))
346            }
347            JsonValue::Number(n) => {
348                // Check if this is a dictionary index
349                if let Some(index) = n.as_u64()
350                    && let Some(string_val) = self.active_dictionary.get(&(index as u16))
351                {
352                    return Ok(JsonValue::String(string_val.clone()));
353                }
354                Ok(data.clone())
355            }
356            _ => Ok(data.clone()),
357        }
358    }
359
360    /// Decompress delta-encoded values
361    pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
362        match data {
363            JsonValue::Object(obj) => {
364                let mut decompressed_obj = serde_json::Map::new();
365                for (key, value) in obj {
366                    decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
367                }
368                Ok(JsonValue::Object(decompressed_obj))
369            }
370            JsonValue::Array(arr) => {
371                if arr.is_empty() {
372                    return Ok(JsonValue::Array(arr.clone()));
373                }
374
375                // Check if this is a delta-compressed array
376                if let Some(first) = arr.first()
377                    && let Some(obj) = first.as_object()
378                    && obj.contains_key("delta_base")
379                    && obj.contains_key("delta_type")
380                {
381                    // This is a delta-compressed numeric sequence
382                    return self.decompress_delta_array(arr);
383                }
384
385                // Not a delta-compressed array, process elements recursively
386                let decompressed_arr: Result<Vec<_>, _> =
387                    arr.iter().map(|item| self.decompress_delta(item)).collect();
388                Ok(JsonValue::Array(decompressed_arr?))
389            }
390            _ => Ok(data.clone()),
391        }
392    }
393
394    /// Decompress delta-encoded array back to original values
395    fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
396        if arr.is_empty() {
397            return Ok(JsonValue::Array(Vec::new()));
398        }
399
400        // VULN-002 FIX: Validate array size to prevent memory exhaustion
401        if arr.len() > MAX_DELTA_ARRAY_SIZE {
402            return Err(DomainError::CompressionError(format!(
403                "Delta array size {} exceeds maximum {}",
404                arr.len(),
405                MAX_DELTA_ARRAY_SIZE
406            )));
407        }
408
409        // Extract base value from metadata
410        let base_value = arr[0]
411            .get("delta_base")
412            .and_then(|v| v.as_f64())
413            .ok_or_else(|| {
414                DomainError::CompressionError(
415                    "Missing or invalid delta_base in metadata".to_string(),
416                )
417            })?;
418
419        // Reconstruct original values from deltas
420        let mut original_values = Vec::new();
421        for delta_value in arr.iter().skip(1) {
422            let delta = delta_value.as_f64().ok_or_else(|| {
423                DomainError::CompressionError("Invalid delta value: expected number".to_string())
424            })?;
425
426            let original = base_value + delta;
427            original_values.push(JsonValue::from(original));
428        }
429
430        Ok(JsonValue::Array(original_values))
431    }
432
433    /// Decompress run-length encoded data
434    pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
435        match data {
436            JsonValue::Object(obj) => {
437                let mut decompressed_obj = serde_json::Map::new();
438                for (key, value) in obj {
439                    decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
440                }
441                Ok(JsonValue::Object(decompressed_obj))
442            }
443            JsonValue::Array(arr) => {
444                let mut decompressed_values = Vec::new();
445                let mut total_size = 0usize;
446
447                for item in arr {
448                    if let Some(obj) = item.as_object() {
449                        // Validate RLE object integrity: both keys must be present or both absent
450                        let has_rle_value = obj.contains_key("rle_value");
451                        let has_rle_count = obj.contains_key("rle_count");
452
453                        if has_rle_value && !has_rle_count {
454                            return Err(DomainError::CompressionError(
455                                "Malformed RLE object: rle_value without rle_count".to_string(),
456                            ));
457                        }
458                        if has_rle_count && !has_rle_value {
459                            return Err(DomainError::CompressionError(
460                                "Malformed RLE object: rle_count without rle_value".to_string(),
461                            ));
462                        }
463
464                        // Check if this is an RLE-encoded run
465                        if has_rle_value && has_rle_count {
466                            let value = obj
467                                .get("rle_value")
468                                .ok_or_else(|| {
469                                    DomainError::CompressionError("Missing rle_value".to_string())
470                                })?
471                                .clone();
472
473                            let count =
474                                obj.get("rle_count")
475                                    .and_then(|v| v.as_u64())
476                                    .ok_or_else(|| {
477                                        DomainError::CompressionError(
478                                            "Invalid rle_count: expected positive integer"
479                                                .to_string(),
480                                        )
481                                    })?;
482
483                            // VULN-001 FIX: Validate RLE count to prevent decompression bomb
484                            if count > MAX_RLE_COUNT {
485                                return Err(DomainError::CompressionError(format!(
486                                    "RLE count {} exceeds maximum {}",
487                                    count, MAX_RLE_COUNT
488                                )));
489                            }
490
491                            // VULN-003 FIX: Convert u64 to usize safely to prevent overflow
492                            let count_usize = usize::try_from(count).map_err(|_| {
493                                DomainError::CompressionError(format!(
494                                    "RLE count {} exceeds platform maximum",
495                                    count
496                                ))
497                            })?;
498
499                            // Track total decompressed size across all RLE runs
500                            total_size = total_size.checked_add(count_usize).ok_or_else(|| {
501                                DomainError::CompressionError(
502                                    "Total decompressed size overflow".to_string(),
503                                )
504                            })?;
505
506                            if total_size > MAX_DECOMPRESSED_SIZE {
507                                return Err(DomainError::CompressionError(format!(
508                                    "Decompressed size {} exceeds maximum {}",
509                                    total_size, MAX_DECOMPRESSED_SIZE
510                                )));
511                            }
512
513                            // Expand the run
514                            for _ in 0..count {
515                                decompressed_values.push(value.clone());
516                            }
517                        } else {
518                            // Not an RLE object, process recursively
519                            decompressed_values.push(self.decompress_run_length(item)?);
520                        }
521                    } else {
522                        // Not an object, process recursively
523                        decompressed_values.push(self.decompress_run_length(item)?);
524                    }
525                }
526
527                Ok(JsonValue::Array(decompressed_values))
528            }
529            _ => Ok(data.clone()),
530        }
531    }
532
533    /// Update decompression statistics
534    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
535        self.stats.frames_decompressed += 1;
536
537        if let Ok(serialized) = serde_json::to_string(data) {
538            self.stats.total_decompressed_bytes += serialized.len();
539        }
540
541        let new_time_us = duration.as_micros() as u64;
542        if self.stats.frames_decompressed == 1 {
543            self.stats.avg_decompression_time_us = new_time_us;
544        } else {
545            // Calculate running average
546            let total_frames = self.stats.frames_decompressed as u64;
547            let total_time =
548                self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
549            self.stats.avg_decompression_time_us = total_time / total_frames;
550        }
551    }
552
553    /// Get decompression statistics
554    pub fn get_stats(&self) -> &DecompressionStats {
555        &self.stats
556    }
557}
558
559impl Default for StreamingCompressor {
560    fn default() -> Self {
561        Self::new()
562    }
563}
564
565impl Default for StreamingDecompressor {
566    fn default() -> Self {
567        Self::new()
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use serde_json::json;
575
576    #[test]
577    fn test_streaming_compressor_basic() {
578        let mut compressor = StreamingCompressor::new();
579
580        let frame = StreamFrame {
581            data: json!({
582                "message": "test message",
583                "count": 42
584            }),
585            priority: Priority::MEDIUM,
586            metadata: HashMap::new(),
587        };
588
589        let result = compressor.compress_frame(frame);
590        assert!(result.is_ok());
591
592        let compressed = result.unwrap();
593        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
594    }
595
596    #[test]
597    fn test_compression_stats() {
598        let stats = CompressionStats {
599            total_input_bytes: 1000,
600            total_output_bytes: 600,
601            ..Default::default()
602        };
603
604        assert_eq!(stats.overall_compression_ratio(), 0.6);
605        assert_eq!(stats.bytes_saved(), 400);
606        // Use approximate comparison for float precision
607        let percentage = stats.percentage_saved();
608        assert!((percentage - 40.0).abs() < 0.001);
609    }
610
611    #[test]
612    fn test_streaming_decompressor_basic() {
613        let mut decompressor = StreamingDecompressor::new();
614
615        let compressed_frame = CompressedFrame {
616            frame: StreamFrame {
617                data: json!({"test": "data"}),
618                priority: Priority::MEDIUM,
619                metadata: HashMap::new(),
620            },
621            compressed_data: CompressedData {
622                strategy: CompressionStrategy::None,
623                compressed_size: 20,
624                data: json!({"test": "data"}),
625                compression_metadata: HashMap::new(),
626            },
627            decompression_metadata: DecompressionMetadata {
628                strategy: CompressionStrategy::None,
629                dictionary_map: HashMap::new(),
630                delta_bases: HashMap::new(),
631                priority_hints: HashMap::new(),
632            },
633        };
634
635        let result = decompressor.decompress_frame(compressed_frame);
636        assert!(result.is_ok());
637
638        let decompressed = result.unwrap();
639        assert_eq!(decompressed.data, json!({"test": "data"}));
640    }
641
642    #[test]
643    fn test_dictionary_decompression() {
644        let mut decompressor = StreamingDecompressor::new();
645        decompressor
646            .active_dictionary
647            .insert(0, "hello".to_string());
648        decompressor
649            .active_dictionary
650            .insert(1, "world".to_string());
651
652        // Test with dictionary indices
653        let compressed = json!({
654            "greeting": 0,
655            "target": 1
656        });
657
658        let result = decompressor.decompress_dictionary(&compressed).unwrap();
659        assert_eq!(
660            result,
661            json!({
662                "greeting": "hello",
663                "target": "world"
664            })
665        );
666    }
667
668    #[test]
669    fn test_priority_based_compression() {
670        let mut compressor = StreamingCompressor::new();
671
672        let critical_frame = StreamFrame {
673            data: json!({"error": "critical failure"}),
674            priority: Priority::CRITICAL,
675            metadata: HashMap::new(),
676        };
677
678        let low_frame = StreamFrame {
679            data: json!({"debug": "verbose information"}),
680            priority: Priority::LOW,
681            metadata: HashMap::new(),
682        };
683
684        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
685        let _low_result = compressor.compress_frame(low_frame).unwrap();
686
687        let stats = compressor.get_stats();
688        assert_eq!(stats.frames_processed, 2);
689        assert!(stats.total_input_bytes > 0);
690    }
691
692    #[test]
693    fn test_delta_decompression_basic() {
694        let decompressor = StreamingDecompressor::new();
695
696        let compressed_data = json!([
697            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
698            0.0,
699            1.0,
700            2.0,
701            3.0,
702            4.0
703        ]);
704
705        let result = decompressor.decompress_delta(&compressed_data).unwrap();
706        assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
707    }
708
709    #[test]
710    fn test_delta_decompression_negative_deltas() {
711        let decompressor = StreamingDecompressor::new();
712
713        let compressed_data = json!([
714            {"delta_base": 50.0, "delta_type": "numeric_sequence"},
715            -10.0,
716            0.0,
717            10.0,
718            20.0
719        ]);
720
721        let result = decompressor.decompress_delta(&compressed_data).unwrap();
722        assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
723    }
724
725    #[test]
726    fn test_delta_decompression_fractional_deltas() {
727        let decompressor = StreamingDecompressor::new();
728
729        let compressed_data = json!([
730            {"delta_base": 10.0, "delta_type": "numeric_sequence"},
731            0.5,
732            1.0,
733            1.5,
734            2.0
735        ]);
736
737        let result = decompressor.decompress_delta(&compressed_data).unwrap();
738        assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
739    }
740
741    #[test]
742    fn test_delta_decompression_empty_array() {
743        let decompressor = StreamingDecompressor::new();
744
745        let compressed_data = json!([]);
746
747        let result = decompressor.decompress_delta(&compressed_data).unwrap();
748        assert_eq!(result, json!([]));
749    }
750
751    #[test]
752    fn test_delta_decompression_single_element() {
753        let decompressor = StreamingDecompressor::new();
754
755        let compressed_data = json!([
756            {"delta_base": 100.0, "delta_type": "numeric_sequence"}
757        ]);
758
759        let result = decompressor.decompress_delta(&compressed_data).unwrap();
760        assert_eq!(result, json!([]));
761    }
762
763    #[test]
764    fn test_delta_decompression_nested_structure() {
765        let decompressor = StreamingDecompressor::new();
766
767        let compressed_data = json!({
768            "sequence": [
769                {"delta_base": 100.0, "delta_type": "numeric_sequence"},
770                0.0,
771                1.0,
772                2.0
773            ],
774            "other": "data"
775        });
776
777        let result = decompressor.decompress_delta(&compressed_data).unwrap();
778        assert_eq!(
779            result,
780            json!({
781                "sequence": [100.0, 101.0, 102.0],
782                "other": "data"
783            })
784        );
785    }
786
787    #[test]
788    fn test_delta_decompression_invalid_metadata() {
789        let decompressor = StreamingDecompressor::new();
790
791        let compressed_data = json!([
792            {"wrong_key": 100.0},
793            0.0,
794            1.0
795        ]);
796
797        let result = decompressor.decompress_delta(&compressed_data);
798        assert!(result.is_ok());
799        // Should return as-is if not valid delta format
800    }
801
802    #[test]
803    fn test_delta_decompression_invalid_delta_value() {
804        let decompressor = StreamingDecompressor::new();
805
806        let compressed_data = json!([
807            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
808            "not_a_number"
809        ]);
810
811        let result = decompressor.decompress_delta(&compressed_data);
812        assert!(result.is_err());
813    }
814
815    #[test]
816    fn test_rle_decompression_basic() {
817        let decompressor = StreamingDecompressor::new();
818
819        let compressed_data = json!([
820            {"rle_value": 1, "rle_count": 3},
821            {"rle_value": 2, "rle_count": 2},
822            {"rle_value": 3, "rle_count": 4}
823        ]);
824
825        let result = decompressor
826            .decompress_run_length(&compressed_data)
827            .unwrap();
828        assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
829    }
830
831    #[test]
832    fn test_rle_decompression_mixed_runs() {
833        let decompressor = StreamingDecompressor::new();
834
835        let compressed_data = json!([
836            {"rle_value": "a", "rle_count": 2},
837            "b",
838            {"rle_value": "c", "rle_count": 3}
839        ]);
840
841        let result = decompressor
842            .decompress_run_length(&compressed_data)
843            .unwrap();
844        assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
845    }
846
847    #[test]
848    fn test_rle_decompression_single_count() {
849        let decompressor = StreamingDecompressor::new();
850
851        let compressed_data = json!([
852            {"rle_value": "x", "rle_count": 1}
853        ]);
854
855        let result = decompressor
856            .decompress_run_length(&compressed_data)
857            .unwrap();
858        assert_eq!(result, json!(["x"]));
859    }
860
861    #[test]
862    fn test_rle_decompression_zero_count() {
863        let decompressor = StreamingDecompressor::new();
864
865        let compressed_data = json!([
866            {"rle_value": "x", "rle_count": 0}
867        ]);
868
869        let result = decompressor
870            .decompress_run_length(&compressed_data)
871            .unwrap();
872        assert_eq!(result, json!([]));
873    }
874
875    #[test]
876    fn test_rle_decompression_nested_values() {
877        let decompressor = StreamingDecompressor::new();
878
879        let compressed_data = json!([
880            {"rle_value": {"name": "test"}, "rle_count": 3}
881        ]);
882
883        let result = decompressor
884            .decompress_run_length(&compressed_data)
885            .unwrap();
886        assert_eq!(
887            result,
888            json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
889        );
890    }
891
892    #[test]
893    fn test_rle_decompression_nested_structure() {
894        let decompressor = StreamingDecompressor::new();
895
896        let compressed_data = json!({
897            "data": [
898                {"rle_value": 1, "rle_count": 3},
899                {"rle_value": 2, "rle_count": 2}
900            ],
901            "other": "field"
902        });
903
904        let result = decompressor
905            .decompress_run_length(&compressed_data)
906            .unwrap();
907        assert_eq!(
908            result,
909            json!({
910                "data": [1, 1, 1, 2, 2],
911                "other": "field"
912            })
913        );
914    }
915
916    #[test]
917    fn test_rle_decompression_empty_array() {
918        let decompressor = StreamingDecompressor::new();
919
920        let compressed_data = json!([]);
921
922        let result = decompressor
923            .decompress_run_length(&compressed_data)
924            .unwrap();
925        assert_eq!(result, json!([]));
926    }
927
928    #[test]
929    fn test_rle_decompression_invalid_count() {
930        let decompressor = StreamingDecompressor::new();
931
932        let compressed_data = json!([
933            {"rle_value": "x", "rle_count": "not_a_number"}
934        ]);
935
936        let result = decompressor.decompress_run_length(&compressed_data);
937        assert!(result.is_err());
938    }
939
940    #[test]
941    fn test_rle_decompression_missing_value() {
942        let decompressor = StreamingDecompressor::new();
943
944        let compressed_data = json!([
945            {"rle_count": 3}
946        ]);
947
948        let result = decompressor.decompress_run_length(&compressed_data);
949        assert!(result.is_err());
950    }
951
952    #[test]
953    fn test_rle_decompression_missing_count() {
954        let decompressor = StreamingDecompressor::new();
955
956        let compressed_data = json!([
957            {"rle_value": "x"}
958        ]);
959
960        let result = decompressor.decompress_run_length(&compressed_data);
961        assert!(result.is_err());
962    }
963
964    #[test]
965    fn test_rle_decompression_non_rle_objects() {
966        let decompressor = StreamingDecompressor::new();
967
968        let compressed_data = json!([
969            {"regular": "object"},
970            {"another": "one"}
971        ]);
972
973        let result = decompressor
974            .decompress_run_length(&compressed_data)
975            .unwrap();
976        // Should return as-is if objects don't have RLE format
977        assert_eq!(
978            result,
979            json!([
980                {"regular": "object"},
981                {"another": "one"}
982            ])
983        );
984    }
985
986    // NEW TESTS FOR COVERAGE IMPROVEMENT (Task P2-TEST-002)
987
988    #[test]
989    fn test_compress_frame_with_custom_strategies() {
990        let mut dict = HashMap::new();
991        dict.insert("test".to_string(), 0);
992
993        let mut bases = HashMap::new();
994        bases.insert("value".to_string(), 100.0);
995
996        let mut compressor = StreamingCompressor::with_strategies(
997            CompressionStrategy::Dictionary { dictionary: dict },
998            CompressionStrategy::Delta { base_values: bases },
999        );
1000
1001        let frame = StreamFrame {
1002            data: json!({"value": 123, "other": 456}),
1003            priority: Priority::HIGH,
1004            metadata: HashMap::new(),
1005        };
1006
1007        let result = compressor.compress_frame(frame);
1008        assert!(result.is_ok());
1009        assert_eq!(compressor.stats.frames_processed, 1);
1010    }
1011
1012    #[test]
1013    fn test_optimize_for_data_with_samples() {
1014        let mut compressor = StreamingCompressor::new();
1015
1016        let skeleton = json!({
1017            "id": null,
1018            "name": null
1019        });
1020
1021        let samples = vec![
1022            json!({"id": 1, "name": "test1"}),
1023            json!({"id": 2, "name": "test2"}),
1024            json!({"id": 3, "name": "test3"}),
1025        ];
1026
1027        let result = compressor.optimize_for_data(&skeleton, &samples);
1028        assert!(result.is_ok());
1029    }
1030
1031    #[test]
1032    fn test_optimize_for_data_empty_samples() {
1033        let mut compressor = StreamingCompressor::new();
1034
1035        let skeleton = json!({"key": "value"});
1036        let result = compressor.optimize_for_data(&skeleton, &[]);
1037        assert!(result.is_ok());
1038    }
1039
1040    #[test]
1041    fn test_reset_stats() {
1042        let mut compressor = StreamingCompressor::new();
1043
1044        compressor.stats.total_input_bytes = 1000;
1045        compressor.stats.total_output_bytes = 500;
1046        compressor.stats.frames_processed = 10;
1047
1048        compressor.reset_stats();
1049
1050        assert_eq!(compressor.stats.total_input_bytes, 0);
1051        assert_eq!(compressor.stats.total_output_bytes, 0);
1052        assert_eq!(compressor.stats.frames_processed, 0);
1053    }
1054
1055    #[test]
1056    fn test_compressor_critical_vs_low_priority() {
1057        let mut compressor = StreamingCompressor::new();
1058
1059        let critical_frame = StreamFrame {
1060            data: json!({"critical": "data"}),
1061            priority: Priority::CRITICAL,
1062            metadata: HashMap::new(),
1063        };
1064
1065        let low_frame = StreamFrame {
1066            data: json!({"low": "data"}),
1067            priority: Priority::LOW,
1068            metadata: HashMap::new(),
1069        };
1070
1071        compressor.compress_frame(critical_frame).unwrap();
1072        compressor.compress_frame(low_frame).unwrap();
1073
1074        assert_eq!(compressor.stats.frames_processed, 2);
1075    }
1076
1077    #[test]
1078    fn test_decompressor_hybrid_strategy() {
1079        let mut decompressor = StreamingDecompressor::new();
1080
1081        // Setup delta bases and dictionary
1082        decompressor.delta_bases.insert("value".to_string(), 100.0);
1083        decompressor.active_dictionary.insert(0, "test".to_string());
1084
1085        let mut string_dict = HashMap::new();
1086        string_dict.insert("test".to_string(), 0);
1087
1088        let mut numeric_deltas = HashMap::new();
1089        numeric_deltas.insert("value".to_string(), 100.0);
1090
1091        let compressed_frame = CompressedFrame {
1092            frame: StreamFrame {
1093                data: json!({"test": "data"}),
1094                priority: Priority::MEDIUM,
1095                metadata: HashMap::new(),
1096            },
1097            compressed_data: CompressedData {
1098                strategy: CompressionStrategy::Hybrid {
1099                    string_dict: string_dict.clone(),
1100                    numeric_deltas: numeric_deltas.clone(),
1101                },
1102                compressed_size: 20,
1103                data: json!({"value": 5.0}), // Delta from base 100
1104                compression_metadata: HashMap::new(),
1105            },
1106            decompression_metadata: DecompressionMetadata {
1107                strategy: CompressionStrategy::Hybrid {
1108                    string_dict,
1109                    numeric_deltas,
1110                },
1111                dictionary_map: HashMap::new(),
1112                delta_bases: HashMap::new(),
1113                priority_hints: HashMap::new(),
1114            },
1115        };
1116
1117        let result = decompressor.decompress_frame(compressed_frame);
1118        assert!(result.is_ok());
1119    }
1120
1121    #[test]
1122    fn test_decompress_dictionary_nested_arrays() {
1123        let mut decompressor = StreamingDecompressor::new();
1124        decompressor
1125            .active_dictionary
1126            .insert(0, "item1".to_string());
1127        decompressor
1128            .active_dictionary
1129            .insert(1, "item2".to_string());
1130
1131        let data = json!([[0, 1], [1, 0]]);
1132        let result = decompressor.decompress_dictionary(&data).unwrap();
1133
1134        assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
1135    }
1136
1137    #[test]
1138    fn test_decompress_dictionary_non_index_numbers() {
1139        let mut decompressor = StreamingDecompressor::new();
1140        decompressor.active_dictionary.insert(0, "test".to_string());
1141
1142        // Number that doesn't map to dictionary
1143        let data = json!({"value": 999});
1144        let result = decompressor.decompress_dictionary(&data).unwrap();
1145
1146        // Should remain as number since not in dictionary
1147        assert_eq!(result, json!({"value": 999}));
1148    }
1149
1150    #[test]
1151    fn test_decompress_delta_non_array() {
1152        let decompressor = StreamingDecompressor::new();
1153
1154        // Non-array delta encoding (passthrough)
1155        let data = json!({"key": "value"});
1156        let result = decompressor.decompress_delta(&data).unwrap();
1157
1158        assert_eq!(result, json!({"key": "value"}));
1159    }
1160
1161    #[test]
1162    fn test_decompress_delta_array_without_metadata() {
1163        let decompressor = StreamingDecompressor::new();
1164
1165        // Array without delta metadata
1166        let data = json!([1, 2, 3, 4]);
1167        let result = decompressor.decompress_delta(&data).unwrap();
1168
1169        assert_eq!(result, json!([1, 2, 3, 4]));
1170    }
1171
1172    #[test]
1173    fn test_decompress_run_length_nested_objects() {
1174        let decompressor = StreamingDecompressor::new();
1175
1176        let data = json!({
1177            "outer": {
1178                "inner": [
1179                    {"rle_value": {"nested": "obj"}, "rle_count": 2}
1180                ]
1181            }
1182        });
1183
1184        let result = decompressor.decompress_run_length(&data).unwrap();
1185        assert_eq!(
1186            result,
1187            json!({
1188                "outer": {
1189                    "inner": [{"nested": "obj"}, {"nested": "obj"}]
1190                }
1191            })
1192        );
1193    }
1194
1195    #[test]
1196    fn test_decompression_stats_tracking() {
1197        let mut decompressor = StreamingDecompressor::new();
1198
1199        assert_eq!(decompressor.stats.frames_decompressed, 0);
1200
1201        let frame = CompressedFrame {
1202            frame: StreamFrame {
1203                data: json!({"test": "data"}),
1204                priority: Priority::MEDIUM,
1205                metadata: HashMap::new(),
1206            },
1207            compressed_data: CompressedData {
1208                strategy: CompressionStrategy::None,
1209                compressed_size: 15,
1210                data: json!({"test": "data"}),
1211                compression_metadata: HashMap::new(),
1212            },
1213            decompression_metadata: DecompressionMetadata {
1214                strategy: CompressionStrategy::None,
1215                dictionary_map: HashMap::new(),
1216                delta_bases: HashMap::new(),
1217                priority_hints: HashMap::new(),
1218            },
1219        };
1220
1221        decompressor.decompress_frame(frame).unwrap();
1222
1223        assert_eq!(decompressor.stats.frames_decompressed, 1);
1224        assert!(decompressor.stats.total_decompressed_bytes > 0);
1225        assert!(decompressor.stats.avg_decompression_time_us > 0);
1226    }
1227
1228    #[test]
1229    fn test_decompress_delta_array_malformed_metadata() {
1230        let decompressor = StreamingDecompressor::new();
1231
1232        // Array with object but missing delta_base - passes through as-is
1233        let data = json!([
1234            {"delta_type": "numeric_sequence"},
1235            1.0,
1236            2.0
1237        ]);
1238
1239        let result = decompressor.decompress_delta(&data);
1240        // Without both delta_base and delta_type matching, it's treated as regular array
1241        assert!(result.is_ok());
1242        // Should return as-is since it doesn't match delta format
1243        assert_eq!(result.unwrap(), data);
1244    }
1245
1246    #[test]
1247    fn test_decompress_run_length_large_count() {
1248        let decompressor = StreamingDecompressor::new();
1249
1250        // Within MAX_RLE_COUNT
1251        let data = json!([
1252            {"rle_value": "x", "rle_count": 1000}
1253        ]);
1254
1255        let result = decompressor.decompress_run_length(&data);
1256        assert!(result.is_ok());
1257        let decompressed = result.unwrap();
1258        if let Some(arr) = decompressed.as_array() {
1259            assert_eq!(arr.len(), 1000);
1260        }
1261    }
1262
1263    #[test]
1264    fn test_decompress_run_length_exceeds_max_count() {
1265        let decompressor = StreamingDecompressor::new();
1266
1267        // Exceeds MAX_RLE_COUNT (100_000)
1268        let data = json!([
1269            {"rle_value": "x", "rle_count": 200_000}
1270        ]);
1271
1272        let result = decompressor.decompress_run_length(&data);
1273        assert!(result.is_err()); // Should error
1274    }
1275
1276    #[test]
1277    fn test_decompress_run_length_cumulative_overflow() {
1278        let decompressor = StreamingDecompressor::new();
1279
1280        // Multiple runs that sum to exceed MAX_DECOMPRESSED_SIZE
1281        let data = json!([
1282            {"rle_value": "a", "rle_count": 5_000_000},
1283            {"rle_value": "b", "rle_count": 6_000_000}
1284        ]);
1285
1286        let result = decompressor.decompress_run_length(&data);
1287        // Should error when cumulative size exceeds limit
1288        assert!(result.is_err());
1289    }
1290
1291    #[test]
1292    fn test_decompress_delta_array_size_limit() {
1293        let decompressor = StreamingDecompressor::new();
1294
1295        // Create very large array (exceeds MAX_DELTA_ARRAY_SIZE)
1296        let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
1297        for _i in 0..1_000_001 {
1298            large_array.push(json!(0.0));
1299        }
1300
1301        let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
1302        assert!(result.is_err()); // Should error on size limit
1303    }
1304
1305    #[test]
1306    fn test_compression_stats_default() {
1307        let stats = CompressionStats::default();
1308        assert_eq!(stats.total_input_bytes, 0);
1309        assert_eq!(stats.total_output_bytes, 0);
1310        assert_eq!(stats.frames_processed, 0);
1311        assert_eq!(stats.overall_compression_ratio(), 1.0);
1312    }
1313
1314    #[test]
1315    fn test_decompression_stats_default() {
1316        let stats = DecompressionStats::default();
1317        assert_eq!(stats.frames_decompressed, 0);
1318        assert_eq!(stats.total_decompressed_bytes, 0);
1319        assert_eq!(stats.avg_decompression_time_us, 0);
1320    }
1321
1322    // ============================================================================
1323    // Additional coverage tests for P2-TEST-002 (70%+ coverage target)
1324    // ============================================================================
1325
1326    #[test]
1327    fn test_decompress_dictionary_with_strings() {
1328        let decompressor = StreamingDecompressor::new();
1329        let data = json!({"key": "value", "nested": {"inner": "string"}});
1330        let result = decompressor.decompress_dictionary(&data);
1331        assert!(result.is_ok());
1332        assert_eq!(result.unwrap(), data);
1333    }
1334
1335    #[test]
1336    fn test_decompress_dictionary_with_null() {
1337        let decompressor = StreamingDecompressor::new();
1338        let data = json!(null);
1339        let result = decompressor.decompress_dictionary(&data);
1340        assert!(result.is_ok());
1341        assert_eq!(result.unwrap(), json!(null));
1342    }
1343
1344    #[test]
1345    fn test_decompress_dictionary_with_boolean() {
1346        let decompressor = StreamingDecompressor::new();
1347        let data = json!(true);
1348        let result = decompressor.decompress_dictionary(&data);
1349        assert!(result.is_ok());
1350        assert_eq!(result.unwrap(), json!(true));
1351    }
1352
1353    #[test]
1354    fn test_decompress_dictionary_with_string() {
1355        let decompressor = StreamingDecompressor::new();
1356        let data = json!("plain string");
1357        let result = decompressor.decompress_dictionary(&data);
1358        assert!(result.is_ok());
1359        assert_eq!(result.unwrap(), json!("plain string"));
1360    }
1361
1362    #[test]
1363    fn test_decompress_delta_with_object_no_array() {
1364        let decompressor = StreamingDecompressor::new();
1365        let data = json!({"key": "value"});
1366        let result = decompressor.decompress_delta(&data);
1367        assert!(result.is_ok());
1368        assert_eq!(result.unwrap(), json!({"key": "value"}));
1369    }
1370
1371    #[test]
1372    fn test_decompress_delta_with_primitive_values() {
1373        let decompressor = StreamingDecompressor::new();
1374
1375        assert_eq!(
1376            decompressor.decompress_delta(&json!("string")).unwrap(),
1377            json!("string")
1378        );
1379        assert_eq!(
1380            decompressor.decompress_delta(&json!(42)).unwrap(),
1381            json!(42)
1382        );
1383        assert_eq!(
1384            decompressor.decompress_delta(&json!(true)).unwrap(),
1385            json!(true)
1386        );
1387        assert_eq!(
1388            decompressor.decompress_delta(&json!(null)).unwrap(),
1389            json!(null)
1390        );
1391    }
1392
1393    #[test]
1394    fn test_decompress_run_length_with_primitive_values() {
1395        let decompressor = StreamingDecompressor::new();
1396
1397        assert_eq!(
1398            decompressor
1399                .decompress_run_length(&json!("string"))
1400                .unwrap(),
1401            json!("string")
1402        );
1403        assert_eq!(
1404            decompressor.decompress_run_length(&json!(123)).unwrap(),
1405            json!(123)
1406        );
1407        assert_eq!(
1408            decompressor.decompress_run_length(&json!(false)).unwrap(),
1409            json!(false)
1410        );
1411        assert_eq!(
1412            decompressor.decompress_run_length(&json!(null)).unwrap(),
1413            json!(null)
1414        );
1415    }
1416
1417    #[test]
1418    fn test_decompress_data_strategy_dictionary() {
1419        let mut decompressor = StreamingDecompressor::new();
1420        decompressor.active_dictionary.insert(0, "test".to_string());
1421
1422        let mut dict = HashMap::new();
1423        dict.insert("test".to_string(), 0);
1424
1425        let compressed_data = CompressedData {
1426            strategy: CompressionStrategy::Dictionary { dictionary: dict },
1427            compressed_size: 10,
1428            data: json!({"field": 0}),
1429            compression_metadata: HashMap::new(),
1430        };
1431
1432        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1433        assert!(result.is_ok());
1434    }
1435
1436    #[test]
1437    fn test_decompress_data_strategy_delta() {
1438        let decompressor = StreamingDecompressor::new();
1439
1440        let mut bases = HashMap::new();
1441        bases.insert("value".to_string(), 100.0);
1442
1443        let compressed_data = CompressedData {
1444            strategy: CompressionStrategy::Delta {
1445                base_values: bases.clone(),
1446            },
1447            compressed_size: 10,
1448            data: json!({
1449                "sequence": [
1450                    {"delta_base": 100.0, "delta_type": "numeric_sequence"},
1451                    5.0,
1452                    10.0
1453                ]
1454            }),
1455            compression_metadata: HashMap::new(),
1456        };
1457
1458        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1459        assert!(result.is_ok());
1460    }
1461
1462    #[test]
1463    fn test_decompress_data_strategy_run_length() {
1464        let decompressor = StreamingDecompressor::new();
1465
1466        let compressed_data = CompressedData {
1467            strategy: CompressionStrategy::RunLength,
1468            compressed_size: 10,
1469            data: json!([
1470                {"rle_value": "x", "rle_count": 3}
1471            ]),
1472            compression_metadata: HashMap::new(),
1473        };
1474
1475        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1476        assert!(result.is_ok());
1477        assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
1478    }
1479
1480    #[test]
1481    fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
1482        let mut decompressor = StreamingDecompressor::new();
1483        decompressor.active_dictionary.insert(0, "test".to_string());
1484
1485        let mut string_dict = HashMap::new();
1486        string_dict.insert("test".to_string(), 0);
1487
1488        let mut numeric_deltas = HashMap::new();
1489        numeric_deltas.insert("value".to_string(), 100.0);
1490
1491        let compressed_data = CompressedData {
1492            strategy: CompressionStrategy::Hybrid {
1493                string_dict,
1494                numeric_deltas,
1495            },
1496            compressed_size: 10,
1497            data: json!({
1498                "field": 0
1499            }),
1500            compression_metadata: HashMap::new(),
1501        };
1502
1503        let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1504        assert!(result.is_ok());
1505    }
1506
1507    #[test]
1508    fn test_select_compressor_critical_priority() {
1509        let mut compressor = StreamingCompressor::new();
1510        let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
1511    }
1512
1513    #[test]
1514    fn test_select_compressor_high_priority() {
1515        let mut compressor = StreamingCompressor::new();
1516        let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
1517    }
1518
1519    #[test]
1520    fn test_select_compressor_medium_priority() {
1521        let mut compressor = StreamingCompressor::new();
1522        let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
1523    }
1524
1525    #[test]
1526    fn test_select_compressor_low_priority() {
1527        let mut compressor = StreamingCompressor::new();
1528        let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
1529    }
1530
1531    #[test]
1532    fn test_select_compressor_background_priority() {
1533        let mut compressor = StreamingCompressor::new();
1534        let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
1535    }
1536
1537    #[test]
1538    fn test_update_stats_with_zero_original_size() {
1539        let mut compressor = StreamingCompressor::new();
1540        compressor.update_stats(Priority::MEDIUM, 0, 10);
1541
1542        let stats = compressor.get_stats();
1543        assert_eq!(stats.frames_processed, 1);
1544        assert_eq!(stats.total_input_bytes, 0);
1545        assert_eq!(stats.total_output_bytes, 10);
1546        assert_eq!(
1547            stats.priority_compression_ratio(Priority::MEDIUM.value()),
1548            1.0
1549        );
1550    }
1551
1552    #[test]
1553    fn test_update_stats_with_normal_compression() {
1554        let mut compressor = StreamingCompressor::new();
1555        compressor.update_stats(Priority::HIGH, 1000, 500);
1556
1557        let stats = compressor.get_stats();
1558        assert_eq!(stats.frames_processed, 1);
1559        assert_eq!(stats.total_input_bytes, 1000);
1560        assert_eq!(stats.total_output_bytes, 500);
1561        assert_eq!(
1562            stats.priority_compression_ratio(Priority::HIGH.value()),
1563            0.5
1564        );
1565    }
1566
1567    #[test]
1568    fn test_update_decompression_stats_first_frame() {
1569        let mut decompressor = StreamingDecompressor::new();
1570        let data = json!({"test": "data"});
1571        let duration = std::time::Duration::from_micros(100);
1572
1573        decompressor.update_decompression_stats(&data, duration);
1574
1575        let stats = decompressor.get_stats();
1576        assert_eq!(stats.frames_decompressed, 1);
1577        assert_eq!(stats.avg_decompression_time_us, 100);
1578    }
1579
1580    #[test]
1581    fn test_update_decompression_stats_multiple_frames() {
1582        let mut decompressor = StreamingDecompressor::new();
1583        let data = json!({"test": "data"});
1584
1585        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
1586        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
1587        decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
1588
1589        let stats = decompressor.get_stats();
1590        assert_eq!(stats.frames_decompressed, 3);
1591        assert_eq!(stats.avg_decompression_time_us, 200); // (100 + 200 + 300) / 3
1592    }
1593
1594    #[test]
1595    fn test_create_decompression_metadata_with_dict() {
1596        let compressor = StreamingCompressor::new();
1597        let mut metadata = HashMap::new();
1598        metadata.insert("dict_0".to_string(), json!("hello"));
1599        metadata.insert("dict_1".to_string(), json!("world"));
1600
1601        let compressed_data = CompressedData {
1602            strategy: CompressionStrategy::None,
1603            compressed_size: 10,
1604            data: json!({}),
1605            compression_metadata: metadata,
1606        };
1607
1608        let result = compressor.create_decompression_metadata(&compressed_data);
1609        assert!(result.is_ok());
1610        let meta = result.unwrap();
1611        assert_eq!(meta.dictionary_map.len(), 2);
1612        assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
1613        assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
1614    }
1615
1616    #[test]
1617    fn test_create_decompression_metadata_with_delta_bases() {
1618        let compressor = StreamingCompressor::new();
1619        let mut metadata = HashMap::new();
1620        metadata.insert("base_value1".to_string(), json!(100.0));
1621        metadata.insert("base_value2".to_string(), json!(200.0));
1622
1623        let compressed_data = CompressedData {
1624            strategy: CompressionStrategy::None,
1625            compressed_size: 10,
1626            data: json!({}),
1627            compression_metadata: metadata,
1628        };
1629
1630        let result = compressor.create_decompression_metadata(&compressed_data);
1631        assert!(result.is_ok());
1632        let meta = result.unwrap();
1633        assert_eq!(meta.delta_bases.len(), 2);
1634        assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
1635        assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
1636    }
1637
1638    #[test]
1639    fn test_create_decompression_metadata_with_invalid_dict_index() {
1640        let compressor = StreamingCompressor::new();
1641        let mut metadata = HashMap::new();
1642        metadata.insert("dict_invalid".to_string(), json!("value"));
1643        metadata.insert("dict_0".to_string(), json!("valid"));
1644
1645        let compressed_data = CompressedData {
1646            strategy: CompressionStrategy::None,
1647            compressed_size: 10,
1648            data: json!({}),
1649            compression_metadata: metadata,
1650        };
1651
1652        let result = compressor.create_decompression_metadata(&compressed_data);
1653        assert!(result.is_ok());
1654        let meta = result.unwrap();
1655        // Only valid index should be parsed
1656        assert_eq!(meta.dictionary_map.len(), 1);
1657        assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
1658    }
1659
1660    #[test]
1661    fn test_update_context_updates_dictionary() {
1662        let mut decompressor = StreamingDecompressor::new();
1663
1664        let mut metadata = DecompressionMetadata {
1665            strategy: CompressionStrategy::None,
1666            dictionary_map: HashMap::new(),
1667            delta_bases: HashMap::new(),
1668            priority_hints: HashMap::new(),
1669        };
1670        metadata.dictionary_map.insert(0, "hello".to_string());
1671        metadata.dictionary_map.insert(1, "world".to_string());
1672
1673        let result = decompressor.update_context(&metadata);
1674        assert!(result.is_ok());
1675        assert_eq!(decompressor.active_dictionary.len(), 2);
1676        assert_eq!(
1677            decompressor.active_dictionary.get(&0),
1678            Some(&"hello".to_string())
1679        );
1680    }
1681
1682    #[test]
1683    fn test_update_context_updates_delta_bases() {
1684        let mut decompressor = StreamingDecompressor::new();
1685
1686        let mut metadata = DecompressionMetadata {
1687            strategy: CompressionStrategy::None,
1688            dictionary_map: HashMap::new(),
1689            delta_bases: HashMap::new(),
1690            priority_hints: HashMap::new(),
1691        };
1692        metadata.delta_bases.insert("value1".to_string(), 100.0);
1693        metadata.delta_bases.insert("value2".to_string(), 200.0);
1694
1695        let result = decompressor.update_context(&metadata);
1696        assert!(result.is_ok());
1697        assert_eq!(decompressor.delta_bases.len(), 2);
1698        assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
1699    }
1700
1701    #[test]
1702    fn test_decompress_dictionary_with_float_that_is_not_u64() {
1703        let mut decompressor = StreamingDecompressor::new();
1704        decompressor.active_dictionary.insert(0, "test".to_string());
1705
1706        // Float that can't be represented as u64
1707        let data = json!({"value": 1.5});
1708        let result = decompressor.decompress_dictionary(&data);
1709        assert!(result.is_ok());
1710        // Should remain as float since not a valid index
1711        assert_eq!(result.unwrap(), json!({"value": 1.5}));
1712    }
1713
1714    #[test]
1715    fn test_decompress_dictionary_with_negative_number() {
1716        let mut decompressor = StreamingDecompressor::new();
1717        decompressor.active_dictionary.insert(0, "test".to_string());
1718
1719        let data = json!({"value": -1});
1720        let result = decompressor.decompress_dictionary(&data);
1721        assert!(result.is_ok());
1722        // Negative numbers can't be indices
1723        assert_eq!(result.unwrap(), json!({"value": -1}));
1724    }
1725
1726    #[test]
1727    fn test_decompress_delta_array_checks_first_element_structure() {
1728        let decompressor = StreamingDecompressor::new();
1729
1730        // Array without proper metadata structure
1731        let data = json!([
1732            {"wrong_field": 100.0},
1733            1.0,
1734            2.0
1735        ]);
1736
1737        let result = decompressor.decompress_delta(&data);
1738        assert!(result.is_ok());
1739        // Should be processed recursively, not as delta array
1740        assert_eq!(result.unwrap(), data);
1741    }
1742
1743    #[test]
1744    fn test_decompress_delta_array_requires_both_base_and_type() {
1745        let decompressor = StreamingDecompressor::new();
1746
1747        // Has delta_base but not delta_type
1748        let data1 = json!([
1749            {"delta_base": 100.0},
1750            1.0
1751        ]);
1752        let result1 = decompressor.decompress_delta(&data1);
1753        assert!(result1.is_ok());
1754
1755        // Has delta_type but not delta_base
1756        let data2 = json!([
1757            {"delta_type": "numeric_sequence"},
1758            1.0
1759        ]);
1760        let result2 = decompressor.decompress_delta(&data2);
1761        assert!(result2.is_ok());
1762    }
1763
1764    #[test]
1765    fn test_decompress_run_length_with_non_objects_in_array() {
1766        let decompressor = StreamingDecompressor::new();
1767
1768        // Mix of RLE objects and plain values
1769        let data = json!([
1770            {"rle_value": "a", "rle_count": 2},
1771            "plain",
1772            42,
1773            true
1774        ]);
1775
1776        let result = decompressor.decompress_run_length(&data);
1777        assert!(result.is_ok());
1778        assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
1779    }
1780
1781    #[test]
1782    fn test_decompress_run_length_integrity_check_rle_value_without_count() {
1783        let decompressor = StreamingDecompressor::new();
1784
1785        let data = json!([
1786            {"rle_value": "x"}
1787        ]);
1788
1789        let result = decompressor.decompress_run_length(&data);
1790        assert!(result.is_err());
1791    }
1792
1793    #[test]
1794    fn test_decompress_run_length_integrity_check_rle_count_without_value() {
1795        let decompressor = StreamingDecompressor::new();
1796
1797        let data = json!([
1798            {"rle_count": 3}
1799        ]);
1800
1801        let result = decompressor.decompress_run_length(&data);
1802        assert!(result.is_err());
1803    }
1804
1805    #[test]
1806    fn test_decompress_run_length_non_number_count() {
1807        let decompressor = StreamingDecompressor::new();
1808
1809        let data = json!([
1810            {"rle_value": "x", "rle_count": "three"}
1811        ]);
1812
1813        let result = decompressor.decompress_run_length(&data);
1814        assert!(result.is_err());
1815    }
1816
1817    #[test]
1818    fn test_compress_frame_with_large_data() {
1819        let mut compressor = StreamingCompressor::new();
1820
1821        let large_data = json!({
1822            "users": (0..100).map(|i| json!({
1823                "id": i,
1824                "name": format!("User{}", i),
1825                "email": format!("user{}@example.com", i),
1826                "age": 20 + (i % 50),
1827                "active": i % 2 == 0
1828            })).collect::<Vec<_>>()
1829        });
1830
1831        let frame = StreamFrame {
1832            data: large_data,
1833            priority: Priority::MEDIUM,
1834            metadata: HashMap::new(),
1835        };
1836
1837        let result = compressor.compress_frame(frame);
1838        assert!(result.is_ok());
1839
1840        let stats = compressor.get_stats();
1841        assert_eq!(stats.frames_processed, 1);
1842        assert!(stats.total_input_bytes > 1000);
1843    }
1844
1845    #[test]
1846    fn test_decompress_delta_with_very_large_deltas() {
1847        let decompressor = StreamingDecompressor::new();
1848
1849        let data = json!([
1850            {"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
1851            100_000.0,
1852            200_000.0,
1853            300_000.0
1854        ]);
1855
1856        let result = decompressor.decompress_delta(&data);
1857        assert!(result.is_ok());
1858        assert_eq!(
1859            result.unwrap(),
1860            json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
1861        );
1862    }
1863}