1pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14mod serde_priority {
16 use crate::domain::Priority;
17 use serde::{Serialize, Serializer};
18
19 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20 where
21 S: Serializer,
22 {
23 priority.value().serialize(serializer)
24 }
25}
26
27#[derive(Debug, Clone, serde::Serialize)]
29pub struct StreamFrame {
30 pub data: JsonValue,
32 #[serde(with = "serde_priority")]
34 pub priority: Priority,
35 pub metadata: HashMap<String, String>,
37}
38
39#[derive(Debug, Clone)]
41pub enum ProcessResult {
42 Processed(StreamFrame),
44 Incomplete,
46 Complete(Vec<StreamFrame>),
48 Error(String),
50}
51
52#[derive(Debug, Clone)]
54pub struct StreamConfig {
55 pub max_frame_size: usize,
57 pub buffer_size: usize,
59 pub enable_compression: bool,
61 pub priority_threshold: u8,
63}
64
65impl Default for StreamConfig {
66 fn default() -> Self {
67 Self {
68 max_frame_size: 1024 * 1024, buffer_size: 64,
70 enable_compression: true,
71 priority_threshold: Priority::HIGH.value(),
72 }
73 }
74}
75
76#[derive(Debug)]
78pub struct StreamProcessor {
79 config: StreamConfig,
80 buffer: Vec<StreamFrame>,
81 processed_count: usize,
82}
83
84impl StreamProcessor {
85 pub fn new(config: StreamConfig) -> Self {
87 Self {
88 config,
89 buffer: Vec::new(),
90 processed_count: 0,
91 }
92 }
93
94 pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
96 let frame_size = serde_json::to_string(&frame.data)
98 .map_err(|e| {
99 crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
100 })?
101 .len();
102
103 if frame_size > self.config.max_frame_size {
104 return Ok(ProcessResult::Error(format!(
105 "Frame size {} exceeds maximum {}",
106 frame_size, self.config.max_frame_size
107 )));
108 }
109
110 self.buffer.push(frame);
112 self.processed_count += 1;
113
114 if self.buffer.len() >= self.config.buffer_size {
116 let frames = self.buffer.drain(..).collect();
117 Ok(ProcessResult::Complete(frames))
118 } else {
119 Ok(ProcessResult::Incomplete)
120 }
121 }
122
123 pub fn flush(&mut self) -> Vec<StreamFrame> {
125 self.buffer.drain(..).collect()
126 }
127
128 pub fn stats(&self) -> StreamStats {
130 StreamStats {
131 processed_frames: self.processed_count,
132 buffered_frames: self.buffer.len(),
133 buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct StreamStats {
141 pub processed_frames: usize,
142 pub buffered_frames: usize,
143 pub buffer_utilization: f32,
144}
145
146pub use compression_integration::{
148 CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
149 StreamingCompressor, StreamingDecompressor,
150};
151pub use priority::{PriorityStreamFrame, PriorityStreamer};
152pub use reconstruction::JsonReconstructor;
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use serde_json::json;
158
159 #[test]
160 fn test_stream_processor_basic() {
161 let config = StreamConfig::default();
162 let mut processor = StreamProcessor::new(config);
163
164 let frame = StreamFrame {
165 data: json!({"test": "data"}),
166 priority: Priority::MEDIUM,
167 metadata: HashMap::new(),
168 };
169
170 let result = processor.process_frame(frame).unwrap();
171 assert!(matches!(result, ProcessResult::Incomplete));
172
173 let stats = processor.stats();
174 assert_eq!(stats.processed_frames, 1);
175 assert_eq!(stats.buffered_frames, 1);
176 }
177
178 #[test]
179 fn test_stream_frame_creation() {
180 let frame = StreamFrame {
181 data: json!({"message": "hello"}),
182 priority: Priority::HIGH,
183 metadata: {
184 let mut meta = HashMap::new();
185 meta.insert("source".to_string(), "test".to_string());
186 meta
187 },
188 };
189
190 assert_eq!(frame.priority, Priority::HIGH);
191 assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
192 }
193
194 #[test]
195 fn test_buffer_flush() {
196 let config = StreamConfig {
197 buffer_size: 2,
198 ..StreamConfig::default()
199 };
200 let mut processor = StreamProcessor::new(config);
201
202 let frame1 = StreamFrame {
204 data: json!({"id": 1}),
205 priority: Priority::MEDIUM,
206 metadata: HashMap::new(),
207 };
208 let result1 = processor.process_frame(frame1).unwrap();
209 assert!(matches!(result1, ProcessResult::Incomplete));
210
211 let frame2 = StreamFrame {
213 data: json!({"id": 2}),
214 priority: Priority::MEDIUM,
215 metadata: HashMap::new(),
216 };
217 let result2 = processor.process_frame(frame2).unwrap();
218
219 match result2 {
220 ProcessResult::Complete(frames) => {
221 assert_eq!(frames.len(), 2);
222 }
223 _ => panic!("Expected Complete with 2 frames"),
224 }
225 }
226}