1use crate::{
7 compression::{CompressedData, CompressionStrategy, SchemaCompressor},
8 domain::{DomainError, DomainResult},
9 stream::{Priority, StreamFrame},
10};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14const MAX_RLE_COUNT: u64 = 100_000;
16const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
17const MAX_DECOMPRESSED_SIZE: usize = 10_485_760; #[derive(Debug, Clone)]
21pub struct StreamingCompressor {
22 skeleton_compressor: SchemaCompressor,
24 content_compressor: SchemaCompressor,
26 stats: CompressionStats,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct CompressionStats {
32 pub total_input_bytes: usize,
34 pub total_output_bytes: usize,
36 pub frames_processed: u32,
38 pub priority_ratios: HashMap<u8, f32>,
40}
41
42#[derive(Debug, Clone)]
44pub struct CompressedFrame {
45 pub frame: StreamFrame,
47 pub compressed_data: CompressedData,
49 pub decompression_metadata: DecompressionMetadata,
51}
52
53#[derive(Debug, Clone)]
54pub struct DecompressionMetadata {
55 pub strategy: CompressionStrategy,
57 pub dictionary_map: HashMap<u16, String>,
59 pub delta_bases: HashMap<String, f64>,
61 pub priority_hints: HashMap<u8, String>,
63}
64
65impl StreamingCompressor {
66 pub fn new() -> Self {
68 Self {
69 skeleton_compressor: SchemaCompressor::new(),
70 content_compressor: SchemaCompressor::new(),
71 stats: CompressionStats::default(),
72 }
73 }
74
75 pub fn with_strategies(
77 skeleton_strategy: CompressionStrategy,
78 content_strategy: CompressionStrategy,
79 ) -> Self {
80 Self {
81 skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
82 content_compressor: SchemaCompressor::with_strategy(content_strategy),
83 stats: CompressionStats::default(),
84 }
85 }
86
87 pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
89 let compressor = self.select_compressor_for_priority(frame.priority);
90
91 let original_size = serde_json::to_string(&frame.data)
93 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
94 .len();
95
96 let compressed_data = compressor.compress(&frame.data)?;
98
99 self.update_stats(
101 frame.priority,
102 original_size,
103 compressed_data.compressed_size,
104 );
105
106 let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
108
109 Ok(CompressedFrame {
110 frame,
111 compressed_data,
112 decompression_metadata,
113 })
114 }
115
116 pub fn optimize_for_data(
118 &mut self,
119 skeleton: &JsonValue,
120 sample_data: &[JsonValue],
121 ) -> DomainResult<()> {
122 self.skeleton_compressor.analyze_and_optimize(skeleton)?;
124
125 if !sample_data.is_empty() {
127 let combined_sample = JsonValue::Array(sample_data.to_vec());
129 self.content_compressor
130 .analyze_and_optimize(&combined_sample)?;
131 }
132
133 Ok(())
134 }
135
136 pub fn get_stats(&self) -> &CompressionStats {
138 &self.stats
139 }
140
141 pub fn reset_stats(&mut self) {
143 self.stats = CompressionStats::default();
144 }
145
146 fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
148 match priority {
149 Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
151 _ => &mut self.content_compressor,
153 }
154 }
155
156 fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
158 self.stats.total_input_bytes += original_size;
159 self.stats.total_output_bytes += compressed_size;
160 self.stats.frames_processed += 1;
161
162 let ratio = if original_size > 0 {
163 compressed_size as f32 / original_size as f32
164 } else {
165 1.0
166 };
167
168 self.stats.priority_ratios.insert(priority.value(), ratio);
169 }
170
171 fn create_decompression_metadata(
173 &self,
174 compressed_data: &CompressedData,
175 ) -> DomainResult<DecompressionMetadata> {
176 let mut dictionary_map = HashMap::new();
177 let mut delta_bases = HashMap::new();
178
179 for (key, value) in &compressed_data.compression_metadata {
181 if key.starts_with("dict_") {
182 if let Ok(index) = key.strip_prefix("dict_").unwrap().parse::<u16>()
183 && let Some(string_val) = value.as_str()
184 {
185 dictionary_map.insert(index, string_val.to_string());
186 }
187 } else if key.starts_with("base_") {
188 let path = key.strip_prefix("base_").unwrap();
189 if let Some(num) = value.as_f64() {
190 delta_bases.insert(path.to_string(), num);
191 }
192 }
193 }
194
195 Ok(DecompressionMetadata {
196 strategy: compressed_data.strategy.clone(),
197 dictionary_map,
198 delta_bases,
199 priority_hints: HashMap::new(), })
201 }
202}
203
204impl CompressionStats {
205 pub fn overall_compression_ratio(&self) -> f32 {
207 if self.total_input_bytes == 0 {
208 return 1.0;
209 }
210 self.total_output_bytes as f32 / self.total_input_bytes as f32
211 }
212
213 pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
215 self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
216 }
217
218 pub fn bytes_saved(&self) -> isize {
220 self.total_input_bytes as isize - self.total_output_bytes as isize
221 }
222
223 pub fn percentage_saved(&self) -> f32 {
225 if self.total_input_bytes == 0 {
226 return 0.0;
227 }
228 let ratio = self.overall_compression_ratio();
229 (1.0 - ratio) * 100.0
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct StreamingDecompressor {
236 active_dictionary: HashMap<u16, String>,
238 delta_bases: HashMap<String, f64>,
240 stats: DecompressionStats,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct DecompressionStats {
246 pub frames_decompressed: u32,
248 pub total_decompressed_bytes: usize,
250 pub avg_decompression_time_us: u64,
252}
253
254impl StreamingDecompressor {
255 pub fn new() -> Self {
257 Self {
258 active_dictionary: HashMap::new(),
259 delta_bases: HashMap::new(),
260 stats: DecompressionStats::default(),
261 }
262 }
263
264 pub fn decompress_frame(
266 &mut self,
267 compressed_frame: CompressedFrame,
268 ) -> DomainResult<StreamFrame> {
269 let start_time = std::time::Instant::now();
270
271 self.update_context(&compressed_frame.decompression_metadata)?;
273
274 let decompressed_data = self.decompress_data(
276 &compressed_frame.compressed_data,
277 &compressed_frame.decompression_metadata.strategy,
278 )?;
279
280 let decompression_time = start_time.elapsed();
282 self.update_decompression_stats(&decompressed_data, decompression_time);
283
284 Ok(StreamFrame {
285 data: decompressed_data,
286 priority: compressed_frame.frame.priority,
287 metadata: compressed_frame.frame.metadata,
288 })
289 }
290
291 fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
293 for (&index, string) in &metadata.dictionary_map {
295 self.active_dictionary.insert(index, string.clone());
296 }
297
298 for (path, &base) in &metadata.delta_bases {
300 self.delta_bases.insert(path.clone(), base);
301 }
302
303 Ok(())
304 }
305
306 fn decompress_data(
308 &self,
309 compressed_data: &CompressedData,
310 strategy: &CompressionStrategy,
311 ) -> DomainResult<JsonValue> {
312 match strategy {
313 CompressionStrategy::None => Ok(compressed_data.data.clone()),
314
315 CompressionStrategy::Dictionary { .. } => {
316 self.decompress_dictionary(&compressed_data.data)
317 }
318
319 CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
320
321 CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
322
323 CompressionStrategy::Hybrid { .. } => {
324 let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
326 self.decompress_dictionary(&delta_decompressed)
327 }
328 }
329 }
330
331 fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
333 match data {
334 JsonValue::Object(obj) => {
335 let mut decompressed = serde_json::Map::new();
336 for (key, value) in obj {
337 decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
338 }
339 Ok(JsonValue::Object(decompressed))
340 }
341 JsonValue::Array(arr) => {
342 let decompressed: Result<Vec<_>, _> = arr
343 .iter()
344 .map(|item| self.decompress_dictionary(item))
345 .collect();
346 Ok(JsonValue::Array(decompressed?))
347 }
348 JsonValue::Number(n) => {
349 if let Some(index) = n.as_u64()
351 && let Some(string_val) = self.active_dictionary.get(&(index as u16))
352 {
353 return Ok(JsonValue::String(string_val.clone()));
354 }
355 Ok(data.clone())
356 }
357 _ => Ok(data.clone()),
358 }
359 }
360
361 pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
363 match data {
364 JsonValue::Object(obj) => {
365 let mut decompressed_obj = serde_json::Map::new();
366 for (key, value) in obj {
367 decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
368 }
369 Ok(JsonValue::Object(decompressed_obj))
370 }
371 JsonValue::Array(arr) => {
372 if arr.is_empty() {
373 return Ok(JsonValue::Array(arr.clone()));
374 }
375
376 if let Some(first) = arr.first()
378 && let Some(obj) = first.as_object()
379 && obj.contains_key("delta_base")
380 && obj.contains_key("delta_type")
381 {
382 return self.decompress_delta_array(arr);
384 }
385
386 let decompressed_arr: Result<Vec<_>, _> =
388 arr.iter().map(|item| self.decompress_delta(item)).collect();
389 Ok(JsonValue::Array(decompressed_arr?))
390 }
391 _ => Ok(data.clone()),
392 }
393 }
394
395 fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
397 if arr.is_empty() {
398 return Ok(JsonValue::Array(Vec::new()));
399 }
400
401 if arr.len() > MAX_DELTA_ARRAY_SIZE {
403 return Err(DomainError::CompressionError(format!(
404 "Delta array size {} exceeds maximum {}",
405 arr.len(),
406 MAX_DELTA_ARRAY_SIZE
407 )));
408 }
409
410 let base_value = arr[0]
412 .get("delta_base")
413 .and_then(|v| v.as_f64())
414 .ok_or_else(|| {
415 DomainError::CompressionError(
416 "Missing or invalid delta_base in metadata".to_string(),
417 )
418 })?;
419
420 let mut original_values = Vec::new();
422 for delta_value in arr.iter().skip(1) {
423 let delta = delta_value.as_f64().ok_or_else(|| {
424 DomainError::CompressionError("Invalid delta value: expected number".to_string())
425 })?;
426
427 let original = base_value + delta;
428 original_values.push(JsonValue::from(original));
429 }
430
431 Ok(JsonValue::Array(original_values))
432 }
433
434 pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
436 match data {
437 JsonValue::Object(obj) => {
438 let mut decompressed_obj = serde_json::Map::new();
439 for (key, value) in obj {
440 decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
441 }
442 Ok(JsonValue::Object(decompressed_obj))
443 }
444 JsonValue::Array(arr) => {
445 let mut decompressed_values = Vec::new();
446 let mut total_size = 0usize;
447
448 for item in arr {
449 if let Some(obj) = item.as_object() {
450 if obj.contains_key("rle_value") && obj.contains_key("rle_count") {
452 let value = obj
453 .get("rle_value")
454 .ok_or_else(|| {
455 DomainError::CompressionError("Missing rle_value".to_string())
456 })?
457 .clone();
458
459 let count =
460 obj.get("rle_count")
461 .and_then(|v| v.as_u64())
462 .ok_or_else(|| {
463 DomainError::CompressionError(
464 "Invalid rle_count: expected positive integer"
465 .to_string(),
466 )
467 })?;
468
469 if count > MAX_RLE_COUNT {
471 return Err(DomainError::CompressionError(format!(
472 "RLE count {} exceeds maximum {}",
473 count, MAX_RLE_COUNT
474 )));
475 }
476
477 let count_usize = usize::try_from(count).map_err(|_| {
479 DomainError::CompressionError(format!(
480 "RLE count {} exceeds platform maximum",
481 count
482 ))
483 })?;
484
485 total_size = total_size.checked_add(count_usize).ok_or_else(|| {
487 DomainError::CompressionError(
488 "Total decompressed size overflow".to_string(),
489 )
490 })?;
491
492 if total_size > MAX_DECOMPRESSED_SIZE {
493 return Err(DomainError::CompressionError(format!(
494 "Decompressed size {} exceeds maximum {}",
495 total_size, MAX_DECOMPRESSED_SIZE
496 )));
497 }
498
499 for _ in 0..count {
501 decompressed_values.push(value.clone());
502 }
503 } else {
504 decompressed_values.push(self.decompress_run_length(item)?);
506 }
507 } else {
508 decompressed_values.push(self.decompress_run_length(item)?);
510 }
511 }
512
513 Ok(JsonValue::Array(decompressed_values))
514 }
515 _ => Ok(data.clone()),
516 }
517 }
518
519 fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
521 self.stats.frames_decompressed += 1;
522
523 if let Ok(serialized) = serde_json::to_string(data) {
524 self.stats.total_decompressed_bytes += serialized.len();
525 }
526
527 let new_time_us = duration.as_micros() as u64;
528 if self.stats.frames_decompressed == 1 {
529 self.stats.avg_decompression_time_us = new_time_us;
530 } else {
531 let total_frames = self.stats.frames_decompressed as u64;
533 let total_time =
534 self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
535 self.stats.avg_decompression_time_us = total_time / total_frames;
536 }
537 }
538
539 pub fn get_stats(&self) -> &DecompressionStats {
541 &self.stats
542 }
543}
544
545impl Default for StreamingCompressor {
546 fn default() -> Self {
547 Self::new()
548 }
549}
550
551impl Default for StreamingDecompressor {
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use serde_json::json;
561
562 #[test]
563 fn test_streaming_compressor_basic() {
564 let mut compressor = StreamingCompressor::new();
565
566 let frame = StreamFrame {
567 data: json!({
568 "message": "test message",
569 "count": 42
570 }),
571 priority: Priority::MEDIUM,
572 metadata: HashMap::new(),
573 };
574
575 let result = compressor.compress_frame(frame);
576 assert!(result.is_ok());
577
578 let compressed = result.unwrap();
579 assert_eq!(compressed.frame.priority, Priority::MEDIUM);
580 }
581
582 #[test]
583 fn test_compression_stats() {
584 let stats = CompressionStats {
585 total_input_bytes: 1000,
586 total_output_bytes: 600,
587 ..Default::default()
588 };
589
590 assert_eq!(stats.overall_compression_ratio(), 0.6);
591 assert_eq!(stats.bytes_saved(), 400);
592 let percentage = stats.percentage_saved();
594 assert!((percentage - 40.0).abs() < 0.001);
595 }
596
597 #[test]
598 fn test_streaming_decompressor_basic() {
599 let mut decompressor = StreamingDecompressor::new();
600
601 let compressed_frame = CompressedFrame {
602 frame: StreamFrame {
603 data: json!({"test": "data"}),
604 priority: Priority::MEDIUM,
605 metadata: HashMap::new(),
606 },
607 compressed_data: CompressedData {
608 strategy: CompressionStrategy::None,
609 compressed_size: 20,
610 data: json!({"test": "data"}),
611 compression_metadata: HashMap::new(),
612 },
613 decompression_metadata: DecompressionMetadata {
614 strategy: CompressionStrategy::None,
615 dictionary_map: HashMap::new(),
616 delta_bases: HashMap::new(),
617 priority_hints: HashMap::new(),
618 },
619 };
620
621 let result = decompressor.decompress_frame(compressed_frame);
622 assert!(result.is_ok());
623
624 let decompressed = result.unwrap();
625 assert_eq!(decompressed.data, json!({"test": "data"}));
626 }
627
628 #[test]
629 fn test_dictionary_decompression() {
630 let mut decompressor = StreamingDecompressor::new();
631 decompressor
632 .active_dictionary
633 .insert(0, "hello".to_string());
634 decompressor
635 .active_dictionary
636 .insert(1, "world".to_string());
637
638 let compressed = json!({
640 "greeting": 0,
641 "target": 1
642 });
643
644 let result = decompressor.decompress_dictionary(&compressed).unwrap();
645 assert_eq!(
646 result,
647 json!({
648 "greeting": "hello",
649 "target": "world"
650 })
651 );
652 }
653
654 #[test]
655 fn test_priority_based_compression() {
656 let mut compressor = StreamingCompressor::new();
657
658 let critical_frame = StreamFrame {
659 data: json!({"error": "critical failure"}),
660 priority: Priority::CRITICAL,
661 metadata: HashMap::new(),
662 };
663
664 let low_frame = StreamFrame {
665 data: json!({"debug": "verbose information"}),
666 priority: Priority::LOW,
667 metadata: HashMap::new(),
668 };
669
670 let _critical_result = compressor.compress_frame(critical_frame).unwrap();
671 let _low_result = compressor.compress_frame(low_frame).unwrap();
672
673 let stats = compressor.get_stats();
674 assert_eq!(stats.frames_processed, 2);
675 assert!(stats.total_input_bytes > 0);
676 }
677
678 #[test]
679 fn test_delta_decompression_basic() {
680 let decompressor = StreamingDecompressor::new();
681
682 let compressed_data = json!([
683 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
684 0.0,
685 1.0,
686 2.0,
687 3.0,
688 4.0
689 ]);
690
691 let result = decompressor.decompress_delta(&compressed_data).unwrap();
692 assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
693 }
694
695 #[test]
696 fn test_delta_decompression_negative_deltas() {
697 let decompressor = StreamingDecompressor::new();
698
699 let compressed_data = json!([
700 {"delta_base": 50.0, "delta_type": "numeric_sequence"},
701 -10.0,
702 0.0,
703 10.0,
704 20.0
705 ]);
706
707 let result = decompressor.decompress_delta(&compressed_data).unwrap();
708 assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
709 }
710
711 #[test]
712 fn test_delta_decompression_fractional_deltas() {
713 let decompressor = StreamingDecompressor::new();
714
715 let compressed_data = json!([
716 {"delta_base": 10.0, "delta_type": "numeric_sequence"},
717 0.5,
718 1.0,
719 1.5,
720 2.0
721 ]);
722
723 let result = decompressor.decompress_delta(&compressed_data).unwrap();
724 assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
725 }
726
727 #[test]
728 fn test_delta_decompression_empty_array() {
729 let decompressor = StreamingDecompressor::new();
730
731 let compressed_data = json!([]);
732
733 let result = decompressor.decompress_delta(&compressed_data).unwrap();
734 assert_eq!(result, json!([]));
735 }
736
737 #[test]
738 fn test_delta_decompression_single_element() {
739 let decompressor = StreamingDecompressor::new();
740
741 let compressed_data = json!([
742 {"delta_base": 100.0, "delta_type": "numeric_sequence"}
743 ]);
744
745 let result = decompressor.decompress_delta(&compressed_data).unwrap();
746 assert_eq!(result, json!([]));
747 }
748
749 #[test]
750 fn test_delta_decompression_nested_structure() {
751 let decompressor = StreamingDecompressor::new();
752
753 let compressed_data = json!({
754 "sequence": [
755 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
756 0.0,
757 1.0,
758 2.0
759 ],
760 "other": "data"
761 });
762
763 let result = decompressor.decompress_delta(&compressed_data).unwrap();
764 assert_eq!(
765 result,
766 json!({
767 "sequence": [100.0, 101.0, 102.0],
768 "other": "data"
769 })
770 );
771 }
772
773 #[test]
774 fn test_delta_decompression_invalid_metadata() {
775 let decompressor = StreamingDecompressor::new();
776
777 let compressed_data = json!([
778 {"wrong_key": 100.0},
779 0.0,
780 1.0
781 ]);
782
783 let result = decompressor.decompress_delta(&compressed_data);
784 assert!(result.is_ok());
785 }
787
788 #[test]
789 fn test_delta_decompression_invalid_delta_value() {
790 let decompressor = StreamingDecompressor::new();
791
792 let compressed_data = json!([
793 {"delta_base": 100.0, "delta_type": "numeric_sequence"},
794 "not_a_number"
795 ]);
796
797 let result = decompressor.decompress_delta(&compressed_data);
798 assert!(result.is_err());
799 }
800
801 #[test]
802 fn test_rle_decompression_basic() {
803 let decompressor = StreamingDecompressor::new();
804
805 let compressed_data = json!([
806 {"rle_value": 1, "rle_count": 3},
807 {"rle_value": 2, "rle_count": 2},
808 {"rle_value": 3, "rle_count": 4}
809 ]);
810
811 let result = decompressor
812 .decompress_run_length(&compressed_data)
813 .unwrap();
814 assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
815 }
816
817 #[test]
818 fn test_rle_decompression_mixed_runs() {
819 let decompressor = StreamingDecompressor::new();
820
821 let compressed_data = json!([
822 {"rle_value": "a", "rle_count": 2},
823 "b",
824 {"rle_value": "c", "rle_count": 3}
825 ]);
826
827 let result = decompressor
828 .decompress_run_length(&compressed_data)
829 .unwrap();
830 assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
831 }
832
833 #[test]
834 fn test_rle_decompression_single_count() {
835 let decompressor = StreamingDecompressor::new();
836
837 let compressed_data = json!([
838 {"rle_value": "x", "rle_count": 1}
839 ]);
840
841 let result = decompressor
842 .decompress_run_length(&compressed_data)
843 .unwrap();
844 assert_eq!(result, json!(["x"]));
845 }
846
847 #[test]
848 fn test_rle_decompression_zero_count() {
849 let decompressor = StreamingDecompressor::new();
850
851 let compressed_data = json!([
852 {"rle_value": "x", "rle_count": 0}
853 ]);
854
855 let result = decompressor
856 .decompress_run_length(&compressed_data)
857 .unwrap();
858 assert_eq!(result, json!([]));
859 }
860
861 #[test]
862 fn test_rle_decompression_nested_values() {
863 let decompressor = StreamingDecompressor::new();
864
865 let compressed_data = json!([
866 {"rle_value": {"name": "test"}, "rle_count": 3}
867 ]);
868
869 let result = decompressor
870 .decompress_run_length(&compressed_data)
871 .unwrap();
872 assert_eq!(
873 result,
874 json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
875 );
876 }
877
878 #[test]
879 fn test_rle_decompression_nested_structure() {
880 let decompressor = StreamingDecompressor::new();
881
882 let compressed_data = json!({
883 "data": [
884 {"rle_value": 1, "rle_count": 3},
885 {"rle_value": 2, "rle_count": 2}
886 ],
887 "other": "field"
888 });
889
890 let result = decompressor
891 .decompress_run_length(&compressed_data)
892 .unwrap();
893 assert_eq!(
894 result,
895 json!({
896 "data": [1, 1, 1, 2, 2],
897 "other": "field"
898 })
899 );
900 }
901
902 #[test]
903 fn test_rle_decompression_empty_array() {
904 let decompressor = StreamingDecompressor::new();
905
906 let compressed_data = json!([]);
907
908 let result = decompressor
909 .decompress_run_length(&compressed_data)
910 .unwrap();
911 assert_eq!(result, json!([]));
912 }
913
914 #[test]
915 fn test_rle_decompression_invalid_count() {
916 let decompressor = StreamingDecompressor::new();
917
918 let compressed_data = json!([
919 {"rle_value": "x", "rle_count": "not_a_number"}
920 ]);
921
922 let result = decompressor.decompress_run_length(&compressed_data);
923 assert!(result.is_err());
924 }
925
926 #[test]
927 fn test_rle_decompression_missing_value() {
928 let decompressor = StreamingDecompressor::new();
929
930 let compressed_data = json!([
931 {"rle_count": 3}
932 ]);
933
934 let result = decompressor.decompress_run_length(&compressed_data);
935 assert!(result.is_err());
936 }
937
938 #[test]
939 fn test_rle_decompression_non_rle_objects() {
940 let decompressor = StreamingDecompressor::new();
941
942 let compressed_data = json!([
943 {"regular": "object"},
944 {"another": "one"}
945 ]);
946
947 let result = decompressor
948 .decompress_run_length(&compressed_data)
949 .unwrap();
950 assert_eq!(
952 result,
953 json!([
954 {"regular": "object"},
955 {"another": "one"}
956 ])
957 );
958 }
959}