1pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14mod serde_priority {
16 use crate::domain::Priority;
17 use serde::{Serialize, Serializer};
18
19 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20 where
21 S: Serializer,
22 {
23 priority.value().serialize(serializer)
24 }
25}
26
27#[derive(Debug, Clone, serde::Serialize)]
29pub struct StreamFrame {
30 pub data: JsonValue,
32 #[serde(with = "serde_priority")]
34 pub priority: Priority,
35 pub metadata: HashMap<String, String>,
37}
38
39#[derive(Debug, Clone)]
41pub enum ProcessResult {
42 Processed(StreamFrame),
44 Complete(Vec<StreamFrame>),
46 Error(String),
48}
49
50#[derive(Debug, Clone)]
52pub struct StreamConfig {
53 pub max_frame_size: usize,
55 pub enable_compression: bool,
57 pub priority_threshold: u8,
59}
60
61impl Default for StreamConfig {
62 fn default() -> Self {
63 Self {
64 max_frame_size: 1024 * 1024, enable_compression: true,
66 priority_threshold: Priority::HIGH.value(),
67 }
68 }
69}
70
71#[derive(Debug)]
73pub struct StreamProcessor {
74 config: StreamConfig,
75 processed_count: usize,
76}
77
78impl StreamProcessor {
79 pub fn new(config: StreamConfig) -> Self {
81 Self {
82 config,
83 processed_count: 0,
84 }
85 }
86
87 pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
94 let frame_size = serde_json::to_string(&frame.data)
95 .map_err(|e| {
96 crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
97 })?
98 .len();
99
100 if frame_size > self.config.max_frame_size {
101 return Ok(ProcessResult::Error(format!(
102 "Frame size {} exceeds maximum {}",
103 frame_size, self.config.max_frame_size
104 )));
105 }
106
107 self.processed_count += 1;
108 Ok(ProcessResult::Processed(frame))
109 }
110
111 pub fn stats(&self) -> StreamStats {
113 StreamStats {
114 processed_frames: self.processed_count,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
121pub struct StreamStats {
122 pub processed_frames: usize,
124}
125
126pub use compression_integration::{
128 CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
129 StreamingCompressor, StreamingDecompressor,
130};
131pub use priority::{PriorityStreamFrame, PriorityStreamer};
132pub use reconstruction::JsonReconstructor;
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use serde_json::json;
138
139 #[test]
140 fn test_process_frame_returns_processed_immediately() {
141 let config = StreamConfig::default();
142 let mut processor = StreamProcessor::new(config);
143
144 let frame = StreamFrame {
145 data: json!({"test": "data"}),
146 priority: Priority::MEDIUM,
147 metadata: HashMap::new(),
148 };
149
150 let result = processor.process_frame(frame).unwrap();
151 assert!(matches!(result, ProcessResult::Processed(_)));
152
153 let stats = processor.stats();
154 assert_eq!(stats.processed_frames, 1);
155 }
156
157 #[test]
158 fn test_processed_result_contains_original_frame() {
159 let config = StreamConfig::default();
160 let mut processor = StreamProcessor::new(config);
161
162 let frame = StreamFrame {
163 data: json!({"message": "hello"}),
164 priority: Priority::HIGH,
165 metadata: HashMap::new(),
166 };
167
168 match processor.process_frame(frame).unwrap() {
169 ProcessResult::Processed(f) => {
170 assert_eq!(f.priority, Priority::HIGH);
171 assert_eq!(f.data, json!({"message": "hello"}));
172 }
173 other => panic!("Expected Processed, got {other:?}"),
174 }
175 }
176
177 #[test]
178 fn test_stream_frame_creation() {
179 let frame = StreamFrame {
180 data: json!({"message": "hello"}),
181 priority: Priority::HIGH,
182 metadata: {
183 let mut meta = HashMap::new();
184 meta.insert("source".to_string(), "test".to_string());
185 meta
186 },
187 };
188
189 assert_eq!(frame.priority, Priority::HIGH);
190 assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
191 }
192
193 #[test]
194 fn test_oversized_frame_returns_error() {
195 let config = StreamConfig {
196 max_frame_size: 10, ..StreamConfig::default()
198 };
199 let mut processor = StreamProcessor::new(config);
200
201 let frame = StreamFrame {
202 data: json!({"large_key": "this value is definitely over ten bytes"}),
203 priority: Priority::MEDIUM,
204 metadata: HashMap::new(),
205 };
206
207 let result = processor.process_frame(frame).unwrap();
208 assert!(matches!(result, ProcessResult::Error(_)));
209 }
210
211 #[test]
212 fn test_multiple_frames_each_processed_immediately() {
213 let config = StreamConfig::default();
214 let mut processor = StreamProcessor::new(config);
215
216 for i in 0..3u32 {
217 let frame = StreamFrame {
218 data: json!({"id": i}),
219 priority: Priority::MEDIUM,
220 metadata: HashMap::new(),
221 };
222 assert!(matches!(
223 processor.process_frame(frame).unwrap(),
224 ProcessResult::Processed(_)
225 ));
226 }
227
228 assert_eq!(processor.stats().processed_frames, 3);
229 }
230}