pjson_rs/stream/
mod.rs

1//! Streaming system for PJS protocol
2//!
3//! Provides frame-based streaming with priority-aware processing
4//! and compression integration.
5
6pub mod compression_integration;
7// TODO: Reconcile with legacy priority and reconstruction modules
8pub mod priority;
9// pub mod reconstruction;
10
11use crate::domain::{Priority, DomainResult};
12use serde_json::Value as JsonValue;
13use std::collections::HashMap;
14
15/// Stream frame with priority and metadata
16#[derive(Debug, Clone, serde::Serialize)]
17pub struct StreamFrame {
18    /// JSON data payload
19    pub data: JsonValue,
20    /// Frame priority for streaming order
21    pub priority: Priority,
22    /// Additional metadata for processing
23    pub metadata: HashMap<String, String>,
24}
25
26/// Stream processing result
27#[derive(Debug, Clone)]
28pub enum ProcessResult {
29    /// Frame processed successfully
30    Processed(StreamFrame),
31    /// Frame needs more data to process
32    Incomplete,
33    /// Processing completed
34    Complete(Vec<StreamFrame>),
35    /// Processing failed
36    Error(String),
37}
38
39/// Stream processor configuration
40#[derive(Debug, Clone)]
41pub struct StreamConfig {
42    /// Maximum frame size in bytes
43    pub max_frame_size: usize,
44    /// Buffer size for incomplete frames
45    pub buffer_size: usize,
46    /// Enable compression
47    pub enable_compression: bool,
48    /// Priority threshold for critical frames
49    pub priority_threshold: u8,
50}
51
52impl Default for StreamConfig {
53    fn default() -> Self {
54        Self {
55            max_frame_size: 1024 * 1024, // 1MB
56            buffer_size: 64,
57            enable_compression: true,
58            priority_threshold: Priority::HIGH.value(),
59        }
60    }
61}
62
63/// Stream processor for handling PJS frames
64#[derive(Debug)]
65pub struct StreamProcessor {
66    config: StreamConfig,
67    buffer: Vec<StreamFrame>,
68    processed_count: usize,
69}
70
71impl StreamProcessor {
72    /// Create new stream processor
73    pub fn new(config: StreamConfig) -> Self {
74        Self {
75            config,
76            buffer: Vec::new(),
77            processed_count: 0,
78        }
79    }
80
81    /// Process a stream frame
82    pub fn process_frame(&mut self, frame: StreamFrame) -> DomainResult<ProcessResult> {
83        // Check frame size
84        let frame_size = serde_json::to_string(&frame.data)
85            .map_err(|e| crate::domain::DomainError::Logic(format!("JSON serialization failed: {e}")))?
86            .len();
87            
88        if frame_size > self.config.max_frame_size {
89            return Ok(ProcessResult::Error(format!(
90                "Frame size {} exceeds maximum {}", 
91                frame_size, 
92                self.config.max_frame_size
93            )));
94        }
95
96        // Add to buffer
97        self.buffer.push(frame);
98        self.processed_count += 1;
99
100        // Check if we should flush buffer
101        if self.buffer.len() >= self.config.buffer_size {
102            let frames = self.buffer.drain(..).collect();
103            Ok(ProcessResult::Complete(frames))
104        } else {
105            Ok(ProcessResult::Incomplete)
106        }
107    }
108
109    /// Flush remaining frames
110    pub fn flush(&mut self) -> Vec<StreamFrame> {
111        self.buffer.drain(..).collect()
112    }
113
114    /// Get processing statistics
115    pub fn stats(&self) -> StreamStats {
116        StreamStats {
117            processed_frames: self.processed_count,
118            buffered_frames: self.buffer.len(),
119            buffer_utilization: self.buffer.len() as f32 / self.config.buffer_size as f32,
120        }
121    }
122}
123
124/// Stream processing statistics
125#[derive(Debug, Clone)]
126pub struct StreamStats {
127    pub processed_frames: usize,
128    pub buffered_frames: usize, 
129    pub buffer_utilization: f32,
130}
131
132// Re-export key types
133pub use compression_integration::{
134    CompressedFrame, StreamingCompressor, StreamingDecompressor,
135    CompressionStats, DecompressionStats, DecompressionMetadata,
136};
137pub use priority::PriorityStreamer;
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use serde_json::json;
143
144    #[test]
145    fn test_stream_processor_basic() {
146        let config = StreamConfig::default();
147        let mut processor = StreamProcessor::new(config);
148
149        let frame = StreamFrame {
150            data: json!({"test": "data"}),
151            priority: Priority::MEDIUM,
152            metadata: HashMap::new(),
153        };
154
155        let result = processor.process_frame(frame).unwrap();
156        assert!(matches!(result, ProcessResult::Incomplete));
157
158        let stats = processor.stats();
159        assert_eq!(stats.processed_frames, 1);
160        assert_eq!(stats.buffered_frames, 1);
161    }
162
163    #[test]
164    fn test_stream_frame_creation() {
165        let frame = StreamFrame {
166            data: json!({"message": "hello"}),
167            priority: Priority::HIGH,
168            metadata: {
169                let mut meta = HashMap::new();
170                meta.insert("source".to_string(), "test".to_string());
171                meta
172            },
173        };
174
175        assert_eq!(frame.priority, Priority::HIGH);
176        assert_eq!(frame.metadata.get("source"), Some(&"test".to_string()));
177    }
178
179    #[test]
180    fn test_buffer_flush() {
181        let config = StreamConfig {
182            buffer_size: 2,
183            ..StreamConfig::default()
184        };
185        let mut processor = StreamProcessor::new(config);
186
187        // Add first frame
188        let frame1 = StreamFrame {
189            data: json!({"id": 1}),
190            priority: Priority::MEDIUM,
191            metadata: HashMap::new(),
192        };
193        let result1 = processor.process_frame(frame1).unwrap();
194        assert!(matches!(result1, ProcessResult::Incomplete));
195
196        // Add second frame - should trigger flush
197        let frame2 = StreamFrame {
198            data: json!({"id": 2}),
199            priority: Priority::MEDIUM,
200            metadata: HashMap::new(),
201        };
202        let result2 = processor.process_frame(frame2).unwrap();
203        
204        match result2 {
205            ProcessResult::Complete(frames) => {
206                assert_eq!(frames.len(), 2);
207            }
208            _ => panic!("Expected Complete with 2 frames"),
209        }
210    }
211}