pjson_rs_domain/entities/
stream.rs

1//! Stream entity representing a prioritized data stream
2
3use crate::{
4    DomainError, DomainResult,
5    entities::Frame,
6    value_objects::{JsonData, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12/// Stream state in its lifecycle
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub enum StreamState {
15    /// Stream is being prepared
16    Preparing,
17    /// Stream is actively sending data
18    Streaming,
19    /// Stream completed successfully
20    Completed,
21    /// Stream failed with error
22    Failed,
23    /// Stream was cancelled
24    Cancelled,
25}
26
27/// Stream configuration and metadata
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct StreamConfig {
30    /// Maximum frame size in bytes
31    pub max_frame_size: usize,
32    /// Maximum frames per batch
33    pub max_frames_per_batch: usize,
34    /// Compression settings
35    pub enable_compression: bool,
36    /// Custom priority rules
37    pub priority_rules: HashMap<String, Priority>,
38}
39
40impl Default for StreamConfig {
41    fn default() -> Self {
42        Self {
43            max_frame_size: 64 * 1024, // 64KB
44            max_frames_per_batch: 10,
45            enable_compression: true,
46            priority_rules: HashMap::new(),
47        }
48    }
49}
50
51/// Stream statistics for monitoring
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct StreamStats {
54    /// Total number of frames generated
55    pub total_frames: u64,
56    /// Number of skeleton frames sent
57    pub skeleton_frames: u64,
58    /// Number of patch frames sent
59    pub patch_frames: u64,
60    /// Number of completion frames sent
61    pub complete_frames: u64,
62    /// Number of error frames sent
63    pub error_frames: u64,
64    /// Total bytes transmitted across all frames
65    pub total_bytes: u64,
66    /// Bytes transmitted in critical priority frames
67    pub critical_bytes: u64,
68    /// Bytes transmitted in high priority frames
69    pub high_priority_bytes: u64,
70    /// Average size of frames in bytes
71    pub average_frame_size: f64,
72}
73
74/// Priority data stream entity
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Stream {
77    id: StreamId,
78    session_id: SessionId,
79    state: StreamState,
80    config: StreamConfig,
81    stats: StreamStats,
82    created_at: DateTime<Utc>,
83    updated_at: DateTime<Utc>,
84    completed_at: Option<DateTime<Utc>>,
85    next_sequence: u64,
86    source_data: Option<JsonData>,
87    metadata: HashMap<String, String>,
88}
89
90impl Stream {
91    /// Create new stream
92    pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
93        let now = Utc::now();
94
95        Self {
96            id: StreamId::new(),
97            session_id,
98            state: StreamState::Preparing,
99            config,
100            stats: StreamStats::default(),
101            created_at: now,
102            updated_at: now,
103            completed_at: None,
104            next_sequence: 1,
105            source_data: Some(source_data),
106            metadata: HashMap::new(),
107        }
108    }
109
110    /// Get stream ID
111    pub fn id(&self) -> StreamId {
112        self.id
113    }
114
115    /// Get session ID
116    pub fn session_id(&self) -> SessionId {
117        self.session_id
118    }
119
120    /// Get current state
121    pub fn state(&self) -> &StreamState {
122        &self.state
123    }
124
125    /// Get configuration
126    pub fn config(&self) -> &StreamConfig {
127        &self.config
128    }
129
130    /// Get statistics
131    pub fn stats(&self) -> &StreamStats {
132        &self.stats
133    }
134
135    /// Get creation timestamp
136    pub fn created_at(&self) -> DateTime<Utc> {
137        self.created_at
138    }
139
140    /// Get last update timestamp
141    pub fn updated_at(&self) -> DateTime<Utc> {
142        self.updated_at
143    }
144
145    /// Get completion timestamp
146    pub fn completed_at(&self) -> Option<DateTime<Utc>> {
147        self.completed_at
148    }
149
150    /// Get source data
151    pub fn source_data(&self) -> Option<&JsonData> {
152        self.source_data.as_ref()
153    }
154
155    /// Get metadata
156    pub fn metadata(&self) -> &HashMap<String, String> {
157        &self.metadata
158    }
159
160    /// Add metadata
161    pub fn add_metadata(&mut self, key: String, value: String) {
162        self.metadata.insert(key, value);
163        self.update_timestamp();
164    }
165
166    /// Start streaming (transition to Streaming state)
167    pub fn start_streaming(&mut self) -> DomainResult<()> {
168        match self.state {
169            StreamState::Preparing => {
170                self.state = StreamState::Streaming;
171                self.update_timestamp();
172                Ok(())
173            }
174            _ => Err(DomainError::InvalidStateTransition(format!(
175                "Cannot start streaming from state: {:?}",
176                self.state
177            ))),
178        }
179    }
180
181    /// Complete stream successfully
182    pub fn complete(&mut self) -> DomainResult<()> {
183        match self.state {
184            StreamState::Streaming => {
185                self.state = StreamState::Completed;
186                self.completed_at = Some(Utc::now());
187                self.update_timestamp();
188                Ok(())
189            }
190            _ => Err(DomainError::InvalidStateTransition(format!(
191                "Cannot complete stream from state: {:?}",
192                self.state
193            ))),
194        }
195    }
196
197    /// Fail stream with error
198    pub fn fail(&mut self, error: String) -> DomainResult<()> {
199        match self.state {
200            StreamState::Preparing | StreamState::Streaming => {
201                self.state = StreamState::Failed;
202                self.completed_at = Some(Utc::now());
203                self.add_metadata("error".to_string(), error);
204                Ok(())
205            }
206            _ => Err(DomainError::InvalidStateTransition(format!(
207                "Cannot fail stream from state: {:?}",
208                self.state
209            ))),
210        }
211    }
212
213    /// Cancel stream
214    pub fn cancel(&mut self) -> DomainResult<()> {
215        match self.state {
216            StreamState::Preparing | StreamState::Streaming => {
217                self.state = StreamState::Cancelled;
218                self.completed_at = Some(Utc::now());
219                self.update_timestamp();
220                Ok(())
221            }
222            _ => Err(DomainError::InvalidStateTransition(format!(
223                "Cannot cancel stream from state: {:?}",
224                self.state
225            ))),
226        }
227    }
228
229    /// Generate skeleton frame for the stream
230    pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
231        if !matches!(self.state, StreamState::Streaming) {
232            return Err(DomainError::InvalidStreamState(
233                "Stream must be in streaming state to create frames".to_string(),
234            ));
235        }
236
237        let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
238            DomainError::InvalidStreamState("No source data available for skeleton".to_string())
239        })?;
240
241        let skeleton = self.generate_skeleton(skeleton_data)?;
242        let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
243
244        self.record_frame_created(&frame);
245
246        Ok(frame)
247    }
248
249    /// Create batch of patch frames based on priority
250    pub fn create_patch_frames(
251        &mut self,
252        priority_threshold: Priority,
253        max_frames: usize,
254    ) -> DomainResult<Vec<Frame>> {
255        if !matches!(self.state, StreamState::Streaming) {
256            return Err(DomainError::InvalidStreamState(
257                "Stream must be in streaming state to create frames".to_string(),
258            ));
259        }
260
261        let source_data = self.source_data.as_ref().ok_or_else(|| {
262            DomainError::InvalidStreamState("No source data available for patches".to_string())
263        })?;
264
265        let patches = self.extract_patches(source_data, priority_threshold)?;
266        let frames = self.batch_patches_into_frames(patches, max_frames)?;
267
268        for frame in &frames {
269            self.record_frame_created(frame);
270        }
271
272        Ok(frames)
273    }
274
275    /// Create completion frame
276    pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
277        if !matches!(self.state, StreamState::Streaming) {
278            return Err(DomainError::InvalidStreamState(
279                "Stream must be in streaming state to create frames".to_string(),
280            ));
281        }
282
283        let frame = Frame::complete(self.id, self.next_sequence, checksum);
284        self.record_frame_created(&frame);
285
286        Ok(frame)
287    }
288
289    /// Check if stream is active
290    pub fn is_active(&self) -> bool {
291        matches!(self.state, StreamState::Preparing | StreamState::Streaming)
292    }
293
294    /// Check if stream is finished
295    pub fn is_finished(&self) -> bool {
296        matches!(
297            self.state,
298            StreamState::Completed | StreamState::Failed | StreamState::Cancelled
299        )
300    }
301
302    /// Get stream duration
303    pub fn duration(&self) -> Option<chrono::Duration> {
304        self.completed_at.map(|end| end - self.created_at)
305    }
306
307    /// Calculate stream progress (0.0 to 1.0)
308    pub fn progress(&self) -> f64 {
309        match self.state {
310            StreamState::Preparing => 0.0,
311            StreamState::Streaming => {
312                // Estimate based on frames sent vs expected
313                if self.stats.total_frames == 0 {
314                    0.1 // Just started
315                } else {
316                    // Simple heuristic: more frames = more progress
317                    (self.stats.total_frames as f64 / 100.0).min(0.9)
318                }
319            }
320            StreamState::Completed => 1.0,
321            StreamState::Failed | StreamState::Cancelled => {
322                // Partial progress before failure/cancellation
323                (self.stats.total_frames as f64 / 100.0).min(0.99)
324            }
325        }
326    }
327
328    /// Update configuration
329    pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
330        if !self.is_active() {
331            return Err(DomainError::InvalidStreamState(
332                "Cannot update config of inactive stream".to_string(),
333            ));
334        }
335
336        self.config = config;
337        self.update_timestamp();
338        Ok(())
339    }
340
341    /// Private helper: Update timestamp
342    fn update_timestamp(&mut self) {
343        self.updated_at = Utc::now();
344    }
345
346    /// Private helper: Record frame creation for stats
347    fn record_frame_created(&mut self, frame: &Frame) {
348        self.next_sequence += 1;
349        self.stats.total_frames += 1;
350
351        let frame_size = frame.estimated_size() as u64;
352        self.stats.total_bytes += frame_size;
353
354        match frame.frame_type() {
355            crate::entities::frame::FrameType::Skeleton => {
356                self.stats.skeleton_frames += 1;
357                self.stats.critical_bytes += frame_size;
358            }
359            crate::entities::frame::FrameType::Patch => {
360                self.stats.patch_frames += 1;
361                if frame.is_critical() {
362                    self.stats.critical_bytes += frame_size;
363                } else if frame.is_high_priority() {
364                    self.stats.high_priority_bytes += frame_size;
365                }
366            }
367            crate::entities::frame::FrameType::Complete => {
368                self.stats.complete_frames += 1;
369                self.stats.critical_bytes += frame_size;
370            }
371            crate::entities::frame::FrameType::Error => {
372                self.stats.error_frames += 1;
373                self.stats.critical_bytes += frame_size;
374            }
375        }
376
377        // Update average frame size
378        self.stats.average_frame_size =
379            self.stats.total_bytes as f64 / self.stats.total_frames as f64;
380
381        self.update_timestamp();
382    }
383
384    /// Private helper: Generate skeleton from source data
385    fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
386        // Simplified skeleton generation - create empty structure
387        match data {
388            JsonData::Object(obj) => {
389                let mut skeleton = HashMap::new();
390                for (key, value) in obj.iter() {
391                    skeleton.insert(
392                        key.clone(),
393                        match value {
394                            JsonData::Array(_) => JsonData::Array(Vec::new()),
395                            JsonData::Object(_) => self.generate_skeleton(value)?,
396                            JsonData::Integer(_) => JsonData::Integer(0),
397                            JsonData::Float(_) => JsonData::Float(0.0),
398                            JsonData::String(_) => JsonData::Null,
399                            JsonData::Bool(_) => JsonData::Bool(false),
400                            JsonData::Null => JsonData::Null,
401                        },
402                    );
403                }
404                Ok(JsonData::Object(skeleton))
405            }
406            JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
407            _ => Ok(JsonData::Null),
408        }
409    }
410
411    /// Private helper: Extract patches with priority filtering
412    fn extract_patches(
413        &self,
414        _data: &JsonData,
415        _threshold: Priority,
416    ) -> DomainResult<Vec<crate::entities::frame::FramePatch>> {
417        // Simplified patch extraction - would need more sophisticated logic
418        Ok(Vec::new())
419    }
420
421    /// Private helper: Batch patches into frames
422    fn batch_patches_into_frames(
423        &mut self,
424        patches: Vec<crate::entities::frame::FramePatch>,
425        max_frames: usize,
426    ) -> DomainResult<Vec<Frame>> {
427        if patches.is_empty() {
428            return Ok(Vec::new());
429        }
430
431        let mut frames = Vec::new();
432        let chunk_size = patches.len().div_ceil(max_frames);
433
434        for patch_chunk in patches.chunks(chunk_size) {
435            let priority = patch_chunk
436                .iter()
437                .map(|_| Priority::MEDIUM) // Simplified - would calculate from patch content
438                .max()
439                .unwrap_or(Priority::MEDIUM);
440
441            let frame = Frame::patch(self.id, self.next_sequence, priority, patch_chunk.to_vec())?;
442
443            frames.push(frame);
444        }
445
446        Ok(frames)
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_stream_creation() {
456        let session_id = SessionId::new();
457        let source_data = serde_json::json!({
458            "users": [
459                {"id": 1, "name": "John"},
460                {"id": 2, "name": "Jane"}
461            ],
462            "total": 2
463        });
464
465        let stream = Stream::new(
466            session_id,
467            source_data.clone().into(),
468            StreamConfig::default(),
469        );
470
471        assert_eq!(stream.session_id(), session_id);
472        assert_eq!(stream.state(), &StreamState::Preparing);
473        assert!(stream.is_active());
474        assert!(!stream.is_finished());
475        assert_eq!(stream.progress(), 0.0);
476    }
477
478    #[test]
479    fn test_stream_state_transitions() {
480        let session_id = SessionId::new();
481        let source_data = serde_json::json!({});
482        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
483
484        // Start streaming
485        assert!(stream.start_streaming().is_ok());
486        assert_eq!(stream.state(), &StreamState::Streaming);
487
488        // Complete stream
489        assert!(stream.complete().is_ok());
490        assert_eq!(stream.state(), &StreamState::Completed);
491        assert!(stream.is_finished());
492        assert_eq!(stream.progress(), 1.0);
493    }
494
495    #[test]
496    fn test_invalid_state_transitions() {
497        let session_id = SessionId::new();
498        let source_data = serde_json::json!({});
499        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
500
501        // Cannot complete from preparing state
502        assert!(stream.complete().is_err());
503
504        // Start and complete
505        assert!(stream.start_streaming().is_ok());
506        assert!(stream.complete().is_ok());
507
508        // Cannot start again from completed state
509        assert!(stream.start_streaming().is_err());
510    }
511
512    #[test]
513    fn test_frame_creation() {
514        let session_id = SessionId::new();
515        let source_data = serde_json::json!({
516            "test": "data"
517        });
518        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
519
520        // Cannot create frames before streaming
521        assert!(stream.create_skeleton_frame().is_err());
522
523        // Start streaming and create skeleton
524        assert!(stream.start_streaming().is_ok());
525        let skeleton = stream
526            .create_skeleton_frame()
527            .expect("Failed to create skeleton frame in test");
528
529        assert_eq!(
530            skeleton.frame_type(),
531            &crate::entities::frame::FrameType::Skeleton
532        );
533        assert_eq!(skeleton.sequence(), 1);
534        assert_eq!(stream.stats().skeleton_frames, 1);
535    }
536
537    #[test]
538    fn test_stream_metadata() {
539        let session_id = SessionId::new();
540        let source_data = serde_json::json!({});
541        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
542
543        stream.add_metadata("source".to_string(), "api".to_string());
544        stream.add_metadata("version".to_string(), "1.0".to_string());
545
546        assert_eq!(stream.metadata().len(), 2);
547        assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
548    }
549}