1use crate::{
7 compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8 domain::{DomainError, DomainResult},
9 stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; #[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22 skeleton_compressor: SchemaCompressor,
24 content_compressor: SchemaCompressor,
26 stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct CompressionStats {
32 pub total_input_bytes: usize,
34 pub total_output_bytes: usize,
36 pub frames_processed: u32,
38 pub priority_ratios: HashMap<u8, f32>,
40}
41
42#[derive(Debug, Clone)]
44pub struct CompressedFrame {
45 pub frame: StreamFrame,
47 pub compressed_data: CompressedData,
49 pub decompression_metadata: DecompressionMetadata,
51}
52
53#[derive(Debug, Clone)]
54pub struct DecompressionMetadata {
55 pub strategy: CompressionStrategy,
57 pub dictionary_map: HashMap<u16, String>,
59 pub delta_bases: HashMap<String, f64>,
61 pub priority_hints: HashMap<u8, String>,
63}
64
65impl StreamingCompressor {
66 pub fn new() -> Self {
68 Self {
69 skeleton_compressor: SchemaCompressor::new(),
70 content_compressor: SchemaCompressor::new(),
71 stats: CompressionStats::default(),
72 }
73 }
74
75 pub fn with_strategies(
77 skeleton_strategy: CompressionStrategy,
78 content_strategy: CompressionStrategy,
79 ) -> Self {
80 Self {
81 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
82 content_compressor: SchemaCompressor::with_strategy(content_strategy),
83 stats: CompressionStats::default(),
84 }
85 }
86
87 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
89 let compressor = self.select_compressor_for_priority(frame.priority);
90
91 let original_size = serde_json::to_string(&frame.data)
93 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
94 .len();
95
96 let compressed_data = compressor.compress(&frame.data)?;
98
99 self.update_stats(
101 frame.priority,
102 original_size,
103 compressed_data.compressed_size,
104 );
105
106 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
108
109 Ok(CompressedFrame {
110 frame,
111 compressed_data,
112 decompression_metadata,
113 })
114 }
115
116 pub fn optimize_for_data(
118 &mut self,
119 skeleton: &JsonValue,
120 sample_data: &[JsonValue],
121 ) -> DomainResult<()> {
122 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
124
125 if !sample_data.is_empty() {
127 let combined_sample = JsonValue::Array(sample_data.to_vec());
129 self.content_compressor
130 .analyze_and_optimize(&combined_sample)?;
131 }
132
133 Ok(())
134 }
135
136 pub fn get_stats(&self) -> &CompressionStats {
138 &self.stats
139 }
140
141 pub fn reset_stats(&mut self) {
143 self.stats = CompressionStats::default();
144 }
145
146 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
148 match priority {
149 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
151 _ => &mut self.content_compressor,
153 }
154 }
155
156 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
158 self.stats.total_input_bytes += original_size;
159 self.stats.total_output_bytes += compressed_size;
160 self.stats.frames_processed += 1;
161
162 let ratio = if original_size > 0 {
163 compressed_size as f32 / original_size as f32
164 } else {
165 1.0
166 };
167
168 self.stats.priority_ratios.insert(priority.value(), ratio);
169 }
170
171 fn create_decompression_metadata(
173 &self,
174 compressed_data: &CompressedData,
175 ) -> DomainResult<DecompressionMetadata> {
176 let mut dictionary_map = HashMap::new();
177 let mut delta_bases = HashMap::new();
178
179 for (key, value) in &compressed_data.compression_metadata {
181 if key.starts_with("dict_") {
182 if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
183 && let Some(string_val) = value.as_str()
184 {
185 dictionary_map.insert(index, string_val.to_string());
186 }
187 } else if key.starts_with("base_") {
188 let path = key.strip_prefix("base_").unwrap();
189 if let Some(num) = value.as_f64() {
190 delta_bases.insert(path.to_string(), num);
191 }
192 }
193 }
194
195 Ok(DecompressionMetadata {
196 strategy: compressed_data.strategy.clone(),
197 dictionary_map,
198 delta_bases,
199 priority_hints: HashMap::new(), })
201 }
202}
203
204impl CompressionStats {
205 pub fn overall_compression_ratio(&self) -> f32 {
207 if self.total_input_bytes == 0 {
208 return 1.0;
209 }
210 self.total_output_bytes as f32 / self.total_input_bytes as f32
211 }
212
213 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
215 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
216 }
217
218 pub fn bytes_saved(&self) -> isize {
220 self.total_input_bytes as isize - self.total_output_bytes as isize
221 }
222
223 pub fn percentage_saved(&self) -> f32 {
225 if self.total_input_bytes == 0 {
226 return 0.0;
227 }
228 let ratio = self.overall_compression_ratio();
229 (1.0 - ratio) * 100.0
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct StreamingDecompressor {
236 active_dictionary: HashMap<u16, String>,
238 delta_bases: HashMap<String, f64>,
240 stats: DecompressionStats,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct DecompressionStats {
246 pub frames_decompressed: u32,
248 pub total_decompressed_bytes: usize,
250 pub avg_decompression_time_us: u64,
252}
253
254impl StreamingDecompressor {
255 pub fn new() -> Self {
257 Self {
258 active_dictionary: HashMap::new(),
259 delta_bases: HashMap::new(),
260 stats: DecompressionStats::default(),
261 }
262 }
263
264 pub fn decompress_frame(
266 &mut self,
267 compressed_frame: CompressedFrame,
268 ) -> DomainResult<StreamFrame> {
269 let start_time = std::time::Instant::now();
270
271 self.update_context(&compressed_frame.decompression_metadata)?;
273
274 let decompressed_data = self.decompress_data(
276 &compressed_frame.compressed_data,
277 &compressed_frame.decompression_metadata.strategy,
278 )?;
279
280 let decompression_time = start_time.elapsed();
282 self.update_decompression_stats(&decompressed_data, decompression_time);
283
284 Ok(StreamFrame {
285 data: decompressed_data,
286 priority: compressed_frame.frame.priority,
287 metadata: compressed_frame.frame.metadata,
288 })
289 }
290
291 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
293 for (&index, string) in &metadata.dictionary_map {
295 self.active_dictionary.insert(index, string.clone());
296 }
297
298 for (path, &base) in &metadata.delta_bases {
300 self.delta_bases.insert(path.clone(), base);
301 }
302
303 Ok(())
304 }
305
306 fn decompress_data(
308 &self,
309 compressed_data: &CompressedData,
310 strategy: &CompressionStrategy,
311 ) -> DomainResult<JsonValue> {
312 match strategy {
313 CompressionStrategy::None => Ok(compressed_data.data.clone()),
314
315 CompressionStrategy::Dictionary { .. } => {
316 self.decompress_dictionary(&compressed_data.data)
317 }
318
319 CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
320
321 CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
322
323 CompressionStrategy::Hybrid { .. } => {
324 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
326 self.decompress_dictionary(&delta_decompressed)
327 }
328 }
329 }
330
331 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
333 match data {
334 JsonValue::Object(obj) => {
335 let mut decompressed = serde_json::Map::new();
336 for (key, value) in obj {
337 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
338 }
339 Ok(JsonValue::Object(decompressed))
340 }
341 JsonValue::Array(arr) => {
342 let decompressed: Result<Vec<_>, _> = arr
343 .iter()
344 .map(|item| self.decompress_dictionary(item))
345 .collect();
346 Ok(JsonValue::Array(decompressed?))
347 }
348 JsonValue::Number(n) => {
349 if let Some(index) = n.as_u64()
351 && let Some(string_val) = self.active_dictionary.get(&(index as u16))
352 {
353 return Ok(JsonValue::String(string_val.clone()));
354 }
355 Ok(data.clone())
356 }
357 _ => Ok(data.clone()),
358 }
359 }
360
361 pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
363 match data {
364 JsonValue::Object(obj) => {
365 let mut decompressed_obj = serde_json::Map::new();
366 for (key, value) in obj {
367 decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
368 }
369 Ok(JsonValue::Object(decompressed_obj))
370 }
371 JsonValue::Array(arr) => {
372 if arr.is_empty() {
373 return Ok(JsonValue::Array(arr.clone()));
374 }
375
376 if let Some(first) = arr.first()
378 && let Some(obj) = first.as_object()
379 && obj.contains_key("delta_base")
380 && obj.contains_key("delta_type")
381 {
382 return self.decompress_delta_array(arr);
384 }
385
386 let decompressed_arr: Result<Vec<_>, _> =
388 arr.iter().map(|item| self.decompress_delta(item)).collect();
389 Ok(JsonValue::Array(decompressed_arr?))
390 }
391 _ => Ok(data.clone()),
392 }
393 }
394
395 fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
397 if arr.is_empty() {
398 return Ok(JsonValue::Array(Vec::new()));
399 }
400
401 if arr.len() > MAX_DELTA_ARRAY_SIZE {
403 return Err(DomainError::CompressionError(format!(
404 "Delta array size {} exceeds maximum {}",
405 arr.len(),
406 MAX_DELTA_ARRAY_SIZE
407 )));
408 }
409
410 let base_value = arr[0]
412 .get("delta_base")
413 .and_then(|v| v.as_f64())
414 .ok_or_else(|| {
415 DomainError::CompressionError(
416 "Missing or invalid delta_base in metadata".to_string(),
417 )
418 })?;
419
420 let mut original_values = Vec::new();
422 for delta_value in arr.iter().skip(1) {
423 let delta = delta_value.as_f64().ok_or_else(|| {
424 DomainError::CompressionError("Invalid delta value: expected number".to_string())
425 })?;
426
427 let original = base_value + delta;
428 original_values.push(JsonValue::from(original));
429 }
430
431 Ok(JsonValue::Array(original_values))
432 }
433
434 pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
436 match data {
437 JsonValue::Object(obj) => {
438 let mut decompressed_obj = serde_json::Map::new();
439 for (key, value) in obj {
440 decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
441 }
442 Ok(JsonValue::Object(decompressed_obj))
443 }
444 JsonValue::Array(arr) => {
445 let mut decompressed_values = Vec::new();
446 let mut total_size = 0usize;
447
448 for item in arr {
449 if let Some(obj) = item.as_object() {
450 let has_rle_value = obj.contains_key("rle_value");
452 let has_rle_count = obj.contains_key("rle_count");
453
454 if has_rle_value && !has_rle_count {
455 return Err(DomainError::CompressionError(
456 "Malformed RLE object: rle_value without rle_count".to_string(),
457 ));
458 }
459 if has_rle_count && !has_rle_value {
460 return Err(DomainError::CompressionError(
461 "Malformed RLE object: rle_count without rle_value".to_string(),
462 ));
463 }
464
465 if has_rle_value && has_rle_count {
467 let value = obj
468 .get("rle_value")
469 .ok_or_else(|| {
470 DomainError::CompressionError("Missing rle_value".to_string())
471 })?
472 .clone();
473
474 let count =
475 obj.get("rle_count")
476 .and_then(|v| v.as_u64())
477 .ok_or_else(|| {
478 DomainError::CompressionError(
479 "Invalid rle_count: expected positive integer"
480 .to_string(),
481 )
482 })?;
483
484 if count > MAX_RLE_COUNT {
486 return Err(DomainError::CompressionError(format!(
487 "RLE count {} exceeds maximum {}",
488 count, MAX_RLE_COUNT
489 )));
490 }
491
492 let count_usize = usize::try_from(count).map_err(|_| {
494 DomainError::CompressionError(format!(
495 "RLE count {} exceeds platform maximum",
496 count
497 ))
498 })?;
499
500 total_size = total_size.checked_add(count_usize).ok_or_else(|| {
502 DomainError::CompressionError(
503 "Total decompressed size overflow".to_string(),
504 )
505 })?;
506
507 if total_size > MAX_DECOMPRESSED_SIZE {
508 return Err(DomainError::CompressionError(format!(
509 "Decompressed size {} exceeds maximum {}",
510 total_size, MAX_DECOMPRESSED_SIZE
511 )));
512 }
513
514 for _ in 0..count {
516 decompressed_values.push(value.clone());
517 }
518 } else {
519 decompressed_values.push(self.decompress_run_length(item)?);
521 }
522 } else {
523 decompressed_values.push(self.decompress_run_length(item)?);
525 }
526 }
527
528 Ok(JsonValue::Array(decompressed_values))
529 }
530 _ => Ok(data.clone()),
531 }
532 }
533
534 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
536 self.stats.frames_decompressed += 1;
537
538 if let Ok(serialized) = serde_json::to_string(data) {
539 self.stats.total_decompressed_bytes += serialized.len();
540 }
541
542 let new_time_us = duration.as_micros() as u64;
543 if self.stats.frames_decompressed == 1 {
544 self.stats.avg_decompression_time_us = new_time_us;
545 } else {
546 let total_frames = self.stats.frames_decompressed as u64;
548 let total_time =
549 self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
550 self.stats.avg_decompression_time_us = total_time / total_frames;
551 }
552 }
553
554 pub fn get_stats(&self) -> &DecompressionStats {
556 &self.stats
557 }
558}
559
560impl Default for StreamingCompressor {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566impl Default for StreamingDecompressor {
567 fn default() -> Self {
568 Self::new()
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use serde_json::json;
576
577 #[test]
578 fn test_streaming_compressor_basic() {
579 let mut compressor = StreamingCompressor::new();
580
581 let frame = StreamFrame {
582 data: json!({
583 "message": "test message",
584 "count": 42
585 }),
586 priority: Priority::MEDIUM,
587 metadata: HashMap::new(),
588 };
589
590 let result = compressor.compress_frame(frame);
591 assert!(result.is_ok());
592
593 let compressed = result.unwrap();
594 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
595 }
596
597 #[test]
598 fn test_compression_stats() {
599 let stats = CompressionStats {
600 total_input_bytes: 1000,
601 total_output_bytes: 600,
602 ..Default::default()
603 };
604
605 assert_eq!(stats.overall_compression_ratio(), 0.6);
606 assert_eq!(stats.bytes_saved(), 400);
607 let percentage = stats.percentage_saved();
609 assert!((percentage - 40.0).abs() < 0.001);
610 }
611
612 #[test]
613 fn test_streaming_decompressor_basic() {
614 let mut decompressor = StreamingDecompressor::new();
615
616 let compressed_frame = CompressedFrame {
617 frame: StreamFrame {
618 data: json!({"test": "data"}),
619 priority: Priority::MEDIUM,
620 metadata: HashMap::new(),
621 },
622 compressed_data: CompressedData {
623 strategy: CompressionStrategy::None,
624 compressed_size: 20,
625 data: json!({"test": "data"}),
626 compression_metadata: HashMap::new(),
627 },
628 decompression_metadata: DecompressionMetadata {
629 strategy: CompressionStrategy::None,
630 dictionary_map: HashMap::new(),
631 delta_bases: HashMap::new(),
632 priority_hints: HashMap::new(),
633 },
634 };
635
636 let result = decompressor.decompress_frame(compressed_frame);
637 assert!(result.is_ok());
638
639 let decompressed = result.unwrap();
640 assert_eq!(decompressed.data, json!({"test": "data"}));
641 }
642
643 #[test]
644 fn test_dictionary_decompression() {
645 let mut decompressor = StreamingDecompressor::new();
646 decompressor
647 .active_dictionary
648 .insert(0, "hello".to_string());
649 decompressor
650 .active_dictionary
651 .insert(1, "world".to_string());
652
653 let compressed = json!({
655 "greeting": 0,
656 "target": 1
657 });
658
659 let result = decompressor.decompress_dictionary(&compressed).unwrap();
660 assert_eq!(
661 result,
662 json!({
663 "greeting": "hello",
664 "target": "world"
665 })
666 );
667 }
668
669 #[test]
670 fn test_priority_based_compression() {
671 let mut compressor = StreamingCompressor::new();
672
673 let critical_frame = StreamFrame {
674 data: json!({"error": "critical failure"}),
675 priority: Priority::CRITICAL,
676 metadata: HashMap::new(),
677 };
678
679 let low_frame = StreamFrame {
680 data: json!({"debug": "verbose information"}),
681 priority: Priority::LOW,
682 metadata: HashMap::new(),
683 };
684
685 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
686 let _low_result = compressor.compress_frame(low_frame).unwrap();
687
688 let stats = compressor.get_stats();
689 assert_eq!(stats.frames_processed, 2);
690 assert!(stats.total_input_bytes > 0);
691 }
692
693 #[test]
694 fn test_delta_decompression_basic() {
695 let decompressor = StreamingDecompressor::new();
696
697 let compressed_data = json!([
698 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
699 0.0,
700 1.0,
701 2.0,
702 3.0,
703 4.0
704 ]);
705
706 let result = decompressor.decompress_delta(&compressed_data).unwrap();
707 assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
708 }
709
710 #[test]
711 fn test_delta_decompression_negative_deltas() {
712 let decompressor = StreamingDecompressor::new();
713
714 let compressed_data = json!([
715 {"delta_base": 50.0, "delta_type": "numeric_sequence"},
716 -10.0,
717 0.0,
718 10.0,
719 20.0
720 ]);
721
722 let result = decompressor.decompress_delta(&compressed_data).unwrap();
723 assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
724 }
725
726 #[test]
727 fn test_delta_decompression_fractional_deltas() {
728 let decompressor = StreamingDecompressor::new();
729
730 let compressed_data = json!([
731 {"delta_base": 10.0, "delta_type": "numeric_sequence"},
732 0.5,
733 1.0,
734 1.5,
735 2.0
736 ]);
737
738 let result = decompressor.decompress_delta(&compressed_data).unwrap();
739 assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
740 }
741
742 #[test]
743 fn test_delta_decompression_empty_array() {
744 let decompressor = StreamingDecompressor::new();
745
746 let compressed_data = json!([]);
747
748 let result = decompressor.decompress_delta(&compressed_data).unwrap();
749 assert_eq!(result, json!([]));
750 }
751
752 #[test]
753 fn test_delta_decompression_single_element() {
754 let decompressor = StreamingDecompressor::new();
755
756 let compressed_data = json!([
757 {"delta_base": 100.0, "delta_type": "numeric_sequence"}
758 ]);
759
760 let result = decompressor.decompress_delta(&compressed_data).unwrap();
761 assert_eq!(result, json!([]));
762 }
763
764 #[test]
765 fn test_delta_decompression_nested_structure() {
766 let decompressor = StreamingDecompressor::new();
767
768 let compressed_data = json!({
769 "sequence": [
770 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
771 0.0,
772 1.0,
773 2.0
774 ],
775 "other": "data"
776 });
777
778 let result = decompressor.decompress_delta(&compressed_data).unwrap();
779 assert_eq!(
780 result,
781 json!({
782 "sequence": [100.0, 101.0, 102.0],
783 "other": "data"
784 })
785 );
786 }
787
788 #[test]
789 fn test_delta_decompression_invalid_metadata() {
790 let decompressor = StreamingDecompressor::new();
791
792 let compressed_data = json!([
793 {"wrong_key": 100.0},
794 0.0,
795 1.0
796 ]);
797
798 let result = decompressor.decompress_delta(&compressed_data);
799 assert!(result.is_ok());
800 }
802
803 #[test]
804 fn test_delta_decompression_invalid_delta_value() {
805 let decompressor = StreamingDecompressor::new();
806
807 let compressed_data = json!([
808 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
809 "not_a_number"
810 ]);
811
812 let result = decompressor.decompress_delta(&compressed_data);
813 assert!(result.is_err());
814 }
815
816 #[test]
817 fn test_rle_decompression_basic() {
818 let decompressor = StreamingDecompressor::new();
819
820 let compressed_data = json!([
821 {"rle_value": 1, "rle_count": 3},
822 {"rle_value": 2, "rle_count": 2},
823 {"rle_value": 3, "rle_count": 4}
824 ]);
825
826 let result = decompressor
827 .decompress_run_length(&compressed_data)
828 .unwrap();
829 assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
830 }
831
832 #[test]
833 fn test_rle_decompression_mixed_runs() {
834 let decompressor = StreamingDecompressor::new();
835
836 let compressed_data = json!([
837 {"rle_value": "a", "rle_count": 2},
838 "b",
839 {"rle_value": "c", "rle_count": 3}
840 ]);
841
842 let result = decompressor
843 .decompress_run_length(&compressed_data)
844 .unwrap();
845 assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
846 }
847
848 #[test]
849 fn test_rle_decompression_single_count() {
850 let decompressor = StreamingDecompressor::new();
851
852 let compressed_data = json!([
853 {"rle_value": "x", "rle_count": 1}
854 ]);
855
856 let result = decompressor
857 .decompress_run_length(&compressed_data)
858 .unwrap();
859 assert_eq!(result, json!(["x"]));
860 }
861
862 #[test]
863 fn test_rle_decompression_zero_count() {
864 let decompressor = StreamingDecompressor::new();
865
866 let compressed_data = json!([
867 {"rle_value": "x", "rle_count": 0}
868 ]);
869
870 let result = decompressor
871 .decompress_run_length(&compressed_data)
872 .unwrap();
873 assert_eq!(result, json!([]));
874 }
875
876 #[test]
877 fn test_rle_decompression_nested_values() {
878 let decompressor = StreamingDecompressor::new();
879
880 let compressed_data = json!([
881 {"rle_value": {"name": "test"}, "rle_count": 3}
882 ]);
883
884 let result = decompressor
885 .decompress_run_length(&compressed_data)
886 .unwrap();
887 assert_eq!(
888 result,
889 json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
890 );
891 }
892
893 #[test]
894 fn test_rle_decompression_nested_structure() {
895 let decompressor = StreamingDecompressor::new();
896
897 let compressed_data = json!({
898 "data": [
899 {"rle_value": 1, "rle_count": 3},
900 {"rle_value": 2, "rle_count": 2}
901 ],
902 "other": "field"
903 });
904
905 let result = decompressor
906 .decompress_run_length(&compressed_data)
907 .unwrap();
908 assert_eq!(
909 result,
910 json!({
911 "data": [1, 1, 1, 2, 2],
912 "other": "field"
913 })
914 );
915 }
916
917 #[test]
918 fn test_rle_decompression_empty_array() {
919 let decompressor = StreamingDecompressor::new();
920
921 let compressed_data = json!([]);
922
923 let result = decompressor
924 .decompress_run_length(&compressed_data)
925 .unwrap();
926 assert_eq!(result, json!([]));
927 }
928
929 #[test]
930 fn test_rle_decompression_invalid_count() {
931 let decompressor = StreamingDecompressor::new();
932
933 let compressed_data = json!([
934 {"rle_value": "x", "rle_count": "not_a_number"}
935 ]);
936
937 let result = decompressor.decompress_run_length(&compressed_data);
938 assert!(result.is_err());
939 }
940
941 #[test]
942 fn test_rle_decompression_missing_value() {
943 let decompressor = StreamingDecompressor::new();
944
945 let compressed_data = json!([
946 {"rle_count": 3}
947 ]);
948
949 let result = decompressor.decompress_run_length(&compressed_data);
950 assert!(result.is_err());
951 }
952
953 #[test]
954 fn test_rle_decompression_missing_count() {
955 let decompressor = StreamingDecompressor::new();
956
957 let compressed_data = json!([
958 {"rle_value": "x"}
959 ]);
960
961 let result = decompressor.decompress_run_length(&compressed_data);
962 assert!(result.is_err());
963 }
964
965 #[test]
966 fn test_rle_decompression_non_rle_objects() {
967 let decompressor = StreamingDecompressor::new();
968
969 let compressed_data = json!([
970 {"regular": "object"},
971 {"another": "one"}
972 ]);
973
974 let result = decompressor
975 .decompress_run_length(&compressed_data)
976 .unwrap();
977 assert_eq!(
979 result,
980 json!([
981 {"regular": "object"},
982 {"another": "one"}
983 ])
984 );
985 }
986
987 #[test]
990 fn test_compress_frame_with_custom_strategies() {
991 let mut dict = HashMap::new();
992 dict.insert("test".to_string(), 0);
993
994 let mut bases = HashMap::new();
995 bases.insert("value".to_string(), 100.0);
996
997 let mut compressor = StreamingCompressor::with_strategies(
998 CompressionStrategy::Dictionary { dictionary: dict },
999 CompressionStrategy::Delta { base_values: bases },
1000 );
1001
1002 let frame = StreamFrame {
1003 data: json!({"value": 123, "other": 456}),
1004 priority: Priority::HIGH,
1005 metadata: HashMap::new(),
1006 };
1007
1008 let result = compressor.compress_frame(frame);
1009 assert!(result.is_ok());
1010 assert_eq!(compressor.stats.frames_processed, 1);
1011 }
1012
1013 #[test]
1014 fn test_optimize_for_data_with_samples() {
1015 let mut compressor = StreamingCompressor::new();
1016
1017 let skeleton = json!({
1018 "id": null,
1019 "name": null
1020 });
1021
1022 let samples = vec![
1023 json!({"id": 1, "name": "test1"}),
1024 json!({"id": 2, "name": "test2"}),
1025 json!({"id": 3, "name": "test3"}),
1026 ];
1027
1028 let result = compressor.optimize_for_data(&skeleton, &samples);
1029 assert!(result.is_ok());
1030 }
1031
1032 #[test]
1033 fn test_optimize_for_data_empty_samples() {
1034 let mut compressor = StreamingCompressor::new();
1035
1036 let skeleton = json!({"key": "value"});
1037 let result = compressor.optimize_for_data(&skeleton, &[]);
1038 assert!(result.is_ok());
1039 }
1040
1041 #[test]
1042 fn test_reset_stats() {
1043 let mut compressor = StreamingCompressor::new();
1044
1045 compressor.stats.total_input_bytes = 1000;
1046 compressor.stats.total_output_bytes = 500;
1047 compressor.stats.frames_processed = 10;
1048
1049 compressor.reset_stats();
1050
1051 assert_eq!(compressor.stats.total_input_bytes, 0);
1052 assert_eq!(compressor.stats.total_output_bytes, 0);
1053 assert_eq!(compressor.stats.frames_processed, 0);
1054 }
1055
1056 #[test]
1057 fn test_compressor_critical_vs_low_priority() {
1058 let mut compressor = StreamingCompressor::new();
1059
1060 let critical_frame = StreamFrame {
1061 data: json!({"critical": "data"}),
1062 priority: Priority::CRITICAL,
1063 metadata: HashMap::new(),
1064 };
1065
1066 let low_frame = StreamFrame {
1067 data: json!({"low": "data"}),
1068 priority: Priority::LOW,
1069 metadata: HashMap::new(),
1070 };
1071
1072 compressor.compress_frame(critical_frame).unwrap();
1073 compressor.compress_frame(low_frame).unwrap();
1074
1075 assert_eq!(compressor.stats.frames_processed, 2);
1076 }
1077
1078 #[test]
1079 fn test_decompressor_hybrid_strategy() {
1080 let mut decompressor = StreamingDecompressor::new();
1081
1082 decompressor.delta_bases.insert("value".to_string(), 100.0);
1084 decompressor.active_dictionary.insert(0, "test".to_string());
1085
1086 let mut string_dict = HashMap::new();
1087 string_dict.insert("test".to_string(), 0);
1088
1089 let mut numeric_deltas = HashMap::new();
1090 numeric_deltas.insert("value".to_string(), 100.0);
1091
1092 let compressed_frame = CompressedFrame {
1093 frame: StreamFrame {
1094 data: json!({"test": "data"}),
1095 priority: Priority::MEDIUM,
1096 metadata: HashMap::new(),
1097 },
1098 compressed_data: CompressedData {
1099 strategy: CompressionStrategy::Hybrid {
1100 string_dict: string_dict.clone(),
1101 numeric_deltas: numeric_deltas.clone(),
1102 },
1103 compressed_size: 20,
1104 data: json!({"value": 5.0}), compression_metadata: HashMap::new(),
1106 },
1107 decompression_metadata: DecompressionMetadata {
1108 strategy: CompressionStrategy::Hybrid {
1109 string_dict,
1110 numeric_deltas,
1111 },
1112 dictionary_map: HashMap::new(),
1113 delta_bases: HashMap::new(),
1114 priority_hints: HashMap::new(),
1115 },
1116 };
1117
1118 let result = decompressor.decompress_frame(compressed_frame);
1119 assert!(result.is_ok());
1120 }
1121
1122 #[test]
1123 fn test_decompress_dictionary_nested_arrays() {
1124 let mut decompressor = StreamingDecompressor::new();
1125 decompressor
1126 .active_dictionary
1127 .insert(0, "item1".to_string());
1128 decompressor
1129 .active_dictionary
1130 .insert(1, "item2".to_string());
1131
1132 let data = json!([[0, 1], [1, 0]]);
1133 let result = decompressor.decompress_dictionary(&data).unwrap();
1134
1135 assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
1136 }
1137
1138 #[test]
1139 fn test_decompress_dictionary_non_index_numbers() {
1140 let mut decompressor = StreamingDecompressor::new();
1141 decompressor.active_dictionary.insert(0, "test".to_string());
1142
1143 let data = json!({"value": 999});
1145 let result = decompressor.decompress_dictionary(&data).unwrap();
1146
1147 assert_eq!(result, json!({"value": 999}));
1149 }
1150
1151 #[test]
1152 fn test_decompress_delta_non_array() {
1153 let decompressor = StreamingDecompressor::new();
1154
1155 let data = json!({"key": "value"});
1157 let result = decompressor.decompress_delta(&data).unwrap();
1158
1159 assert_eq!(result, json!({"key": "value"}));
1160 }
1161
1162 #[test]
1163 fn test_decompress_delta_array_without_metadata() {
1164 let decompressor = StreamingDecompressor::new();
1165
1166 let data = json!([1, 2, 3, 4]);
1168 let result = decompressor.decompress_delta(&data).unwrap();
1169
1170 assert_eq!(result, json!([1, 2, 3, 4]));
1171 }
1172
1173 #[test]
1174 fn test_decompress_run_length_nested_objects() {
1175 let decompressor = StreamingDecompressor::new();
1176
1177 let data = json!({
1178 "outer": {
1179 "inner": [
1180 {"rle_value": {"nested": "obj"}, "rle_count": 2}
1181 ]
1182 }
1183 });
1184
1185 let result = decompressor.decompress_run_length(&data).unwrap();
1186 assert_eq!(
1187 result,
1188 json!({
1189 "outer": {
1190 "inner": [{"nested": "obj"}, {"nested": "obj"}]
1191 }
1192 })
1193 );
1194 }
1195
1196 #[test]
1197 fn test_decompression_stats_tracking() {
1198 let mut decompressor = StreamingDecompressor::new();
1199
1200 assert_eq!(decompressor.stats.frames_decompressed, 0);
1201
1202 let frame = CompressedFrame {
1203 frame: StreamFrame {
1204 data: json!({"test": "data"}),
1205 priority: Priority::MEDIUM,
1206 metadata: HashMap::new(),
1207 },
1208 compressed_data: CompressedData {
1209 strategy: CompressionStrategy::None,
1210 compressed_size: 15,
1211 data: json!({"test": "data"}),
1212 compression_metadata: HashMap::new(),
1213 },
1214 decompression_metadata: DecompressionMetadata {
1215 strategy: CompressionStrategy::None,
1216 dictionary_map: HashMap::new(),
1217 delta_bases: HashMap::new(),
1218 priority_hints: HashMap::new(),
1219 },
1220 };
1221
1222 decompressor.decompress_frame(frame).unwrap();
1223
1224 assert_eq!(decompressor.stats.frames_decompressed, 1);
1225 assert!(decompressor.stats.total_decompressed_bytes > 0);
1226 assert!(decompressor.stats.avg_decompression_time_us > 0);
1227 }
1228
1229 #[test]
1230 fn test_decompress_delta_array_malformed_metadata() {
1231 let decompressor = StreamingDecompressor::new();
1232
1233 let data = json!([
1235 {"delta_type": "numeric_sequence"},
1236 1.0,
1237 2.0
1238 ]);
1239
1240 let result = decompressor.decompress_delta(&data);
1241 assert!(result.is_ok());
1243 assert_eq!(result.unwrap(), data);
1245 }
1246
1247 #[test]
1248 fn test_decompress_run_length_large_count() {
1249 let decompressor = StreamingDecompressor::new();
1250
1251 let data = json!([
1253 {"rle_value": "x", "rle_count": 1000}
1254 ]);
1255
1256 let result = decompressor.decompress_run_length(&data);
1257 assert!(result.is_ok());
1258 let decompressed = result.unwrap();
1259 if let Some(arr) = decompressed.as_array() {
1260 assert_eq!(arr.len(), 1000);
1261 }
1262 }
1263
1264 #[test]
1265 fn test_decompress_run_length_exceeds_max_count() {
1266 let decompressor = StreamingDecompressor::new();
1267
1268 let data = json!([
1270 {"rle_value": "x", "rle_count": 200_000}
1271 ]);
1272
1273 let result = decompressor.decompress_run_length(&data);
1274 assert!(result.is_err()); }
1276
1277 #[test]
1278 fn test_decompress_run_length_cumulative_overflow() {
1279 let decompressor = StreamingDecompressor::new();
1280
1281 let data = json!([
1283 {"rle_value": "a", "rle_count": 5_000_000},
1284 {"rle_value": "b", "rle_count": 6_000_000}
1285 ]);
1286
1287 let result = decompressor.decompress_run_length(&data);
1288 assert!(result.is_err());
1290 }
1291
1292 #[test]
1293 fn test_decompress_delta_array_size_limit() {
1294 let decompressor = StreamingDecompressor::new();
1295
1296 let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
1298 for _i in 0..1_000_001 {
1299 large_array.push(json!(0.0));
1300 }
1301
1302 let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
1303 assert!(result.is_err()); }
1305
1306 #[test]
1307 fn test_compression_stats_default() {
1308 let stats = CompressionStats::default();
1309 assert_eq!(stats.total_input_bytes, 0);
1310 assert_eq!(stats.total_output_bytes, 0);
1311 assert_eq!(stats.frames_processed, 0);
1312 assert_eq!(stats.overall_compression_ratio(), 1.0);
1313 }
1314
1315 #[test]
1316 fn test_decompression_stats_default() {
1317 let stats = DecompressionStats::default();
1318 assert_eq!(stats.frames_decompressed, 0);
1319 assert_eq!(stats.total_decompressed_bytes, 0);
1320 assert_eq!(stats.avg_decompression_time_us, 0);
1321 }
1322
1323 #[test]
1328 fn test_decompress_dictionary_with_strings() {
1329 let decompressor = StreamingDecompressor::new();
1330 let data = json!({"key": "value", "nested": {"inner": "string"}});
1331 let result = decompressor.decompress_dictionary(&data);
1332 assert!(result.is_ok());
1333 assert_eq!(result.unwrap(), data);
1334 }
1335
1336 #[test]
1337 fn test_decompress_dictionary_with_null() {
1338 let decompressor = StreamingDecompressor::new();
1339 let data = json!(null);
1340 let result = decompressor.decompress_dictionary(&data);
1341 assert!(result.is_ok());
1342 assert_eq!(result.unwrap(), json!(null));
1343 }
1344
1345 #[test]
1346 fn test_decompress_dictionary_with_boolean() {
1347 let decompressor = StreamingDecompressor::new();
1348 let data = json!(true);
1349 let result = decompressor.decompress_dictionary(&data);
1350 assert!(result.is_ok());
1351 assert_eq!(result.unwrap(), json!(true));
1352 }
1353
1354 #[test]
1355 fn test_decompress_dictionary_with_string() {
1356 let decompressor = StreamingDecompressor::new();
1357 let data = json!("plain string");
1358 let result = decompressor.decompress_dictionary(&data);
1359 assert!(result.is_ok());
1360 assert_eq!(result.unwrap(), json!("plain string"));
1361 }
1362
1363 #[test]
1364 fn test_decompress_delta_with_object_no_array() {
1365 let decompressor = StreamingDecompressor::new();
1366 let data = json!({"key": "value"});
1367 let result = decompressor.decompress_delta(&data);
1368 assert!(result.is_ok());
1369 assert_eq!(result.unwrap(), json!({"key": "value"}));
1370 }
1371
1372 #[test]
1373 fn test_decompress_delta_with_primitive_values() {
1374 let decompressor = StreamingDecompressor::new();
1375
1376 assert_eq!(
1377 decompressor.decompress_delta(&json!("string")).unwrap(),
1378 json!("string")
1379 );
1380 assert_eq!(
1381 decompressor.decompress_delta(&json!(42)).unwrap(),
1382 json!(42)
1383 );
1384 assert_eq!(
1385 decompressor.decompress_delta(&json!(true)).unwrap(),
1386 json!(true)
1387 );
1388 assert_eq!(
1389 decompressor.decompress_delta(&json!(null)).unwrap(),
1390 json!(null)
1391 );
1392 }
1393
1394 #[test]
1395 fn test_decompress_run_length_with_primitive_values() {
1396 let decompressor = StreamingDecompressor::new();
1397
1398 assert_eq!(
1399 decompressor
1400 .decompress_run_length(&json!("string"))
1401 .unwrap(),
1402 json!("string")
1403 );
1404 assert_eq!(
1405 decompressor.decompress_run_length(&json!(123)).unwrap(),
1406 json!(123)
1407 );
1408 assert_eq!(
1409 decompressor.decompress_run_length(&json!(false)).unwrap(),
1410 json!(false)
1411 );
1412 assert_eq!(
1413 decompressor.decompress_run_length(&json!(null)).unwrap(),
1414 json!(null)
1415 );
1416 }
1417
1418 #[test]
1419 fn test_decompress_data_strategy_dictionary() {
1420 let mut decompressor = StreamingDecompressor::new();
1421 decompressor.active_dictionary.insert(0, "test".to_string());
1422
1423 let mut dict = HashMap::new();
1424 dict.insert("test".to_string(), 0);
1425
1426 let compressed_data = CompressedData {
1427 strategy: CompressionStrategy::Dictionary { dictionary: dict },
1428 compressed_size: 10,
1429 data: json!({"field": 0}),
1430 compression_metadata: HashMap::new(),
1431 };
1432
1433 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1434 assert!(result.is_ok());
1435 }
1436
1437 #[test]
1438 fn test_decompress_data_strategy_delta() {
1439 let decompressor = StreamingDecompressor::new();
1440
1441 let mut bases = HashMap::new();
1442 bases.insert("value".to_string(), 100.0);
1443
1444 let compressed_data = CompressedData {
1445 strategy: CompressionStrategy::Delta {
1446 base_values: bases.clone(),
1447 },
1448 compressed_size: 10,
1449 data: json!({
1450 "sequence": [
1451 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
1452 5.0,
1453 10.0
1454 ]
1455 }),
1456 compression_metadata: HashMap::new(),
1457 };
1458
1459 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1460 assert!(result.is_ok());
1461 }
1462
1463 #[test]
1464 fn test_decompress_data_strategy_run_length() {
1465 let decompressor = StreamingDecompressor::new();
1466
1467 let compressed_data = CompressedData {
1468 strategy: CompressionStrategy::RunLength,
1469 compressed_size: 10,
1470 data: json!([
1471 {"rle_value": "x", "rle_count": 3}
1472 ]),
1473 compression_metadata: HashMap::new(),
1474 };
1475
1476 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1477 assert!(result.is_ok());
1478 assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
1479 }
1480
1481 #[test]
1482 fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
1483 let mut decompressor = StreamingDecompressor::new();
1484 decompressor.active_dictionary.insert(0, "test".to_string());
1485
1486 let mut string_dict = HashMap::new();
1487 string_dict.insert("test".to_string(), 0);
1488
1489 let mut numeric_deltas = HashMap::new();
1490 numeric_deltas.insert("value".to_string(), 100.0);
1491
1492 let compressed_data = CompressedData {
1493 strategy: CompressionStrategy::Hybrid {
1494 string_dict,
1495 numeric_deltas,
1496 },
1497 compressed_size: 10,
1498 data: json!({
1499 "field": 0
1500 }),
1501 compression_metadata: HashMap::new(),
1502 };
1503
1504 let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
1505 assert!(result.is_ok());
1506 }
1507
1508 #[test]
1509 fn test_select_compressor_critical_priority() {
1510 let mut compressor = StreamingCompressor::new();
1511 let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
1512 }
1513
1514 #[test]
1515 fn test_select_compressor_high_priority() {
1516 let mut compressor = StreamingCompressor::new();
1517 let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
1518 }
1519
1520 #[test]
1521 fn test_select_compressor_medium_priority() {
1522 let mut compressor = StreamingCompressor::new();
1523 let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
1524 }
1525
1526 #[test]
1527 fn test_select_compressor_low_priority() {
1528 let mut compressor = StreamingCompressor::new();
1529 let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
1530 }
1531
1532 #[test]
1533 fn test_select_compressor_background_priority() {
1534 let mut compressor = StreamingCompressor::new();
1535 let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
1536 }
1537
1538 #[test]
1539 fn test_update_stats_with_zero_original_size() {
1540 let mut compressor = StreamingCompressor::new();
1541 compressor.update_stats(Priority::MEDIUM, 0, 10);
1542
1543 let stats = compressor.get_stats();
1544 assert_eq!(stats.frames_processed, 1);
1545 assert_eq!(stats.total_input_bytes, 0);
1546 assert_eq!(stats.total_output_bytes, 10);
1547 assert_eq!(
1548 stats.priority_compression_ratio(Priority::MEDIUM.value()),
1549 1.0
1550 );
1551 }
1552
1553 #[test]
1554 fn test_update_stats_with_normal_compression() {
1555 let mut compressor = StreamingCompressor::new();
1556 compressor.update_stats(Priority::HIGH, 1000, 500);
1557
1558 let stats = compressor.get_stats();
1559 assert_eq!(stats.frames_processed, 1);
1560 assert_eq!(stats.total_input_bytes, 1000);
1561 assert_eq!(stats.total_output_bytes, 500);
1562 assert_eq!(
1563 stats.priority_compression_ratio(Priority::HIGH.value()),
1564 0.5
1565 );
1566 }
1567
1568 #[test]
1569 fn test_update_decompression_stats_first_frame() {
1570 let mut decompressor = StreamingDecompressor::new();
1571 let data = json!({"test": "data"});
1572 let duration = std::time::Duration::from_micros(100);
1573
1574 decompressor.update_decompression_stats(&data, duration);
1575
1576 let stats = decompressor.get_stats();
1577 assert_eq!(stats.frames_decompressed, 1);
1578 assert_eq!(stats.avg_decompression_time_us, 100);
1579 }
1580
1581 #[test]
1582 fn test_update_decompression_stats_multiple_frames() {
1583 let mut decompressor = StreamingDecompressor::new();
1584 let data = json!({"test": "data"});
1585
1586 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
1587 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
1588 decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
1589
1590 let stats = decompressor.get_stats();
1591 assert_eq!(stats.frames_decompressed, 3);
1592 assert_eq!(stats.avg_decompression_time_us, 200); }
1594
1595 #[test]
1596 fn test_create_decompression_metadata_with_dict() {
1597 let compressor = StreamingCompressor::new();
1598 let mut metadata = HashMap::new();
1599 metadata.insert("dict_0".to_string(), json!("hello"));
1600 metadata.insert("dict_1".to_string(), json!("world"));
1601
1602 let compressed_data = CompressedData {
1603 strategy: CompressionStrategy::None,
1604 compressed_size: 10,
1605 data: json!({}),
1606 compression_metadata: metadata,
1607 };
1608
1609 let result = compressor.create_decompression_metadata(&compressed_data);
1610 assert!(result.is_ok());
1611 let meta = result.unwrap();
1612 assert_eq!(meta.dictionary_map.len(), 2);
1613 assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
1614 assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
1615 }
1616
1617 #[test]
1618 fn test_create_decompression_metadata_with_delta_bases() {
1619 let compressor = StreamingCompressor::new();
1620 let mut metadata = HashMap::new();
1621 metadata.insert("base_value1".to_string(), json!(100.0));
1622 metadata.insert("base_value2".to_string(), json!(200.0));
1623
1624 let compressed_data = CompressedData {
1625 strategy: CompressionStrategy::None,
1626 compressed_size: 10,
1627 data: json!({}),
1628 compression_metadata: metadata,
1629 };
1630
1631 let result = compressor.create_decompression_metadata(&compressed_data);
1632 assert!(result.is_ok());
1633 let meta = result.unwrap();
1634 assert_eq!(meta.delta_bases.len(), 2);
1635 assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
1636 assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
1637 }
1638
1639 #[test]
1640 fn test_create_decompression_metadata_with_invalid_dict_index() {
1641 let compressor = StreamingCompressor::new();
1642 let mut metadata = HashMap::new();
1643 metadata.insert("dict_invalid".to_string(), json!("value"));
1644 metadata.insert("dict_0".to_string(), json!("valid"));
1645
1646 let compressed_data = CompressedData {
1647 strategy: CompressionStrategy::None,
1648 compressed_size: 10,
1649 data: json!({}),
1650 compression_metadata: metadata,
1651 };
1652
1653 let result = compressor.create_decompression_metadata(&compressed_data);
1654 assert!(result.is_ok());
1655 let meta = result.unwrap();
1656 assert_eq!(meta.dictionary_map.len(), 1);
1658 assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
1659 }
1660
1661 #[test]
1662 fn test_update_context_updates_dictionary() {
1663 let mut decompressor = StreamingDecompressor::new();
1664
1665 let mut metadata = DecompressionMetadata {
1666 strategy: CompressionStrategy::None,
1667 dictionary_map: HashMap::new(),
1668 delta_bases: HashMap::new(),
1669 priority_hints: HashMap::new(),
1670 };
1671 metadata.dictionary_map.insert(0, "hello".to_string());
1672 metadata.dictionary_map.insert(1, "world".to_string());
1673
1674 let result = decompressor.update_context(&metadata);
1675 assert!(result.is_ok());
1676 assert_eq!(decompressor.active_dictionary.len(), 2);
1677 assert_eq!(
1678 decompressor.active_dictionary.get(&0),
1679 Some(&"hello".to_string())
1680 );
1681 }
1682
1683 #[test]
1684 fn test_update_context_updates_delta_bases() {
1685 let mut decompressor = StreamingDecompressor::new();
1686
1687 let mut metadata = DecompressionMetadata {
1688 strategy: CompressionStrategy::None,
1689 dictionary_map: HashMap::new(),
1690 delta_bases: HashMap::new(),
1691 priority_hints: HashMap::new(),
1692 };
1693 metadata.delta_bases.insert("value1".to_string(), 100.0);
1694 metadata.delta_bases.insert("value2".to_string(), 200.0);
1695
1696 let result = decompressor.update_context(&metadata);
1697 assert!(result.is_ok());
1698 assert_eq!(decompressor.delta_bases.len(), 2);
1699 assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
1700 }
1701
1702 #[test]
1703 fn test_decompress_dictionary_with_float_that_is_not_u64() {
1704 let mut decompressor = StreamingDecompressor::new();
1705 decompressor.active_dictionary.insert(0, "test".to_string());
1706
1707 let data = json!({"value": 1.5});
1709 let result = decompressor.decompress_dictionary(&data);
1710 assert!(result.is_ok());
1711 assert_eq!(result.unwrap(), json!({"value": 1.5}));
1713 }
1714
1715 #[test]
1716 fn test_decompress_dictionary_with_negative_number() {
1717 let mut decompressor = StreamingDecompressor::new();
1718 decompressor.active_dictionary.insert(0, "test".to_string());
1719
1720 let data = json!({"value": -1});
1721 let result = decompressor.decompress_dictionary(&data);
1722 assert!(result.is_ok());
1723 assert_eq!(result.unwrap(), json!({"value": -1}));
1725 }
1726
1727 #[test]
1728 fn test_decompress_delta_array_checks_first_element_structure() {
1729 let decompressor = StreamingDecompressor::new();
1730
1731 let data = json!([
1733 {"wrong_field": 100.0},
1734 1.0,
1735 2.0
1736 ]);
1737
1738 let result = decompressor.decompress_delta(&data);
1739 assert!(result.is_ok());
1740 assert_eq!(result.unwrap(), data);
1742 }
1743
1744 #[test]
1745 fn test_decompress_delta_array_requires_both_base_and_type() {
1746 let decompressor = StreamingDecompressor::new();
1747
1748 let data1 = json!([
1750 {"delta_base": 100.0},
1751 1.0
1752 ]);
1753 let result1 = decompressor.decompress_delta(&data1);
1754 assert!(result1.is_ok());
1755
1756 let data2 = json!([
1758 {"delta_type": "numeric_sequence"},
1759 1.0
1760 ]);
1761 let result2 = decompressor.decompress_delta(&data2);
1762 assert!(result2.is_ok());
1763 }
1764
1765 #[test]
1766 fn test_decompress_run_length_with_non_objects_in_array() {
1767 let decompressor = StreamingDecompressor::new();
1768
1769 let data = json!([
1771 {"rle_value": "a", "rle_count": 2},
1772 "plain",
1773 42,
1774 true
1775 ]);
1776
1777 let result = decompressor.decompress_run_length(&data);
1778 assert!(result.is_ok());
1779 assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
1780 }
1781
1782 #[test]
1783 fn test_decompress_run_length_integrity_check_rle_value_without_count() {
1784 let decompressor = StreamingDecompressor::new();
1785
1786 let data = json!([
1787 {"rle_value": "x"}
1788 ]);
1789
1790 let result = decompressor.decompress_run_length(&data);
1791 assert!(result.is_err());
1792 }
1793
1794 #[test]
1795 fn test_decompress_run_length_integrity_check_rle_count_without_value() {
1796 let decompressor = StreamingDecompressor::new();
1797
1798 let data = json!([
1799 {"rle_count": 3}
1800 ]);
1801
1802 let result = decompressor.decompress_run_length(&data);
1803 assert!(result.is_err());
1804 }
1805
1806 #[test]
1807 fn test_decompress_run_length_non_number_count() {
1808 let decompressor = StreamingDecompressor::new();
1809
1810 let data = json!([
1811 {"rle_value": "x", "rle_count": "three"}
1812 ]);
1813
1814 let result = decompressor.decompress_run_length(&data);
1815 assert!(result.is_err());
1816 }
1817
1818 #[test]
1819 fn test_compress_frame_with_large_data() {
1820 let mut compressor = StreamingCompressor::new();
1821
1822 let large_data = json!({
1823 "users": (0..100).map(|i| json!({
1824 "id": i,
1825 "name": format!("User{}", i),
1826 "email": format!("user{}@example.com", i),
1827 "age": 20 + (i % 50),
1828 "active": i % 2 == 0
1829 })).collect::<Vec<_>>()
1830 });
1831
1832 let frame = StreamFrame {
1833 data: large_data,
1834 priority: Priority::MEDIUM,
1835 metadata: HashMap::new(),
1836 };
1837
1838 let result = compressor.compress_frame(frame);
1839 assert!(result.is_ok());
1840
1841 let stats = compressor.get_stats();
1842 assert_eq!(stats.frames_processed, 1);
1843 assert!(stats.total_input_bytes > 1000);
1844 }
1845
1846 #[test]
1847 fn test_decompress_delta_with_very_large_deltas() {
1848 let decompressor = StreamingDecompressor::new();
1849
1850 let data = json!([
1851 {"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
1852 100_000.0,
1853 200_000.0,
1854 300_000.0
1855 ]);
1856
1857 let result = decompressor.decompress_delta(&data);
1858 assert!(result.is_ok());
1859 assert_eq!(
1860 result.unwrap(),
1861 json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
1862 );
1863 }
1864}