pjson_rs/stream/
compression_integration.rs

1//! Integration of schema-based compression with PJS streaming
2//!
3//! Provides streaming-aware compression that maintains the ability
4//! to progressively decompress data as frames arrive.
5
6use crate::{
7    compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8    domain::{DomainError, DomainResult},
9    stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14// Security limits to prevent decompression bomb attacks
15const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; // 10MB
18
19/// Streaming compressor that maintains compression state across frames
20#[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22    /// Primary compressor for skeleton and critical data
23    skeleton_compressor: SchemaCompressor,
24    /// Secondary compressor for non-critical data
25    content_compressor: SchemaCompressor,
26    /// Compression statistics
27    stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct CompressionStats {
32    /// Total bytes processed
33    pub total_input_bytes: usize,
34    /// Total bytes after compression
35    pub total_output_bytes: usize,
36    /// Number of frames processed
37    pub frames_processed: u32,
38    /// Compression ratio by priority level
39    pub priority_ratios: HashMap<u8, f32>,
40}
41
42/// Compressed stream frame with metadata
43#[derive(Debug, Clone)]
44pub struct CompressedFrame {
45    /// Original frame metadata
46    pub frame: StreamFrame,
47    /// Compressed data
48    pub compressed_data: CompressedData,
49    /// Decompression instructions for client
50    pub decompression_metadata: DecompressionMetadata,
51}
52
53#[derive(Debug, Clone)]
54pub struct DecompressionMetadata {
55    /// Compression strategy used
56    pub strategy: CompressionStrategy,
57    /// Dictionary indices mapping
58    pub dictionary_map: HashMap<u16, String>,
59    /// Delta base values for numeric decompression
60    pub delta_bases: HashMap<String, f64>,
61    /// Priority-specific decompression hints
62    pub priority_hints: HashMap<u8, String>,
63}
64
65impl StreamingCompressor {
66    /// Create new streaming compressor
67    pub fn new() -> Self {
68        Self {
69            skeleton_compressor: SchemaCompressor::new(),
70            content_compressor: SchemaCompressor::new(),
71            stats: CompressionStats::default(),
72        }
73    }
74
75    /// Create with custom compression strategies
76    pub fn with_strategies(
77        skeleton_strategy: CompressionStrategy,
78        content_strategy: CompressionStrategy,
79    ) -> Self {
80        Self {
81            skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
82            content_compressor: SchemaCompressor::with_strategy(content_strategy),
83            stats: CompressionStats::default(),
84        }
85    }
86
87    /// Process and compress a stream frame based on its priority
88    pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
89        let compressor = self.select_compressor_for_priority(frame.priority);
90
91        // Calculate original size
92        let original_size = serde_json::to_string(&frame.data)
93            .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
94            .len();
95
96        // Compress based on frame content and priority
97        let compressed_data = compressor.compress(&frame.data)?;
98
99        // Update statistics
100        self.update_stats(
101            frame.priority,
102            original_size,
103            compressed_data.compressed_size,
104        );
105
106        // Create decompression metadata
107        let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
108
109        Ok(CompressedFrame {
110            frame,
111            compressed_data,
112            decompression_metadata,
113        })
114    }
115
116    /// Analyze JSON data to optimize compression strategies
117    pub fn optimize_for_data(
118        &mut self,
119        skeleton: &JsonValue,
120        sample_data: &[JsonValue],
121    ) -> DomainResult<()> {
122        // Optimize skeleton compressor for critical structural data
123        self.skeleton_compressor.analyze_and_optimize(skeleton)?;
124
125        // Analyze sample content data to optimize content compressor
126        if !sample_data.is_empty() {
127            // Combine samples for comprehensive analysis
128            let combined_sample = JsonValue::Array(sample_data.to_vec());
129            self.content_compressor
130                .analyze_and_optimize(&combined_sample)?;
131        }
132
133        Ok(())
134    }
135
136    /// Get current compression statistics
137    pub fn get_stats(&self) -> &CompressionStats {
138        &self.stats
139    }
140
141    /// Reset compression statistics
142    pub fn reset_stats(&mut self) {
143        self.stats = CompressionStats::default();
144    }
145
146    /// Select appropriate compressor based on frame priority
147    fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
148        match priority {
149            // Critical data (skeleton, errors) - use specialized compressor
150            Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
151            // Regular content data - use content compressor
152            _ => &mut self.content_compressor,
153        }
154    }
155
156    /// Update compression statistics
157    fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
158        self.stats.total_input_bytes += original_size;
159        self.stats.total_output_bytes += compressed_size;
160        self.stats.frames_processed += 1;
161
162        let ratio = if original_size > 0 {
163            compressed_size as f32 / original_size as f32
164        } else {
165            1.0
166        };
167
168        self.stats.priority_ratios.insert(priority.value(), ratio);
169    }
170
171    /// Create decompression metadata for client
172    fn create_decompression_metadata(
173        &self,
174        compressed_data: &CompressedData,
175    ) -> DomainResult<DecompressionMetadata> {
176        let mut dictionary_map = HashMap::new();
177        let mut delta_bases = HashMap::new();
178
179        // Extract dictionary mappings
180        for (key, value) in &compressed_data.compression_metadata {
181            if key.starts_with("dict_") {
182                if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
183                    && let Some(string_val) = value.as_str()
184                {
185                    dictionary_map.insert(index, string_val.to_string());
186                }
187            } else if key.starts_with("base_") {
188                let path = key.strip_prefix("base_").unwrap();
189                if let Some(num) = value.as_f64() {
190                    delta_bases.insert(path.to_string(), num);
191                }
192            }
193        }
194
195        Ok(DecompressionMetadata {
196            strategy: compressed_data.strategy.clone(),
197            dictionary_map,
198            delta_bases,
199            priority_hints: HashMap::new(), // TODO: Add priority-specific hints
200        })
201    }
202}
203
204impl CompressionStats {
205    /// Calculate overall compression ratio
206    pub fn overall_compression_ratio(&self) -> f32 {
207        if self.total_input_bytes == 0 {
208            return 1.0;
209        }
210        self.total_output_bytes as f32 / self.total_input_bytes as f32
211    }
212
213    /// Get compression ratio for specific priority level
214    pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
215        self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
216    }
217
218    /// Calculate bytes saved
219    pub fn bytes_saved(&self) -> isize {
220        self.total_input_bytes as isize - self.total_output_bytes as isize
221    }
222
223    /// Calculate percentage saved
224    pub fn percentage_saved(&self) -> f32 {
225        if self.total_input_bytes == 0 {
226            return 0.0;
227        }
228        let ratio = self.overall_compression_ratio();
229        (1.0 - ratio) * 100.0
230    }
231}
232
233/// Client-side decompressor for receiving compressed frames
234#[derive(Debug, Clone)]
235pub struct StreamingDecompressor {
236    /// Active dictionary for string decompression
237    active_dictionary: HashMap<u16, String>,
238    /// Delta base values for numeric decompression  
239    delta_bases: HashMap<String, f64>,
240    /// Decompression statistics
241    stats: DecompressionStats,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct DecompressionStats {
246    /// Total frames decompressed
247    pub frames_decompressed: u32,
248    /// Total bytes decompressed
249    pub total_decompressed_bytes: usize,
250    /// Average decompression time in microseconds
251    pub avg_decompression_time_us: u64,
252}
253
254impl StreamingDecompressor {
255    /// Create new streaming decompressor
256    pub fn new() -> Self {
257        Self {
258            active_dictionary: HashMap::new(),
259            delta_bases: HashMap::new(),
260            stats: DecompressionStats::default(),
261        }
262    }
263
264    /// Decompress a compressed frame
265    pub fn decompress_frame(
266        &mut self,
267        compressed_frame: CompressedFrame,
268    ) -> DomainResult<StreamFrame> {
269        let start_time = std::time::Instant::now();
270
271        // Update decompression context with metadata
272        self.update_context(&compressed_frame.decompression_metadata)?;
273
274        // Decompress data based on strategy
275        let decompressed_data = self.decompress_data(
276            &compressed_frame.compressed_data,
277            &compressed_frame.decompression_metadata.strategy,
278        )?;
279
280        // Update statistics
281        let decompression_time = start_time.elapsed();
282        self.update_decompression_stats(&decompressed_data, decompression_time);
283
284        Ok(StreamFrame {
285            data: decompressed_data,
286            priority: compressed_frame.frame.priority,
287            metadata: compressed_frame.frame.metadata,
288        })
289    }
290
291    /// Update decompression context with new metadata
292    fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
293        // Update dictionary
294        for (&index, string) in &metadata.dictionary_map {
295            self.active_dictionary.insert(index, string.clone());
296        }
297
298        // Update delta bases
299        for (path, &base) in &metadata.delta_bases {
300            self.delta_bases.insert(path.clone(), base);
301        }
302
303        Ok(())
304    }
305
306    /// Decompress data according to strategy
307    fn decompress_data(
308        &self,
309        compressed_data: &CompressedData,
310        strategy: &CompressionStrategy,
311    ) -> DomainResult<JsonValue> {
312        match strategy {
313            CompressionStrategy::None => Ok(compressed_data.data.clone()),
314
315            CompressionStrategy::Dictionary { .. } => {
316                self.decompress_dictionary(&compressed_data.data)
317            }
318
319            CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
320
321            CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
322
323            CompressionStrategy::Hybrid { .. } => {
324                // Apply decompression in reverse order: delta first, then dictionary
325                let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
326                self.decompress_dictionary(&delta_decompressed)
327            }
328        }
329    }
330
331    /// Decompress dictionary-encoded strings
332    fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
333        match data {
334            JsonValue::Object(obj) => {
335                let mut decompressed = serde_json::Map::new();
336                for (key, value) in obj {
337                    decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
338                }
339                Ok(JsonValue::Object(decompressed))
340            }
341            JsonValue::Array(arr) => {
342                let decompressed: Result<Vec<_>, _> = arr
343                    .iter()
344                    .map(|item| self.decompress_dictionary(item))
345                    .collect();
346                Ok(JsonValue::Array(decompressed?))
347            }
348            JsonValue::Number(n) => {
349                // Check if this is a dictionary index
350                if let Some(index) = n.as_u64()
351                    && let Some(string_val) = self.active_dictionary.get(&(index as u16))
352                {
353                    return Ok(JsonValue::String(string_val.clone()));
354                }
355                Ok(data.clone())
356            }
357            _ => Ok(data.clone()),
358        }
359    }
360
361    /// Decompress delta-encoded values
362    pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
363        match data {
364            JsonValue::Object(obj) => {
365                let mut decompressed_obj = serde_json::Map::new();
366                for (key, value) in obj {
367                    decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
368                }
369                Ok(JsonValue::Object(decompressed_obj))
370            }
371            JsonValue::Array(arr) => {
372                if arr.is_empty() {
373                    return Ok(JsonValue::Array(arr.clone()));
374                }
375
376                // Check if this is a delta-compressed array
377                if let Some(first) = arr.first()
378                    && let Some(obj) = first.as_object()
379                    && obj.contains_key("delta_base")
380                    && obj.contains_key("delta_type")
381                {
382                    // This is a delta-compressed numeric sequence
383                    return self.decompress_delta_array(arr);
384                }
385
386                // Not a delta-compressed array, process elements recursively
387                let decompressed_arr: Result<Vec<_>, _> =
388                    arr.iter().map(|item| self.decompress_delta(item)).collect();
389                Ok(JsonValue::Array(decompressed_arr?))
390            }
391            _ => Ok(data.clone()),
392        }
393    }
394
395    /// Decompress delta-encoded array back to original values
396    fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
397        if arr.is_empty() {
398            return Ok(JsonValue::Array(Vec::new()));
399        }
400
401        // VULN-002 FIX: Validate array size to prevent memory exhaustion
402        if arr.len() > MAX_DELTA_ARRAY_SIZE {
403            return Err(DomainError::CompressionError(format!(
404                "Delta array size {} exceeds maximum {}",
405                arr.len(),
406                MAX_DELTA_ARRAY_SIZE
407            )));
408        }
409
410        // Extract base value from metadata
411        let base_value = arr[0]
412            .get("delta_base")
413            .and_then(|v| v.as_f64())
414            .ok_or_else(|| {
415                DomainError::CompressionError(
416                    "Missing or invalid delta_base in metadata".to_string(),
417                )
418            })?;
419
420        // Reconstruct original values from deltas
421        let mut original_values = Vec::new();
422        for delta_value in arr.iter().skip(1) {
423            let delta = delta_value.as_f64().ok_or_else(|| {
424                DomainError::CompressionError("Invalid delta value: expected number".to_string())
425            })?;
426
427            let original = base_value + delta;
428            original_values.push(JsonValue::from(original));
429        }
430
431        Ok(JsonValue::Array(original_values))
432    }
433
434    /// Decompress run-length encoded data
435    pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
436        match data {
437            JsonValue::Object(obj) => {
438                let mut decompressed_obj = serde_json::Map::new();
439                for (key, value) in obj {
440                    decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
441                }
442                Ok(JsonValue::Object(decompressed_obj))
443            }
444            JsonValue::Array(arr) => {
445                let mut decompressed_values = Vec::new();
446                let mut total_size = 0usize;
447
448                for item in arr {
449                    if let Some(obj) = item.as_object() {
450                        // Check if this is an RLE-encoded run
451                        if obj.contains_key("rle_value") && obj.contains_key("rle_count") {
452                            let value = obj
453                                .get("rle_value")
454                                .ok_or_else(|| {
455                                    DomainError::CompressionError("Missing rle_value".to_string())
456                                })?
457                                .clone();
458
459                            let count =
460                                obj.get("rle_count")
461                                    .and_then(|v| v.as_u64())
462                                    .ok_or_else(|| {
463                                        DomainError::CompressionError(
464                                            "Invalid rle_count: expected positive integer"
465                                                .to_string(),
466                                        )
467                                    })?;
468
469                            // VULN-001 FIX: Validate RLE count to prevent decompression bomb
470                            if count > MAX_RLE_COUNT {
471                                return Err(DomainError::CompressionError(format!(
472                                    "RLE count {} exceeds maximum {}",
473                                    count, MAX_RLE_COUNT
474                                )));
475                            }
476
477                            // VULN-003 FIX: Convert u64 to usize safely to prevent overflow
478                            let count_usize = usize::try_from(count).map_err(|_| {
479                                DomainError::CompressionError(format!(
480                                    "RLE count {} exceeds platform maximum",
481                                    count
482                                ))
483                            })?;
484
485                            // Track total decompressed size across all RLE runs
486                            total_size = total_size.checked_add(count_usize).ok_or_else(|| {
487                                DomainError::CompressionError(
488                                    "Total decompressed size overflow".to_string(),
489                                )
490                            })?;
491
492                            if total_size > MAX_DECOMPRESSED_SIZE {
493                                return Err(DomainError::CompressionError(format!(
494                                    "Decompressed size {} exceeds maximum {}",
495                                    total_size, MAX_DECOMPRESSED_SIZE
496                                )));
497                            }
498
499                            // Expand the run
500                            for _ in 0..count {
501                                decompressed_values.push(value.clone());
502                            }
503                        } else {
504                            // Not an RLE object, process recursively
505                            decompressed_values.push(self.decompress_run_length(item)?);
506                        }
507                    } else {
508                        // Not an object, process recursively
509                        decompressed_values.push(self.decompress_run_length(item)?);
510                    }
511                }
512
513                Ok(JsonValue::Array(decompressed_values))
514            }
515            _ => Ok(data.clone()),
516        }
517    }
518
519    /// Update decompression statistics
520    fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
521        self.stats.frames_decompressed += 1;
522
523        if let Ok(serialized) = serde_json::to_string(data) {
524            self.stats.total_decompressed_bytes += serialized.len();
525        }
526
527        let new_time_us = duration.as_micros() as u64;
528        if self.stats.frames_decompressed == 1 {
529            self.stats.avg_decompression_time_us = new_time_us;
530        } else {
531            // Calculate running average
532            let total_frames = self.stats.frames_decompressed as u64;
533            let total_time =
534                self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
535            self.stats.avg_decompression_time_us = total_time / total_frames;
536        }
537    }
538
539    /// Get decompression statistics
540    pub fn get_stats(&self) -> &DecompressionStats {
541        &self.stats
542    }
543}
544
545impl Default for StreamingCompressor {
546    fn default() -> Self {
547        Self::new()
548    }
549}
550
551impl Default for StreamingDecompressor {
552    fn default() -> Self {
553        Self::new()
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use serde_json::json;
561
562    #[test]
563    fn test_streaming_compressor_basic() {
564        let mut compressor = StreamingCompressor::new();
565
566        let frame = StreamFrame {
567            data: json!({
568                "message": "test message",
569                "count": 42
570            }),
571            priority: Priority::MEDIUM,
572            metadata: HashMap::new(),
573        };
574
575        let result = compressor.compress_frame(frame);
576        assert!(result.is_ok());
577
578        let compressed = result.unwrap();
579        assert_eq!(compressed.frame.priority, Priority::MEDIUM);
580    }
581
582    #[test]
583    fn test_compression_stats() {
584        let stats = CompressionStats {
585            total_input_bytes: 1000,
586            total_output_bytes: 600,
587            ..Default::default()
588        };
589
590        assert_eq!(stats.overall_compression_ratio(), 0.6);
591        assert_eq!(stats.bytes_saved(), 400);
592        // Use approximate comparison for float precision
593        let percentage = stats.percentage_saved();
594        assert!((percentage - 40.0).abs() < 0.001);
595    }
596
597    #[test]
598    fn test_streaming_decompressor_basic() {
599        let mut decompressor = StreamingDecompressor::new();
600
601        let compressed_frame = CompressedFrame {
602            frame: StreamFrame {
603                data: json!({"test": "data"}),
604                priority: Priority::MEDIUM,
605                metadata: HashMap::new(),
606            },
607            compressed_data: CompressedData {
608                strategy: CompressionStrategy::None,
609                compressed_size: 20,
610                data: json!({"test": "data"}),
611                compression_metadata: HashMap::new(),
612            },
613            decompression_metadata: DecompressionMetadata {
614                strategy: CompressionStrategy::None,
615                dictionary_map: HashMap::new(),
616                delta_bases: HashMap::new(),
617                priority_hints: HashMap::new(),
618            },
619        };
620
621        let result = decompressor.decompress_frame(compressed_frame);
622        assert!(result.is_ok());
623
624        let decompressed = result.unwrap();
625        assert_eq!(decompressed.data, json!({"test": "data"}));
626    }
627
628    #[test]
629    fn test_dictionary_decompression() {
630        let mut decompressor = StreamingDecompressor::new();
631        decompressor
632            .active_dictionary
633            .insert(0, "hello".to_string());
634        decompressor
635            .active_dictionary
636            .insert(1, "world".to_string());
637
638        // Test with dictionary indices
639        let compressed = json!({
640            "greeting": 0,
641            "target": 1
642        });
643
644        let result = decompressor.decompress_dictionary(&compressed).unwrap();
645        assert_eq!(
646            result,
647            json!({
648                "greeting": "hello",
649                "target": "world"
650            })
651        );
652    }
653
654    #[test]
655    fn test_priority_based_compression() {
656        let mut compressor = StreamingCompressor::new();
657
658        let critical_frame = StreamFrame {
659            data: json!({"error": "critical failure"}),
660            priority: Priority::CRITICAL,
661            metadata: HashMap::new(),
662        };
663
664        let low_frame = StreamFrame {
665            data: json!({"debug": "verbose information"}),
666            priority: Priority::LOW,
667            metadata: HashMap::new(),
668        };
669
670        let _critical_result = compressor.compress_frame(critical_frame).unwrap();
671        let _low_result = compressor.compress_frame(low_frame).unwrap();
672
673        let stats = compressor.get_stats();
674        assert_eq!(stats.frames_processed, 2);
675        assert!(stats.total_input_bytes > 0);
676    }
677
678    #[test]
679    fn test_delta_decompression_basic() {
680        let decompressor = StreamingDecompressor::new();
681
682        let compressed_data = json!([
683            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
684            0.0,
685            1.0,
686            2.0,
687            3.0,
688            4.0
689        ]);
690
691        let result = decompressor.decompress_delta(&compressed_data).unwrap();
692        assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
693    }
694
695    #[test]
696    fn test_delta_decompression_negative_deltas() {
697        let decompressor = StreamingDecompressor::new();
698
699        let compressed_data = json!([
700            {"delta_base": 50.0, "delta_type": "numeric_sequence"},
701            -10.0,
702            0.0,
703            10.0,
704            20.0
705        ]);
706
707        let result = decompressor.decompress_delta(&compressed_data).unwrap();
708        assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
709    }
710
711    #[test]
712    fn test_delta_decompression_fractional_deltas() {
713        let decompressor = StreamingDecompressor::new();
714
715        let compressed_data = json!([
716            {"delta_base": 10.0, "delta_type": "numeric_sequence"},
717            0.5,
718            1.0,
719            1.5,
720            2.0
721        ]);
722
723        let result = decompressor.decompress_delta(&compressed_data).unwrap();
724        assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
725    }
726
727    #[test]
728    fn test_delta_decompression_empty_array() {
729        let decompressor = StreamingDecompressor::new();
730
731        let compressed_data = json!([]);
732
733        let result = decompressor.decompress_delta(&compressed_data).unwrap();
734        assert_eq!(result, json!([]));
735    }
736
737    #[test]
738    fn test_delta_decompression_single_element() {
739        let decompressor = StreamingDecompressor::new();
740
741        let compressed_data = json!([
742            {"delta_base": 100.0, "delta_type": "numeric_sequence"}
743        ]);
744
745        let result = decompressor.decompress_delta(&compressed_data).unwrap();
746        assert_eq!(result, json!([]));
747    }
748
749    #[test]
750    fn test_delta_decompression_nested_structure() {
751        let decompressor = StreamingDecompressor::new();
752
753        let compressed_data = json!({
754            "sequence": [
755                {"delta_base": 100.0, "delta_type": "numeric_sequence"},
756                0.0,
757                1.0,
758                2.0
759            ],
760            "other": "data"
761        });
762
763        let result = decompressor.decompress_delta(&compressed_data).unwrap();
764        assert_eq!(
765            result,
766            json!({
767                "sequence": [100.0, 101.0, 102.0],
768                "other": "data"
769            })
770        );
771    }
772
773    #[test]
774    fn test_delta_decompression_invalid_metadata() {
775        let decompressor = StreamingDecompressor::new();
776
777        let compressed_data = json!([
778            {"wrong_key": 100.0},
779            0.0,
780            1.0
781        ]);
782
783        let result = decompressor.decompress_delta(&compressed_data);
784        assert!(result.is_ok());
785        // Should return as-is if not valid delta format
786    }
787
788    #[test]
789    fn test_delta_decompression_invalid_delta_value() {
790        let decompressor = StreamingDecompressor::new();
791
792        let compressed_data = json!([
793            {"delta_base": 100.0, "delta_type": "numeric_sequence"},
794            "not_a_number"
795        ]);
796
797        let result = decompressor.decompress_delta(&compressed_data);
798        assert!(result.is_err());
799    }
800
801    #[test]
802    fn test_rle_decompression_basic() {
803        let decompressor = StreamingDecompressor::new();
804
805        let compressed_data = json!([
806            {"rle_value": 1, "rle_count": 3},
807            {"rle_value": 2, "rle_count": 2},
808            {"rle_value": 3, "rle_count": 4}
809        ]);
810
811        let result = decompressor
812            .decompress_run_length(&compressed_data)
813            .unwrap();
814        assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
815    }
816
817    #[test]
818    fn test_rle_decompression_mixed_runs() {
819        let decompressor = StreamingDecompressor::new();
820
821        let compressed_data = json!([
822            {"rle_value": "a", "rle_count": 2},
823            "b",
824            {"rle_value": "c", "rle_count": 3}
825        ]);
826
827        let result = decompressor
828            .decompress_run_length(&compressed_data)
829            .unwrap();
830        assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
831    }
832
833    #[test]
834    fn test_rle_decompression_single_count() {
835        let decompressor = StreamingDecompressor::new();
836
837        let compressed_data = json!([
838            {"rle_value": "x", "rle_count": 1}
839        ]);
840
841        let result = decompressor
842            .decompress_run_length(&compressed_data)
843            .unwrap();
844        assert_eq!(result, json!(["x"]));
845    }
846
847    #[test]
848    fn test_rle_decompression_zero_count() {
849        let decompressor = StreamingDecompressor::new();
850
851        let compressed_data = json!([
852            {"rle_value": "x", "rle_count": 0}
853        ]);
854
855        let result = decompressor
856            .decompress_run_length(&compressed_data)
857            .unwrap();
858        assert_eq!(result, json!([]));
859    }
860
861    #[test]
862    fn test_rle_decompression_nested_values() {
863        let decompressor = StreamingDecompressor::new();
864
865        let compressed_data = json!([
866            {"rle_value": {"name": "test"}, "rle_count": 3}
867        ]);
868
869        let result = decompressor
870            .decompress_run_length(&compressed_data)
871            .unwrap();
872        assert_eq!(
873            result,
874            json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
875        );
876    }
877
878    #[test]
879    fn test_rle_decompression_nested_structure() {
880        let decompressor = StreamingDecompressor::new();
881
882        let compressed_data = json!({
883            "data": [
884                {"rle_value": 1, "rle_count": 3},
885                {"rle_value": 2, "rle_count": 2}
886            ],
887            "other": "field"
888        });
889
890        let result = decompressor
891            .decompress_run_length(&compressed_data)
892            .unwrap();
893        assert_eq!(
894            result,
895            json!({
896                "data": [1, 1, 1, 2, 2],
897                "other": "field"
898            })
899        );
900    }
901
902    #[test]
903    fn test_rle_decompression_empty_array() {
904        let decompressor = StreamingDecompressor::new();
905
906        let compressed_data = json!([]);
907
908        let result = decompressor
909            .decompress_run_length(&compressed_data)
910            .unwrap();
911        assert_eq!(result, json!([]));
912    }
913
914    #[test]
915    fn test_rle_decompression_invalid_count() {
916        let decompressor = StreamingDecompressor::new();
917
918        let compressed_data = json!([
919            {"rle_value": "x", "rle_count": "not_a_number"}
920        ]);
921
922        let result = decompressor.decompress_run_length(&compressed_data);
923        assert!(result.is_err());
924    }
925
926    #[test]
927    fn test_rle_decompression_missing_value() {
928        let decompressor = StreamingDecompressor::new();
929
930        let compressed_data = json!([
931            {"rle_count": 3}
932        ]);
933
934        let result = decompressor.decompress_run_length(&compressed_data);
935        assert!(result.is_err());
936    }
937
938    #[test]
939    fn test_rle_decompression_non_rle_objects() {
940        let decompressor = StreamingDecompressor::new();
941
942        let compressed_data = json!([
943            {"regular": "object"},
944            {"another": "one"}
945        ]);
946
947        let result = decompressor
948            .decompress_run_length(&compressed_data)
949            .unwrap();
950        // Should return as-is if objects don't have RLE format
951        assert_eq!(
952            result,
953            json!([
954                {"regular": "object"},
955                {"another": "one"}
956            ])
957        );
958    }
959}