1pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{Priority, DomainResult};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone, serde::Serialize)]
16pub struct StreamFrame {
17 pub data: JsonValue,
19 pub priority: Priority,
21 pub metadata: HashMap<String, String>,
23}
24
25
26#[derive(Debug, Clone)]
28pub enum ProcessResult {
29 Processed(StreamFrame),
31 Incomplete,
33 Complete(Vec<StreamFrame>),
35 Error(String),
37}
38
39#[derive(Debug, Clone)]
41pub struct StreamConfig {
42 pub max_frame_size: usize,
44 pub buffer_size: usize,
46 pub enable_compression: bool,
48 pub priority_threshold: u8,
50}
51
52impl Default for StreamConfig {
53 fn default() -> Self {
54 Self {
55 max_frame_size: 1024 * 1024, buffer_size: 64,
57 enable_compression: true,
58 priority_threshold: Priority::HIGH.value(),
59 }
60 }
61}
62
63#[derive(Debug)]
65pub struct StreamProcessor {
66 config: StreamConfig,
67 buffer: Vec<StreamFrame>,
68 processed_count: usize,
69}
70
71impl StreamProcessor {
72 pub fn new(config: StreamConfig) -> Self {
74 Self {
75 config,
76 buffer: Vec::new(),
77 processed_count: 0,
78 }
79 }
80
81 pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
83 let frame_size = serde_json::to_string(&frame.data)
85 .map_err(|e| crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}")))?
86 .len();
87
88 if frame_size > self.config.max_frame_size {
89 return Ok(ProcessResult::Error(format!(
90 "Frame size {} exceeds maximum {}",
91 frame_size,
92 self.config.max_frame_size
93 )));
94 }
95
96 self.buffer.push(frame);
98 self.processed_count += 1;
99
100 if self.buffer.len() >= self.config.buffer_size {
102 let frames = self.buffer.drain(..).collect();
103 Ok(ProcessResult::Complete(frames))
104 } else {
105 Ok(ProcessResult::Incomplete)
106 }
107 }
108
109 pub fn flush(&mut self) -> Vec<StreamFrame> {
111 self.buffer.drain(..).collect()
112 }
113
114 pub fn stats(&self) -> StreamStats {
116 StreamStats {
117 processed_frames: self.processed_count,
118 buffered_frames: self.buffer.len(),
119 buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamStats {
127 pub processed_frames: usize,
128 pub buffered_frames: usize,
129 pub buffer_utilization: f32,
130}
131
132pub use compression_integration::{
134 CompressedFrame, StreamingCompressor, StreamingDecompressor,
135 CompressionStats, DecompressionStats, DecompressionMetadata,
136};
137pub use priority::{PriorityStreamer, PriorityStreamFrame};
138pub use reconstruction::JsonReconstructor;
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use serde_json::json;
144
145 #[test]
146 fn test_stream_processor_basic() {
147 let config = StreamConfig::default();
148 let mut processor = StreamProcessor::new(config);
149
150 let frame = StreamFrame {
151 data: json!({"test": "data"}),
152 priority: Priority::MEDIUM,
153 metadata: HashMap::new(),
154 };
155
156 let result = processor.process_frame(frame).unwrap();
157 assert!(matches!(result, ProcessResult::Incomplete));
158
159 let stats = processor.stats();
160 assert_eq!(stats.processed_frames, 1);
161 assert_eq!(stats.buffered_frames, 1);
162 }
163
164 #[test]
165 fn test_stream_frame_creation() {
166 let frame = StreamFrame {
167 data: json!({"message": "hello"}),
168 priority: Priority::HIGH,
169 metadata: {
170 let mut meta = HashMap::new();
171 meta.insert("source".to_string(), "test".to_string());
172 meta
173 },
174 };
175
176 assert_eq!(frame.priority, Priority::HIGH);
177 assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
178 }
179
180 #[test]
181 fn test_buffer_flush() {
182 let config = StreamConfig {
183 buffer_size: 2,
184 ..StreamConfig::default()
185 };
186 let mut processor = StreamProcessor::new(config);
187
188 let frame1 = StreamFrame {
190 data: json!({"id": 1}),
191 priority: Priority::MEDIUM,
192 metadata: HashMap::new(),
193 };
194 let result1 = processor.process_frame(frame1).unwrap();
195 assert!(matches!(result1, ProcessResult::Incomplete));
196
197 let frame2 = StreamFrame {
199 data: json!({"id": 2}),
200 priority: Priority::MEDIUM,
201 metadata: HashMap::new(),
202 };
203 let result2 = processor.process_frame(frame2).unwrap();
204
205 match result2 {
206 ProcessResult::Complete(frames) => {
207 assert_eq!(frames.len(), 2);
208 }
209 _ => panic!("Expected Complete with 2 frames"),
210 }
211 }
212}