Skip to main content

pjson_rs/stream/
mod.rs

1//! Streaming system for PJS protocol
2//!
3//! Provides frame-based streaming with priority-aware processing
4//! and compression integration.
5
6pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Custom serde for Priority in stream module
15mod serde_priority {
16    use crate::domain::Priority;
17    use serde::{Serialize, Serializer};
18
19    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20    where
21        S: Serializer,
22    {
23        priority.value().serialize(serializer)
24    }
25}
26
27/// Stream frame with priority and metadata
28#[derive(Debug, Clone, serde::Serialize)]
29pub struct StreamFrame {
30    /// JSON data payload
31    pub data: JsonValue,
32    /// Frame priority for streaming order
33    #[serde(with = "serde_priority")]
34    pub priority: Priority,
35    /// Additional metadata for processing
36    pub metadata: HashMap<String, String>,
37}
38
39/// Stream processing result
40#[derive(Debug, Clone)]
41pub enum ProcessResult {
42    /// Frame processed successfully and ready to stream to the client
43    Processed(StreamFrame),
44    /// Processing completed — returned when the stream is explicitly flushed
45    Complete(Vec<StreamFrame>),
46    /// Frame rejected: exceeds the configured size limit
47    Error(String),
48}
49
50/// Stream processor configuration
51#[derive(Debug, Clone)]
52pub struct StreamConfig {
53    /// Maximum frame size in bytes
54    pub max_frame_size: usize,
55    /// Enable compression
56    pub enable_compression: bool,
57    /// Priority threshold for critical frames
58    pub priority_threshold: u8,
59}
60
61impl Default for StreamConfig {
62    fn default() -> Self {
63        Self {
64            max_frame_size: 1024 * 1024, // 1MB
65            enable_compression: true,
66            priority_threshold: Priority::HIGH.value(),
67        }
68    }
69}
70
71/// Stream processor for handling PJS frames
72#[derive(Debug)]
73pub struct StreamProcessor {
74    config: StreamConfig,
75    processed_count: usize,
76}
77
78impl StreamProcessor {
79    /// Create new stream processor
80    pub fn new(config: StreamConfig) -> Self {
81        Self {
82            config,
83            processed_count: 0,
84        }
85    }
86
87    /// Process a stream frame.
88    ///
89    /// Each accepted frame immediately returns [`ProcessResult::Processed`] so
90    /// callers can stream it to clients without waiting for a buffer to fill.
91    /// Returns [`ProcessResult::Error`] only when the frame exceeds the
92    /// configured size limit.
93    pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
94        let frame_size = serde_json::to_string(&frame.data)
95            .map_err(|e| {
96                crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
97            })?
98            .len();
99
100        if frame_size > self.config.max_frame_size {
101            return Ok(ProcessResult::Error(format!(
102                "Frame size {} exceeds maximum {}",
103                frame_size, self.config.max_frame_size
104            )));
105        }
106
107        self.processed_count += 1;
108        Ok(ProcessResult::Processed(frame))
109    }
110
111    /// Get processing statistics
112    pub fn stats(&self) -> StreamStats {
113        StreamStats {
114            processed_frames: self.processed_count,
115        }
116    }
117}
118
119/// Stream processing statistics
120#[derive(Debug, Clone)]
121pub struct StreamStats {
122    /// Number of frames processed by the stream.
123    pub processed_frames: usize,
124}
125
126// Re-export key types
127pub use compression_integration::{
128    CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
129    StreamingCompressor, StreamingDecompressor,
130};
131pub use priority::{PriorityStreamFrame, PriorityStreamer};
132pub use reconstruction::JsonReconstructor;
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use serde_json::json;
138
139    #[test]
140    fn test_process_frame_returns_processed_immediately() {
141        let config = StreamConfig::default();
142        let mut processor = StreamProcessor::new(config);
143
144        let frame = StreamFrame {
145            data: json!({"test": "data"}),
146            priority: Priority::MEDIUM,
147            metadata: HashMap::new(),
148        };
149
150        let result = processor.process_frame(frame).unwrap();
151        assert!(matches!(result, ProcessResult::Processed(_)));
152
153        let stats = processor.stats();
154        assert_eq!(stats.processed_frames, 1);
155    }
156
157    #[test]
158    fn test_processed_result_contains_original_frame() {
159        let config = StreamConfig::default();
160        let mut processor = StreamProcessor::new(config);
161
162        let frame = StreamFrame {
163            data: json!({"message": "hello"}),
164            priority: Priority::HIGH,
165            metadata: HashMap::new(),
166        };
167
168        match processor.process_frame(frame).unwrap() {
169            ProcessResult::Processed(f) => {
170                assert_eq!(f.priority, Priority::HIGH);
171                assert_eq!(f.data, json!({"message": "hello"}));
172            }
173            other => panic!("Expected Processed, got {other:?}"),
174        }
175    }
176
177    #[test]
178    fn test_stream_frame_creation() {
179        let frame = StreamFrame {
180            data: json!({"message": "hello"}),
181            priority: Priority::HIGH,
182            metadata: {
183                let mut meta = HashMap::new();
184                meta.insert("source".to_string(), "test".to_string());
185                meta
186            },
187        };
188
189        assert_eq!(frame.priority, Priority::HIGH);
190        assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
191    }
192
193    #[test]
194    fn test_oversized_frame_returns_error() {
195        let config = StreamConfig {
196            max_frame_size: 10, // very small limit
197            ..StreamConfig::default()
198        };
199        let mut processor = StreamProcessor::new(config);
200
201        let frame = StreamFrame {
202            data: json!({"large_key": "this value is definitely over ten bytes"}),
203            priority: Priority::MEDIUM,
204            metadata: HashMap::new(),
205        };
206
207        let result = processor.process_frame(frame).unwrap();
208        assert!(matches!(result, ProcessResult::Error(_)));
209    }
210
211    #[test]
212    fn test_multiple_frames_each_processed_immediately() {
213        let config = StreamConfig::default();
214        let mut processor = StreamProcessor::new(config);
215
216        for i in 0..3u32 {
217            let frame = StreamFrame {
218                data: json!({"id": i}),
219                priority: Priority::MEDIUM,
220                metadata: HashMap::new(),
221            };
222            assert!(matches!(
223                processor.process_frame(frame).unwrap(),
224                ProcessResult::Processed(_)
225            ));
226        }
227
228        assert_eq!(processor.stats().processed_frames, 3);
229    }
230}