Skip to main content

pjson_rs/stream/
mod.rs

1//! Streaming system for PJS protocol
2//!
3//! Provides frame-based streaming with priority-aware processing
4//! and compression integration.
5
6pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Custom serde for Priority in stream module
15mod serde_priority {
16    use crate::domain::Priority;
17    use serde::{Serialize, Serializer};
18
19    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20    where
21        S: Serializer,
22    {
23        priority.value().serialize(serializer)
24    }
25}
26
27/// Stream frame with priority and metadata
28#[derive(Debug, Clone, serde::Serialize)]
29pub struct StreamFrame {
30    /// JSON data payload
31    pub data: JsonValue,
32    /// Frame priority for streaming order
33    #[serde(with = "serde_priority")]
34    pub priority: Priority,
35    /// Additional metadata for processing
36    pub metadata: HashMap<String, String>,
37}
38
39/// Stream processing result
40#[derive(Debug, Clone)]
41pub enum ProcessResult {
42    /// Frame processed successfully and ready to stream to the client
43    Processed(StreamFrame),
44    /// Processing completed — returned when the stream is explicitly flushed
45    Complete(Vec<StreamFrame>),
46    /// Frame rejected: exceeds the configured size limit
47    Error(String),
48}
49
50/// Stream processor configuration
51#[derive(Debug, Clone)]
52pub struct StreamConfig {
53    /// Maximum frame size in bytes
54    pub max_frame_size: usize,
55    /// Enable compression
56    pub enable_compression: bool,
57    /// Priority threshold for critical frames
58    pub priority_threshold: u8,
59}
60
61impl Default for StreamConfig {
62    fn default() -> Self {
63        Self {
64            max_frame_size: 1024 * 1024, // 1MB
65            enable_compression: true,
66            priority_threshold: Priority::HIGH.value(),
67        }
68    }
69}
70
71/// Stream processor for handling PJS frames
72#[derive(Debug)]
73pub struct StreamProcessor {
74    config: StreamConfig,
75    processed_count: usize,
76}
77
78impl StreamProcessor {
79    /// Create new stream processor
80    pub fn new(config: StreamConfig) -> Self {
81        Self {
82            config,
83            processed_count: 0,
84        }
85    }
86
87    /// Process a stream frame.
88    ///
89    /// Each accepted frame immediately returns [`ProcessResult::Processed`] so
90    /// callers can stream it to clients without waiting for a buffer to fill.
91    /// Returns [`ProcessResult::Error`] only when the frame exceeds the
92    /// configured size limit.
93    pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
94        let frame_size = serde_json::to_string(&frame.data)
95            .map_err(|e| {
96                crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
97            })?
98            .len();
99
100        if frame_size > self.config.max_frame_size {
101            return Ok(ProcessResult::Error(format!(
102                "Frame size {} exceeds maximum {}",
103                frame_size, self.config.max_frame_size
104            )));
105        }
106
107        self.processed_count += 1;
108        Ok(ProcessResult::Processed(frame))
109    }
110
111    /// Get processing statistics
112    pub fn stats(&self) -> StreamStats {
113        StreamStats {
114            processed_frames: self.processed_count,
115        }
116    }
117}
118
119/// Stream processing statistics
120#[derive(Debug, Clone)]
121pub struct StreamStats {
122    pub processed_frames: usize,
123}
124
125// Re-export key types
126pub use compression_integration::{
127    CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
128    StreamingCompressor, StreamingDecompressor,
129};
130pub use priority::{PriorityStreamFrame, PriorityStreamer};
131pub use reconstruction::JsonReconstructor;
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use serde_json::json;
137
138    #[test]
139    fn test_process_frame_returns_processed_immediately() {
140        let config = StreamConfig::default();
141        let mut processor = StreamProcessor::new(config);
142
143        let frame = StreamFrame {
144            data: json!({"test": "data"}),
145            priority: Priority::MEDIUM,
146            metadata: HashMap::new(),
147        };
148
149        let result = processor.process_frame(frame).unwrap();
150        assert!(matches!(result, ProcessResult::Processed(_)));
151
152        let stats = processor.stats();
153        assert_eq!(stats.processed_frames, 1);
154    }
155
156    #[test]
157    fn test_processed_result_contains_original_frame() {
158        let config = StreamConfig::default();
159        let mut processor = StreamProcessor::new(config);
160
161        let frame = StreamFrame {
162            data: json!({"message": "hello"}),
163            priority: Priority::HIGH,
164            metadata: HashMap::new(),
165        };
166
167        match processor.process_frame(frame).unwrap() {
168            ProcessResult::Processed(f) => {
169                assert_eq!(f.priority, Priority::HIGH);
170                assert_eq!(f.data, json!({"message": "hello"}));
171            }
172            other => panic!("Expected Processed, got {other:?}"),
173        }
174    }
175
176    #[test]
177    fn test_stream_frame_creation() {
178        let frame = StreamFrame {
179            data: json!({"message": "hello"}),
180            priority: Priority::HIGH,
181            metadata: {
182                let mut meta = HashMap::new();
183                meta.insert("source".to_string(), "test".to_string());
184                meta
185            },
186        };
187
188        assert_eq!(frame.priority, Priority::HIGH);
189        assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
190    }
191
192    #[test]
193    fn test_oversized_frame_returns_error() {
194        let config = StreamConfig {
195            max_frame_size: 10, // very small limit
196            ..StreamConfig::default()
197        };
198        let mut processor = StreamProcessor::new(config);
199
200        let frame = StreamFrame {
201            data: json!({"large_key": "this value is definitely over ten bytes"}),
202            priority: Priority::MEDIUM,
203            metadata: HashMap::new(),
204        };
205
206        let result = processor.process_frame(frame).unwrap();
207        assert!(matches!(result, ProcessResult::Error(_)));
208    }
209
210    #[test]
211    fn test_multiple_frames_each_processed_immediately() {
212        let config = StreamConfig::default();
213        let mut processor = StreamProcessor::new(config);
214
215        for i in 0..3u32 {
216            let frame = StreamFrame {
217                data: json!({"id": i}),
218                priority: Priority::MEDIUM,
219                metadata: HashMap::new(),
220            };
221            assert!(matches!(
222                processor.process_frame(frame).unwrap(),
223                ProcessResult::Processed(_)
224            ));
225        }
226
227        assert_eq!(processor.stats().processed_frames, 3);
228    }
229}