1pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14mod serde_priority {
16 use crate::domain::Priority;
17 use serde::{Serialize, Serializer};
18
19 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20 where
21 S: Serializer,
22 {
23 priority.value().serialize(serializer)
24 }
25}
26
27#[derive(Debug, Clone, serde::Serialize)]
29pub struct StreamFrame {
30 pub data: JsonValue,
32 #[serde(with = "serde_priority")]
34 pub priority: Priority,
35 pub metadata: HashMap<String, String>,
37}
38
39#[derive(Debug, Clone)]
41pub enum ProcessResult {
42 Processed(StreamFrame),
44 Complete(Vec<StreamFrame>),
46 Error(String),
48}
49
50#[derive(Debug, Clone)]
52pub struct StreamConfig {
53 pub max_frame_size: usize,
55 pub enable_compression: bool,
57 pub priority_threshold: u8,
59}
60
61impl Default for StreamConfig {
62 fn default() -> Self {
63 Self {
64 max_frame_size: 1024 * 1024, enable_compression: true,
66 priority_threshold: Priority::HIGH.value(),
67 }
68 }
69}
70
71#[derive(Debug)]
73pub struct StreamProcessor {
74 config: StreamConfig,
75 processed_count: usize,
76}
77
78impl StreamProcessor {
79 pub fn new(config: StreamConfig) -> Self {
81 Self {
82 config,
83 processed_count: 0,
84 }
85 }
86
87 pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
94 let frame_size = serde_json::to_string(&frame.data)
95 .map_err(|e| {
96 crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
97 })?
98 .len();
99
100 if frame_size > self.config.max_frame_size {
101 return Ok(ProcessResult::Error(format!(
102 "Frame size {} exceeds maximum {}",
103 frame_size, self.config.max_frame_size
104 )));
105 }
106
107 self.processed_count += 1;
108 Ok(ProcessResult::Processed(frame))
109 }
110
111 pub fn stats(&self) -> StreamStats {
113 StreamStats {
114 processed_frames: self.processed_count,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
121pub struct StreamStats {
122 pub processed_frames: usize,
123}
124
125pub use compression_integration::{
127 CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
128 StreamingCompressor, StreamingDecompressor,
129};
130pub use priority::{PriorityStreamFrame, PriorityStreamer};
131pub use reconstruction::JsonReconstructor;
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use serde_json::json;
137
138 #[test]
139 fn test_process_frame_returns_processed_immediately() {
140 let config = StreamConfig::default();
141 let mut processor = StreamProcessor::new(config);
142
143 let frame = StreamFrame {
144 data: json!({"test": "data"}),
145 priority: Priority::MEDIUM,
146 metadata: HashMap::new(),
147 };
148
149 let result = processor.process_frame(frame).unwrap();
150 assert!(matches!(result, ProcessResult::Processed(_)));
151
152 let stats = processor.stats();
153 assert_eq!(stats.processed_frames, 1);
154 }
155
156 #[test]
157 fn test_processed_result_contains_original_frame() {
158 let config = StreamConfig::default();
159 let mut processor = StreamProcessor::new(config);
160
161 let frame = StreamFrame {
162 data: json!({"message": "hello"}),
163 priority: Priority::HIGH,
164 metadata: HashMap::new(),
165 };
166
167 match processor.process_frame(frame).unwrap() {
168 ProcessResult::Processed(f) => {
169 assert_eq!(f.priority, Priority::HIGH);
170 assert_eq!(f.data, json!({"message": "hello"}));
171 }
172 other => panic!("Expected Processed, got {other:?}"),
173 }
174 }
175
176 #[test]
177 fn test_stream_frame_creation() {
178 let frame = StreamFrame {
179 data: json!({"message": "hello"}),
180 priority: Priority::HIGH,
181 metadata: {
182 let mut meta = HashMap::new();
183 meta.insert("source".to_string(), "test".to_string());
184 meta
185 },
186 };
187
188 assert_eq!(frame.priority, Priority::HIGH);
189 assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
190 }
191
192 #[test]
193 fn test_oversized_frame_returns_error() {
194 let config = StreamConfig {
195 max_frame_size: 10, ..StreamConfig::default()
197 };
198 let mut processor = StreamProcessor::new(config);
199
200 let frame = StreamFrame {
201 data: json!({"large_key": "this value is definitely over ten bytes"}),
202 priority: Priority::MEDIUM,
203 metadata: HashMap::new(),
204 };
205
206 let result = processor.process_frame(frame).unwrap();
207 assert!(matches!(result, ProcessResult::Error(_)));
208 }
209
210 #[test]
211 fn test_multiple_frames_each_processed_immediately() {
212 let config = StreamConfig::default();
213 let mut processor = StreamProcessor::new(config);
214
215 for i in 0..3u32 {
216 let frame = StreamFrame {
217 data: json!({"id": i}),
218 priority: Priority::MEDIUM,
219 metadata: HashMap::new(),
220 };
221 assert!(matches!(
222 processor.process_frame(frame).unwrap(),
223 ProcessResult::Processed(_)
224 ));
225 }
226
227 assert_eq!(processor.stats().processed_frames, 3);
228 }
229}