pub mod compression_integration;
pub mod priority;
pub mod reconstruction;
use crate::domain::{DomainResult, Priority};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
mod serde_priority {
use crate::domain::Priority;
use serde::{Serialize, Serializer};
pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
priority.value().serialize(serializer)
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct StreamFrame {
pub data: JsonValue,
#[serde(with = "serde_priority")]
pub priority: Priority,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum ProcessResult {
Processed(StreamFrame),
Incomplete,
Complete(Vec<StreamFrame>),
Error(String),
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub max_frame_size: usize,
pub buffer_size: usize,
pub enable_compression: bool,
pub priority_threshold: u8,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
max_frame_size: 1024 * 1024, buffer_size: 64,
enable_compression: true,
priority_threshold: Priority::HIGH.value(),
}
}
}
#[derive(Debug)]
pub struct StreamProcessor {
config: StreamConfig,
buffer: Vec<StreamFrame>,
processed_count: usize,
}
impl StreamProcessor {
pub fn new(config: StreamConfig) -> Self {
Self {
config,
buffer: Vec::new(),
processed_count: 0,
}
}
pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
let frame_size = serde_json::to_string(&frame.data)
.map_err(|e| {
crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
})?
.len();
if frame_size > self.config.max_frame_size {
return Ok(ProcessResult::Error(format!(
"Frame size {} exceeds maximum {}",
frame_size, self.config.max_frame_size
)));
}
self.buffer.push(frame);
self.processed_count += 1;
if self.buffer.len() >= self.config.buffer_size {
let frames = self.buffer.drain(..).collect();
Ok(ProcessResult::Complete(frames))
} else {
Ok(ProcessResult::Incomplete)
}
}
pub fn flush(&mut self) -> Vec<StreamFrame> {
self.buffer.drain(..).collect()
}
pub fn stats(&self) -> StreamStats {
StreamStats {
processed_frames: self.processed_count,
buffered_frames: self.buffer.len(),
buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamStats {
pub processed_frames: usize,
pub buffered_frames: usize,
pub buffer_utilization: f32,
}
pub use compression_integration::{
CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
StreamingCompressor, StreamingDecompressor,
};
pub use priority::{PriorityStreamFrame, PriorityStreamer};
pub use reconstruction::JsonReconstructor;
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_stream_processor_basic() {
let config = StreamConfig::default();
let mut processor = StreamProcessor::new(config);
let frame = StreamFrame {
data: json!({"test": "data"}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result = processor.process_frame(frame).unwrap();
assert!(matches!(result, ProcessResult::Incomplete));
let stats = processor.stats();
assert_eq!(stats.processed_frames, 1);
assert_eq!(stats.buffered_frames, 1);
}
#[test]
fn test_stream_frame_creation() {
let frame = StreamFrame {
data: json!({"message": "hello"}),
priority: Priority::HIGH,
metadata: {
let mut meta = HashMap::new();
meta.insert("source".to_string(), "test".to_string());
meta
},
};
assert_eq!(frame.priority, Priority::HIGH);
assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
}
#[test]
fn test_buffer_flush() {
let config = StreamConfig {
buffer_size: 2,
..StreamConfig::default()
};
let mut processor = StreamProcessor::new(config);
let frame1 = StreamFrame {
data: json!({"id": 1}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result1 = processor.process_frame(frame1).unwrap();
assert!(matches!(result1, ProcessResult::Incomplete));
let frame2 = StreamFrame {
data: json!({"id": 2}),
priority: Priority::MEDIUM,
metadata: HashMap::new(),
};
let result2 = processor.process_frame(frame2).unwrap();
match result2 {
ProcessResult::Complete(frames) => {
assert_eq!(frames.len(), 2);
}
_ => panic!("Expected Complete with 2 frames"),
}
}
}