pjson_rs/stream/
mod.rs

1//! Streaming system for PJS protocol
2//!
3//! Provides frame-based streaming with priority-aware processing
4//! and compression integration.
5
6pub mod compression_integration;
7pub mod priority;
8pub mod reconstruction;
9
10use crate::domain::{DomainResult, Priority};
11use serde_json::Value as JsonValue;
12use std::collections::HashMap;
13
14/// Stream frame with priority and metadata
15#[derive(Debug, Clone, serde::Serialize)]
16pub struct StreamFrame {
17    /// JSON data payload
18    pub data: JsonValue,
19    /// Frame priority for streaming order
20    pub priority: Priority,
21    /// Additional metadata for processing
22    pub metadata: HashMap<String, String>,
23}
24
25/// Stream processing result
26#[derive(Debug, Clone)]
27pub enum ProcessResult {
28    /// Frame processed successfully
29    Processed(StreamFrame),
30    /// Frame needs more data to process
31    Incomplete,
32    /// Processing completed
33    Complete(Vec<StreamFrame>),
34    /// Processing failed
35    Error(String),
36}
37
38/// Stream processor configuration
39#[derive(Debug, Clone)]
40pub struct StreamConfig {
41    /// Maximum frame size in bytes
42    pub max_frame_size: usize,
43    /// Buffer size for incomplete frames
44    pub buffer_size: usize,
45    /// Enable compression
46    pub enable_compression: bool,
47    /// Priority threshold for critical frames
48    pub priority_threshold: u8,
49}
50
51impl Default for StreamConfig {
52    fn default() -> Self {
53        Self {
54            max_frame_size: 1024 * 1024, // 1MB
55            buffer_size: 64,
56            enable_compression: true,
57            priority_threshold: Priority::HIGH.value(),
58        }
59    }
60}
61
62/// Stream processor for handling PJS frames
63#[derive(Debug)]
64pub struct StreamProcessor {
65    config: StreamConfig,
66    buffer: Vec<StreamFrame>,
67    processed_count: usize,
68}
69
70impl StreamProcessor {
71    /// Create new stream processor
72    pub fn new(config: StreamConfig) -> Self {
73        Self {
74            config,
75            buffer: Vec::new(),
76            processed_count: 0,
77        }
78    }
79
80    /// Process a stream frame
81    pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
82        // Check frame size
83        let frame_size = serde_json::to_string(&frame.data)
84            .map_err(|e| {
85                crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}"))
86            })?
87            .len();
88
89        if frame_size > self.config.max_frame_size {
90            return Ok(ProcessResult::Error(format!(
91                "Frame size {} exceeds maximum {}",
92                frame_size, self.config.max_frame_size
93            )));
94        }
95
96        // Add to buffer
97        self.buffer.push(frame);
98        self.processed_count += 1;
99
100        // Check if we should flush buffer
101        if self.buffer.len() >= self.config.buffer_size {
102            let frames = self.buffer.drain(..).collect();
103            Ok(ProcessResult::Complete(frames))
104        } else {
105            Ok(ProcessResult::Incomplete)
106        }
107    }
108
109    /// Flush remaining frames
110    pub fn flush(&mut self) -> Vec<StreamFrame> {
111        self.buffer.drain(..).collect()
112    }
113
114    /// Get processing statistics
115    pub fn stats(&self) -> StreamStats {
116        StreamStats {
117            processed_frames: self.processed_count,
118            buffered_frames: self.buffer.len(),
119            buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
120        }
121    }
122}
123
124/// Stream processing statistics
125#[derive(Debug, Clone)]
126pub struct StreamStats {
127    pub processed_frames: usize,
128    pub buffered_frames: usize,
129    pub buffer_utilization: f32,
130}
131
132// Re-export key types
133pub use compression_integration::{
134    CompressedFrame, CompressionStats, DecompressionMetadata, DecompressionStats,
135    StreamingCompressor, StreamingDecompressor,
136};
137pub use priority::{PriorityStreamFrame, PriorityStreamer};
138pub use reconstruction::JsonReconstructor;
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use serde_json::json;
144
145    #[test]
146    fn test_stream_processor_basic() {
147        let config = StreamConfig::default();
148        let mut processor = StreamProcessor::new(config);
149
150        let frame = StreamFrame {
151            data: json!({"test": "data"}),
152            priority: Priority::MEDIUM,
153            metadata: HashMap::new(),
154        };
155
156        let result = processor.process_frame(frame).unwrap();
157        assert!(matches!(result, ProcessResult::Incomplete));
158
159        let stats = processor.stats();
160        assert_eq!(stats.processed_frames, 1);
161        assert_eq!(stats.buffered_frames, 1);
162    }
163
164    #[test]
165    fn test_stream_frame_creation() {
166        let frame = StreamFrame {
167            data: json!({"message": "hello"}),
168            priority: Priority::HIGH,
169            metadata: {
170                let mut meta = HashMap::new();
171                meta.insert("source".to_string(), "test".to_string());
172                meta
173            },
174        };
175
176        assert_eq!(frame.priority, Priority::HIGH);
177        assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
178    }
179
180    #[test]
181    fn test_buffer_flush() {
182        let config = StreamConfig {
183            buffer_size: 2,
184            ..StreamConfig::default()
185        };
186        let mut processor = StreamProcessor::new(config);
187
188        // Add first frame
189        let frame1 = StreamFrame {
190            data: json!({"id": 1}),
191            priority: Priority::MEDIUM,
192            metadata: HashMap::new(),
193        };
194        let result1 = processor.process_frame(frame1).unwrap();
195        assert!(matches!(result1, ProcessResult::Incomplete));
196
197        // Add second frame - should trigger flush
198        let frame2 = StreamFrame {
199            data: json!({"id": 2}),
200            priority: Priority::MEDIUM,
201            metadata: HashMap::new(),
202        };
203        let result2 = processor.process_frame(frame2).unwrap();
204
205        match result2 {
206            ProcessResult::Complete(frames) => {
207                assert_eq!(frames.len(), 2);
208            }
209            _ => panic!("Expected Complete with 2 frames"),
210        }
211    }
212}