pjson_rs/stream/
mod.rs

1//! Streaming system for PJS protocol
2//!
3//! Provides frame-based streaming with priority-aware processing
4//! and compression integration.
5
6pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Custom serde for Priority in stream module
15mod serde_priority {
16    use crate::domain::Priority;
17    use serde::{Serialize, Serializer};
18
19    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20    where
21        S: Serializer,
22    {
23        priority.value().serialize(serializer)
24    }
25}
26
27/// Stream frame with priority and metadata
28#[derive(Debug, Clone, serde::Serialize)]
29pub struct StreamFrame {
30    /// JSON data payload
31    pub data: JsonValue,
32    /// Frame priority for streaming order
33    #[serde(with = "serde_priority")]
34    pub priority: Priority,
35    /// Additional metadata for processing
36    pub metadata: HashMap<String, String>,
37}
38
39/// Stream processing result
40#[derive(Debug, Clone)]
41pub enum ProcessResult {
42    /// Frame processed successfully
43    Processed(StreamFrame),
44    /// Frame needs more data to process
45    Incomplete,
46    /// Processing completed
47    Complete(Vec<StreamFrame>),
48    /// Processing failed
49    Error(String),
50}
51
52/// Stream processor configuration
53#[derive(Debug, Clone)]
54pub struct StreamConfig {
55    /// Maximum frame size in bytes
56    pub max_frame_size: usize,
57    /// Buffer size for incomplete frames
58    pub buffer_size: usize,
59    /// Enable compression
60    pub enable_compression: bool,
61    /// Priority threshold for critical frames
62    pub priority_threshold: u8,
63}
64
65impl Default for StreamConfig {
66    fn default() -> Self {
67        Self {
68            max_frame_size: 1024 * 1024, // 1MB
69            buffer_size: 64,
70            enable_compression: true,
71            priority_threshold: Priority::HIGH.value(),
72        }
73    }
74}
75
76/// Stream processor for handling PJS frames
77#[derive(Debug)]
78pub struct StreamProcessor {
79    config: StreamConfig,
80    buffer: Vec<StreamFrame>,
81    processed_count: usize,
82}
83
84impl StreamProcessor {
85    /// Create new stream processor
86    pub fn new(config: StreamConfig) -> Self {
87        Self {
88            config,
89            buffer: Vec::new(),
90            processed_count: 0,
91        }
92    }
93
94    /// Process a stream frame
95    pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
96        // Check frame size
97        let frame_size = serde_json::to_string(&frame.data)
98            .map_err(|e| {
99                crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
100            })?
101            .len();
102
103        if frame_size > self.config.max_frame_size {
104            return Ok(ProcessResult::Error(format!(
105                "Frame size {} exceeds maximum {}",
106                frame_size, self.config.max_frame_size
107            )));
108        }
109
110        // Add to buffer
111        self.buffer.push(frame);
112        self.processed_count += 1;
113
114        // Check if we should flush buffer
115        if self.buffer.len() >= self.config.buffer_size {
116            let frames = self.buffer.drain(..).collect();
117            Ok(ProcessResult::Complete(frames))
118        } else {
119            Ok(ProcessResult::Incomplete)
120        }
121    }
122
123    /// Flush remaining frames
124    pub fn flush(&mut self) -> Vec<StreamFrame> {
125        self.buffer.drain(..).collect()
126    }
127
128    /// Get processing statistics
129    pub fn stats(&self) -> StreamStats {
130        StreamStats {
131            processed_frames: self.processed_count,
132            buffered_frames: self.buffer.len(),
133            buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
134        }
135    }
136}
137
138/// Stream processing statistics
139#[derive(Debug, Clone)]
140pub struct StreamStats {
141    pub processed_frames: usize,
142    pub buffered_frames: usize,
143    pub buffer_utilization: f32,
144}
145
146// Re-export key types
147pub use compression_integration::{
148    CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
149    StreamingCompressor, StreamingDecompressor,
150};
151pub use priority::{PriorityStreamFrame, PriorityStreamer};
152pub use reconstruction::JsonReconstructor;
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use serde_json::json;
158
159    #[test]
160    fn test_stream_processor_basic() {
161        let config = StreamConfig::default();
162        let mut processor = StreamProcessor::new(config);
163
164        let frame = StreamFrame {
165            data: json!({"test": "data"}),
166            priority: Priority::MEDIUM,
167            metadata: HashMap::new(),
168        };
169
170        let result = processor.process_frame(frame).unwrap();
171        assert!(matches!(result, ProcessResult::Incomplete));
172
173        let stats = processor.stats();
174        assert_eq!(stats.processed_frames, 1);
175        assert_eq!(stats.buffered_frames, 1);
176    }
177
178    #[test]
179    fn test_stream_frame_creation() {
180        let frame = StreamFrame {
181            data: json!({"message": "hello"}),
182            priority: Priority::HIGH,
183            metadata: {
184                let mut meta = HashMap::new();
185                meta.insert("source".to_string(), "test".to_string());
186                meta
187            },
188        };
189
190        assert_eq!(frame.priority, Priority::HIGH);
191        assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
192    }
193
194    #[test]
195    fn test_buffer_flush() {
196        let config = StreamConfig {
197            buffer_size: 2,
198            ..StreamConfig::default()
199        };
200        let mut processor = StreamProcessor::new(config);
201
202        // Add first frame
203        let frame1 = StreamFrame {
204            data: json!({"id": 1}),
205            priority: Priority::MEDIUM,
206            metadata: HashMap::new(),
207        };
208        let result1 = processor.process_frame(frame1).unwrap();
209        assert!(matches!(result1, ProcessResult::Incomplete));
210
211        // Add second frame - should trigger flush
212        let frame2 = StreamFrame {
213            data: json!({"id": 2}),
214            priority: Priority::MEDIUM,
215            metadata: HashMap::new(),
216        };
217        let result2 = processor.process_frame(frame2).unwrap();
218
219        match result2 {
220            ProcessResult::Complete(frames) => {
221                assert_eq!(frames.len(), 2);
222            }
223            _ => panic!("Expected Complete with 2 frames"),
224        }
225    }
226}