pub mod compression_integration;
pub mod priority;
pub mod reconstruction;
use crate::domain::{DomainResult, Priority};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
mod serde_priority {
use crate::domain::Priority;
use serde::{Serialize, Serializer};
pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
priority.value().serialize(serializer)
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct StreamFrame {
pub data: JsonValue,
#[serde(with = "serde_priority")]
pub priority: Priority,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum ProcessResult {
Processed(StreamFrame),
Complete(Vec<StreamFrame>),
Error(String),
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub max_frame_size: usize,
pub enable_compression: bool,
pub priority_threshold: u8,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
max_frame_size: 1024 * 1024, enable_compression: true,
priority_threshold: Priority::HIGH.value(),
}
}
}
#[derive(Debug)]
pub struct StreamProcessor {
config: StreamConfig,
processed_count: usize,
}
impl StreamProcessor {
pub fn new(config: StreamConfig) -> Self {
Self {
config,
processed_count: 0,
}
}
pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
let frame_size = serde_json::to_string(&frame.data)
.map_err(|e| {
crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
})?
.len();
if frame_size > self.config.max_frame_size {
return Ok(ProcessResult::Error(format!(
"Frame size {} exceeds maximum {}",
frame_size, self.config.max_frame_size
)));
}
self.processed_count += 1;
Ok(ProcessResult::Processed(frame))
}
pub fn stats(&self) -> StreamStats {
StreamStats {
processed_frames: self.processed_count,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamStats {
pub processed_frames: usize,
}
pub use compression_integration::{
CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
StreamingCompressor, StreamingDecompressor,
};
pub use priority::{PriorityStreamFrame, PriorityStreamer};
pub use reconstruction::JsonReconstructor;
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_process_frame_returns_processed_immediately() {
let config = StreamConfig::default();
let mut processor = StreamProcessor::new(config);
let frame = StreamFrame {
data: json!({"test": "data"}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result = processor.process_frame(frame).unwrap();
assert!(matches!(result, ProcessResult::Processed(_)));
let stats = processor.stats();
assert_eq!(stats.processed_frames, 1);
}
#[test]
fn test_processed_result_contains_original_frame() {
let config = StreamConfig::default();
let mut processor = StreamProcessor::new(config);
let frame = StreamFrame {
data: json!({"message": "hello"}),
priority: Priority::HIGH,
metadata: HashMap::new(),
};
match processor.process_frame(frame).unwrap() {
ProcessResult::Processed(f) => {
assert_eq!(f.priority, Priority::HIGH);
assert_eq!(f.data, json!({"message": "hello"}));
}
other => panic!("Expected Processed, got {other:?}"),
}
}
#[test]
fn test_stream_frame_creation() {
let frame = StreamFrame {
data: json!({"message": "hello"}),
priority: Priority::HIGH,
metadata: {
let mut meta = HashMap::new();
meta.insert("source".to_string(), "test".to_string());
meta
},
};
assert_eq!(frame.priority, Priority::HIGH);
assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
}
#[test]
fn test_oversized_frame_returns_error() {
let config = StreamConfig {
max_frame_size: 10, ..StreamConfig::default()
};
let mut processor = StreamProcessor::new(config);
let frame = StreamFrame {
data: json!({"large_key": "this value is definitely over ten bytes"}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result = processor.process_frame(frame).unwrap();
assert!(matches!(result, ProcessResult::Error(_)));
}
#[test]
fn test_multiple_frames_each_processed_immediately() {
let config = StreamConfig::default();
let mut processor = StreamProcessor::new(config);
for i in 0..3u32 {
let frame = StreamFrame {
data: json!({"id": i}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
assert!(matches!(
processor.process_frame(frame).unwrap(),
ProcessResult::Processed(_)
));
}
assert_eq!(processor.stats().processed_frames, 3);
}
}