1pub mod compression_integration;
7pub mod priority;
9use crate::domain::{Priority, DomainResult};
12use serde_json::Value as JsonValue;
13use std::collections::HashMap;
14
15#[derive(Debug, Clone, serde::Serialize)]
17pub struct StreamFrame {
18 pub data: JsonValue,
20 pub priority: Priority,
22 pub metadata: HashMap<String, String>,
24}
25
26#[derive(Debug, Clone)]
28pub enum ProcessResult {
29 Processed(StreamFrame),
31 Incomplete,
33 Complete(Vec<StreamFrame>),
35 Error(String),
37}
38
39#[derive(Debug, Clone)]
41pub struct StreamConfig {
42 pub max_frame_size: usize,
44 pub buffer_size: usize,
46 pub enable_compression: bool,
48 pub priority_threshold: u8,
50}
51
52impl Default for StreamConfig {
53 fn default() -> Self {
54 Self {
55 max_frame_size: 1024 * 1024, buffer_size: 64,
57 enable_compression: true,
58 priority_threshold: Priority::HIGH.value(),
59 }
60 }
61}
62
63#[derive(Debug)]
65pub struct StreamProcessor {
66 config: StreamConfig,
67 buffer: Vec<StreamFrame>,
68 processed_count: usize,
69}
70
71impl StreamProcessor {
72 pub fn new(config: StreamConfig) -> Self {
74 Self {
75 config,
76 buffer: Vec::new(),
77 processed_count: 0,
78 }
79 }
80
81 pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
83 let frame_size = serde_json::to_string(&frame.data)
85 .map_err(|e| crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}")))?
86 .len();
87
88 if frame_size > self.config.max_frame_size {
89 return Ok(ProcessResult::Error(format!(
90 "Frame size {} exceeds maximum {}",
91 frame_size,
92 self.config.max_frame_size
93 )));
94 }
95
96 self.buffer.push(frame);
98 self.processed_count += 1;
99
100 if self.buffer.len() >= self.config.buffer_size {
102 let frames = self.buffer.drain(..).collect();
103 Ok(ProcessResult::Complete(frames))
104 } else {
105 Ok(ProcessResult::Incomplete)
106 }
107 }
108
109 pub fn flush(&mut self) -> Vec<StreamFrame> {
111 self.buffer.drain(..).collect()
112 }
113
114 pub fn stats(&self) -> StreamStats {
116 StreamStats {
117 processed_frames: self.processed_count,
118 buffered_frames: self.buffer.len(),
119 buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamStats {
127 pub processed_frames: usize,
128 pub buffered_frames: usize,
129 pub buffer_utilization: f32,
130}
131
132pub use compression_integration::{
134 CompressedFrame, StreamingCompressor, StreamingDecompressor,
135 CompressionStats, DecompressionStats, DecompressionMetadata,
136};
137pub use priority::PriorityStreamer;
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use serde_json::json;
143
144 #[test]
145 fn test_stream_processor_basic() {
146 let config = StreamConfig::default();
147 let mut processor = StreamProcessor::new(config);
148
149 let frame = StreamFrame {
150 data: json!({"test": "data"}),
151 priority: Priority::MEDIUM,
152 metadata: HashMap::new(),
153 };
154
155 let result = processor.process_frame(frame).unwrap();
156 assert!(matches!(result, ProcessResult::Incomplete));
157
158 let stats = processor.stats();
159 assert_eq!(stats.processed_frames, 1);
160 assert_eq!(stats.buffered_frames, 1);
161 }
162
163 #[test]
164 fn test_stream_frame_creation() {
165 let frame = StreamFrame {
166 data: json!({"message": "hello"}),
167 priority: Priority::HIGH,
168 metadata: {
169 let mut meta = HashMap::new();
170 meta.insert("source".to_string(), "test".to_string());
171 meta
172 },
173 };
174
175 assert_eq!(frame.priority, Priority::HIGH);
176 assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
177 }
178
179 #[test]
180 fn test_buffer_flush() {
181 let config = StreamConfig {
182 buffer_size: 2,
183 ..StreamConfig::default()
184 };
185 let mut processor = StreamProcessor::new(config);
186
187 let frame1 = StreamFrame {
189 data: json!({"id": 1}),
190 priority: Priority::MEDIUM,
191 metadata: HashMap::new(),
192 };
193 let result1 = processor.process_frame(frame1).unwrap();
194 assert!(matches!(result1, ProcessResult::Incomplete));
195
196 let frame2 = StreamFrame {
198 data: json!({"id": 2}),
199 priority: Priority::MEDIUM,
200 metadata: HashMap::new(),
201 };
202 let result2 = processor.process_frame(frame2).unwrap();
203
204 match result2 {
205 ProcessResult::Complete(frames) => {
206 assert_eq!(frames.len(), 2);
207 }
208 _ => panic!("Expected Complete with 2 frames"),
209 }
210 }
211}