1use crate::{
7 compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8 domain::{DomainError, DomainResult},
9 stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; #[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22 skeleton_compressor: SchemaCompressor,
24 content_compressor: SchemaCompressor,
26 stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct CompressionStats {
32 pub total_input_bytes: usize,
34 pub total_output_bytes: usize,
36 pub frames_processed: u32,
38 pub priority_ratios: HashMap<u8, f32>,
40}
41
42#[derive(Debug, Clone)]
44pub struct CompressedFrame {
45 pub frame: StreamFrame,
47 pub compressed_data: CompressedData,
49 pub decompression_metadata: DecompressionMetadata,
51}
52
53#[derive(Debug, Clone)]
54pub struct DecompressionMetadata {
55 pub strategy: CompressionStrategy,
57 pub dictionary_map: HashMap<u16, String>,
59 pub delta_bases: HashMap<String, f64>,
61 pub priority_hints: HashMap<u8, String>,
63}
64
65impl StreamingCompressor {
66 pub fn new() -> Self {
68 Self {
69 skeleton_compressor: SchemaCompressor::new(),
70 content_compressor: SchemaCompressor::new(),
71 stats: CompressionStats::default(),
72 }
73 }
74
75 pub fn with_strategies(
77 skeleton_strategy: CompressionStrategy,
78 content_strategy: CompressionStrategy,
79 ) -> Self {
80 Self {
81 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
82 content_compressor: SchemaCompressor::with_strategy(content_strategy),
83 stats: CompressionStats::default(),
84 }
85 }
86
87 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
89 let compressor = self.select_compressor_for_priority(frame.priority);
90
91 let original_size = serde_json::to_string(&frame.data)
93 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
94 .len();
95
96 let compressed_data = compressor.compress(&frame.data)?;
98
99 self.update_stats(
101 frame.priority,
102 original_size,
103 compressed_data.compressed_size,
104 );
105
106 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
108
109 Ok(CompressedFrame {
110 frame,
111 compressed_data,
112 decompression_metadata,
113 })
114 }
115
116 pub fn optimize_for_data(
118 &mut self,
119 skeleton: &JsonValue,
120 sample_data: &[JsonValue],
121 ) -> DomainResult<()> {
122 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
124
125 if !sample_data.is_empty() {
127 let combined_sample = JsonValue::Array(sample_data.to_vec());
129 self.content_compressor
130 .analyze_and_optimize(&combined_sample)?;
131 }
132
133 Ok(())
134 }
135
136 pub fn get_stats(&self) -> &CompressionStats {
138 &self.stats
139 }
140
141 pub fn reset_stats(&mut self) {
143 self.stats = CompressionStats::default();
144 }
145
146 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
148 match priority {
149 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
151 _ => &mut self.content_compressor,
153 }
154 }
155
156 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
158 self.stats.total_input_bytes += original_size;
159 self.stats.total_output_bytes += compressed_size;
160 self.stats.frames_processed += 1;
161
162 let ratio = if original_size > 0 {
163 compressed_size as f32 / original_size as f32
164 } else {
165 1.0
166 };
167
168 self.stats.priority_ratios.insert(priority.value(), ratio);
169 }
170
171 fn create_decompression_metadata(
173 &self,
174 compressed_data: &CompressedData,
175 ) -> DomainResult<DecompressionMetadata> {
176 let mut dictionary_map = HashMap::new();
177 let mut delta_bases = HashMap::new();
178
179 for (key, value) in &compressed_data.compression_metadata {
181 if let Some(suffix) = key.strip_prefix("dict_") {
182 if let Ok(index) = suffix.parse::<u16>()
183 && let Some(string_val) = value.as_str()
184 {
185 dictionary_map.insert(index, string_val.to_string());
186 }
187 } else if let Some(path) = key.strip_prefix("base_")
188 && let Some(num) = value.as_f64()
189 {
190 delta_bases.insert(path.to_string(), num);
191 }
192 }
193
194 Ok(DecompressionMetadata {
195 strategy: compressed_data.strategy.clone(),
196 dictionary_map,
197 delta_bases,
198 priority_hints: HashMap::new(), })
200 }
201}
202
203impl CompressionStats {
204 pub fn overall_compression_ratio(&self) -> f32 {
206 if self.total_input_bytes == 0 {
207 return 1.0;
208 }
209 self.total_output_bytes as f32 / self.total_input_bytes as f32
210 }
211
212 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
214 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
215 }
216
217 pub fn bytes_saved(&self) -> isize {
219 self.total_input_bytes as isize - self.total_output_bytes as isize
220 }
221
222 pub fn percentage_saved(&self) -> f32 {
224 if self.total_input_bytes == 0 {
225 return 0.0;
226 }
227 let ratio = self.overall_compression_ratio();
228 (1.0 - ratio) * 100.0
229 }
230}
231
232#[derive(Debug, Clone)]
234pub struct StreamingDecompressor {
235 active_dictionary: HashMap<u16, String>,
237 delta_bases: HashMap<String, f64>,
239 stats: DecompressionStats,
241}
242
243#[derive(Debug, Clone, Default)]
244pub struct DecompressionStats {
245 pub frames_decompressed: u32,
247 pub total_decompressed_bytes: usize,
249 pub avg_decompression_time_us: u64,
251}
252
253impl StreamingDecompressor {
254 pub fn new() -> Self {
256 Self {
257 active_dictionary: HashMap::new(),
258 delta_bases: HashMap::new(),
259 stats: DecompressionStats::default(),
260 }
261 }
262
263 pub fn decompress_frame(
265 &mut self,
266 compressed_frame: CompressedFrame,
267 ) -> DomainResult<StreamFrame> {
268 let start_time = std::time::Instant::now();
269
270 self.update_context(&compressed_frame.decompression_metadata)?;
272
273 let decompressed_data = self.decompress_data(
275 &compressed_frame.compressed_data,
276 &compressed_frame.decompression_metadata.strategy,
277 )?;
278
279 let decompression_time = start_time.elapsed();
281 self.update_decompression_stats(&decompressed_data, decompression_time);
282
283 Ok(StreamFrame {
284 data: decompressed_data,
285 priority: compressed_frame.frame.priority,
286 metadata: compressed_frame.frame.metadata,
287 })
288 }
289
290 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
292 for (&index, string) in &metadata.dictionary_map {
294 self.active_dictionary.insert(index, string.clone());
295 }
296
297 for (path, &base) in &metadata.delta_bases {
299 self.delta_bases.insert(path.clone(), base);
300 }
301
302 Ok(())
303 }
304
305 fn decompress_data(
307 &self,
308 compressed_data: &CompressedData,
309 strategy: &CompressionStrategy,
310 ) -> DomainResult<JsonValue> {
311 match strategy {
312 CompressionStrategy::None => Ok(compressed_data.data.clone()),
313
314 CompressionStrategy::Dictionary { .. } => {
315 self.decompress_dictionary(&compressed_data.data)
316 }
317
318 CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
319
320 CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
321
322 CompressionStrategy::Hybrid { .. } => {
323 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
325 self.decompress_dictionary(&delta_decompressed)
326 }
327 }
328 }
329
330 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
332 match data {
333 JsonValue::Object(obj) => {
334 let mut decompressed = serde_json::Map::new();
335 for (key, value) in obj {
336 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
337 }
338 Ok(JsonValue::Object(decompressed))
339 }
340 JsonValue::Array(arr) => {
341 let decompressed: Result<Vec<_>, _> = arr
342 .iter()
343 .map(|item| self.decompress_dictionary(item))
344 .collect();
345 Ok(JsonValue::Array(decompressed?))
346 }
347 JsonValue::Number(n) => {
348 if let Some(index) = n.as_u64()
350 && let Some(string_val) = self.active_dictionary.get(&(index as u16))
351 {
352 return Ok(JsonValue::String(string_val.clone()));
353 }
354 Ok(data.clone())
355 }
356 _ => Ok(data.clone()),
357 }
358 }
359
360 pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
362 match data {
363 JsonValue::Object(obj) => {
364 let mut decompressed_obj = serde_json::Map::new();
365 for (key, value) in obj {
366 decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
367 }
368 Ok(JsonValue::Object(decompressed_obj))
369 }
370 JsonValue::Array(arr) => {
371 if arr.is_empty() {
372 return Ok(JsonValue::Array(arr.clone()));
373 }
374
375 if let Some(first) = arr.first()
377 && let Some(obj) = first.as_object()
378 && obj.contains_key("delta_base")
379 && obj.contains_key("delta_type")
380 {
381 return self.decompress_delta_array(arr);
383 }
384
385 let decompressed_arr: Result<Vec<_>, _> =
387 arr.iter().map(|item| self.decompress_delta(item)).collect();
388 Ok(JsonValue::Array(decompressed_arr?))
389 }
390 _ => Ok(data.clone()),
391 }
392 }
393
394 fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
396 if arr.is_empty() {
397 return Ok(JsonValue::Array(Vec::new()));
398 }
399
400 if arr.len() > MAX_DELTA_ARRAY_SIZE {
402 return Err(DomainError::CompressionError(format!(
403 "Delta array size {} exceeds maximum {}",
404 arr.len(),
405 MAX_DELTA_ARRAY_SIZE
406 )));
407 }
408
409 let base_value = arr[0]
411 .get("delta_base")
412 .and_then(|v| v.as_f64())
413 .ok_or_else(|| {
414 DomainError::CompressionError(
415 "Missing or invalid delta_base in metadata".to_string(),
416 )
417 })?;
418
419 let mut original_values = Vec::new();
421 for delta_value in arr.iter().skip(1) {
422 let delta = delta_value.as_f64().ok_or_else(|| {
423 DomainError::CompressionError("Invalid delta value: expected number".to_string())
424 })?;
425
426 let original = base_value + delta;
427 original_values.push(JsonValue::from(original));
428 }
429
430 Ok(JsonValue::Array(original_values))
431 }
432
433 pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
435 match data {
436 JsonValue::Object(obj) => {
437 let mut decompressed_obj = serde_json::Map::new();
438 for (key, value) in obj {
439 decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
440 }
441 Ok(JsonValue::Object(decompressed_obj))
442 }
443 JsonValue::Array(arr) => {
444 let mut decompressed_values = Vec::new();
445 let mut total_size = 0usize;
446
447 for item in arr {
448 if let Some(obj) = item.as_object() {
449 let has_rle_value = obj.contains_key("rle_value");
451 let has_rle_count = obj.contains_key("rle_count");
452
453 if has_rle_value && !has_rle_count {
454 return Err(DomainError::CompressionError(
455 "Malformed RLE object: rle_value without rle_count".to_string(),
456 ));
457 }
458 if has_rle_count && !has_rle_value {
459 return Err(DomainError::CompressionError(
460 "Malformed RLE object: rle_count without rle_value".to_string(),
461 ));
462 }
463
464 if has_rle_value && has_rle_count {
466 let value = obj
467 .get("rle_value")
468 .ok_or_else(|| {
469 DomainError::CompressionError("Missing rle_value".to_string())
470 })?
471 .clone();
472
473 let count =
474 obj.get("rle_count")
475 .and_then(|v| v.as_u64())
476 .ok_or_else(|| {
477 DomainError::CompressionError(
478 "Invalid rle_count: expected positive integer"
479 .to_string(),
480 )
481 })?;
482
483 if count > MAX_RLE_COUNT {
485 return Err(DomainError::CompressionError(format!(
486 "RLE count {} exceeds maximum {}",
487 count, MAX_RLE_COUNT
488 )));
489 }
490
491 let count_usize = usize::try_from(count).map_err(|_| {
493 DomainError::CompressionError(format!(
494 "RLE count {} exceeds platform maximum",
495 count
496 ))
497 })?;
498
499 total_size = total_size.checked_add(count_usize).ok_or_else(|| {
501 DomainError::CompressionError(
502 "Total decompressed size overflow".to_string(),
503 )
504 })?;
505
506 if total_size > MAX_DECOMPRESSED_SIZE {
507 return Err(DomainError::CompressionError(format!(
508 "Decompressed size {} exceeds maximum {}",
509 total_size, MAX_DECOMPRESSED_SIZE
510 )));
511 }
512
513 for _ in 0..count {
515 decompressed_values.push(value.clone());
516 }
517 } else {
518 decompressed_values.push(self.decompress_run_length(item)?);
520 }
521 } else {
522 decompressed_values.push(self.decompress_run_length(item)?);
524 }
525 }
526
527 Ok(JsonValue::Array(decompressed_values))
528 }
529 _ => Ok(data.clone()),
530 }
531 }
532
533 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
535 self.stats.frames_decompressed += 1;
536
537 if let Ok(serialized) = serde_json::to_string(data) {
538 self.stats.total_decompressed_bytes += serialized.len();
539 }
540
541 let new_time_us = duration.as_micros() as u64;
542 if self.stats.frames_decompressed == 1 {
543 self.stats.avg_decompression_time_us = new_time_us;
544 } else {
545 let total_frames = self.stats.frames_decompressed as u64;
547 let total_time =
548 self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
549 self.stats.avg_decompression_time_us = total_time / total_frames;
550 }
551 }
552
553 pub fn get_stats(&self) -> &DecompressionStats {
555 &self.stats
556 }
557}
558
559impl Default for StreamingCompressor {
560 fn default() -> Self {
561 Self::new()
562 }
563}
564
565impl Default for StreamingDecompressor {
566 fn default() -> Self {
567 Self::new()
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use serde_json::json;
575
576 #[test]
577 fn test_streaming_compressor_basic() {
578 let mut compressor = StreamingCompressor::new();
579
580 let frame = StreamFrame {
581 data: json!({
582 "message": "test message",
583 "count": 42
584 }),
585 priority: Priority::MEDIUM,
586 metadata: HashMap::new(),
587 };
588
589 let result = compressor.compress_frame(frame);
590 assert!(result.is_ok());
591
592 let compressed = result.unwrap();
593 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
594 }
595
596 #[test]
597 fn test_compression_stats() {
598 let stats = CompressionStats {
599 total_input_bytes: 1000,
600 total_output_bytes: 600,
601 ..Default::default()
602 };
603
604 assert_eq!(stats.overall_compression_ratio(), 0.6);
605 assert_eq!(stats.bytes_saved(), 400);
606 let percentage = stats.percentage_saved();
608 assert!((percentage - 40.0).abs() < 0.001);
609 }
610
611 #[test]
612 fn test_streaming_decompressor_basic() {
613 let mut decompressor = StreamingDecompressor::new();
614
615 let compressed_frame = CompressedFrame {
616 frame: StreamFrame {
617 data: json!({"test": "data"}),
618 priority: Priority::MEDIUM,
619 metadata: HashMap::new(),
620 },
621 compressed_data: CompressedData {
622 strategy: CompressionStrategy::None,
623 compressed_size: 20,
624 data: json!({"test": "data"}),
625 compression_metadata: HashMap::new(),
626 },
627 decompression_metadata: DecompressionMetadata {
628 strategy: CompressionStrategy::None,
629 dictionary_map: HashMap::new(),
630 delta_bases: HashMap::new(),
631 priority_hints: HashMap::new(),
632 },
633 };
634
635 let result = decompressor.decompress_frame(compressed_frame);
636 assert!(result.is_ok());
637
638 let decompressed = result.unwrap();
639 assert_eq!(decompressed.data, json!({"test": "data"}));
640 }
641
642 #[test]
643 fn test_dictionary_decompression() {
644 let mut decompressor = StreamingDecompressor::new();
645 decompressor
646 .active_dictionary
647 .insert(0, "hello".to_string());
648 decompressor
649 .active_dictionary
650 .insert(1, "world".to_string());
651
652 let compressed = json!({
654 "greeting": 0,
655 "target": 1
656 });
657
658 let result = decompressor.decompress_dictionary(&compressed).unwrap();
659 assert_eq!(
660 result,
661 json!({
662 "greeting": "hello",
663 "target": "world"
664 })
665 );
666 }
667
668 #[test]
669 fn test_priority_based_compression() {
670 let mut compressor = StreamingCompressor::new();
671
672 let critical_frame = StreamFrame {
673 data: json!({"error": "critical failure"}),
674 priority: Priority::CRITICAL,
675 metadata: HashMap::new(),
676 };
677
678 let low_frame = StreamFrame {
679 data: json!({"debug": "verbose information"}),
680 priority: Priority::LOW,
681 metadata: HashMap::new(),
682 };
683
684 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
685 let _low_result = compressor.compress_frame(low_frame).unwrap();
686
687 let stats = compressor.get_stats();
688 assert_eq!(stats.frames_processed, 2);
689 assert!(stats.total_input_bytes > 0);
690 }
691
692 #[test]
693 fn test_delta_decompression_basic() {
694 let decompressor = StreamingDecompressor::new();
695
696 let compressed_data = json!([
697 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
698 0.0,
699 1.0,
700 2.0,
701 3.0,
702 4.0
703 ]);
704
705 let result = decompressor.decompress_delta(&compressed_data).unwrap();
706 assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
707 }
708
709 #[test]
710 fn test_delta_decompression_negative_deltas() {
711 let decompressor = StreamingDecompressor::new();
712
713 let compressed_data = json!([
714 {"delta_base": 50.0, "delta_type": "numeric_sequence"},
715 -10.0,
716 0.0,
717 10.0,
718 20.0
719 ]);
720
721 let result = decompressor.decompress_delta(&compressed_data).unwrap();
722 assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
723 }
724
725 #[test]
726 fn test_delta_decompression_fractional_deltas() {
727 let decompressor = StreamingDecompressor::new();
728
729 let compressed_data = json!([
730 {"delta_base": 10.0, "delta_type": "numeric_sequence"},
731 0.5,
732 1.0,
733 1.5,
734 2.0
735 ]);
736
737 let result = decompressor.decompress_delta(&compressed_data).unwrap();
738 assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
739 }
740
741 #[test]
742 fn test_delta_decompression_empty_array() {
743 let decompressor = StreamingDecompressor::new();
744
745 let compressed_data = json!([]);
746
747 let result = decompressor.decompress_delta(&compressed_data).unwrap();
748 assert_eq!(result, json!([]));
749 }
750
751 #[test]
752 fn test_delta_decompression_single_element() {
753 let decompressor = StreamingDecompressor::new();
754
755 let compressed_data = json!([
756 {"delta_base": 100.0, "delta_type": "numeric_sequence"}
757 ]);
758
759 let result = decompressor.decompress_delta(&compressed_data).unwrap();
760 assert_eq!(result, json!([]));
761 }
762
763 #[test]
764 fn test_delta_decompression_nested_structure() {
765 let decompressor = StreamingDecompressor::new();
766
767 let compressed_data = json!({
768 "sequence": [
769 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
770 0.0,
771 1.0,
772 2.0
773 ],
774 "other": "data"
775 });
776
777 let result = decompressor.decompress_delta(&compressed_data).unwrap();
778 assert_eq!(
779 result,
780 json!({
781 "sequence": [100.0, 101.0, 102.0],
782 "other": "data"
783 })
784 );
785 }
786
787 #[test]
788 fn test_delta_decompression_invalid_metadata() {
789 let decompressor = StreamingDecompressor::new();
790
791 let compressed_data = json!([
792 {"wrong_key": 100.0},
793 0.0,
794 1.0
795 ]);
796
797 let result = decompressor.decompress_delta(&compressed_data);
798 assert!(result.is_ok());
799 }
801
802 #[test]
803 fn test_delta_decompression_invalid_delta_value() {
804 let decompressor = StreamingDecompressor::new();
805
806 let compressed_data = json!([
807 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
808 "not_a_number"
809 ]);
810
811 let result = decompressor.decompress_delta(&compressed_data);
812 assert!(result.is_err());
813 }
814
815 #[test]
816 fn test_rle_decompression_basic() {
817 let decompressor = StreamingDecompressor::new();
818
819 let compressed_data = json!([
820 {"rle_value": 1, "rle_count": 3},
821 {"rle_value": 2, "rle_count": 2},
822 {"rle_value": 3, "rle_count": 4}
823 ]);
824
825 let result = decompressor
826 .decompress_run_length(&compressed_data)
827 .unwrap();
828 assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
829 }
830
831 #[test]
832 fn test_rle_decompression_mixed_runs() {
833 let decompressor = StreamingDecompressor::new();
834
835 let compressed_data = json!([
836 {"rle_value": "a", "rle_count": 2},
837 "b",
838 {"rle_value": "c", "rle_count": 3}
839 ]);
840
841 let result = decompressor
842 .decompress_run_length(&compressed_data)
843 .unwrap();
844 assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
845 }
846
847 #[test]
848 fn test_rle_decompression_single_count() {
849 let decompressor = StreamingDecompressor::new();
850
851 let compressed_data = json!([
852 {"rle_value": "x", "rle_count": 1}
853 ]);
854
855 let result = decompressor
856 .decompress_run_length(&compressed_data)
857 .unwrap();
858 assert_eq!(result, json!(["x"]));
859 }
860
861 #[test]
862 fn test_rle_decompression_zero_count() {
863 let decompressor = StreamingDecompressor::new();
864
865 let compressed_data = json!([
866 {"rle_value": "x", "rle_count": 0}
867 ]);
868
869 let result = decompressor
870 .decompress_run_length(&compressed_data)
871 .unwrap();
872 assert_eq!(result, json!([]));
873 }
874
875 #[test]
876 fn test_rle_decompression_nested_values() {
877 let decompressor = StreamingDecompressor::new();
878
879 let compressed_data = json!([
880 {"rle_value": {"name": "test"}, "rle_count": 3}
881 ]);
882
883 let result = decompressor
884 .decompress_run_length(&compressed_data)
885 .unwrap();
886 assert_eq!(
887 result,
888 json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
889 );
890 }
891
892 #[test]
893 fn test_rle_decompression_nested_structure() {
894 let decompressor = StreamingDecompressor::new();
895
896 let compressed_data = json!({
897 "data": [
898 {"rle_value": 1, "rle_count": 3},
899 {"rle_value": 2, "rle_count": 2}
900 ],
901 "other": "field"
902 });
903
904 let result = decompressor
905 .decompress_run_length(&compressed_data)
906 .unwrap();
907 assert_eq!(
908 result,
909 json!({
910 "data": [1, 1, 1, 2, 2],
911 "other": "field"
912 })
913 );
914 }
915
916 #[test]
917 fn test_rle_decompression_empty_array() {
918 let decompressor = StreamingDecompressor::new();
919
920 let compressed_data = json!([]);
921
922 let result = decompressor
923 .decompress_run_length(&compressed_data)
924 .unwrap();
925 assert_eq!(result, json!([]));
926 }
927
928 #[test]
929 fn test_rle_decompression_invalid_count() {
930 let decompressor = StreamingDecompressor::new();
931
932 let compressed_data = json!([
933 {"rle_value": "x", "rle_count": "not_a_number"}
934 ]);
935
936 let result = decompressor.decompress_run_length(&compressed_data);
937 assert!(result.is_err());
938 }
939
940 #[test]
941 fn test_rle_decompression_missing_value() {
942 let decompressor = StreamingDecompressor::new();
943
944 let compressed_data = json!([
945 {"rle_count": 3}
946 ]);
947
948 let result = decompressor.decompress_run_length(&compressed_data);
949 assert!(result.is_err());
950 }
951
952 #[test]
953 fn test_rle_decompression_missing_count() {
954 let decompressor = StreamingDecompressor::new();
955
956 let compressed_data = json!([
957 {"rle_value": "x"}
958 ]);
959
960 let result = decompressor.decompress_run_length(&compressed_data);
961 assert!(result.is_err());
962 }
963
964 #[test]
965 fn test_rle_decompression_non_rle_objects() {
966 let decompressor = StreamingDecompressor::new();
967
968 let compressed_data = json!([
969 {"regular": "object"},
970 {"another": "one"}
971 ]);
972
973 let result = decompressor
974 .decompress_run_length(&compressed_data)
975 .unwrap();
976 assert_eq!(
978 result,
979 json!([
980 {"regular": "object"},
981 {"another": "one"}
982 ])
983 );
984 }
985
986 #[test]
989 fn test_compress_frame_with_custom_strategies() {
990 let mut dict = HashMap::new();
991 dict.insert("test".to_string(), 0);
992
993 let mut bases = HashMap::new();
994 bases.insert("value".to_string(), 100.0);
995
996 let mut compressor = StreamingCompressor::with_strategies(
997 CompressionStrategy::Dictionary { dictionary: dict },
998 CompressionStrategy::Delta { base_values: bases },
999 );
1000
1001 let frame = StreamFrame {
1002 data: json!({"value": 123, "other": 456}),
1003 priority: Priority::HIGH,
1004 metadata: HashMap::new(),
1005 };
1006
1007 let result = compressor.compress_frame(frame);
1008 assert!(result.is_ok());
1009 assert_eq!(compressor.stats.frames_processed, 1);
1010 }
1011
1012 #[test]
1013 fn test_optimize_for_data_with_samples() {
1014 let mut compressor = StreamingCompressor::new();
1015
1016 let skeleton = json!({
1017 "id": null,
1018 "name": null
1019 });
1020
1021 let samples = vec![
1022 json!({"id": 1, "name": "test1"}),
1023 json!({"id": 2, "name": "test2"}),
1024 json!({"id": 3, "name": "test3"}),
1025 ];
1026
1027 let result = compressor.optimize_for_data(&skeleton, &samples);
1028 assert!(result.is_ok());
1029 }
1030
1031 #[test]
1032 fn test_optimize_for_data_empty_samples() {
1033 let mut compressor = StreamingCompressor::new();
1034
1035 let skeleton = json!({"key": "value"});
1036 let result = compressor.optimize_for_data(&skeleton, &[]);
1037 assert!(result.is_ok());
1038 }
1039
1040 #[test]
1041 fn test_reset_stats() {
1042 let mut compressor = StreamingCompressor::new();
1043
1044 compressor.stats.total_input_bytes = 1000;
1045 compressor.stats.total_output_bytes = 500;
1046 compressor.stats.frames_processed = 10;
1047
1048 compressor.reset_stats();
1049
1050 assert_eq!(compressor.stats.total_input_bytes, 0);
1051 assert_eq!(compressor.stats.total_output_bytes, 0);
1052 assert_eq!(compressor.stats.frames_processed, 0);
1053 }
1054
1055 #[test]
1056 fn test_compressor_critical_vs_low_priority() {
1057 let mut compressor = StreamingCompressor::new();
1058
1059 let critical_frame = StreamFrame {
1060 data: json!({"critical": "data"}),
1061 priority: Priority::CRITICAL,
1062 metadata: HashMap::new(),
1063 };
1064
1065 let low_frame = StreamFrame {
1066 data: json!({"low": "data"}),
1067 priority: Priority::LOW,
1068 metadata: HashMap::new(),
1069 };
1070
1071 compressor.compress_frame(critical_frame).unwrap();
1072 compressor.compress_frame(low_frame).unwrap();
1073
1074 assert_eq!(compressor.stats.frames_processed, 2);
1075 }
1076
1077 #[test]
1078 fn test_decompressor_hybrid_strategy() {
1079 let mut decompressor = StreamingDecompressor::new();
1080
1081 decompressor.delta_bases.insert("value".to_string(), 100.0);
1083 decompressor.active_dictionary.insert(0, "test".to_string());
1084
1085 let mut string_dict = HashMap::new();
1086 string_dict.insert("test".to_string(), 0);
1087
1088 let mut numeric_deltas = HashMap::new();
1089 numeric_deltas.insert("value".to_string(), 100.0);
1090
1091 let compressed_frame = CompressedFrame {
1092 frame: StreamFrame {
1093 data: json!({"test": "data"}),
1094 priority: Priority::MEDIUM,
1095 metadata: HashMap::new(),
1096 },
1097 compressed_data: CompressedData {
1098 strategy: CompressionStrategy::Hybrid {
1099 string_dict: string_dict.clone(),
1100 numeric_deltas: numeric_deltas.clone(),
1101 },
1102 compressed_size: 20,
1103 data: json!({"value": 5.0}), compression_metadata: HashMap::new(),
1105 },
1106 decompression_metadata: DecompressionMetadata {
1107 strategy: CompressionStrategy::Hybrid {
1108 string_dict,
1109 numeric_deltas,
1110 },
1111 dictionary_map: HashMap::new(),
1112 delta_bases: HashMap::new(),
1113 priority_hints: HashMap::new(),
1114 },
1115 };
1116
1117 let result = decompressor.decompress_frame(compressed_frame);
1118 assert!(result.is_ok());
1119 }
1120
1121 #[test]
1122 fn test_decompress_dictionary_nested_arrays() {
1123 let mut decompressor = StreamingDecompressor::new();
1124 decompressor
1125 .active_dictionary
1126 .insert(0, "item1".to_string());
1127 decompressor
1128 .active_dictionary
1129 .insert(1, "item2".to_string());
1130
1131 let data = json!([[0, 1], [1, 0]]);
1132 let result = decompressor.decompress_dictionary(&data).unwrap();
1133
1134 assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
1135 }
1136
1137 #[test]
1138 fn test_decompress_dictionary_non_index_numbers() {
1139 let mut decompressor = StreamingDecompressor::new();
1140 decompressor.active_dictionary.insert(0, "test".to_string());
1141
1142 let data = json!({"value": 999});
1144 let result = decompressor.decompress_dictionary(&data).unwrap();
1145
1146 assert_eq!(result, json!({"value": 999}));
1148 }
1149
1150 #[test]
1151 fn test_decompress_delta_non_array() {
1152 let decompressor = StreamingDecompressor::new();
1153
1154 let data = json!({"key": "value"});
1156 let result = decompressor.decompress_delta(&data).unwrap();
1157
1158 assert_eq!(result, json!({"key": "value"}));
1159 }
1160
1161 #[test]
1162 fn test_decompress_delta_array_without_metadata() {
1163 let decompressor = StreamingDecompressor::new();
1164
1165 let data = json!([1, 2, 3, 4]);
1167 let result = decompressor.decompress_delta(&data).unwrap();
1168
1169 assert_eq!(result, json!([1, 2, 3, 4]));
1170 }
1171
1172 #[test]
1173 fn test_decompress_run_length_nested_objects() {
1174 let decompressor = StreamingDecompressor::new();
1175
1176 let data = json!({
1177 "outer": {
1178 "inner": [
1179 {"rle_value": {"nested": "obj"}, "rle_count": 2}
1180 ]
1181 }
1182 });
1183
1184 let result = decompressor.decompress_run_length(&data).unwrap();
1185 assert_eq!(
1186 result,
1187 json!({
1188 "outer": {
1189 "inner": [{"nested": "obj"}, {"nested": "obj"}]
1190 }
1191 })
1192 );
1193 }
1194
1195 #[test]
1196 fn test_decompression_stats_tracking() {
1197 let mut decompressor = StreamingDecompressor::new();
1198
1199 assert_eq!(decompressor.stats.frames_decompressed, 0);
1200
1201 let frame = CompressedFrame {
1202 frame: StreamFrame {
1203 data: json!({"test": "data"}),
1204 priority: Priority::MEDIUM,
1205 metadata: HashMap::new(),
1206 },
1207 compressed_data: CompressedData {
1208 strategy: CompressionStrategy::None,
1209 compressed_size: 15,
1210 data: json!({"test": "data"}),
1211 compression_metadata: HashMap::new(),
1212 },
1213 decompression_metadata: DecompressionMetadata {
1214 strategy: CompressionStrategy::None,
1215 dictionary_map: HashMap::new(),
1216 delta_bases: HashMap::new(),
1217 priority_hints: HashMap::new(),
1218 },
1219 };
1220
1221 decompressor.decompress_frame(frame).unwrap();
1222
1223 assert_eq!(decompressor.stats.frames_decompressed, 1);
1224 assert!(decompressor.stats.total_decompressed_bytes > 0);
1225 assert!(decompressor.stats.avg_decompression_time_us > 0);
1226 }
1227
1228 #[test]
1229 fn test_decompress_delta_array_malformed_metadata() {
1230 let decompressor = StreamingDecompressor::new();
1231
1232 let data = json!([
1234 {"delta_type": "numeric_sequence"},
1235 1.0,
1236 2.0
1237 ]);
1238
1239 let result = decompressor.decompress_delta(&data);
1240 assert!(result.is_ok());
1242 assert_eq!(result.unwrap(), data);
1244 }
1245
1246 #[test]
1247 fn test_decompress_run_length_large_count() {
1248 let decompressor = StreamingDecompressor::new();
1249
1250 let data = json!([
1252 {"rle_value": "x", "rle_count": 1000}
1253 ]);
1254
1255 let result = decompressor.decompress_run_length(&data);
1256 assert!(result.is_ok());
1257 let decompressed = result.unwrap();
1258 if let Some(arr) = decompressed.as_array() {
1259 assert_eq!(arr.len(), 1000);
1260 }
1261 }
1262
1263 #[test]
1264 fn test_decompress_run_length_exceeds_max_count() {
1265 let decompressor = StreamingDecompressor::new();
1266
1267 let data = json!([
1269 {"rle_value": "x", "rle_count": 200_000}
1270 ]);
1271
1272 let result = decompressor.decompress_run_length(&data);
1273 assert!(result.is_err()); }
1275
1276 #[test]
1277 fn test_decompress_run_length_cumulative_overflow() {
1278 let decompressor = StreamingDecompressor::new();
1279
1280 let data = json!([
1282 {"rle_value": "a", "rle_count": 5_000_000},
1283 {"rle_value": "b", "rle_count": 6_000_000}
1284 ]);
1285
1286 let result = decompressor.decompress_run_length(&data);
1287 assert!(result.is_err());
1289 }
1290
1291 #[test]
1292 fn test_decompress_delta_array_size_limit() {
1293 let decompressor = StreamingDecompressor::new();
1294
1295 let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
1297 for _i in 0..1_000_001 {
1298 large_array.push(json!(0.0));
1299 }
1300
1301 let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
1302 assert!(result.is_err()); }
1304
1305 #[test]
1306 fn test_compression_stats_default() {
1307 let stats = CompressionStats::default();
1308 assert_eq!(stats.total_input_bytes, 0);
1309 assert_eq!(stats.total_output_bytes, 0);
1310 assert_eq!(stats.frames_processed, 0);
1311 assert_eq!(stats.overall_compression_ratio(), 1.0);
1312 }
1313
1314 #[test]
1315 fn test_decompression_stats_default() {
1316 let stats = DecompressionStats::default();
1317 assert_eq!(stats.frames_decompressed, 0);
1318 assert_eq!(stats.total_decompressed_bytes, 0);
1319 assert_eq!(stats.avg_decompression_time_us, 0);
1320 }
1321
1322 #[test]
1327 fn test_decompress_dictionary_with_strings() {
1328 let decompressor = StreamingDecompressor::new();
1329 let data = json!({"key": "value", "nested": {"inner": "string"}});
1330 let result = decompressor.decompress_dictionary(&data);
1331 assert!(result.is_ok());
1332 assert_eq!(result.unwrap(), data);
1333 }
1334
1335 #[test]
1336 fn test_decompress_dictionary_with_null() {
1337 let decompressor = StreamingDecompressor::new();
1338 let data = json!(null);
1339 let result = decompressor.decompress_dictionary(&data);
1340 assert!(result.is_ok());
1341 assert_eq!(result.unwrap(), json!(null));
1342 }
1343
1344 #[test]
1345 fn test_decompress_dictionary_with_boolean() {
1346 let decompressor = StreamingDecompressor::new();
1347 let data = json!(true);
1348 let result = decompressor.decompress_dictionary(&data);
1349 assert!(result.is_ok());
1350 assert_eq!(result.unwrap(), json!(true));
1351 }
1352
1353 #[test]
1354 fn test_decompress_dictionary_with_string() {
1355 let decompressor = StreamingDecompressor::new();
1356 let data = json!("plain string");
1357 let result = decompressor.decompress_dictionary(&data);
1358 assert!(result.is_ok());
1359 assert_eq!(result.unwrap(), json!("plain string"));
1360 }
1361
1362 #[test]
1363 fn test_decompress_delta_with_object_no_array() {
1364 let decompressor = StreamingDecompressor::new();
1365 let data = json!({"key": "value"});
1366 let result = decompressor.decompress_delta(&data);
1367 assert!(result.is_ok());
1368 assert_eq!(result.unwrap(), json!({"key": "value"}));
1369 }
1370
1371 #[test]
1372 fn test_decompress_delta_with_primitive_values() {
1373 let decompressor = StreamingDecompressor::new();
1374
1375 assert_eq!(
1376 decompressor.decompress_delta(&json!("string")).unwrap(),
1377 json!("string")
1378 );
1379 assert_eq!(
1380 decompressor.decompress_delta(&json!(42)).unwrap(),
1381 json!(42)
1382 );
1383 assert_eq!(
1384 decompressor.decompress_delta(&json!(true)).unwrap(),
1385 json!(true)
1386 );
1387 assert_eq!(
1388 decompressor.decompress_delta(&json!(null)).unwrap(),
1389 json!(null)
1390 );
1391 }
1392
1393 #[test]
1394 fn test_decompress_run_length_with_primitive_values() {
1395 let decompressor = StreamingDecompressor::new();
1396
1397 assert_eq!(
1398 decompressor
1399 .decompress_run_length(&json!("string"))
1400 .unwrap(),
1401 json!("string")
1402 );
1403 assert_eq!(
1404 decompressor.decompress_run_length(&json!(123)).unwrap(),
1405 json!(123)
1406 );
1407 assert_eq!(
1408 decompressor.decompress_run_length(&json!(false)).unwrap(),
1409 json!(false)
1410 );
1411 assert_eq!(
1412 decompressor.decompress_run_length(&json!(null)).unwrap(),
1413 json!(null)
1414 );
1415 }
1416
1417 #[test]
1418 fn test_decompress_data_strategy_dictionary() {
1419 let mut decompressor = StreamingDecompressor::new();
1420 decompressor.active_dictionary.insert(0, "test".to_string());
1421
1422 let mut dict = HashMap::new();
1423 dict.insert("test".to_string(), 0);
1424
1425 let compressed_data = CompressedData {
1426 strategy: CompressionStrategy::Dictionary { dictionary: dict },
1427 compressed_size: 10,
1428 data: json!({"field": 0}),
1429 compression_metadata: HashMap::new(),
1430 };
1431
1432 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1433 assert!(result.is_ok());
1434 }
1435
1436 #[test]
1437 fn test_decompress_data_strategy_delta() {
1438 let decompressor = StreamingDecompressor::new();
1439
1440 let mut bases = HashMap::new();
1441 bases.insert("value".to_string(), 100.0);
1442
1443 let compressed_data = CompressedData {
1444 strategy: CompressionStrategy::Delta {
1445 base_values: bases.clone(),
1446 },
1447 compressed_size: 10,
1448 data: json!({
1449 "sequence": [
1450 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
1451 5.0,
1452 10.0
1453 ]
1454 }),
1455 compression_metadata: HashMap::new(),
1456 };
1457
1458 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1459 assert!(result.is_ok());
1460 }
1461
1462 #[test]
1463 fn test_decompress_data_strategy_run_length() {
1464 let decompressor = StreamingDecompressor::new();
1465
1466 let compressed_data = CompressedData {
1467 strategy: CompressionStrategy::RunLength,
1468 compressed_size: 10,
1469 data: json!([
1470 {"rle_value": "x", "rle_count": 3}
1471 ]),
1472 compression_metadata: HashMap::new(),
1473 };
1474
1475 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1476 assert!(result.is_ok());
1477 assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
1478 }
1479
1480 #[test]
1481 fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
1482 let mut decompressor = StreamingDecompressor::new();
1483 decompressor.active_dictionary.insert(0, "test".to_string());
1484
1485 let mut string_dict = HashMap::new();
1486 string_dict.insert("test".to_string(), 0);
1487
1488 let mut numeric_deltas = HashMap::new();
1489 numeric_deltas.insert("value".to_string(), 100.0);
1490
1491 let compressed_data = CompressedData {
1492 strategy: CompressionStrategy::Hybrid {
1493 string_dict,
1494 numeric_deltas,
1495 },
1496 compressed_size: 10,
1497 data: json!({
1498 "field": 0
1499 }),
1500 compression_metadata: HashMap::new(),
1501 };
1502
1503 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1504 assert!(result.is_ok());
1505 }
1506
1507 #[test]
1508 fn test_select_compressor_critical_priority() {
1509 let mut compressor = StreamingCompressor::new();
1510 let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
1511 }
1512
1513 #[test]
1514 fn test_select_compressor_high_priority() {
1515 let mut compressor = StreamingCompressor::new();
1516 let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
1517 }
1518
1519 #[test]
1520 fn test_select_compressor_medium_priority() {
1521 let mut compressor = StreamingCompressor::new();
1522 let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
1523 }
1524
1525 #[test]
1526 fn test_select_compressor_low_priority() {
1527 let mut compressor = StreamingCompressor::new();
1528 let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
1529 }
1530
1531 #[test]
1532 fn test_select_compressor_background_priority() {
1533 let mut compressor = StreamingCompressor::new();
1534 let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
1535 }
1536
1537 #[test]
1538 fn test_update_stats_with_zero_original_size() {
1539 let mut compressor = StreamingCompressor::new();
1540 compressor.update_stats(Priority::MEDIUM, 0, 10);
1541
1542 let stats = compressor.get_stats();
1543 assert_eq!(stats.frames_processed, 1);
1544 assert_eq!(stats.total_input_bytes, 0);
1545 assert_eq!(stats.total_output_bytes, 10);
1546 assert_eq!(
1547 stats.priority_compression_ratio(Priority::MEDIUM.value()),
1548 1.0
1549 );
1550 }
1551
1552 #[test]
1553 fn test_update_stats_with_normal_compression() {
1554 let mut compressor = StreamingCompressor::new();
1555 compressor.update_stats(Priority::HIGH, 1000, 500);
1556
1557 let stats = compressor.get_stats();
1558 assert_eq!(stats.frames_processed, 1);
1559 assert_eq!(stats.total_input_bytes, 1000);
1560 assert_eq!(stats.total_output_bytes, 500);
1561 assert_eq!(
1562 stats.priority_compression_ratio(Priority::HIGH.value()),
1563 0.5
1564 );
1565 }
1566
1567 #[test]
1568 fn test_update_decompression_stats_first_frame() {
1569 let mut decompressor = StreamingDecompressor::new();
1570 let data = json!({"test": "data"});
1571 let duration = std::time::Duration::from_micros(100);
1572
1573 decompressor.update_decompression_stats(&data, duration);
1574
1575 let stats = decompressor.get_stats();
1576 assert_eq!(stats.frames_decompressed, 1);
1577 assert_eq!(stats.avg_decompression_time_us, 100);
1578 }
1579
1580 #[test]
1581 fn test_update_decompression_stats_multiple_frames() {
1582 let mut decompressor = StreamingDecompressor::new();
1583 let data = json!({"test": "data"});
1584
1585 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
1586 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
1587 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
1588
1589 let stats = decompressor.get_stats();
1590 assert_eq!(stats.frames_decompressed, 3);
1591 assert_eq!(stats.avg_decompression_time_us, 200); }
1593
1594 #[test]
1595 fn test_create_decompression_metadata_with_dict() {
1596 let compressor = StreamingCompressor::new();
1597 let mut metadata = HashMap::new();
1598 metadata.insert("dict_0".to_string(), json!("hello"));
1599 metadata.insert("dict_1".to_string(), json!("world"));
1600
1601 let compressed_data = CompressedData {
1602 strategy: CompressionStrategy::None,
1603 compressed_size: 10,
1604 data: json!({}),
1605 compression_metadata: metadata,
1606 };
1607
1608 let result = compressor.create_decompression_metadata(&compressed_data);
1609 assert!(result.is_ok());
1610 let meta = result.unwrap();
1611 assert_eq!(meta.dictionary_map.len(), 2);
1612 assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
1613 assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
1614 }
1615
1616 #[test]
1617 fn test_create_decompression_metadata_with_delta_bases() {
1618 let compressor = StreamingCompressor::new();
1619 let mut metadata = HashMap::new();
1620 metadata.insert("base_value1".to_string(), json!(100.0));
1621 metadata.insert("base_value2".to_string(), json!(200.0));
1622
1623 let compressed_data = CompressedData {
1624 strategy: CompressionStrategy::None,
1625 compressed_size: 10,
1626 data: json!({}),
1627 compression_metadata: metadata,
1628 };
1629
1630 let result = compressor.create_decompression_metadata(&compressed_data);
1631 assert!(result.is_ok());
1632 let meta = result.unwrap();
1633 assert_eq!(meta.delta_bases.len(), 2);
1634 assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
1635 assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
1636 }
1637
1638 #[test]
1639 fn test_create_decompression_metadata_with_invalid_dict_index() {
1640 let compressor = StreamingCompressor::new();
1641 let mut metadata = HashMap::new();
1642 metadata.insert("dict_invalid".to_string(), json!("value"));
1643 metadata.insert("dict_0".to_string(), json!("valid"));
1644
1645 let compressed_data = CompressedData {
1646 strategy: CompressionStrategy::None,
1647 compressed_size: 10,
1648 data: json!({}),
1649 compression_metadata: metadata,
1650 };
1651
1652 let result = compressor.create_decompression_metadata(&compressed_data);
1653 assert!(result.is_ok());
1654 let meta = result.unwrap();
1655 assert_eq!(meta.dictionary_map.len(), 1);
1657 assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
1658 }
1659
1660 #[test]
1661 fn test_update_context_updates_dictionary() {
1662 let mut decompressor = StreamingDecompressor::new();
1663
1664 let mut metadata = DecompressionMetadata {
1665 strategy: CompressionStrategy::None,
1666 dictionary_map: HashMap::new(),
1667 delta_bases: HashMap::new(),
1668 priority_hints: HashMap::new(),
1669 };
1670 metadata.dictionary_map.insert(0, "hello".to_string());
1671 metadata.dictionary_map.insert(1, "world".to_string());
1672
1673 let result = decompressor.update_context(&metadata);
1674 assert!(result.is_ok());
1675 assert_eq!(decompressor.active_dictionary.len(), 2);
1676 assert_eq!(
1677 decompressor.active_dictionary.get(&0),
1678 Some(&"hello".to_string())
1679 );
1680 }
1681
1682 #[test]
1683 fn test_update_context_updates_delta_bases() {
1684 let mut decompressor = StreamingDecompressor::new();
1685
1686 let mut metadata = DecompressionMetadata {
1687 strategy: CompressionStrategy::None,
1688 dictionary_map: HashMap::new(),
1689 delta_bases: HashMap::new(),
1690 priority_hints: HashMap::new(),
1691 };
1692 metadata.delta_bases.insert("value1".to_string(), 100.0);
1693 metadata.delta_bases.insert("value2".to_string(), 200.0);
1694
1695 let result = decompressor.update_context(&metadata);
1696 assert!(result.is_ok());
1697 assert_eq!(decompressor.delta_bases.len(), 2);
1698 assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
1699 }
1700
1701 #[test]
1702 fn test_decompress_dictionary_with_float_that_is_not_u64() {
1703 let mut decompressor = StreamingDecompressor::new();
1704 decompressor.active_dictionary.insert(0, "test".to_string());
1705
1706 let data = json!({"value": 1.5});
1708 let result = decompressor.decompress_dictionary(&data);
1709 assert!(result.is_ok());
1710 assert_eq!(result.unwrap(), json!({"value": 1.5}));
1712 }
1713
1714 #[test]
1715 fn test_decompress_dictionary_with_negative_number() {
1716 let mut decompressor = StreamingDecompressor::new();
1717 decompressor.active_dictionary.insert(0, "test".to_string());
1718
1719 let data = json!({"value": -1});
1720 let result = decompressor.decompress_dictionary(&data);
1721 assert!(result.is_ok());
1722 assert_eq!(result.unwrap(), json!({"value": -1}));
1724 }
1725
1726 #[test]
1727 fn test_decompress_delta_array_checks_first_element_structure() {
1728 let decompressor = StreamingDecompressor::new();
1729
1730 let data = json!([
1732 {"wrong_field": 100.0},
1733 1.0,
1734 2.0
1735 ]);
1736
1737 let result = decompressor.decompress_delta(&data);
1738 assert!(result.is_ok());
1739 assert_eq!(result.unwrap(), data);
1741 }
1742
1743 #[test]
1744 fn test_decompress_delta_array_requires_both_base_and_type() {
1745 let decompressor = StreamingDecompressor::new();
1746
1747 let data1 = json!([
1749 {"delta_base": 100.0},
1750 1.0
1751 ]);
1752 let result1 = decompressor.decompress_delta(&data1);
1753 assert!(result1.is_ok());
1754
1755 let data2 = json!([
1757 {"delta_type": "numeric_sequence"},
1758 1.0
1759 ]);
1760 let result2 = decompressor.decompress_delta(&data2);
1761 assert!(result2.is_ok());
1762 }
1763
1764 #[test]
1765 fn test_decompress_run_length_with_non_objects_in_array() {
1766 let decompressor = StreamingDecompressor::new();
1767
1768 let data = json!([
1770 {"rle_value": "a", "rle_count": 2},
1771 "plain",
1772 42,
1773 true
1774 ]);
1775
1776 let result = decompressor.decompress_run_length(&data);
1777 assert!(result.is_ok());
1778 assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
1779 }
1780
1781 #[test]
1782 fn test_decompress_run_length_integrity_check_rle_value_without_count() {
1783 let decompressor = StreamingDecompressor::new();
1784
1785 let data = json!([
1786 {"rle_value": "x"}
1787 ]);
1788
1789 let result = decompressor.decompress_run_length(&data);
1790 assert!(result.is_err());
1791 }
1792
1793 #[test]
1794 fn test_decompress_run_length_integrity_check_rle_count_without_value() {
1795 let decompressor = StreamingDecompressor::new();
1796
1797 let data = json!([
1798 {"rle_count": 3}
1799 ]);
1800
1801 let result = decompressor.decompress_run_length(&data);
1802 assert!(result.is_err());
1803 }
1804
1805 #[test]
1806 fn test_decompress_run_length_non_number_count() {
1807 let decompressor = StreamingDecompressor::new();
1808
1809 let data = json!([
1810 {"rle_value": "x", "rle_count": "three"}
1811 ]);
1812
1813 let result = decompressor.decompress_run_length(&data);
1814 assert!(result.is_err());
1815 }
1816
1817 #[test]
1818 fn test_compress_frame_with_large_data() {
1819 let mut compressor = StreamingCompressor::new();
1820
1821 let large_data = json!({
1822 "users": (0..100).map(|i| json!({
1823 "id": i,
1824 "name": format!("User{}", i),
1825 "email": format!("user{}@example.com", i),
1826 "age": 20 + (i % 50),
1827 "active": i % 2 == 0
1828 })).collect::<Vec<_>>()
1829 });
1830
1831 let frame = StreamFrame {
1832 data: large_data,
1833 priority: Priority::MEDIUM,
1834 metadata: HashMap::new(),
1835 };
1836
1837 let result = compressor.compress_frame(frame);
1838 assert!(result.is_ok());
1839
1840 let stats = compressor.get_stats();
1841 assert_eq!(stats.frames_processed, 1);
1842 assert!(stats.total_input_bytes > 1000);
1843 }
1844
1845 #[test]
1846 fn test_decompress_delta_with_very_large_deltas() {
1847 let decompressor = StreamingDecompressor::new();
1848
1849 let data = json!([
1850 {"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
1851 100_000.0,
1852 200_000.0,
1853 300_000.0
1854 ]);
1855
1856 let result = decompressor.decompress_delta(&data);
1857 assert!(result.is_ok());
1858 assert_eq!(
1859 result.unwrap(),
1860 json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
1861 );
1862 }
1863}