pjson_rs_domain/entities/
stream.rs

1//! Stream entity representing a prioritized data stream
2
3use crate::{
4    DomainError, DomainResult,
5    entities::Frame,
6    value_objects::{JsonData, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12/// Custom serde for SessionId within entities
13mod serde_session_id {
14    use crate::value_objects::SessionId;
15    use serde::{Deserialize, Deserializer, Serialize, Serializer};
16
17    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
18    where
19        S: Serializer,
20    {
21        id.as_uuid().serialize(serializer)
22    }
23
24    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
25    where
26        D: Deserializer<'de>,
27    {
28        let uuid = uuid::Uuid::deserialize(deserializer)?;
29        Ok(SessionId::from_uuid(uuid))
30    }
31}
32
33/// Custom serde for StreamId within entities
34mod serde_stream_id {
35    use crate::value_objects::StreamId;
36    use serde::{Deserialize, Deserializer, Serialize, Serializer};
37
38    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
39    where
40        S: Serializer,
41    {
42        id.as_uuid().serialize(serializer)
43    }
44
45    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
46    where
47        D: Deserializer<'de>,
48    {
49        let uuid = uuid::Uuid::deserialize(deserializer)?;
50        Ok(StreamId::from_uuid(uuid))
51    }
52}
53
54/// Custom serde for Priority within entities
55#[allow(dead_code)]
56mod serde_priority {
57    use crate::value_objects::Priority;
58    use serde::{Deserialize, Deserializer, Serialize, Serializer};
59
60    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
61    where
62        S: Serializer,
63    {
64        priority.value().serialize(serializer)
65    }
66
67    pub fn deserialize<'de, D>(deserializer: D) -> Result<Priority, D::Error>
68    where
69        D: Deserializer<'de>,
70    {
71        let value = u8::deserialize(deserializer)?;
72        Priority::new(value).map_err(serde::de::Error::custom)
73    }
74}
75
76/// Custom serde for HashMap<String, Priority>
77mod serde_priority_map {
78    use crate::value_objects::Priority;
79    use serde::{Deserialize, Deserializer, Serialize, Serializer};
80    use std::collections::HashMap;
81
82    pub fn serialize<S>(map: &HashMap<String, Priority>, serializer: S) -> Result<S::Ok, S::Error>
83    where
84        S: Serializer,
85    {
86        let u8_map: HashMap<String, u8> = map.iter().map(|(k, v)| (k.clone(), v.value())).collect();
87        u8_map.serialize(serializer)
88    }
89
90    pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<String, Priority>, D::Error>
91    where
92        D: Deserializer<'de>,
93    {
94        let u8_map: HashMap<String, u8> = HashMap::deserialize(deserializer)?;
95        u8_map
96            .into_iter()
97            .map(|(k, v)| {
98                Priority::new(v)
99                    .map(|p| (k, p))
100                    .map_err(serde::de::Error::custom)
101            })
102            .collect()
103    }
104}
105
106/// Stream state in its lifecycle
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
108pub enum StreamState {
109    /// Stream is being prepared
110    Preparing,
111    /// Stream is actively sending data
112    Streaming,
113    /// Stream completed successfully
114    Completed,
115    /// Stream failed with error
116    Failed,
117    /// Stream was cancelled
118    Cancelled,
119}
120
121/// Stream configuration and metadata
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct StreamConfig {
124    /// Maximum frame size in bytes
125    pub max_frame_size: usize,
126    /// Maximum frames per batch
127    pub max_frames_per_batch: usize,
128    /// Compression settings
129    pub enable_compression: bool,
130    /// Custom priority rules
131    #[serde(with = "serde_priority_map")]
132    pub priority_rules: HashMap<String, Priority>,
133}
134
135impl Default for StreamConfig {
136    fn default() -> Self {
137        Self {
138            max_frame_size: 64 * 1024, // 64KB
139            max_frames_per_batch: 10,
140            enable_compression: true,
141            priority_rules: HashMap::new(),
142        }
143    }
144}
145
146/// Stream statistics for monitoring
147#[derive(Debug, Clone, Default, Serialize, Deserialize)]
148pub struct StreamStats {
149    /// Total number of frames generated
150    pub total_frames: u64,
151    /// Number of skeleton frames sent
152    pub skeleton_frames: u64,
153    /// Number of patch frames sent
154    pub patch_frames: u64,
155    /// Number of completion frames sent
156    pub complete_frames: u64,
157    /// Number of error frames sent
158    pub error_frames: u64,
159    /// Total bytes transmitted across all frames
160    pub total_bytes: u64,
161    /// Bytes transmitted in critical priority frames
162    pub critical_bytes: u64,
163    /// Bytes transmitted in high priority frames
164    pub high_priority_bytes: u64,
165    /// Average size of frames in bytes
166    pub average_frame_size: f64,
167}
168
169/// Priority data stream entity
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct Stream {
172    #[serde(with = "serde_stream_id")]
173    id: StreamId,
174    #[serde(with = "serde_session_id")]
175    session_id: SessionId,
176    state: StreamState,
177    config: StreamConfig,
178    stats: StreamStats,
179    created_at: DateTime<Utc>,
180    updated_at: DateTime<Utc>,
181    completed_at: Option<DateTime<Utc>>,
182    next_sequence: u64,
183    source_data: Option<JsonData>,
184    metadata: HashMap<String, String>,
185}
186
187impl Stream {
188    /// Create new stream
189    pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
190        let now = Utc::now();
191
192        Self {
193            id: StreamId::new(),
194            session_id,
195            state: StreamState::Preparing,
196            config,
197            stats: StreamStats::default(),
198            created_at: now,
199            updated_at: now,
200            completed_at: None,
201            next_sequence: 1,
202            source_data: Some(source_data),
203            metadata: HashMap::new(),
204        }
205    }
206
207    /// Get stream ID
208    pub fn id(&self) -> StreamId {
209        self.id
210    }
211
212    /// Get session ID
213    pub fn session_id(&self) -> SessionId {
214        self.session_id
215    }
216
217    /// Get current state
218    pub fn state(&self) -> &StreamState {
219        &self.state
220    }
221
222    /// Get configuration
223    pub fn config(&self) -> &StreamConfig {
224        &self.config
225    }
226
227    /// Get statistics
228    pub fn stats(&self) -> &StreamStats {
229        &self.stats
230    }
231
232    /// Get creation timestamp
233    pub fn created_at(&self) -> DateTime<Utc> {
234        self.created_at
235    }
236
237    /// Get last update timestamp
238    pub fn updated_at(&self) -> DateTime<Utc> {
239        self.updated_at
240    }
241
242    /// Get completion timestamp
243    pub fn completed_at(&self) -> Option<DateTime<Utc>> {
244        self.completed_at
245    }
246
247    /// Get source data
248    pub fn source_data(&self) -> Option<&JsonData> {
249        self.source_data.as_ref()
250    }
251
252    /// Get metadata
253    pub fn metadata(&self) -> &HashMap<String, String> {
254        &self.metadata
255    }
256
257    /// Add metadata
258    pub fn add_metadata(&mut self, key: String, value: String) {
259        self.metadata.insert(key, value);
260        self.update_timestamp();
261    }
262
263    /// Start streaming (transition to Streaming state)
264    pub fn start_streaming(&mut self) -> DomainResult<()> {
265        match self.state {
266            StreamState::Preparing => {
267                self.state = StreamState::Streaming;
268                self.update_timestamp();
269                Ok(())
270            }
271            _ => Err(DomainError::InvalidStateTransition(format!(
272                "Cannot start streaming from state: {:?}",
273                self.state
274            ))),
275        }
276    }
277
278    /// Complete stream successfully
279    pub fn complete(&mut self) -> DomainResult<()> {
280        match self.state {
281            StreamState::Streaming => {
282                self.state = StreamState::Completed;
283                self.completed_at = Some(Utc::now());
284                self.update_timestamp();
285                Ok(())
286            }
287            _ => Err(DomainError::InvalidStateTransition(format!(
288                "Cannot complete stream from state: {:?}",
289                self.state
290            ))),
291        }
292    }
293
294    /// Fail stream with error
295    pub fn fail(&mut self, error: String) -> DomainResult<()> {
296        match self.state {
297            StreamState::Preparing | StreamState::Streaming => {
298                self.state = StreamState::Failed;
299                self.completed_at = Some(Utc::now());
300                self.add_metadata("error".to_string(), error);
301                Ok(())
302            }
303            _ => Err(DomainError::InvalidStateTransition(format!(
304                "Cannot fail stream from state: {:?}",
305                self.state
306            ))),
307        }
308    }
309
310    /// Cancel stream
311    pub fn cancel(&mut self) -> DomainResult<()> {
312        match self.state {
313            StreamState::Preparing | StreamState::Streaming => {
314                self.state = StreamState::Cancelled;
315                self.completed_at = Some(Utc::now());
316                self.update_timestamp();
317                Ok(())
318            }
319            _ => Err(DomainError::InvalidStateTransition(format!(
320                "Cannot cancel stream from state: {:?}",
321                self.state
322            ))),
323        }
324    }
325
326    /// Generate skeleton frame for the stream
327    pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
328        if !matches!(self.state, StreamState::Streaming) {
329            return Err(DomainError::InvalidStreamState(
330                "Stream must be in streaming state to create frames".to_string(),
331            ));
332        }
333
334        let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
335            DomainError::InvalidStreamState("No source data available for skeleton".to_string())
336        })?;
337
338        let skeleton = self.generate_skeleton(skeleton_data)?;
339        let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
340
341        self.record_frame_created(&frame);
342
343        Ok(frame)
344    }
345
346    /// Create batch of patch frames based on priority
347    pub fn create_patch_frames(
348        &mut self,
349        priority_threshold: Priority,
350        max_frames: usize,
351    ) -> DomainResult<Vec<Frame>> {
352        if !matches!(self.state, StreamState::Streaming) {
353            return Err(DomainError::InvalidStreamState(
354                "Stream must be in streaming state to create frames".to_string(),
355            ));
356        }
357
358        let source_data = self.source_data.as_ref().ok_or_else(|| {
359            DomainError::InvalidStreamState("No source data available for patches".to_string())
360        })?;
361
362        let patches = self.extract_patches(source_data, priority_threshold)?;
363        let frames = self.batch_patches_into_frames(patches, max_frames)?;
364
365        for frame in &frames {
366            self.record_frame_created(frame);
367        }
368
369        Ok(frames)
370    }
371
372    /// Create completion frame
373    pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
374        if !matches!(self.state, StreamState::Streaming) {
375            return Err(DomainError::InvalidStreamState(
376                "Stream must be in streaming state to create frames".to_string(),
377            ));
378        }
379
380        let frame = Frame::complete(self.id, self.next_sequence, checksum);
381        self.record_frame_created(&frame);
382
383        Ok(frame)
384    }
385
386    /// Check if stream is active
387    pub fn is_active(&self) -> bool {
388        matches!(self.state, StreamState::Preparing | StreamState::Streaming)
389    }
390
391    /// Check if stream is finished
392    pub fn is_finished(&self) -> bool {
393        matches!(
394            self.state,
395            StreamState::Completed | StreamState::Failed | StreamState::Cancelled
396        )
397    }
398
399    /// Get stream duration
400    pub fn duration(&self) -> Option<chrono::Duration> {
401        self.completed_at.map(|end| end - self.created_at)
402    }
403
404    /// Calculate stream progress (0.0 to 1.0)
405    pub fn progress(&self) -> f64 {
406        match self.state {
407            StreamState::Preparing => 0.0,
408            StreamState::Streaming => {
409                // Estimate based on frames sent vs expected
410                if self.stats.total_frames == 0 {
411                    0.1 // Just started
412                } else {
413                    // Simple heuristic: more frames = more progress
414                    (self.stats.total_frames as f64 / 100.0).min(0.9)
415                }
416            }
417            StreamState::Completed => 1.0,
418            StreamState::Failed | StreamState::Cancelled => {
419                // Partial progress before failure/cancellation
420                (self.stats.total_frames as f64 / 100.0).min(0.99)
421            }
422        }
423    }
424
425    /// Update configuration
426    pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
427        if !self.is_active() {
428            return Err(DomainError::InvalidStreamState(
429                "Cannot update config of inactive stream".to_string(),
430            ));
431        }
432
433        self.config = config;
434        self.update_timestamp();
435        Ok(())
436    }
437
438    /// Private helper: Update timestamp
439    fn update_timestamp(&mut self) {
440        self.updated_at = Utc::now();
441    }
442
443    /// Private helper: Record frame creation for stats
444    fn record_frame_created(&mut self, frame: &Frame) {
445        self.next_sequence += 1;
446        self.stats.total_frames += 1;
447
448        let frame_size = frame.estimated_size() as u64;
449        self.stats.total_bytes += frame_size;
450
451        match frame.frame_type() {
452            crate::entities::frame::FrameType::Skeleton => {
453                self.stats.skeleton_frames += 1;
454                self.stats.critical_bytes += frame_size;
455            }
456            crate::entities::frame::FrameType::Patch => {
457                self.stats.patch_frames += 1;
458                if frame.is_critical() {
459                    self.stats.critical_bytes += frame_size;
460                } else if frame.is_high_priority() {
461                    self.stats.high_priority_bytes += frame_size;
462                }
463            }
464            crate::entities::frame::FrameType::Complete => {
465                self.stats.complete_frames += 1;
466                self.stats.critical_bytes += frame_size;
467            }
468            crate::entities::frame::FrameType::Error => {
469                self.stats.error_frames += 1;
470                self.stats.critical_bytes += frame_size;
471            }
472        }
473
474        // Update average frame size
475        self.stats.average_frame_size =
476            self.stats.total_bytes as f64 / self.stats.total_frames as f64;
477
478        self.update_timestamp();
479    }
480
481    /// Private helper: Generate skeleton from source data
482    fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
483        // Simplified skeleton generation - create empty structure
484        match data {
485            JsonData::Object(obj) => {
486                let mut skeleton = HashMap::new();
487                for (key, value) in obj.iter() {
488                    skeleton.insert(
489                        key.clone(),
490                        match value {
491                            JsonData::Array(_) => JsonData::Array(Vec::new()),
492                            JsonData::Object(_) => self.generate_skeleton(value)?,
493                            JsonData::Integer(_) => JsonData::Integer(0),
494                            JsonData::Float(_) => JsonData::Float(0.0),
495                            JsonData::String(_) => JsonData::Null,
496                            JsonData::Bool(_) => JsonData::Bool(false),
497                            JsonData::Null => JsonData::Null,
498                        },
499                    );
500                }
501                Ok(JsonData::Object(skeleton))
502            }
503            JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
504            _ => Ok(JsonData::Null),
505        }
506    }
507
508    /// Private helper: Extract patches with priority filtering
509    fn extract_patches(
510        &self,
511        _data: &JsonData,
512        _threshold: Priority,
513    ) -> DomainResult<Vec<crate::entities::frame::FramePatch>> {
514        // Simplified patch extraction - would need more sophisticated logic
515        Ok(Vec::new())
516    }
517
518    /// Private helper: Batch patches into frames
519    fn batch_patches_into_frames(
520        &mut self,
521        patches: Vec<crate::entities::frame::FramePatch>,
522        max_frames: usize,
523    ) -> DomainResult<Vec<Frame>> {
524        if patches.is_empty() {
525            return Ok(Vec::new());
526        }
527
528        let mut frames = Vec::new();
529        let chunk_size = patches.len().div_ceil(max_frames);
530
531        for patch_chunk in patches.chunks(chunk_size) {
532            let priority = patch_chunk
533                .iter()
534                .map(|_| Priority::MEDIUM) // Simplified - would calculate from patch content
535                .max()
536                .unwrap_or(Priority::MEDIUM);
537
538            let frame = Frame::patch(self.id, self.next_sequence, priority, patch_chunk.to_vec())?;
539
540            frames.push(frame);
541        }
542
543        Ok(frames)
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550
551    #[test]
552    fn test_stream_creation() {
553        let session_id = SessionId::new();
554        let source_data = serde_json::json!({
555            "users": [
556                {"id": 1, "name": "John"},
557                {"id": 2, "name": "Jane"}
558            ],
559            "total": 2
560        });
561
562        let stream = Stream::new(
563            session_id,
564            source_data.clone().into(),
565            StreamConfig::default(),
566        );
567
568        assert_eq!(stream.session_id(), session_id);
569        assert_eq!(stream.state(), &StreamState::Preparing);
570        assert!(stream.is_active());
571        assert!(!stream.is_finished());
572        assert_eq!(stream.progress(), 0.0);
573    }
574
575    #[test]
576    fn test_stream_state_transitions() {
577        let session_id = SessionId::new();
578        let source_data = serde_json::json!({});
579        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
580
581        // Start streaming
582        assert!(stream.start_streaming().is_ok());
583        assert_eq!(stream.state(), &StreamState::Streaming);
584
585        // Complete stream
586        assert!(stream.complete().is_ok());
587        assert_eq!(stream.state(), &StreamState::Completed);
588        assert!(stream.is_finished());
589        assert_eq!(stream.progress(), 1.0);
590    }
591
592    #[test]
593    fn test_invalid_state_transitions() {
594        let session_id = SessionId::new();
595        let source_data = serde_json::json!({});
596        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
597
598        // Cannot complete from preparing state
599        assert!(stream.complete().is_err());
600
601        // Start and complete
602        assert!(stream.start_streaming().is_ok());
603        assert!(stream.complete().is_ok());
604
605        // Cannot start again from completed state
606        assert!(stream.start_streaming().is_err());
607    }
608
609    #[test]
610    fn test_frame_creation() {
611        let session_id = SessionId::new();
612        let source_data = serde_json::json!({
613            "test": "data"
614        });
615        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
616
617        // Cannot create frames before streaming
618        assert!(stream.create_skeleton_frame().is_err());
619
620        // Start streaming and create skeleton
621        assert!(stream.start_streaming().is_ok());
622        let skeleton = stream
623            .create_skeleton_frame()
624            .expect("Failed to create skeleton frame in test");
625
626        assert_eq!(
627            skeleton.frame_type(),
628            &crate::entities::frame::FrameType::Skeleton
629        );
630        assert_eq!(skeleton.sequence(), 1);
631        assert_eq!(stream.stats().skeleton_frames, 1);
632    }
633
634    #[test]
635    fn test_stream_metadata() {
636        let session_id = SessionId::new();
637        let source_data = serde_json::json!({});
638        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
639
640        stream.add_metadata("source".to_string(), "api".to_string());
641        stream.add_metadata("version".to_string(), "1.0".to_string());
642
643        assert_eq!(stream.metadata().len(), 2);
644        assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
645    }
646}