1pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone, serde::Serialize)]
16pub struct StreamFrame {
17 pub data: JsonValue,
19 pub priority: Priority,
21 pub metadata: HashMap<String, String>,
23}
24
25#[derive(Debug, Clone)]
27pub enum ProcessResult {
28 Processed(StreamFrame),
30 Incomplete,
32 Complete(Vec<StreamFrame>),
34 Error(String),
36}
37
38#[derive(Debug, Clone)]
40pub struct StreamConfig {
41 pub max_frame_size: usize,
43 pub buffer_size: usize,
45 pub enable_compression: bool,
47 pub priority_threshold: u8,
49}
50
51impl Default for StreamConfig {
52 fn default() -> Self {
53 Self {
54 max_frame_size: 1024 * 1024, buffer_size: 64,
56 enable_compression: true,
57 priority_threshold: Priority::HIGH.value(),
58 }
59 }
60}
61
62#[derive(Debug)]
64pub struct StreamProcessor {
65 config: StreamConfig,
66 buffer: Vec<StreamFrame>,
67 processed_count: usize,
68}
69
70impl StreamProcessor {
71 pub fn new(config: StreamConfig) -> Self {
73 Self {
74 config,
75 buffer: Vec::new(),
76 processed_count: 0,
77 }
78 }
79
80 pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
82 let frame_size = serde_json::to_string(&frame.data)
84 .map_err(|e| {
85 crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
86 })?
87 .len();
88
89 if frame_size > self.config.max_frame_size {
90 return Ok(ProcessResult::Error(format!(
91 "Frame size {} exceeds maximum {}",
92 frame_size, self.config.max_frame_size
93 )));
94 }
95
96 self.buffer.push(frame);
98 self.processed_count += 1;
99
100 if self.buffer.len() >= self.config.buffer_size {
102 let frames = self.buffer.drain(..).collect();
103 Ok(ProcessResult::Complete(frames))
104 } else {
105 Ok(ProcessResult::Incomplete)
106 }
107 }
108
109 pub fn flush(&mut self) -> Vec<StreamFrame> {
111 self.buffer.drain(..).collect()
112 }
113
114 pub fn stats(&self) -> StreamStats {
116 StreamStats {
117 processed_frames: self.processed_count,
118 buffered_frames: self.buffer.len(),
119 buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamStats {
127 pub processed_frames: usize,
128 pub buffered_frames: usize,
129 pub buffer_utilization: f32,
130}
131
132pub use compression_integration::{
134 CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
135 StreamingCompressor, StreamingDecompressor,
136};
137pub use priority::{PriorityStreamFrame, PriorityStreamer};
138pub use reconstruction::JsonReconstructor;
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use serde_json::json;
144
145 #[test]
146 fn test_stream_processor_basic() {
147 let config = StreamConfig::default();
148 let mut processor = StreamProcessor::new(config);
149
150 let frame = StreamFrame {
151 data: json!({"test": "data"}),
152 priority: Priority::MEDIUM,
153 metadata: HashMap::new(),
154 };
155
156 let result = processor.process_frame(frame).unwrap();
157 assert!(matches!(result, ProcessResult::Incomplete));
158
159 let stats = processor.stats();
160 assert_eq!(stats.processed_frames, 1);
161 assert_eq!(stats.buffered_frames, 1);
162 }
163
164 #[test]
165 fn test_stream_frame_creation() {
166 let frame = StreamFrame {
167 data: json!({"message": "hello"}),
168 priority: Priority::HIGH,
169 metadata: {
170 let mut meta = HashMap::new();
171 meta.insert("source".to_string(), "test".to_string());
172 meta
173 },
174 };
175
176 assert_eq!(frame.priority, Priority::HIGH);
177 assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
178 }
179
180 #[test]
181 fn test_buffer_flush() {
182 let config = StreamConfig {
183 buffer_size: 2,
184 ..StreamConfig::default()
185 };
186 let mut processor = StreamProcessor::new(config);
187
188 let frame1 = StreamFrame {
190 data: json!({"id": 1}),
191 priority: Priority::MEDIUM,
192 metadata: HashMap::new(),
193 };
194 let result1 = processor.process_frame(frame1).unwrap();
195 assert!(matches!(result1, ProcessResult::Incomplete));
196
197 let frame2 = StreamFrame {
199 data: json!({"id": 2}),
200 priority: Priority::MEDIUM,
201 metadata: HashMap::new(),
202 };
203 let result2 = processor.process_frame(frame2).unwrap();
204
205 match result2 {
206 ProcessResult::Complete(frames) => {
207 assert_eq!(frames.len(), 2);
208 }
209 _ => panic!("Expected Complete with 2 frames"),
210 }
211 }
212}