1use crate::{
7 compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8 domain::{DomainError, DomainResult},
9 stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; #[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22 skeleton_compressor: SchemaCompressor,
24 content_compressor: SchemaCompressor,
26 stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
32pub struct CompressionStats {
33 pub total_input_bytes: usize,
35 pub total_output_bytes: usize,
37 pub frames_processed: u32,
39 pub priority_ratios: HashMap<u8, f32>,
41}
42
43#[derive(Debug, Clone)]
45pub struct CompressedFrame {
46 pub frame: StreamFrame,
48 pub compressed_data: CompressedData,
50 pub decompression_metadata: DecompressionMetadata,
52}
53
54#[derive(Debug, Clone)]
56pub struct DecompressionMetadata {
57 pub strategy: CompressionStrategy,
59 pub dictionary_map: HashMap<u16, String>,
61 pub delta_bases: HashMap<String, f64>,
63 pub priority_hints: HashMap<u8, String>,
65}
66
67impl StreamingCompressor {
68 pub fn new() -> Self {
70 Self {
71 skeleton_compressor: SchemaCompressor::new(),
72 content_compressor: SchemaCompressor::new(),
73 stats: CompressionStats::default(),
74 }
75 }
76
77 pub fn with_strategies(
79 skeleton_strategy: CompressionStrategy,
80 content_strategy: CompressionStrategy,
81 ) -> Self {
82 Self {
83 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
84 content_compressor: SchemaCompressor::with_strategy(content_strategy),
85 stats: CompressionStats::default(),
86 }
87 }
88
89 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
91 let compressor = self.select_compressor_for_priority(frame.priority);
92
93 let original_size = serde_json::to_string(&frame.data)
95 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
96 .len();
97
98 let compressed_data = compressor.compress(&frame.data)?;
100
101 self.update_stats(
103 frame.priority,
104 original_size,
105 compressed_data.compressed_size,
106 );
107
108 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
110
111 Ok(CompressedFrame {
112 frame,
113 compressed_data,
114 decompression_metadata,
115 })
116 }
117
118 pub fn optimize_for_data(
120 &mut self,
121 skeleton: &JsonValue,
122 sample_data: &[JsonValue],
123 ) -> DomainResult<()> {
124 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
126
127 if !sample_data.is_empty() {
129 let combined_sample = JsonValue::Array(sample_data.to_vec());
131 self.content_compressor
132 .analyze_and_optimize(&combined_sample)?;
133 }
134
135 Ok(())
136 }
137
138 pub fn get_stats(&self) -> &CompressionStats {
140 &self.stats
141 }
142
143 pub fn reset_stats(&mut self) {
145 self.stats = CompressionStats::default();
146 }
147
148 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
150 match priority {
151 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
153 _ => &mut self.content_compressor,
155 }
156 }
157
158 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
160 self.stats.total_input_bytes += original_size;
161 self.stats.total_output_bytes += compressed_size;
162 self.stats.frames_processed += 1;
163
164 let ratio = if original_size > 0 {
165 compressed_size as f32 / original_size as f32
166 } else {
167 1.0
168 };
169
170 self.stats.priority_ratios.insert(priority.value(), ratio);
171 }
172
173 fn create_decompression_metadata(
175 &self,
176 compressed_data: &CompressedData,
177 ) -> DomainResult<DecompressionMetadata> {
178 let mut dictionary_map = HashMap::new();
179 let mut delta_bases = HashMap::new();
180
181 for (key, value) in &compressed_data.compression_metadata {
183 if let Some(suffix) = key.strip_prefix("dict_") {
184 if let Ok(index) = suffix.parse::<u16>()
185 && let Some(string_val) = value.as_str()
186 {
187 dictionary_map.insert(index, string_val.to_string());
188 }
189 } else if let Some(path) = key.strip_prefix("base_")
190 && let Some(num) = value.as_f64()
191 {
192 delta_bases.insert(path.to_string(), num);
193 }
194 }
195
196 Ok(DecompressionMetadata {
197 strategy: compressed_data.strategy.clone(),
198 dictionary_map,
199 delta_bases,
200 priority_hints: HashMap::new(), })
202 }
203}
204
205impl CompressionStats {
206 pub fn overall_compression_ratio(&self) -> f32 {
208 if self.total_input_bytes == 0 {
209 return 1.0;
210 }
211 self.total_output_bytes as f32 / self.total_input_bytes as f32
212 }
213
214 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
216 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
217 }
218
219 pub fn bytes_saved(&self) -> isize {
221 self.total_input_bytes as isize - self.total_output_bytes as isize
222 }
223
224 pub fn percentage_saved(&self) -> f32 {
226 if self.total_input_bytes == 0 {
227 return 0.0;
228 }
229 let ratio = self.overall_compression_ratio();
230 (1.0 - ratio) * 100.0
231 }
232}
233
234#[derive(Debug, Clone)]
236pub struct StreamingDecompressor {
237 active_dictionary: HashMap<u16, String>,
239 delta_bases: HashMap<String, f64>,
241 stats: DecompressionStats,
243}
244
245#[derive(Debug, Clone, Default)]
247pub struct DecompressionStats {
248 pub frames_decompressed: u32,
250 pub total_decompressed_bytes: usize,
252 pub avg_decompression_time_us: u64,
254}
255
256impl StreamingDecompressor {
257 pub fn new() -> Self {
259 Self {
260 active_dictionary: HashMap::new(),
261 delta_bases: HashMap::new(),
262 stats: DecompressionStats::default(),
263 }
264 }
265
266 pub fn decompress_frame(
268 &mut self,
269 compressed_frame: CompressedFrame,
270 ) -> DomainResult<StreamFrame> {
271 let start_time = std::time::Instant::now();
272
273 self.update_context(&compressed_frame.decompression_metadata)?;
275
276 let decompressed_data = self.decompress_data(
278 &compressed_frame.compressed_data,
279 &compressed_frame.decompression_metadata.strategy,
280 )?;
281
282 let decompression_time = start_time.elapsed();
284 self.update_decompression_stats(&decompressed_data, decompression_time);
285
286 Ok(StreamFrame {
287 data: decompressed_data,
288 priority: compressed_frame.frame.priority,
289 metadata: compressed_frame.frame.metadata,
290 })
291 }
292
293 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
295 for (&index, string) in &metadata.dictionary_map {
297 self.active_dictionary.insert(index, string.clone());
298 }
299
300 for (path, &base) in &metadata.delta_bases {
302 self.delta_bases.insert(path.clone(), base);
303 }
304
305 Ok(())
306 }
307
308 fn decompress_data(
310 &self,
311 compressed_data: &CompressedData,
312 strategy: &CompressionStrategy,
313 ) -> DomainResult<JsonValue> {
314 match strategy {
315 CompressionStrategy::None => Ok(compressed_data.data.clone()),
316
317 CompressionStrategy::Dictionary { .. } => {
318 self.decompress_dictionary(&compressed_data.data)
319 }
320
321 CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
322
323 CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
324
325 CompressionStrategy::Hybrid { .. } => {
326 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
328 self.decompress_dictionary(&delta_decompressed)
329 }
330 }
331 }
332
333 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
335 match data {
336 JsonValue::Object(obj) => {
337 let mut decompressed = serde_json::Map::new();
338 for (key, value) in obj {
339 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
340 }
341 Ok(JsonValue::Object(decompressed))
342 }
343 JsonValue::Array(arr) => {
344 let decompressed: Result<Vec<_>, _> = arr
345 .iter()
346 .map(|item| self.decompress_dictionary(item))
347 .collect();
348 Ok(JsonValue::Array(decompressed?))
349 }
350 JsonValue::Number(n) => {
351 if let Some(index) = n.as_u64()
353 && let Some(string_val) = self.active_dictionary.get(&(index as u16))
354 {
355 return Ok(JsonValue::String(string_val.clone()));
356 }
357 Ok(data.clone())
358 }
359 _ => Ok(data.clone()),
360 }
361 }
362
363 pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
365 match data {
366 JsonValue::Object(obj) => {
367 let mut decompressed_obj = serde_json::Map::new();
368 for (key, value) in obj {
369 decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
370 }
371 Ok(JsonValue::Object(decompressed_obj))
372 }
373 JsonValue::Array(arr) => {
374 if arr.is_empty() {
375 return Ok(JsonValue::Array(arr.clone()));
376 }
377
378 if let Some(first) = arr.first()
380 && let Some(obj) = first.as_object()
381 && obj.contains_key("delta_base")
382 && obj.contains_key("delta_type")
383 {
384 return self.decompress_delta_array(arr);
386 }
387
388 let decompressed_arr: Result<Vec<_>, _> =
390 arr.iter().map(|item| self.decompress_delta(item)).collect();
391 Ok(JsonValue::Array(decompressed_arr?))
392 }
393 _ => Ok(data.clone()),
394 }
395 }
396
397 fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
399 if arr.is_empty() {
400 return Ok(JsonValue::Array(Vec::new()));
401 }
402
403 if arr.len() > MAX_DELTA_ARRAY_SIZE {
405 return Err(DomainError::CompressionError(format!(
406 "Delta array size {} exceeds maximum {}",
407 arr.len(),
408 MAX_DELTA_ARRAY_SIZE
409 )));
410 }
411
412 let base_value = arr[0]
414 .get("delta_base")
415 .and_then(|v| v.as_f64())
416 .ok_or_else(|| {
417 DomainError::CompressionError(
418 "Missing or invalid delta_base in metadata".to_string(),
419 )
420 })?;
421
422 let mut original_values = Vec::new();
424 for delta_value in arr.iter().skip(1) {
425 let delta = delta_value.as_f64().ok_or_else(|| {
426 DomainError::CompressionError("Invalid delta value: expected number".to_string())
427 })?;
428
429 let original = base_value + delta;
430 original_values.push(JsonValue::from(original));
431 }
432
433 Ok(JsonValue::Array(original_values))
434 }
435
436 pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
438 match data {
439 JsonValue::Object(obj) => {
440 let mut decompressed_obj = serde_json::Map::new();
441 for (key, value) in obj {
442 decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
443 }
444 Ok(JsonValue::Object(decompressed_obj))
445 }
446 JsonValue::Array(arr) => {
447 let mut decompressed_values = Vec::new();
448 let mut total_size = 0usize;
449
450 for item in arr {
451 if let Some(obj) = item.as_object() {
452 let has_rle_value = obj.contains_key("rle_value");
454 let has_rle_count = obj.contains_key("rle_count");
455
456 if has_rle_value && !has_rle_count {
457 return Err(DomainError::CompressionError(
458 "Malformed RLE object: rle_value without rle_count".to_string(),
459 ));
460 }
461 if has_rle_count && !has_rle_value {
462 return Err(DomainError::CompressionError(
463 "Malformed RLE object: rle_count without rle_value".to_string(),
464 ));
465 }
466
467 if has_rle_value && has_rle_count {
469 let value = obj
470 .get("rle_value")
471 .ok_or_else(|| {
472 DomainError::CompressionError("Missing rle_value".to_string())
473 })?
474 .clone();
475
476 let count =
477 obj.get("rle_count")
478 .and_then(|v| v.as_u64())
479 .ok_or_else(|| {
480 DomainError::CompressionError(
481 "Invalid rle_count: expected positive integer"
482 .to_string(),
483 )
484 })?;
485
486 if count > MAX_RLE_COUNT {
488 return Err(DomainError::CompressionError(format!(
489 "RLE count {} exceeds maximum {}",
490 count, MAX_RLE_COUNT
491 )));
492 }
493
494 let count_usize = usize::try_from(count).map_err(|_| {
496 DomainError::CompressionError(format!(
497 "RLE count {} exceeds platform maximum",
498 count
499 ))
500 })?;
501
502 total_size = total_size.checked_add(count_usize).ok_or_else(|| {
504 DomainError::CompressionError(
505 "Total decompressed size overflow".to_string(),
506 )
507 })?;
508
509 if total_size > MAX_DECOMPRESSED_SIZE {
510 return Err(DomainError::CompressionError(format!(
511 "Decompressed size {} exceeds maximum {}",
512 total_size, MAX_DECOMPRESSED_SIZE
513 )));
514 }
515
516 for _ in 0..count {
518 decompressed_values.push(value.clone());
519 }
520 } else {
521 decompressed_values.push(self.decompress_run_length(item)?);
523 }
524 } else {
525 decompressed_values.push(self.decompress_run_length(item)?);
527 }
528 }
529
530 Ok(JsonValue::Array(decompressed_values))
531 }
532 _ => Ok(data.clone()),
533 }
534 }
535
536 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
538 self.stats.frames_decompressed += 1;
539
540 if let Ok(serialized) = serde_json::to_string(data) {
541 self.stats.total_decompressed_bytes += serialized.len();
542 }
543
544 let new_time_us = duration.as_micros() as u64;
545 if self.stats.frames_decompressed == 1 {
546 self.stats.avg_decompression_time_us = new_time_us;
547 } else {
548 let total_frames = self.stats.frames_decompressed as u64;
550 let total_time =
551 self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
552 self.stats.avg_decompression_time_us = total_time / total_frames;
553 }
554 }
555
556 pub fn get_stats(&self) -> &DecompressionStats {
558 &self.stats
559 }
560}
561
562impl Default for StreamingCompressor {
563 fn default() -> Self {
564 Self::new()
565 }
566}
567
568impl Default for StreamingDecompressor {
569 fn default() -> Self {
570 Self::new()
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use serde_json::json;
578
579 #[test]
580 fn test_streaming_compressor_basic() {
581 let mut compressor = StreamingCompressor::new();
582
583 let frame = StreamFrame {
584 data: json!({
585 "message": "test message",
586 "count": 42
587 }),
588 priority: Priority::MEDIUM,
589 metadata: HashMap::new(),
590 };
591
592 let result = compressor.compress_frame(frame);
593 assert!(result.is_ok());
594
595 let compressed = result.unwrap();
596 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
597 }
598
599 #[test]
600 fn test_compression_stats() {
601 let stats = CompressionStats {
602 total_input_bytes: 1000,
603 total_output_bytes: 600,
604 ..Default::default()
605 };
606
607 assert_eq!(stats.overall_compression_ratio(), 0.6);
608 assert_eq!(stats.bytes_saved(), 400);
609 let percentage = stats.percentage_saved();
611 assert!((percentage - 40.0).abs() < 0.001);
612 }
613
614 #[test]
615 fn test_streaming_decompressor_basic() {
616 let mut decompressor = StreamingDecompressor::new();
617
618 let compressed_frame = CompressedFrame {
619 frame: StreamFrame {
620 data: json!({"test": "data"}),
621 priority: Priority::MEDIUM,
622 metadata: HashMap::new(),
623 },
624 compressed_data: CompressedData {
625 strategy: CompressionStrategy::None,
626 compressed_size: 20,
627 data: json!({"test": "data"}),
628 compression_metadata: HashMap::new(),
629 },
630 decompression_metadata: DecompressionMetadata {
631 strategy: CompressionStrategy::None,
632 dictionary_map: HashMap::new(),
633 delta_bases: HashMap::new(),
634 priority_hints: HashMap::new(),
635 },
636 };
637
638 let result = decompressor.decompress_frame(compressed_frame);
639 assert!(result.is_ok());
640
641 let decompressed = result.unwrap();
642 assert_eq!(decompressed.data, json!({"test": "data"}));
643 }
644
645 #[test]
646 fn test_dictionary_decompression() {
647 let mut decompressor = StreamingDecompressor::new();
648 decompressor
649 .active_dictionary
650 .insert(0, "hello".to_string());
651 decompressor
652 .active_dictionary
653 .insert(1, "world".to_string());
654
655 let compressed = json!({
657 "greeting": 0,
658 "target": 1
659 });
660
661 let result = decompressor.decompress_dictionary(&compressed).unwrap();
662 assert_eq!(
663 result,
664 json!({
665 "greeting": "hello",
666 "target": "world"
667 })
668 );
669 }
670
671 #[test]
672 fn test_priority_based_compression() {
673 let mut compressor = StreamingCompressor::new();
674
675 let critical_frame = StreamFrame {
676 data: json!({"error": "critical failure"}),
677 priority: Priority::CRITICAL,
678 metadata: HashMap::new(),
679 };
680
681 let low_frame = StreamFrame {
682 data: json!({"debug": "verbose information"}),
683 priority: Priority::LOW,
684 metadata: HashMap::new(),
685 };
686
687 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
688 let _low_result = compressor.compress_frame(low_frame).unwrap();
689
690 let stats = compressor.get_stats();
691 assert_eq!(stats.frames_processed, 2);
692 assert!(stats.total_input_bytes > 0);
693 }
694
695 #[test]
696 fn test_delta_decompression_basic() {
697 let decompressor = StreamingDecompressor::new();
698
699 let compressed_data = json!([
700 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
701 0.0,
702 1.0,
703 2.0,
704 3.0,
705 4.0
706 ]);
707
708 let result = decompressor.decompress_delta(&compressed_data).unwrap();
709 assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
710 }
711
712 #[test]
713 fn test_delta_decompression_negative_deltas() {
714 let decompressor = StreamingDecompressor::new();
715
716 let compressed_data = json!([
717 {"delta_base": 50.0, "delta_type": "numeric_sequence"},
718 -10.0,
719 0.0,
720 10.0,
721 20.0
722 ]);
723
724 let result = decompressor.decompress_delta(&compressed_data).unwrap();
725 assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
726 }
727
728 #[test]
729 fn test_delta_decompression_fractional_deltas() {
730 let decompressor = StreamingDecompressor::new();
731
732 let compressed_data = json!([
733 {"delta_base": 10.0, "delta_type": "numeric_sequence"},
734 0.5,
735 1.0,
736 1.5,
737 2.0
738 ]);
739
740 let result = decompressor.decompress_delta(&compressed_data).unwrap();
741 assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
742 }
743
744 #[test]
745 fn test_delta_decompression_empty_array() {
746 let decompressor = StreamingDecompressor::new();
747
748 let compressed_data = json!([]);
749
750 let result = decompressor.decompress_delta(&compressed_data).unwrap();
751 assert_eq!(result, json!([]));
752 }
753
754 #[test]
755 fn test_delta_decompression_single_element() {
756 let decompressor = StreamingDecompressor::new();
757
758 let compressed_data = json!([
759 {"delta_base": 100.0, "delta_type": "numeric_sequence"}
760 ]);
761
762 let result = decompressor.decompress_delta(&compressed_data).unwrap();
763 assert_eq!(result, json!([]));
764 }
765
766 #[test]
767 fn test_delta_decompression_nested_structure() {
768 let decompressor = StreamingDecompressor::new();
769
770 let compressed_data = json!({
771 "sequence": [
772 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
773 0.0,
774 1.0,
775 2.0
776 ],
777 "other": "data"
778 });
779
780 let result = decompressor.decompress_delta(&compressed_data).unwrap();
781 assert_eq!(
782 result,
783 json!({
784 "sequence": [100.0, 101.0, 102.0],
785 "other": "data"
786 })
787 );
788 }
789
790 #[test]
791 fn test_delta_decompression_invalid_metadata() {
792 let decompressor = StreamingDecompressor::new();
793
794 let compressed_data = json!([
795 {"wrong_key": 100.0},
796 0.0,
797 1.0
798 ]);
799
800 let result = decompressor.decompress_delta(&compressed_data);
801 assert!(result.is_ok());
802 }
804
805 #[test]
806 fn test_delta_decompression_invalid_delta_value() {
807 let decompressor = StreamingDecompressor::new();
808
809 let compressed_data = json!([
810 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
811 "not_a_number"
812 ]);
813
814 let result = decompressor.decompress_delta(&compressed_data);
815 assert!(result.is_err());
816 }
817
818 #[test]
819 fn test_rle_decompression_basic() {
820 let decompressor = StreamingDecompressor::new();
821
822 let compressed_data = json!([
823 {"rle_value": 1, "rle_count": 3},
824 {"rle_value": 2, "rle_count": 2},
825 {"rle_value": 3, "rle_count": 4}
826 ]);
827
828 let result = decompressor
829 .decompress_run_length(&compressed_data)
830 .unwrap();
831 assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
832 }
833
834 #[test]
835 fn test_rle_decompression_mixed_runs() {
836 let decompressor = StreamingDecompressor::new();
837
838 let compressed_data = json!([
839 {"rle_value": "a", "rle_count": 2},
840 "b",
841 {"rle_value": "c", "rle_count": 3}
842 ]);
843
844 let result = decompressor
845 .decompress_run_length(&compressed_data)
846 .unwrap();
847 assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
848 }
849
850 #[test]
851 fn test_rle_decompression_single_count() {
852 let decompressor = StreamingDecompressor::new();
853
854 let compressed_data = json!([
855 {"rle_value": "x", "rle_count": 1}
856 ]);
857
858 let result = decompressor
859 .decompress_run_length(&compressed_data)
860 .unwrap();
861 assert_eq!(result, json!(["x"]));
862 }
863
864 #[test]
865 fn test_rle_decompression_zero_count() {
866 let decompressor = StreamingDecompressor::new();
867
868 let compressed_data = json!([
869 {"rle_value": "x", "rle_count": 0}
870 ]);
871
872 let result = decompressor
873 .decompress_run_length(&compressed_data)
874 .unwrap();
875 assert_eq!(result, json!([]));
876 }
877
878 #[test]
879 fn test_rle_decompression_nested_values() {
880 let decompressor = StreamingDecompressor::new();
881
882 let compressed_data = json!([
883 {"rle_value": {"name": "test"}, "rle_count": 3}
884 ]);
885
886 let result = decompressor
887 .decompress_run_length(&compressed_data)
888 .unwrap();
889 assert_eq!(
890 result,
891 json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
892 );
893 }
894
895 #[test]
896 fn test_rle_decompression_nested_structure() {
897 let decompressor = StreamingDecompressor::new();
898
899 let compressed_data = json!({
900 "data": [
901 {"rle_value": 1, "rle_count": 3},
902 {"rle_value": 2, "rle_count": 2}
903 ],
904 "other": "field"
905 });
906
907 let result = decompressor
908 .decompress_run_length(&compressed_data)
909 .unwrap();
910 assert_eq!(
911 result,
912 json!({
913 "data": [1, 1, 1, 2, 2],
914 "other": "field"
915 })
916 );
917 }
918
919 #[test]
920 fn test_rle_decompression_empty_array() {
921 let decompressor = StreamingDecompressor::new();
922
923 let compressed_data = json!([]);
924
925 let result = decompressor
926 .decompress_run_length(&compressed_data)
927 .unwrap();
928 assert_eq!(result, json!([]));
929 }
930
931 #[test]
932 fn test_rle_decompression_invalid_count() {
933 let decompressor = StreamingDecompressor::new();
934
935 let compressed_data = json!([
936 {"rle_value": "x", "rle_count": "not_a_number"}
937 ]);
938
939 let result = decompressor.decompress_run_length(&compressed_data);
940 assert!(result.is_err());
941 }
942
943 #[test]
944 fn test_rle_decompression_missing_value() {
945 let decompressor = StreamingDecompressor::new();
946
947 let compressed_data = json!([
948 {"rle_count": 3}
949 ]);
950
951 let result = decompressor.decompress_run_length(&compressed_data);
952 assert!(result.is_err());
953 }
954
955 #[test]
956 fn test_rle_decompression_missing_count() {
957 let decompressor = StreamingDecompressor::new();
958
959 let compressed_data = json!([
960 {"rle_value": "x"}
961 ]);
962
963 let result = decompressor.decompress_run_length(&compressed_data);
964 assert!(result.is_err());
965 }
966
967 #[test]
968 fn test_rle_decompression_non_rle_objects() {
969 let decompressor = StreamingDecompressor::new();
970
971 let compressed_data = json!([
972 {"regular": "object"},
973 {"another": "one"}
974 ]);
975
976 let result = decompressor
977 .decompress_run_length(&compressed_data)
978 .unwrap();
979 assert_eq!(
981 result,
982 json!([
983 {"regular": "object"},
984 {"another": "one"}
985 ])
986 );
987 }
988
989 #[test]
992 fn test_compress_frame_with_custom_strategies() {
993 let mut dict = HashMap::new();
994 dict.insert("test".to_string(), 0);
995
996 let mut bases = HashMap::new();
997 bases.insert("value".to_string(), 100.0);
998
999 let mut compressor = StreamingCompressor::with_strategies(
1000 CompressionStrategy::Dictionary { dictionary: dict },
1001 CompressionStrategy::Delta { base_values: bases },
1002 );
1003
1004 let frame = StreamFrame {
1005 data: json!({"value": 123, "other": 456}),
1006 priority: Priority::HIGH,
1007 metadata: HashMap::new(),
1008 };
1009
1010 let result = compressor.compress_frame(frame);
1011 assert!(result.is_ok());
1012 assert_eq!(compressor.stats.frames_processed, 1);
1013 }
1014
1015 #[test]
1016 fn test_optimize_for_data_with_samples() {
1017 let mut compressor = StreamingCompressor::new();
1018
1019 let skeleton = json!({
1020 "id": null,
1021 "name": null
1022 });
1023
1024 let samples = vec![
1025 json!({"id": 1, "name": "test1"}),
1026 json!({"id": 2, "name": "test2"}),
1027 json!({"id": 3, "name": "test3"}),
1028 ];
1029
1030 let result = compressor.optimize_for_data(&skeleton, &samples);
1031 assert!(result.is_ok());
1032 }
1033
1034 #[test]
1035 fn test_optimize_for_data_empty_samples() {
1036 let mut compressor = StreamingCompressor::new();
1037
1038 let skeleton = json!({"key": "value"});
1039 let result = compressor.optimize_for_data(&skeleton, &[]);
1040 assert!(result.is_ok());
1041 }
1042
1043 #[test]
1044 fn test_reset_stats() {
1045 let mut compressor = StreamingCompressor::new();
1046
1047 compressor.stats.total_input_bytes = 1000;
1048 compressor.stats.total_output_bytes = 500;
1049 compressor.stats.frames_processed = 10;
1050
1051 compressor.reset_stats();
1052
1053 assert_eq!(compressor.stats.total_input_bytes, 0);
1054 assert_eq!(compressor.stats.total_output_bytes, 0);
1055 assert_eq!(compressor.stats.frames_processed, 0);
1056 }
1057
1058 #[test]
1059 fn test_compressor_critical_vs_low_priority() {
1060 let mut compressor = StreamingCompressor::new();
1061
1062 let critical_frame = StreamFrame {
1063 data: json!({"critical": "data"}),
1064 priority: Priority::CRITICAL,
1065 metadata: HashMap::new(),
1066 };
1067
1068 let low_frame = StreamFrame {
1069 data: json!({"low": "data"}),
1070 priority: Priority::LOW,
1071 metadata: HashMap::new(),
1072 };
1073
1074 compressor.compress_frame(critical_frame).unwrap();
1075 compressor.compress_frame(low_frame).unwrap();
1076
1077 assert_eq!(compressor.stats.frames_processed, 2);
1078 }
1079
1080 #[test]
1081 fn test_decompressor_hybrid_strategy() {
1082 let mut decompressor = StreamingDecompressor::new();
1083
1084 decompressor.delta_bases.insert("value".to_string(), 100.0);
1086 decompressor.active_dictionary.insert(0, "test".to_string());
1087
1088 let mut string_dict = HashMap::new();
1089 string_dict.insert("test".to_string(), 0);
1090
1091 let mut numeric_deltas = HashMap::new();
1092 numeric_deltas.insert("value".to_string(), 100.0);
1093
1094 let compressed_frame = CompressedFrame {
1095 frame: StreamFrame {
1096 data: json!({"test": "data"}),
1097 priority: Priority::MEDIUM,
1098 metadata: HashMap::new(),
1099 },
1100 compressed_data: CompressedData {
1101 strategy: CompressionStrategy::Hybrid {
1102 string_dict: string_dict.clone(),
1103 numeric_deltas: numeric_deltas.clone(),
1104 },
1105 compressed_size: 20,
1106 data: json!({"value": 5.0}), compression_metadata: HashMap::new(),
1108 },
1109 decompression_metadata: DecompressionMetadata {
1110 strategy: CompressionStrategy::Hybrid {
1111 string_dict,
1112 numeric_deltas,
1113 },
1114 dictionary_map: HashMap::new(),
1115 delta_bases: HashMap::new(),
1116 priority_hints: HashMap::new(),
1117 },
1118 };
1119
1120 let result = decompressor.decompress_frame(compressed_frame);
1121 assert!(result.is_ok());
1122 }
1123
1124 #[test]
1125 fn test_decompress_dictionary_nested_arrays() {
1126 let mut decompressor = StreamingDecompressor::new();
1127 decompressor
1128 .active_dictionary
1129 .insert(0, "item1".to_string());
1130 decompressor
1131 .active_dictionary
1132 .insert(1, "item2".to_string());
1133
1134 let data = json!([[0, 1], [1, 0]]);
1135 let result = decompressor.decompress_dictionary(&data).unwrap();
1136
1137 assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
1138 }
1139
1140 #[test]
1141 fn test_decompress_dictionary_non_index_numbers() {
1142 let mut decompressor = StreamingDecompressor::new();
1143 decompressor.active_dictionary.insert(0, "test".to_string());
1144
1145 let data = json!({"value": 999});
1147 let result = decompressor.decompress_dictionary(&data).unwrap();
1148
1149 assert_eq!(result, json!({"value": 999}));
1151 }
1152
1153 #[test]
1154 fn test_decompress_delta_non_array() {
1155 let decompressor = StreamingDecompressor::new();
1156
1157 let data = json!({"key": "value"});
1159 let result = decompressor.decompress_delta(&data).unwrap();
1160
1161 assert_eq!(result, json!({"key": "value"}));
1162 }
1163
1164 #[test]
1165 fn test_decompress_delta_array_without_metadata() {
1166 let decompressor = StreamingDecompressor::new();
1167
1168 let data = json!([1, 2, 3, 4]);
1170 let result = decompressor.decompress_delta(&data).unwrap();
1171
1172 assert_eq!(result, json!([1, 2, 3, 4]));
1173 }
1174
1175 #[test]
1176 fn test_decompress_run_length_nested_objects() {
1177 let decompressor = StreamingDecompressor::new();
1178
1179 let data = json!({
1180 "outer": {
1181 "inner": [
1182 {"rle_value": {"nested": "obj"}, "rle_count": 2}
1183 ]
1184 }
1185 });
1186
1187 let result = decompressor.decompress_run_length(&data).unwrap();
1188 assert_eq!(
1189 result,
1190 json!({
1191 "outer": {
1192 "inner": [{"nested": "obj"}, {"nested": "obj"}]
1193 }
1194 })
1195 );
1196 }
1197
1198 #[test]
1199 fn test_decompression_stats_tracking() {
1200 let mut decompressor = StreamingDecompressor::new();
1201
1202 assert_eq!(decompressor.stats.frames_decompressed, 0);
1203
1204 let frame = CompressedFrame {
1205 frame: StreamFrame {
1206 data: json!({"test": "data"}),
1207 priority: Priority::MEDIUM,
1208 metadata: HashMap::new(),
1209 },
1210 compressed_data: CompressedData {
1211 strategy: CompressionStrategy::None,
1212 compressed_size: 15,
1213 data: json!({"test": "data"}),
1214 compression_metadata: HashMap::new(),
1215 },
1216 decompression_metadata: DecompressionMetadata {
1217 strategy: CompressionStrategy::None,
1218 dictionary_map: HashMap::new(),
1219 delta_bases: HashMap::new(),
1220 priority_hints: HashMap::new(),
1221 },
1222 };
1223
1224 decompressor.decompress_frame(frame).unwrap();
1225
1226 assert_eq!(decompressor.stats.frames_decompressed, 1);
1227 assert!(decompressor.stats.total_decompressed_bytes > 0);
1228 assert!(decompressor.stats.avg_decompression_time_us > 0);
1229 }
1230
1231 #[test]
1232 fn test_decompress_delta_array_malformed_metadata() {
1233 let decompressor = StreamingDecompressor::new();
1234
1235 let data = json!([
1237 {"delta_type": "numeric_sequence"},
1238 1.0,
1239 2.0
1240 ]);
1241
1242 let result = decompressor.decompress_delta(&data);
1243 assert!(result.is_ok());
1245 assert_eq!(result.unwrap(), data);
1247 }
1248
1249 #[test]
1250 fn test_decompress_run_length_large_count() {
1251 let decompressor = StreamingDecompressor::new();
1252
1253 let data = json!([
1255 {"rle_value": "x", "rle_count": 1000}
1256 ]);
1257
1258 let result = decompressor.decompress_run_length(&data);
1259 assert!(result.is_ok());
1260 let decompressed = result.unwrap();
1261 if let Some(arr) = decompressed.as_array() {
1262 assert_eq!(arr.len(), 1000);
1263 }
1264 }
1265
1266 #[test]
1267 fn test_decompress_run_length_exceeds_max_count() {
1268 let decompressor = StreamingDecompressor::new();
1269
1270 let data = json!([
1272 {"rle_value": "x", "rle_count": 200_000}
1273 ]);
1274
1275 let result = decompressor.decompress_run_length(&data);
1276 assert!(result.is_err()); }
1278
1279 #[test]
1280 fn test_decompress_run_length_cumulative_overflow() {
1281 let decompressor = StreamingDecompressor::new();
1282
1283 let data = json!([
1285 {"rle_value": "a", "rle_count": 5_000_000},
1286 {"rle_value": "b", "rle_count": 6_000_000}
1287 ]);
1288
1289 let result = decompressor.decompress_run_length(&data);
1290 assert!(result.is_err());
1292 }
1293
1294 #[test]
1295 fn test_decompress_delta_array_size_limit() {
1296 let decompressor = StreamingDecompressor::new();
1297
1298 let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
1300 for _i in 0..1_000_001 {
1301 large_array.push(json!(0.0));
1302 }
1303
1304 let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
1305 assert!(result.is_err()); }
1307
1308 #[test]
1309 fn test_compression_stats_default() {
1310 let stats = CompressionStats::default();
1311 assert_eq!(stats.total_input_bytes, 0);
1312 assert_eq!(stats.total_output_bytes, 0);
1313 assert_eq!(stats.frames_processed, 0);
1314 assert_eq!(stats.overall_compression_ratio(), 1.0);
1315 }
1316
1317 #[test]
1318 fn test_decompression_stats_default() {
1319 let stats = DecompressionStats::default();
1320 assert_eq!(stats.frames_decompressed, 0);
1321 assert_eq!(stats.total_decompressed_bytes, 0);
1322 assert_eq!(stats.avg_decompression_time_us, 0);
1323 }
1324
1325 #[test]
1330 fn test_decompress_dictionary_with_strings() {
1331 let decompressor = StreamingDecompressor::new();
1332 let data = json!({"key": "value", "nested": {"inner": "string"}});
1333 let result = decompressor.decompress_dictionary(&data);
1334 assert!(result.is_ok());
1335 assert_eq!(result.unwrap(), data);
1336 }
1337
1338 #[test]
1339 fn test_decompress_dictionary_with_null() {
1340 let decompressor = StreamingDecompressor::new();
1341 let data = json!(null);
1342 let result = decompressor.decompress_dictionary(&data);
1343 assert!(result.is_ok());
1344 assert_eq!(result.unwrap(), json!(null));
1345 }
1346
1347 #[test]
1348 fn test_decompress_dictionary_with_boolean() {
1349 let decompressor = StreamingDecompressor::new();
1350 let data = json!(true);
1351 let result = decompressor.decompress_dictionary(&data);
1352 assert!(result.is_ok());
1353 assert_eq!(result.unwrap(), json!(true));
1354 }
1355
1356 #[test]
1357 fn test_decompress_dictionary_with_string() {
1358 let decompressor = StreamingDecompressor::new();
1359 let data = json!("plain string");
1360 let result = decompressor.decompress_dictionary(&data);
1361 assert!(result.is_ok());
1362 assert_eq!(result.unwrap(), json!("plain string"));
1363 }
1364
1365 #[test]
1366 fn test_decompress_delta_with_object_no_array() {
1367 let decompressor = StreamingDecompressor::new();
1368 let data = json!({"key": "value"});
1369 let result = decompressor.decompress_delta(&data);
1370 assert!(result.is_ok());
1371 assert_eq!(result.unwrap(), json!({"key": "value"}));
1372 }
1373
1374 #[test]
1375 fn test_decompress_delta_with_primitive_values() {
1376 let decompressor = StreamingDecompressor::new();
1377
1378 assert_eq!(
1379 decompressor.decompress_delta(&json!("string")).unwrap(),
1380 json!("string")
1381 );
1382 assert_eq!(
1383 decompressor.decompress_delta(&json!(42)).unwrap(),
1384 json!(42)
1385 );
1386 assert_eq!(
1387 decompressor.decompress_delta(&json!(true)).unwrap(),
1388 json!(true)
1389 );
1390 assert_eq!(
1391 decompressor.decompress_delta(&json!(null)).unwrap(),
1392 json!(null)
1393 );
1394 }
1395
1396 #[test]
1397 fn test_decompress_run_length_with_primitive_values() {
1398 let decompressor = StreamingDecompressor::new();
1399
1400 assert_eq!(
1401 decompressor
1402 .decompress_run_length(&json!("string"))
1403 .unwrap(),
1404 json!("string")
1405 );
1406 assert_eq!(
1407 decompressor.decompress_run_length(&json!(123)).unwrap(),
1408 json!(123)
1409 );
1410 assert_eq!(
1411 decompressor.decompress_run_length(&json!(false)).unwrap(),
1412 json!(false)
1413 );
1414 assert_eq!(
1415 decompressor.decompress_run_length(&json!(null)).unwrap(),
1416 json!(null)
1417 );
1418 }
1419
1420 #[test]
1421 fn test_decompress_data_strategy_dictionary() {
1422 let mut decompressor = StreamingDecompressor::new();
1423 decompressor.active_dictionary.insert(0, "test".to_string());
1424
1425 let mut dict = HashMap::new();
1426 dict.insert("test".to_string(), 0);
1427
1428 let compressed_data = CompressedData {
1429 strategy: CompressionStrategy::Dictionary { dictionary: dict },
1430 compressed_size: 10,
1431 data: json!({"field": 0}),
1432 compression_metadata: HashMap::new(),
1433 };
1434
1435 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1436 assert!(result.is_ok());
1437 }
1438
1439 #[test]
1440 fn test_decompress_data_strategy_delta() {
1441 let decompressor = StreamingDecompressor::new();
1442
1443 let mut bases = HashMap::new();
1444 bases.insert("value".to_string(), 100.0);
1445
1446 let compressed_data = CompressedData {
1447 strategy: CompressionStrategy::Delta {
1448 base_values: bases.clone(),
1449 },
1450 compressed_size: 10,
1451 data: json!({
1452 "sequence": [
1453 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
1454 5.0,
1455 10.0
1456 ]
1457 }),
1458 compression_metadata: HashMap::new(),
1459 };
1460
1461 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1462 assert!(result.is_ok());
1463 }
1464
1465 #[test]
1466 fn test_decompress_data_strategy_run_length() {
1467 let decompressor = StreamingDecompressor::new();
1468
1469 let compressed_data = CompressedData {
1470 strategy: CompressionStrategy::RunLength,
1471 compressed_size: 10,
1472 data: json!([
1473 {"rle_value": "x", "rle_count": 3}
1474 ]),
1475 compression_metadata: HashMap::new(),
1476 };
1477
1478 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1479 assert!(result.is_ok());
1480 assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
1481 }
1482
1483 #[test]
1484 fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
1485 let mut decompressor = StreamingDecompressor::new();
1486 decompressor.active_dictionary.insert(0, "test".to_string());
1487
1488 let mut string_dict = HashMap::new();
1489 string_dict.insert("test".to_string(), 0);
1490
1491 let mut numeric_deltas = HashMap::new();
1492 numeric_deltas.insert("value".to_string(), 100.0);
1493
1494 let compressed_data = CompressedData {
1495 strategy: CompressionStrategy::Hybrid {
1496 string_dict,
1497 numeric_deltas,
1498 },
1499 compressed_size: 10,
1500 data: json!({
1501 "field": 0
1502 }),
1503 compression_metadata: HashMap::new(),
1504 };
1505
1506 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1507 assert!(result.is_ok());
1508 }
1509
1510 #[test]
1511 fn test_select_compressor_critical_priority() {
1512 let mut compressor = StreamingCompressor::new();
1513 let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
1514 }
1515
1516 #[test]
1517 fn test_select_compressor_high_priority() {
1518 let mut compressor = StreamingCompressor::new();
1519 let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
1520 }
1521
1522 #[test]
1523 fn test_select_compressor_medium_priority() {
1524 let mut compressor = StreamingCompressor::new();
1525 let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
1526 }
1527
1528 #[test]
1529 fn test_select_compressor_low_priority() {
1530 let mut compressor = StreamingCompressor::new();
1531 let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
1532 }
1533
1534 #[test]
1535 fn test_select_compressor_background_priority() {
1536 let mut compressor = StreamingCompressor::new();
1537 let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
1538 }
1539
1540 #[test]
1541 fn test_update_stats_with_zero_original_size() {
1542 let mut compressor = StreamingCompressor::new();
1543 compressor.update_stats(Priority::MEDIUM, 0, 10);
1544
1545 let stats = compressor.get_stats();
1546 assert_eq!(stats.frames_processed, 1);
1547 assert_eq!(stats.total_input_bytes, 0);
1548 assert_eq!(stats.total_output_bytes, 10);
1549 assert_eq!(
1550 stats.priority_compression_ratio(Priority::MEDIUM.value()),
1551 1.0
1552 );
1553 }
1554
1555 #[test]
1556 fn test_update_stats_with_normal_compression() {
1557 let mut compressor = StreamingCompressor::new();
1558 compressor.update_stats(Priority::HIGH, 1000, 500);
1559
1560 let stats = compressor.get_stats();
1561 assert_eq!(stats.frames_processed, 1);
1562 assert_eq!(stats.total_input_bytes, 1000);
1563 assert_eq!(stats.total_output_bytes, 500);
1564 assert_eq!(
1565 stats.priority_compression_ratio(Priority::HIGH.value()),
1566 0.5
1567 );
1568 }
1569
1570 #[test]
1571 fn test_update_decompression_stats_first_frame() {
1572 let mut decompressor = StreamingDecompressor::new();
1573 let data = json!({"test": "data"});
1574 let duration = std::time::Duration::from_micros(100);
1575
1576 decompressor.update_decompression_stats(&data, duration);
1577
1578 let stats = decompressor.get_stats();
1579 assert_eq!(stats.frames_decompressed, 1);
1580 assert_eq!(stats.avg_decompression_time_us, 100);
1581 }
1582
1583 #[test]
1584 fn test_update_decompression_stats_multiple_frames() {
1585 let mut decompressor = StreamingDecompressor::new();
1586 let data = json!({"test": "data"});
1587
1588 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
1589 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
1590 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
1591
1592 let stats = decompressor.get_stats();
1593 assert_eq!(stats.frames_decompressed, 3);
1594 assert_eq!(stats.avg_decompression_time_us, 200); }
1596
1597 #[test]
1598 fn test_create_decompression_metadata_with_dict() {
1599 let compressor = StreamingCompressor::new();
1600 let mut metadata = HashMap::new();
1601 metadata.insert("dict_0".to_string(), json!("hello"));
1602 metadata.insert("dict_1".to_string(), json!("world"));
1603
1604 let compressed_data = CompressedData {
1605 strategy: CompressionStrategy::None,
1606 compressed_size: 10,
1607 data: json!({}),
1608 compression_metadata: metadata,
1609 };
1610
1611 let result = compressor.create_decompression_metadata(&compressed_data);
1612 assert!(result.is_ok());
1613 let meta = result.unwrap();
1614 assert_eq!(meta.dictionary_map.len(), 2);
1615 assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
1616 assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
1617 }
1618
1619 #[test]
1620 fn test_create_decompression_metadata_with_delta_bases() {
1621 let compressor = StreamingCompressor::new();
1622 let mut metadata = HashMap::new();
1623 metadata.insert("base_value1".to_string(), json!(100.0));
1624 metadata.insert("base_value2".to_string(), json!(200.0));
1625
1626 let compressed_data = CompressedData {
1627 strategy: CompressionStrategy::None,
1628 compressed_size: 10,
1629 data: json!({}),
1630 compression_metadata: metadata,
1631 };
1632
1633 let result = compressor.create_decompression_metadata(&compressed_data);
1634 assert!(result.is_ok());
1635 let meta = result.unwrap();
1636 assert_eq!(meta.delta_bases.len(), 2);
1637 assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
1638 assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
1639 }
1640
1641 #[test]
1642 fn test_create_decompression_metadata_with_invalid_dict_index() {
1643 let compressor = StreamingCompressor::new();
1644 let mut metadata = HashMap::new();
1645 metadata.insert("dict_invalid".to_string(), json!("value"));
1646 metadata.insert("dict_0".to_string(), json!("valid"));
1647
1648 let compressed_data = CompressedData {
1649 strategy: CompressionStrategy::None,
1650 compressed_size: 10,
1651 data: json!({}),
1652 compression_metadata: metadata,
1653 };
1654
1655 let result = compressor.create_decompression_metadata(&compressed_data);
1656 assert!(result.is_ok());
1657 let meta = result.unwrap();
1658 assert_eq!(meta.dictionary_map.len(), 1);
1660 assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
1661 }
1662
1663 #[test]
1664 fn test_update_context_updates_dictionary() {
1665 let mut decompressor = StreamingDecompressor::new();
1666
1667 let mut metadata = DecompressionMetadata {
1668 strategy: CompressionStrategy::None,
1669 dictionary_map: HashMap::new(),
1670 delta_bases: HashMap::new(),
1671 priority_hints: HashMap::new(),
1672 };
1673 metadata.dictionary_map.insert(0, "hello".to_string());
1674 metadata.dictionary_map.insert(1, "world".to_string());
1675
1676 let result = decompressor.update_context(&metadata);
1677 assert!(result.is_ok());
1678 assert_eq!(decompressor.active_dictionary.len(), 2);
1679 assert_eq!(
1680 decompressor.active_dictionary.get(&0),
1681 Some(&"hello".to_string())
1682 );
1683 }
1684
1685 #[test]
1686 fn test_update_context_updates_delta_bases() {
1687 let mut decompressor = StreamingDecompressor::new();
1688
1689 let mut metadata = DecompressionMetadata {
1690 strategy: CompressionStrategy::None,
1691 dictionary_map: HashMap::new(),
1692 delta_bases: HashMap::new(),
1693 priority_hints: HashMap::new(),
1694 };
1695 metadata.delta_bases.insert("value1".to_string(), 100.0);
1696 metadata.delta_bases.insert("value2".to_string(), 200.0);
1697
1698 let result = decompressor.update_context(&metadata);
1699 assert!(result.is_ok());
1700 assert_eq!(decompressor.delta_bases.len(), 2);
1701 assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
1702 }
1703
1704 #[test]
1705 fn test_decompress_dictionary_with_float_that_is_not_u64() {
1706 let mut decompressor = StreamingDecompressor::new();
1707 decompressor.active_dictionary.insert(0, "test".to_string());
1708
1709 let data = json!({"value": 1.5});
1711 let result = decompressor.decompress_dictionary(&data);
1712 assert!(result.is_ok());
1713 assert_eq!(result.unwrap(), json!({"value": 1.5}));
1715 }
1716
1717 #[test]
1718 fn test_decompress_dictionary_with_negative_number() {
1719 let mut decompressor = StreamingDecompressor::new();
1720 decompressor.active_dictionary.insert(0, "test".to_string());
1721
1722 let data = json!({"value": -1});
1723 let result = decompressor.decompress_dictionary(&data);
1724 assert!(result.is_ok());
1725 assert_eq!(result.unwrap(), json!({"value": -1}));
1727 }
1728
1729 #[test]
1730 fn test_decompress_delta_array_checks_first_element_structure() {
1731 let decompressor = StreamingDecompressor::new();
1732
1733 let data = json!([
1735 {"wrong_field": 100.0},
1736 1.0,
1737 2.0
1738 ]);
1739
1740 let result = decompressor.decompress_delta(&data);
1741 assert!(result.is_ok());
1742 assert_eq!(result.unwrap(), data);
1744 }
1745
1746 #[test]
1747 fn test_decompress_delta_array_requires_both_base_and_type() {
1748 let decompressor = StreamingDecompressor::new();
1749
1750 let data1 = json!([
1752 {"delta_base": 100.0},
1753 1.0
1754 ]);
1755 let result1 = decompressor.decompress_delta(&data1);
1756 assert!(result1.is_ok());
1757
1758 let data2 = json!([
1760 {"delta_type": "numeric_sequence"},
1761 1.0
1762 ]);
1763 let result2 = decompressor.decompress_delta(&data2);
1764 assert!(result2.is_ok());
1765 }
1766
1767 #[test]
1768 fn test_decompress_run_length_with_non_objects_in_array() {
1769 let decompressor = StreamingDecompressor::new();
1770
1771 let data = json!([
1773 {"rle_value": "a", "rle_count": 2},
1774 "plain",
1775 42,
1776 true
1777 ]);
1778
1779 let result = decompressor.decompress_run_length(&data);
1780 assert!(result.is_ok());
1781 assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
1782 }
1783
1784 #[test]
1785 fn test_decompress_run_length_integrity_check_rle_value_without_count() {
1786 let decompressor = StreamingDecompressor::new();
1787
1788 let data = json!([
1789 {"rle_value": "x"}
1790 ]);
1791
1792 let result = decompressor.decompress_run_length(&data);
1793 assert!(result.is_err());
1794 }
1795
1796 #[test]
1797 fn test_decompress_run_length_integrity_check_rle_count_without_value() {
1798 let decompressor = StreamingDecompressor::new();
1799
1800 let data = json!([
1801 {"rle_count": 3}
1802 ]);
1803
1804 let result = decompressor.decompress_run_length(&data);
1805 assert!(result.is_err());
1806 }
1807
1808 #[test]
1809 fn test_decompress_run_length_non_number_count() {
1810 let decompressor = StreamingDecompressor::new();
1811
1812 let data = json!([
1813 {"rle_value": "x", "rle_count": "three"}
1814 ]);
1815
1816 let result = decompressor.decompress_run_length(&data);
1817 assert!(result.is_err());
1818 }
1819
1820 #[test]
1821 fn test_compress_frame_with_large_data() {
1822 let mut compressor = StreamingCompressor::new();
1823
1824 let large_data = json!({
1825 "users": (0..100).map(|i| json!({
1826 "id": i,
1827 "name": format!("User{}", i),
1828 "email": format!("user{}@example.com", i),
1829 "age": 20 + (i % 50),
1830 "active": i % 2 == 0
1831 })).collect::<Vec<_>>()
1832 });
1833
1834 let frame = StreamFrame {
1835 data: large_data,
1836 priority: Priority::MEDIUM,
1837 metadata: HashMap::new(),
1838 };
1839
1840 let result = compressor.compress_frame(frame);
1841 assert!(result.is_ok());
1842
1843 let stats = compressor.get_stats();
1844 assert_eq!(stats.frames_processed, 1);
1845 assert!(stats.total_input_bytes > 1000);
1846 }
1847
1848 #[test]
1849 fn test_decompress_delta_with_very_large_deltas() {
1850 let decompressor = StreamingDecompressor::new();
1851
1852 let data = json!([
1853 {"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
1854 100_000.0,
1855 200_000.0,
1856 300_000.0
1857 ]);
1858
1859 let result = decompressor.decompress_delta(&data);
1860 assert!(result.is_ok());
1861 assert_eq!(
1862 result.unwrap(),
1863 json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
1864 );
1865 }
1866}