use crate::{
compression::{CompressedData, CompressionStrategy, SchemaCompressor},
domain::{DomainError, DomainResult},
stream::{Priority, StreamFrame},
};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
const MAX_RLE_COUNT: u64 = 100_000;
const MAX_DELTA_ARRAY_SIZE: usize = 1_000_000;
const MAX_DECOMPRESSED_SIZE: usize = 10_485_760;
#[derive(Debug, Clone)]
pub struct StreamingCompressor {
skeleton_compressor: SchemaCompressor,
content_compressor: SchemaCompressor,
stats: CompressionStats,
}
#[derive(Debug, Clone, Default)]
pub struct CompressionStats {
pub total_input_bytes: usize,
pub total_output_bytes: usize,
pub frames_processed: u32,
pub priority_ratios: HashMap<u8, f32>,
}
#[derive(Debug, Clone)]
pub struct CompressedFrame {
pub frame: StreamFrame,
pub compressed_data: CompressedData,
pub decompression_metadata: DecompressionMetadata,
}
#[derive(Debug, Clone)]
pub struct DecompressionMetadata {
pub strategy: CompressionStrategy,
pub dictionary_map: HashMap<u16, String>,
pub delta_bases: HashMap<String, f64>,
pub priority_hints: HashMap<u8, String>,
}
impl StreamingCompressor {
pub fn new() -> Self {
Self {
skeleton_compressor: SchemaCompressor::new(),
content_compressor: SchemaCompressor::new(),
stats: CompressionStats::default(),
}
}
pub fn with_strategies(
skeleton_strategy: CompressionStrategy,
content_strategy: CompressionStrategy,
) -> Self {
Self {
skeleton_compressor: SchemaCompressor::with_strategy(skeleton_strategy),
content_compressor: SchemaCompressor::with_strategy(content_strategy),
stats: CompressionStats::default(),
}
}
pub fn compress_frame(&mut self, frame: StreamFrame) -> DomainResult<CompressedFrame> {
let compressor = self.select_compressor_for_priority(frame.priority);
let original_size = serde_json::to_string(&frame.data)
.map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
.len();
let compressed_data = compressor.compress(&frame.data)?;
self.update_stats(
frame.priority,
original_size,
compressed_data.compressed_size,
);
let decompression_metadata = self.create_decompression_metadata(&compressed_data)?;
Ok(CompressedFrame {
frame,
compressed_data,
decompression_metadata,
})
}
pub fn optimize_for_data(
&mut self,
skeleton: &JsonValue,
sample_data: &[JsonValue],
) -> DomainResult<()> {
self.skeleton_compressor.analyze_and_optimize(skeleton)?;
if !sample_data.is_empty() {
let combined_sample = JsonValue::Array(sample_data.to_vec());
self.content_compressor
.analyze_and_optimize(&combined_sample)?;
}
Ok(())
}
pub fn get_stats(&self) -> &CompressionStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = CompressionStats::default();
}
fn select_compressor_for_priority(&mut self, priority: Priority) -> &mut SchemaCompressor {
match priority {
Priority::CRITICAL | Priority::HIGH => &mut self.skeleton_compressor,
_ => &mut self.content_compressor,
}
}
fn update_stats(&mut self, priority: Priority, original_size: usize, compressed_size: usize) {
self.stats.total_input_bytes += original_size;
self.stats.total_output_bytes += compressed_size;
self.stats.frames_processed += 1;
let ratio = if original_size > 0 {
compressed_size as f32 / original_size as f32
} else {
1.0
};
self.stats.priority_ratios.insert(priority.value(), ratio);
}
fn create_decompression_metadata(
&self,
compressed_data: &CompressedData,
) -> DomainResult<DecompressionMetadata> {
let mut dictionary_map = HashMap::new();
let mut delta_bases = HashMap::new();
for (key, value) in &compressed_data.compression_metadata {
if let Some(suffix) = key.strip_prefix("dict_") {
if let Ok(index) = suffix.parse::<u16>()
&& let Some(string_val) = value.as_str()
{
dictionary_map.insert(index, string_val.to_string());
}
} else if let Some(path) = key.strip_prefix("base_")
&& let Some(num) = value.as_f64()
{
delta_bases.insert(path.to_string(), num);
}
}
Ok(DecompressionMetadata {
strategy: compressed_data.strategy.clone(),
dictionary_map,
delta_bases,
priority_hints: HashMap::new(), })
}
}
impl CompressionStats {
pub fn overall_compression_ratio(&self) -> f32 {
if self.total_input_bytes == 0 {
return 1.0;
}
self.total_output_bytes as f32 / self.total_input_bytes as f32
}
pub fn priority_compression_ratio(&self, priority: u8) -> f32 {
self.priority_ratios.get(&priority).copied().unwrap_or(1.0)
}
pub fn bytes_saved(&self) -> isize {
self.total_input_bytes as isize - self.total_output_bytes as isize
}
pub fn percentage_saved(&self) -> f32 {
if self.total_input_bytes == 0 {
return 0.0;
}
let ratio = self.overall_compression_ratio();
(1.0 - ratio) * 100.0
}
}
#[derive(Debug, Clone)]
pub struct StreamingDecompressor {
active_dictionary: HashMap<u16, String>,
delta_bases: HashMap<String, f64>,
stats: DecompressionStats,
}
#[derive(Debug, Clone, Default)]
pub struct DecompressionStats {
pub frames_decompressed: u32,
pub total_decompressed_bytes: usize,
pub avg_decompression_time_us: u64,
}
impl StreamingDecompressor {
pub fn new() -> Self {
Self {
active_dictionary: HashMap::new(),
delta_bases: HashMap::new(),
stats: DecompressionStats::default(),
}
}
pub fn decompress_frame(
&mut self,
compressed_frame: CompressedFrame,
) -> DomainResult<StreamFrame> {
let start_time = std::time::Instant::now();
self.update_context(&compressed_frame.decompression_metadata)?;
let decompressed_data = self.decompress_data(
&compressed_frame.compressed_data,
&compressed_frame.decompression_metadata.strategy,
)?;
let decompression_time = start_time.elapsed();
self.update_decompression_stats(&decompressed_data, decompression_time);
Ok(StreamFrame {
data: decompressed_data,
priority: compressed_frame.frame.priority,
metadata: compressed_frame.frame.metadata,
})
}
fn update_context(&mut self, metadata: &DecompressionMetadata) -> DomainResult<()> {
for (&index, string) in &metadata.dictionary_map {
self.active_dictionary.insert(index, string.clone());
}
for (path, &base) in &metadata.delta_bases {
self.delta_bases.insert(path.clone(), base);
}
Ok(())
}
fn decompress_data(
&self,
compressed_data: &CompressedData,
strategy: &CompressionStrategy,
) -> DomainResult<JsonValue> {
match strategy {
CompressionStrategy::None => Ok(compressed_data.data.clone()),
CompressionStrategy::Dictionary { .. } => {
self.decompress_dictionary(&compressed_data.data)
}
CompressionStrategy::Delta { .. } => self.decompress_delta(&compressed_data.data),
CompressionStrategy::RunLength => self.decompress_run_length(&compressed_data.data),
CompressionStrategy::Hybrid { .. } => {
let delta_decompressed = self.decompress_delta(&compressed_data.data)?;
self.decompress_dictionary(&delta_decompressed)
}
}
}
fn decompress_dictionary(&self, data: &JsonValue) -> DomainResult<JsonValue> {
match data {
JsonValue::Object(obj) => {
let mut decompressed = serde_json::Map::new();
for (key, value) in obj {
decompressed.insert(key.clone(), self.decompress_dictionary(value)?);
}
Ok(JsonValue::Object(decompressed))
}
JsonValue::Array(arr) => {
let decompressed: Result<Vec<_>, _> = arr
.iter()
.map(|item| self.decompress_dictionary(item))
.collect();
Ok(JsonValue::Array(decompressed?))
}
JsonValue::Number(n) => {
if let Some(index) = n.as_u64()
&& let Some(string_val) = self.active_dictionary.get(&(index as u16))
{
return Ok(JsonValue::String(string_val.clone()));
}
Ok(data.clone())
}
_ => Ok(data.clone()),
}
}
pub fn decompress_delta(&self, data: &JsonValue) -> DomainResult<JsonValue> {
match data {
JsonValue::Object(obj) => {
let mut decompressed_obj = serde_json::Map::new();
for (key, value) in obj {
decompressed_obj.insert(key.clone(), self.decompress_delta(value)?);
}
Ok(JsonValue::Object(decompressed_obj))
}
JsonValue::Array(arr) => {
if arr.is_empty() {
return Ok(JsonValue::Array(arr.clone()));
}
if let Some(first) = arr.first()
&& let Some(obj) = first.as_object()
&& obj.contains_key("delta_base")
&& obj.contains_key("delta_type")
{
return self.decompress_delta_array(arr);
}
let decompressed_arr: Result<Vec<_>, _> =
arr.iter().map(|item| self.decompress_delta(item)).collect();
Ok(JsonValue::Array(decompressed_arr?))
}
_ => Ok(data.clone()),
}
}
fn decompress_delta_array(&self, arr: &[JsonValue]) -> DomainResult<JsonValue> {
if arr.is_empty() {
return Ok(JsonValue::Array(Vec::new()));
}
if arr.len() > MAX_DELTA_ARRAY_SIZE {
return Err(DomainError::CompressionError(format!(
"Delta array size {} exceeds maximum {}",
arr.len(),
MAX_DELTA_ARRAY_SIZE
)));
}
let base_value = arr[0]
.get("delta_base")
.and_then(|v| v.as_f64())
.ok_or_else(|| {
DomainError::CompressionError(
"Missing or invalid delta_base in metadata".to_string(),
)
})?;
let mut original_values = Vec::new();
for delta_value in arr.iter().skip(1) {
let delta = delta_value.as_f64().ok_or_else(|| {
DomainError::CompressionError("Invalid delta value: expected number".to_string())
})?;
let original = base_value + delta;
original_values.push(JsonValue::from(original));
}
Ok(JsonValue::Array(original_values))
}
pub fn decompress_run_length(&self, data: &JsonValue) -> DomainResult<JsonValue> {
match data {
JsonValue::Object(obj) => {
let mut decompressed_obj = serde_json::Map::new();
for (key, value) in obj {
decompressed_obj.insert(key.clone(), self.decompress_run_length(value)?);
}
Ok(JsonValue::Object(decompressed_obj))
}
JsonValue::Array(arr) => {
let mut decompressed_values = Vec::new();
let mut total_size = 0usize;
for item in arr {
if let Some(obj) = item.as_object() {
let has_rle_value = obj.contains_key("rle_value");
let has_rle_count = obj.contains_key("rle_count");
if has_rle_value && !has_rle_count {
return Err(DomainError::CompressionError(
"Malformed RLE object: rle_value without rle_count".to_string(),
));
}
if has_rle_count && !has_rle_value {
return Err(DomainError::CompressionError(
"Malformed RLE object: rle_count without rle_value".to_string(),
));
}
if has_rle_value && has_rle_count {
let value = obj
.get("rle_value")
.ok_or_else(|| {
DomainError::CompressionError("Missing rle_value".to_string())
})?
.clone();
let count =
obj.get("rle_count")
.and_then(|v| v.as_u64())
.ok_or_else(|| {
DomainError::CompressionError(
"Invalid rle_count: expected positive integer"
.to_string(),
)
})?;
if count > MAX_RLE_COUNT {
return Err(DomainError::CompressionError(format!(
"RLE count {} exceeds maximum {}",
count, MAX_RLE_COUNT
)));
}
let count_usize = usize::try_from(count).map_err(|_| {
DomainError::CompressionError(format!(
"RLE count {} exceeds platform maximum",
count
))
})?;
total_size = total_size.checked_add(count_usize).ok_or_else(|| {
DomainError::CompressionError(
"Total decompressed size overflow".to_string(),
)
})?;
if total_size > MAX_DECOMPRESSED_SIZE {
return Err(DomainError::CompressionError(format!(
"Decompressed size {} exceeds maximum {}",
total_size, MAX_DECOMPRESSED_SIZE
)));
}
for _ in 0..count {
decompressed_values.push(value.clone());
}
} else {
decompressed_values.push(self.decompress_run_length(item)?);
}
} else {
decompressed_values.push(self.decompress_run_length(item)?);
}
}
Ok(JsonValue::Array(decompressed_values))
}
_ => Ok(data.clone()),
}
}
fn update_decompression_stats(&mut self, data: &JsonValue, duration: std::time::Duration) {
self.stats.frames_decompressed += 1;
if let Ok(serialized) = serde_json::to_string(data) {
self.stats.total_decompressed_bytes += serialized.len();
}
let new_time_us = duration.as_micros() as u64;
if self.stats.frames_decompressed == 1 {
self.stats.avg_decompression_time_us = new_time_us;
} else {
let total_frames = self.stats.frames_decompressed as u64;
let total_time =
self.stats.avg_decompression_time_us * (total_frames - 1) + new_time_us;
self.stats.avg_decompression_time_us = total_time / total_frames;
}
}
pub fn get_stats(&self) -> &DecompressionStats {
&self.stats
}
}
impl Default for StreamingCompressor {
fn default() -> Self {
Self::new()
}
}
impl Default for StreamingDecompressor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_streaming_compressor_basic() {
let mut compressor = StreamingCompressor::new();
let frame = StreamFrame {
data: json!({
"message": "test message",
"count": 42
}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result = compressor.compress_frame(frame);
assert!(result.is_ok());
let compressed = result.unwrap();
assert_eq!(compressed.frame.priority, Priority::MEDIUM);
}
#[test]
fn test_compression_stats() {
let stats = CompressionStats {
total_input_bytes: 1000,
total_output_bytes: 600,
..Default::default()
};
assert_eq!(stats.overall_compression_ratio(), 0.6);
assert_eq!(stats.bytes_saved(), 400);
let percentage = stats.percentage_saved();
assert!((percentage - 40.0).abs() < 0.001);
}
#[test]
fn test_streaming_decompressor_basic() {
let mut decompressor = StreamingDecompressor::new();
let compressed_frame = CompressedFrame {
frame: StreamFrame {
data: json!({"test": "data"}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
},
compressed_data: CompressedData {
strategy: CompressionStrategy::None,
compressed_size: 20,
data: json!({"test": "data"}),
compression_metadata: HashMap::new(),
},
decompression_metadata: DecompressionMetadata {
strategy: CompressionStrategy::None,
dictionary_map: HashMap::new(),
delta_bases: HashMap::new(),
priority_hints: HashMap::new(),
},
};
let result = decompressor.decompress_frame(compressed_frame);
assert!(result.is_ok());
let decompressed = result.unwrap();
assert_eq!(decompressed.data, json!({"test": "data"}));
}
#[test]
fn test_dictionary_decompression() {
let mut decompressor = StreamingDecompressor::new();
decompressor
.active_dictionary
.insert(0, "hello".to_string());
decompressor
.active_dictionary
.insert(1, "world".to_string());
let compressed = json!({
"greeting": 0,
"target": 1
});
let result = decompressor.decompress_dictionary(&compressed).unwrap();
assert_eq!(
result,
json!({
"greeting": "hello",
"target": "world"
})
);
}
#[test]
fn test_priority_based_compression() {
let mut compressor = StreamingCompressor::new();
let critical_frame = StreamFrame {
data: json!({"error": "critical failure"}),
priority: Priority::CRITICAL,
metadata: HashMap::new(),
};
let low_frame = StreamFrame {
data: json!({"debug": "verbose information"}),
priority: Priority::LOW,
metadata: HashMap::new(),
};
let _critical_result = compressor.compress_frame(critical_frame).unwrap();
let _low_result = compressor.compress_frame(low_frame).unwrap();
let stats = compressor.get_stats();
assert_eq!(stats.frames_processed, 2);
assert!(stats.total_input_bytes > 0);
}
#[test]
fn test_delta_decompression_basic() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"delta_base": 100.0, "delta_type": "numeric_sequence"},
0.0,
1.0,
2.0,
3.0,
4.0
]);
let result = decompressor.decompress_delta(&compressed_data).unwrap();
assert_eq!(result, json!([100.0, 101.0, 102.0, 103.0, 104.0]));
}
#[test]
fn test_delta_decompression_negative_deltas() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"delta_base": 50.0, "delta_type": "numeric_sequence"},
-10.0,
0.0,
10.0,
20.0
]);
let result = decompressor.decompress_delta(&compressed_data).unwrap();
assert_eq!(result, json!([40.0, 50.0, 60.0, 70.0]));
}
#[test]
fn test_delta_decompression_fractional_deltas() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"delta_base": 10.0, "delta_type": "numeric_sequence"},
0.5,
1.0,
1.5,
2.0
]);
let result = decompressor.decompress_delta(&compressed_data).unwrap();
assert_eq!(result, json!([10.5, 11.0, 11.5, 12.0]));
}
#[test]
fn test_delta_decompression_empty_array() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([]);
let result = decompressor.decompress_delta(&compressed_data).unwrap();
assert_eq!(result, json!([]));
}
#[test]
fn test_delta_decompression_single_element() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"delta_base": 100.0, "delta_type": "numeric_sequence"}
]);
let result = decompressor.decompress_delta(&compressed_data).unwrap();
assert_eq!(result, json!([]));
}
#[test]
fn test_delta_decompression_nested_structure() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!({
"sequence": [
{"delta_base": 100.0, "delta_type": "numeric_sequence"},
0.0,
1.0,
2.0
],
"other": "data"
});
let result = decompressor.decompress_delta(&compressed_data).unwrap();
assert_eq!(
result,
json!({
"sequence": [100.0, 101.0, 102.0],
"other": "data"
})
);
}
#[test]
fn test_delta_decompression_invalid_metadata() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"wrong_key": 100.0},
0.0,
1.0
]);
let result = decompressor.decompress_delta(&compressed_data);
assert!(result.is_ok());
}
#[test]
fn test_delta_decompression_invalid_delta_value() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"delta_base": 100.0, "delta_type": "numeric_sequence"},
"not_a_number"
]);
let result = decompressor.decompress_delta(&compressed_data);
assert!(result.is_err());
}
#[test]
fn test_rle_decompression_basic() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": 1, "rle_count": 3},
{"rle_value": 2, "rle_count": 2},
{"rle_value": 3, "rle_count": 4}
]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(result, json!([1, 1, 1, 2, 2, 3, 3, 3, 3]));
}
#[test]
fn test_rle_decompression_mixed_runs() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": "a", "rle_count": 2},
"b",
{"rle_value": "c", "rle_count": 3}
]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(result, json!(["a", "a", "b", "c", "c", "c"]));
}
#[test]
fn test_rle_decompression_single_count() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": "x", "rle_count": 1}
]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(result, json!(["x"]));
}
#[test]
fn test_rle_decompression_zero_count() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": "x", "rle_count": 0}
]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(result, json!([]));
}
#[test]
fn test_rle_decompression_nested_values() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": {"name": "test"}, "rle_count": 3}
]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(
result,
json!([{"name": "test"}, {"name": "test"}, {"name": "test"}])
);
}
#[test]
fn test_rle_decompression_nested_structure() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!({
"data": [
{"rle_value": 1, "rle_count": 3},
{"rle_value": 2, "rle_count": 2}
],
"other": "field"
});
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(
result,
json!({
"data": [1, 1, 1, 2, 2],
"other": "field"
})
);
}
#[test]
fn test_rle_decompression_empty_array() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(result, json!([]));
}
#[test]
fn test_rle_decompression_invalid_count() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": "x", "rle_count": "not_a_number"}
]);
let result = decompressor.decompress_run_length(&compressed_data);
assert!(result.is_err());
}
#[test]
fn test_rle_decompression_missing_value() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_count": 3}
]);
let result = decompressor.decompress_run_length(&compressed_data);
assert!(result.is_err());
}
#[test]
fn test_rle_decompression_missing_count() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"rle_value": "x"}
]);
let result = decompressor.decompress_run_length(&compressed_data);
assert!(result.is_err());
}
#[test]
fn test_rle_decompression_non_rle_objects() {
let decompressor = StreamingDecompressor::new();
let compressed_data = json!([
{"regular": "object"},
{"another": "one"}
]);
let result = decompressor
.decompress_run_length(&compressed_data)
.unwrap();
assert_eq!(
result,
json!([
{"regular": "object"},
{"another": "one"}
])
);
}
#[test]
fn test_compress_frame_with_custom_strategies() {
let mut dict = HashMap::new();
dict.insert("test".to_string(), 0);
let mut bases = HashMap::new();
bases.insert("value".to_string(), 100.0);
let mut compressor = StreamingCompressor::with_strategies(
CompressionStrategy::Dictionary { dictionary: dict },
CompressionStrategy::Delta { base_values: bases },
);
let frame = StreamFrame {
data: json!({"value": 123, "other": 456}),
priority: Priority::HIGH,
metadata: HashMap::new(),
};
let result = compressor.compress_frame(frame);
assert!(result.is_ok());
assert_eq!(compressor.stats.frames_processed, 1);
}
#[test]
fn test_optimize_for_data_with_samples() {
let mut compressor = StreamingCompressor::new();
let skeleton = json!({
"id": null,
"name": null
});
let samples = vec![
json!({"id": 1, "name": "test1"}),
json!({"id": 2, "name": "test2"}),
json!({"id": 3, "name": "test3"}),
];
let result = compressor.optimize_for_data(&skeleton, &samples);
assert!(result.is_ok());
}
#[test]
fn test_optimize_for_data_empty_samples() {
let mut compressor = StreamingCompressor::new();
let skeleton = json!({"key": "value"});
let result = compressor.optimize_for_data(&skeleton, &[]);
assert!(result.is_ok());
}
#[test]
fn test_reset_stats() {
let mut compressor = StreamingCompressor::new();
compressor.stats.total_input_bytes = 1000;
compressor.stats.total_output_bytes = 500;
compressor.stats.frames_processed = 10;
compressor.reset_stats();
assert_eq!(compressor.stats.total_input_bytes, 0);
assert_eq!(compressor.stats.total_output_bytes, 0);
assert_eq!(compressor.stats.frames_processed, 0);
}
#[test]
fn test_compressor_critical_vs_low_priority() {
let mut compressor = StreamingCompressor::new();
let critical_frame = StreamFrame {
data: json!({"critical": "data"}),
priority: Priority::CRITICAL,
metadata: HashMap::new(),
};
let low_frame = StreamFrame {
data: json!({"low": "data"}),
priority: Priority::LOW,
metadata: HashMap::new(),
};
compressor.compress_frame(critical_frame).unwrap();
compressor.compress_frame(low_frame).unwrap();
assert_eq!(compressor.stats.frames_processed, 2);
}
#[test]
fn test_decompressor_hybrid_strategy() {
let mut decompressor = StreamingDecompressor::new();
decompressor.delta_bases.insert("value".to_string(), 100.0);
decompressor.active_dictionary.insert(0, "test".to_string());
let mut string_dict = HashMap::new();
string_dict.insert("test".to_string(), 0);
let mut numeric_deltas = HashMap::new();
numeric_deltas.insert("value".to_string(), 100.0);
let compressed_frame = CompressedFrame {
frame: StreamFrame {
data: json!({"test": "data"}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
},
compressed_data: CompressedData {
strategy: CompressionStrategy::Hybrid {
string_dict: string_dict.clone(),
numeric_deltas: numeric_deltas.clone(),
},
compressed_size: 20,
data: json!({"value": 5.0}), compression_metadata: HashMap::new(),
},
decompression_metadata: DecompressionMetadata {
strategy: CompressionStrategy::Hybrid {
string_dict,
numeric_deltas,
},
dictionary_map: HashMap::new(),
delta_bases: HashMap::new(),
priority_hints: HashMap::new(),
},
};
let result = decompressor.decompress_frame(compressed_frame);
assert!(result.is_ok());
}
#[test]
fn test_decompress_dictionary_nested_arrays() {
let mut decompressor = StreamingDecompressor::new();
decompressor
.active_dictionary
.insert(0, "item1".to_string());
decompressor
.active_dictionary
.insert(1, "item2".to_string());
let data = json!([[0, 1], [1, 0]]);
let result = decompressor.decompress_dictionary(&data).unwrap();
assert_eq!(result, json!([["item1", "item2"], ["item2", "item1"]]));
}
#[test]
fn test_decompress_dictionary_non_index_numbers() {
let mut decompressor = StreamingDecompressor::new();
decompressor.active_dictionary.insert(0, "test".to_string());
let data = json!({"value": 999});
let result = decompressor.decompress_dictionary(&data).unwrap();
assert_eq!(result, json!({"value": 999}));
}
#[test]
fn test_decompress_delta_non_array() {
let decompressor = StreamingDecompressor::new();
let data = json!({"key": "value"});
let result = decompressor.decompress_delta(&data).unwrap();
assert_eq!(result, json!({"key": "value"}));
}
#[test]
fn test_decompress_delta_array_without_metadata() {
let decompressor = StreamingDecompressor::new();
let data = json!([1, 2, 3, 4]);
let result = decompressor.decompress_delta(&data).unwrap();
assert_eq!(result, json!([1, 2, 3, 4]));
}
#[test]
fn test_decompress_run_length_nested_objects() {
let decompressor = StreamingDecompressor::new();
let data = json!({
"outer": {
"inner": [
{"rle_value": {"nested": "obj"}, "rle_count": 2}
]
}
});
let result = decompressor.decompress_run_length(&data).unwrap();
assert_eq!(
result,
json!({
"outer": {
"inner": [{"nested": "obj"}, {"nested": "obj"}]
}
})
);
}
#[test]
fn test_decompression_stats_tracking() {
let mut decompressor = StreamingDecompressor::new();
assert_eq!(decompressor.stats.frames_decompressed, 0);
let frame = CompressedFrame {
frame: StreamFrame {
data: json!({"test": "data"}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
},
compressed_data: CompressedData {
strategy: CompressionStrategy::None,
compressed_size: 15,
data: json!({"test": "data"}),
compression_metadata: HashMap::new(),
},
decompression_metadata: DecompressionMetadata {
strategy: CompressionStrategy::None,
dictionary_map: HashMap::new(),
delta_bases: HashMap::new(),
priority_hints: HashMap::new(),
},
};
decompressor.decompress_frame(frame).unwrap();
assert_eq!(decompressor.stats.frames_decompressed, 1);
assert!(decompressor.stats.total_decompressed_bytes > 0);
assert!(decompressor.stats.avg_decompression_time_us > 0);
}
#[test]
fn test_decompress_delta_array_malformed_metadata() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"delta_type": "numeric_sequence"},
1.0,
2.0
]);
let result = decompressor.decompress_delta(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), data);
}
#[test]
fn test_decompress_run_length_large_count() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_value": "x", "rle_count": 1000}
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_ok());
let decompressed = result.unwrap();
if let Some(arr) = decompressed.as_array() {
assert_eq!(arr.len(), 1000);
}
}
#[test]
fn test_decompress_run_length_exceeds_max_count() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_value": "x", "rle_count": 200_000}
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_err()); }
#[test]
fn test_decompress_run_length_cumulative_overflow() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_value": "a", "rle_count": 5_000_000},
{"rle_value": "b", "rle_count": 6_000_000}
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_err());
}
#[test]
fn test_decompress_delta_array_size_limit() {
let decompressor = StreamingDecompressor::new();
let mut large_array = vec![json!({"delta_base": 0.0, "delta_type": "numeric_sequence"})];
for _i in 0..1_000_001 {
large_array.push(json!(0.0));
}
let result = decompressor.decompress_delta(&JsonValue::Array(large_array));
assert!(result.is_err()); }
#[test]
fn test_compression_stats_default() {
let stats = CompressionStats::default();
assert_eq!(stats.total_input_bytes, 0);
assert_eq!(stats.total_output_bytes, 0);
assert_eq!(stats.frames_processed, 0);
assert_eq!(stats.overall_compression_ratio(), 1.0);
}
#[test]
fn test_decompression_stats_default() {
let stats = DecompressionStats::default();
assert_eq!(stats.frames_decompressed, 0);
assert_eq!(stats.total_decompressed_bytes, 0);
assert_eq!(stats.avg_decompression_time_us, 0);
}
#[test]
fn test_decompress_dictionary_with_strings() {
let decompressor = StreamingDecompressor::new();
let data = json!({"key": "value", "nested": {"inner": "string"}});
let result = decompressor.decompress_dictionary(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), data);
}
#[test]
fn test_decompress_dictionary_with_null() {
let decompressor = StreamingDecompressor::new();
let data = json!(null);
let result = decompressor.decompress_dictionary(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!(null));
}
#[test]
fn test_decompress_dictionary_with_boolean() {
let decompressor = StreamingDecompressor::new();
let data = json!(true);
let result = decompressor.decompress_dictionary(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!(true));
}
#[test]
fn test_decompress_dictionary_with_string() {
let decompressor = StreamingDecompressor::new();
let data = json!("plain string");
let result = decompressor.decompress_dictionary(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!("plain string"));
}
#[test]
fn test_decompress_delta_with_object_no_array() {
let decompressor = StreamingDecompressor::new();
let data = json!({"key": "value"});
let result = decompressor.decompress_delta(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!({"key": "value"}));
}
#[test]
fn test_decompress_delta_with_primitive_values() {
let decompressor = StreamingDecompressor::new();
assert_eq!(
decompressor.decompress_delta(&json!("string")).unwrap(),
json!("string")
);
assert_eq!(
decompressor.decompress_delta(&json!(42)).unwrap(),
json!(42)
);
assert_eq!(
decompressor.decompress_delta(&json!(true)).unwrap(),
json!(true)
);
assert_eq!(
decompressor.decompress_delta(&json!(null)).unwrap(),
json!(null)
);
}
#[test]
fn test_decompress_run_length_with_primitive_values() {
let decompressor = StreamingDecompressor::new();
assert_eq!(
decompressor
.decompress_run_length(&json!("string"))
.unwrap(),
json!("string")
);
assert_eq!(
decompressor.decompress_run_length(&json!(123)).unwrap(),
json!(123)
);
assert_eq!(
decompressor.decompress_run_length(&json!(false)).unwrap(),
json!(false)
);
assert_eq!(
decompressor.decompress_run_length(&json!(null)).unwrap(),
json!(null)
);
}
#[test]
fn test_decompress_data_strategy_dictionary() {
let mut decompressor = StreamingDecompressor::new();
decompressor.active_dictionary.insert(0, "test".to_string());
let mut dict = HashMap::new();
dict.insert("test".to_string(), 0);
let compressed_data = CompressedData {
strategy: CompressionStrategy::Dictionary { dictionary: dict },
compressed_size: 10,
data: json!({"field": 0}),
compression_metadata: HashMap::new(),
};
let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
assert!(result.is_ok());
}
#[test]
fn test_decompress_data_strategy_delta() {
let decompressor = StreamingDecompressor::new();
let mut bases = HashMap::new();
bases.insert("value".to_string(), 100.0);
let compressed_data = CompressedData {
strategy: CompressionStrategy::Delta {
base_values: bases.clone(),
},
compressed_size: 10,
data: json!({
"sequence": [
{"delta_base": 100.0, "delta_type": "numeric_sequence"},
5.0,
10.0
]
}),
compression_metadata: HashMap::new(),
};
let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
assert!(result.is_ok());
}
#[test]
fn test_decompress_data_strategy_run_length() {
let decompressor = StreamingDecompressor::new();
let compressed_data = CompressedData {
strategy: CompressionStrategy::RunLength,
compressed_size: 10,
data: json!([
{"rle_value": "x", "rle_count": 3}
]),
compression_metadata: HashMap::new(),
};
let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!(["x", "x", "x"]));
}
#[test]
fn test_decompress_data_strategy_hybrid_applies_delta_then_dict() {
let mut decompressor = StreamingDecompressor::new();
decompressor.active_dictionary.insert(0, "test".to_string());
let mut string_dict = HashMap::new();
string_dict.insert("test".to_string(), 0);
let mut numeric_deltas = HashMap::new();
numeric_deltas.insert("value".to_string(), 100.0);
let compressed_data = CompressedData {
strategy: CompressionStrategy::Hybrid {
string_dict,
numeric_deltas,
},
compressed_size: 10,
data: json!({
"field": 0
}),
compression_metadata: HashMap::new(),
};
let result = decompressor.decompress_data(&compressed_data, &compressed_data.strategy);
assert!(result.is_ok());
}
#[test]
fn test_select_compressor_critical_priority() {
let mut compressor = StreamingCompressor::new();
let _skeleton_comp = compressor.select_compressor_for_priority(Priority::CRITICAL);
}
#[test]
fn test_select_compressor_high_priority() {
let mut compressor = StreamingCompressor::new();
let _skeleton_comp = compressor.select_compressor_for_priority(Priority::HIGH);
}
#[test]
fn test_select_compressor_medium_priority() {
let mut compressor = StreamingCompressor::new();
let _content_comp = compressor.select_compressor_for_priority(Priority::MEDIUM);
}
#[test]
fn test_select_compressor_low_priority() {
let mut compressor = StreamingCompressor::new();
let _content_comp = compressor.select_compressor_for_priority(Priority::LOW);
}
#[test]
fn test_select_compressor_background_priority() {
let mut compressor = StreamingCompressor::new();
let _content_comp = compressor.select_compressor_for_priority(Priority::BACKGROUND);
}
#[test]
fn test_update_stats_with_zero_original_size() {
let mut compressor = StreamingCompressor::new();
compressor.update_stats(Priority::MEDIUM, 0, 10);
let stats = compressor.get_stats();
assert_eq!(stats.frames_processed, 1);
assert_eq!(stats.total_input_bytes, 0);
assert_eq!(stats.total_output_bytes, 10);
assert_eq!(
stats.priority_compression_ratio(Priority::MEDIUM.value()),
1.0
);
}
#[test]
fn test_update_stats_with_normal_compression() {
let mut compressor = StreamingCompressor::new();
compressor.update_stats(Priority::HIGH, 1000, 500);
let stats = compressor.get_stats();
assert_eq!(stats.frames_processed, 1);
assert_eq!(stats.total_input_bytes, 1000);
assert_eq!(stats.total_output_bytes, 500);
assert_eq!(
stats.priority_compression_ratio(Priority::HIGH.value()),
0.5
);
}
#[test]
fn test_update_decompression_stats_first_frame() {
let mut decompressor = StreamingDecompressor::new();
let data = json!({"test": "data"});
let duration = std::time::Duration::from_micros(100);
decompressor.update_decompression_stats(&data, duration);
let stats = decompressor.get_stats();
assert_eq!(stats.frames_decompressed, 1);
assert_eq!(stats.avg_decompression_time_us, 100);
}
#[test]
fn test_update_decompression_stats_multiple_frames() {
let mut decompressor = StreamingDecompressor::new();
let data = json!({"test": "data"});
decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(100));
decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(200));
decompressor.update_decompression_stats(&data, std::time::Duration::from_micros(300));
let stats = decompressor.get_stats();
assert_eq!(stats.frames_decompressed, 3);
assert_eq!(stats.avg_decompression_time_us, 200); }
#[test]
fn test_create_decompression_metadata_with_dict() {
let compressor = StreamingCompressor::new();
let mut metadata = HashMap::new();
metadata.insert("dict_0".to_string(), json!("hello"));
metadata.insert("dict_1".to_string(), json!("world"));
let compressed_data = CompressedData {
strategy: CompressionStrategy::None,
compressed_size: 10,
data: json!({}),
compression_metadata: metadata,
};
let result = compressor.create_decompression_metadata(&compressed_data);
assert!(result.is_ok());
let meta = result.unwrap();
assert_eq!(meta.dictionary_map.len(), 2);
assert_eq!(meta.dictionary_map.get(&0), Some(&"hello".to_string()));
assert_eq!(meta.dictionary_map.get(&1), Some(&"world".to_string()));
}
#[test]
fn test_create_decompression_metadata_with_delta_bases() {
let compressor = StreamingCompressor::new();
let mut metadata = HashMap::new();
metadata.insert("base_value1".to_string(), json!(100.0));
metadata.insert("base_value2".to_string(), json!(200.0));
let compressed_data = CompressedData {
strategy: CompressionStrategy::None,
compressed_size: 10,
data: json!({}),
compression_metadata: metadata,
};
let result = compressor.create_decompression_metadata(&compressed_data);
assert!(result.is_ok());
let meta = result.unwrap();
assert_eq!(meta.delta_bases.len(), 2);
assert_eq!(meta.delta_bases.get("value1"), Some(&100.0));
assert_eq!(meta.delta_bases.get("value2"), Some(&200.0));
}
#[test]
fn test_create_decompression_metadata_with_invalid_dict_index() {
let compressor = StreamingCompressor::new();
let mut metadata = HashMap::new();
metadata.insert("dict_invalid".to_string(), json!("value"));
metadata.insert("dict_0".to_string(), json!("valid"));
let compressed_data = CompressedData {
strategy: CompressionStrategy::None,
compressed_size: 10,
data: json!({}),
compression_metadata: metadata,
};
let result = compressor.create_decompression_metadata(&compressed_data);
assert!(result.is_ok());
let meta = result.unwrap();
assert_eq!(meta.dictionary_map.len(), 1);
assert_eq!(meta.dictionary_map.get(&0), Some(&"valid".to_string()));
}
#[test]
fn test_update_context_updates_dictionary() {
let mut decompressor = StreamingDecompressor::new();
let mut metadata = DecompressionMetadata {
strategy: CompressionStrategy::None,
dictionary_map: HashMap::new(),
delta_bases: HashMap::new(),
priority_hints: HashMap::new(),
};
metadata.dictionary_map.insert(0, "hello".to_string());
metadata.dictionary_map.insert(1, "world".to_string());
let result = decompressor.update_context(&metadata);
assert!(result.is_ok());
assert_eq!(decompressor.active_dictionary.len(), 2);
assert_eq!(
decompressor.active_dictionary.get(&0),
Some(&"hello".to_string())
);
}
#[test]
fn test_update_context_updates_delta_bases() {
let mut decompressor = StreamingDecompressor::new();
let mut metadata = DecompressionMetadata {
strategy: CompressionStrategy::None,
dictionary_map: HashMap::new(),
delta_bases: HashMap::new(),
priority_hints: HashMap::new(),
};
metadata.delta_bases.insert("value1".to_string(), 100.0);
metadata.delta_bases.insert("value2".to_string(), 200.0);
let result = decompressor.update_context(&metadata);
assert!(result.is_ok());
assert_eq!(decompressor.delta_bases.len(), 2);
assert_eq!(decompressor.delta_bases.get("value1"), Some(&100.0));
}
#[test]
fn test_decompress_dictionary_with_float_that_is_not_u64() {
let mut decompressor = StreamingDecompressor::new();
decompressor.active_dictionary.insert(0, "test".to_string());
let data = json!({"value": 1.5});
let result = decompressor.decompress_dictionary(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!({"value": 1.5}));
}
#[test]
fn test_decompress_dictionary_with_negative_number() {
let mut decompressor = StreamingDecompressor::new();
decompressor.active_dictionary.insert(0, "test".to_string());
let data = json!({"value": -1});
let result = decompressor.decompress_dictionary(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!({"value": -1}));
}
#[test]
fn test_decompress_delta_array_checks_first_element_structure() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"wrong_field": 100.0},
1.0,
2.0
]);
let result = decompressor.decompress_delta(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), data);
}
#[test]
fn test_decompress_delta_array_requires_both_base_and_type() {
let decompressor = StreamingDecompressor::new();
let data1 = json!([
{"delta_base": 100.0},
1.0
]);
let result1 = decompressor.decompress_delta(&data1);
assert!(result1.is_ok());
let data2 = json!([
{"delta_type": "numeric_sequence"},
1.0
]);
let result2 = decompressor.decompress_delta(&data2);
assert!(result2.is_ok());
}
#[test]
fn test_decompress_run_length_with_non_objects_in_array() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_value": "a", "rle_count": 2},
"plain",
42,
true
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), json!(["a", "a", "plain", 42, true]));
}
#[test]
fn test_decompress_run_length_integrity_check_rle_value_without_count() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_value": "x"}
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_err());
}
#[test]
fn test_decompress_run_length_integrity_check_rle_count_without_value() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_count": 3}
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_err());
}
#[test]
fn test_decompress_run_length_non_number_count() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"rle_value": "x", "rle_count": "three"}
]);
let result = decompressor.decompress_run_length(&data);
assert!(result.is_err());
}
#[test]
fn test_compress_frame_with_large_data() {
let mut compressor = StreamingCompressor::new();
let large_data = json!({
"users": (0..100).map(|i| json!({
"id": i,
"name": format!("User{}", i),
"email": format!("user{}@example.com", i),
"age": 20 + (i % 50),
"active": i % 2 == 0
})).collect::<Vec<_>>()
});
let frame = StreamFrame {
data: large_data,
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result = compressor.compress_frame(frame);
assert!(result.is_ok());
let stats = compressor.get_stats();
assert_eq!(stats.frames_processed, 1);
assert!(stats.total_input_bytes > 1000);
}
#[test]
fn test_decompress_delta_with_very_large_deltas() {
let decompressor = StreamingDecompressor::new();
let data = json!([
{"delta_base": 1_000_000.0, "delta_type": "numeric_sequence"},
100_000.0,
200_000.0,
300_000.0
]);
let result = decompressor.decompress_delta(&data);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
json!([1_100_000.0, 1_200_000.0, 1_300_000.0])
);
}
}