Skip to main content

pjson_rs_domain/entities/
stream.rs

1//! Stream entity representing a prioritized data stream
2
3use crate::{
4    DomainError, DomainResult,
5    entities::Frame,
6    value_objects::{JsonData, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12/// Custom serde for SessionId within entities
13mod serde_session_id {
14    use crate::value_objects::SessionId;
15    use serde::{Deserialize, Deserializer, Serialize, Serializer};
16
17    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
18    where
19        S: Serializer,
20    {
21        id.as_uuid().serialize(serializer)
22    }
23
24    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
25    where
26        D: Deserializer<'de>,
27    {
28        let uuid = uuid::Uuid::deserialize(deserializer)?;
29        Ok(SessionId::from_uuid(uuid))
30    }
31}
32
33/// Custom serde for StreamId within entities
34mod serde_stream_id {
35    use crate::value_objects::StreamId;
36    use serde::{Deserialize, Deserializer, Serialize, Serializer};
37
38    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
39    where
40        S: Serializer,
41    {
42        id.as_uuid().serialize(serializer)
43    }
44
45    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
46    where
47        D: Deserializer<'de>,
48    {
49        let uuid = uuid::Uuid::deserialize(deserializer)?;
50        Ok(StreamId::from_uuid(uuid))
51    }
52}
53
54/// Custom serde for HashMap<String, Priority>
55mod serde_priority_map {
56    use crate::value_objects::Priority;
57    use serde::{Deserialize, Deserializer, Serialize, Serializer};
58    use std::collections::HashMap;
59
60    pub fn serialize<S>(map: &HashMap<String, Priority>, serializer: S) -> Result<S::Ok, S::Error>
61    where
62        S: Serializer,
63    {
64        let u8_map: HashMap<String, u8> = map.iter().map(|(k, v)| (k.clone(), v.value())).collect();
65        u8_map.serialize(serializer)
66    }
67
68    pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<String, Priority>, D::Error>
69    where
70        D: Deserializer<'de>,
71    {
72        let u8_map: HashMap<String, u8> = HashMap::deserialize(deserializer)?;
73        u8_map
74            .into_iter()
75            .map(|(k, v)| {
76                Priority::new(v)
77                    .map(|p| (k, p))
78                    .map_err(serde::de::Error::custom)
79            })
80            .collect()
81    }
82}
83
84/// Stream state in its lifecycle
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub enum StreamState {
87    /// Stream is being prepared
88    Preparing,
89    /// Stream is actively sending data
90    Streaming,
91    /// Stream completed successfully
92    Completed,
93    /// Stream failed with error
94    Failed,
95    /// Stream was cancelled
96    Cancelled,
97}
98
99/// Stream configuration and metadata
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct StreamConfig {
102    /// Maximum frame size in bytes
103    pub max_frame_size: usize,
104    /// Maximum frames per batch
105    pub max_frames_per_batch: usize,
106    /// Compression settings
107    pub enable_compression: bool,
108    /// Custom priority rules
109    #[serde(with = "serde_priority_map")]
110    pub priority_rules: HashMap<String, Priority>,
111}
112
113impl Default for StreamConfig {
114    fn default() -> Self {
115        Self {
116            max_frame_size: 64 * 1024, // 64KB
117            max_frames_per_batch: 10,
118            enable_compression: true,
119            priority_rules: HashMap::new(),
120        }
121    }
122}
123
124/// Stream statistics for monitoring
125#[derive(Debug, Clone, Default, Serialize, Deserialize)]
126pub struct StreamStats {
127    /// Total number of frames generated
128    pub total_frames: u64,
129    /// Number of skeleton frames sent
130    pub skeleton_frames: u64,
131    /// Number of patch frames sent
132    pub patch_frames: u64,
133    /// Number of completion frames sent
134    pub complete_frames: u64,
135    /// Number of error frames sent
136    pub error_frames: u64,
137    /// Total bytes transmitted across all frames
138    pub total_bytes: u64,
139    /// Bytes transmitted in critical priority frames
140    pub critical_bytes: u64,
141    /// Bytes transmitted in high priority frames
142    pub high_priority_bytes: u64,
143    /// Average size of frames in bytes
144    pub average_frame_size: f64,
145}
146
147/// Priority data stream entity
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct Stream {
150    #[serde(with = "serde_stream_id")]
151    id: StreamId,
152    #[serde(with = "serde_session_id")]
153    session_id: SessionId,
154    state: StreamState,
155    config: StreamConfig,
156    stats: StreamStats,
157    created_at: DateTime<Utc>,
158    updated_at: DateTime<Utc>,
159    completed_at: Option<DateTime<Utc>>,
160    next_sequence: u64,
161    source_data: Option<JsonData>,
162    metadata: HashMap<String, String>,
163}
164
165impl Stream {
166    /// Create new stream
167    pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
168        let now = Utc::now();
169
170        Self {
171            id: StreamId::new(),
172            session_id,
173            state: StreamState::Preparing,
174            config,
175            stats: StreamStats::default(),
176            created_at: now,
177            updated_at: now,
178            completed_at: None,
179            next_sequence: 1,
180            source_data: Some(source_data),
181            metadata: HashMap::new(),
182        }
183    }
184
185    /// Get stream ID
186    pub fn id(&self) -> StreamId {
187        self.id
188    }
189
190    /// Get session ID
191    pub fn session_id(&self) -> SessionId {
192        self.session_id
193    }
194
195    /// Get current state
196    pub fn state(&self) -> &StreamState {
197        &self.state
198    }
199
200    /// Get configuration
201    pub fn config(&self) -> &StreamConfig {
202        &self.config
203    }
204
205    /// Get statistics
206    pub fn stats(&self) -> &StreamStats {
207        &self.stats
208    }
209
210    /// Get creation timestamp
211    pub fn created_at(&self) -> DateTime<Utc> {
212        self.created_at
213    }
214
215    /// Get last update timestamp
216    pub fn updated_at(&self) -> DateTime<Utc> {
217        self.updated_at
218    }
219
220    /// Get completion timestamp
221    pub fn completed_at(&self) -> Option<DateTime<Utc>> {
222        self.completed_at
223    }
224
225    /// Get source data
226    pub fn source_data(&self) -> Option<&JsonData> {
227        self.source_data.as_ref()
228    }
229
230    /// Get metadata
231    pub fn metadata(&self) -> &HashMap<String, String> {
232        &self.metadata
233    }
234
235    /// Add metadata
236    pub fn add_metadata(&mut self, key: String, value: String) {
237        self.metadata.insert(key, value);
238        self.update_timestamp();
239    }
240
241    /// Start streaming (transition to Streaming state)
242    pub fn start_streaming(&mut self) -> DomainResult<()> {
243        match self.state {
244            StreamState::Preparing => {
245                self.state = StreamState::Streaming;
246                self.update_timestamp();
247                Ok(())
248            }
249            _ => Err(DomainError::InvalidStateTransition(format!(
250                "Cannot start streaming from state: {:?}",
251                self.state
252            ))),
253        }
254    }
255
256    /// Complete stream successfully
257    pub fn complete(&mut self) -> DomainResult<()> {
258        match self.state {
259            StreamState::Streaming => {
260                self.state = StreamState::Completed;
261                self.completed_at = Some(Utc::now());
262                self.update_timestamp();
263                Ok(())
264            }
265            _ => Err(DomainError::InvalidStateTransition(format!(
266                "Cannot complete stream from state: {:?}",
267                self.state
268            ))),
269        }
270    }
271
272    /// Fail stream with error
273    pub fn fail(&mut self, error: String) -> DomainResult<()> {
274        match self.state {
275            StreamState::Preparing | StreamState::Streaming => {
276                self.state = StreamState::Failed;
277                self.completed_at = Some(Utc::now());
278                self.add_metadata("error".to_string(), error);
279                Ok(())
280            }
281            _ => Err(DomainError::InvalidStateTransition(format!(
282                "Cannot fail stream from state: {:?}",
283                self.state
284            ))),
285        }
286    }
287
288    /// Cancel stream
289    pub fn cancel(&mut self) -> DomainResult<()> {
290        match self.state {
291            StreamState::Preparing | StreamState::Streaming => {
292                self.state = StreamState::Cancelled;
293                self.completed_at = Some(Utc::now());
294                self.update_timestamp();
295                Ok(())
296            }
297            _ => Err(DomainError::InvalidStateTransition(format!(
298                "Cannot cancel stream from state: {:?}",
299                self.state
300            ))),
301        }
302    }
303
304    /// Generate skeleton frame for the stream
305    pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
306        if !matches!(self.state, StreamState::Streaming) {
307            return Err(DomainError::InvalidStreamState(
308                "Stream must be in streaming state to create frames".to_string(),
309            ));
310        }
311
312        let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
313            DomainError::InvalidStreamState("No source data available for skeleton".to_string())
314        })?;
315
316        let skeleton = self.generate_skeleton(skeleton_data)?;
317        let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
318
319        self.record_frame_created(&frame);
320
321        Ok(frame)
322    }
323
324    /// Create batch of patch frames based on priority
325    pub fn create_patch_frames(
326        &mut self,
327        priority_threshold: Priority,
328        max_frames: usize,
329    ) -> DomainResult<Vec<Frame>> {
330        if !matches!(self.state, StreamState::Streaming) {
331            return Err(DomainError::InvalidStreamState(
332                "Stream must be in streaming state to create frames".to_string(),
333            ));
334        }
335
336        let source_data = self.source_data.as_ref().ok_or_else(|| {
337            DomainError::InvalidStreamState("No source data available for patches".to_string())
338        })?;
339
340        let patches = self.extract_patches(source_data, priority_threshold)?;
341        let frames = self.batch_patches_into_frames(patches, max_frames)?;
342
343        for frame in &frames {
344            self.record_frame_created(frame);
345        }
346
347        Ok(frames)
348    }
349
350    /// Create completion frame
351    pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
352        if !matches!(self.state, StreamState::Streaming) {
353            return Err(DomainError::InvalidStreamState(
354                "Stream must be in streaming state to create frames".to_string(),
355            ));
356        }
357
358        let frame = Frame::complete(self.id, self.next_sequence, checksum);
359        self.record_frame_created(&frame);
360
361        Ok(frame)
362    }
363
364    /// Check if stream is active
365    pub fn is_active(&self) -> bool {
366        matches!(self.state, StreamState::Preparing | StreamState::Streaming)
367    }
368
369    /// Check if stream is finished
370    pub fn is_finished(&self) -> bool {
371        matches!(
372            self.state,
373            StreamState::Completed | StreamState::Failed | StreamState::Cancelled
374        )
375    }
376
377    /// Get stream duration
378    pub fn duration(&self) -> Option<chrono::Duration> {
379        self.completed_at.map(|end| end - self.created_at)
380    }
381
382    /// Calculate stream progress (0.0 to 1.0)
383    pub fn progress(&self) -> f64 {
384        match self.state {
385            StreamState::Preparing => 0.0,
386            StreamState::Streaming => {
387                // Estimate based on frames sent vs expected
388                if self.stats.total_frames == 0 {
389                    0.1 // Just started
390                } else {
391                    // Simple heuristic: more frames = more progress
392                    (self.stats.total_frames as f64 / 100.0).min(0.9)
393                }
394            }
395            StreamState::Completed => 1.0,
396            StreamState::Failed | StreamState::Cancelled => {
397                // Partial progress before failure/cancellation
398                (self.stats.total_frames as f64 / 100.0).min(0.99)
399            }
400        }
401    }
402
403    /// Update configuration
404    pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
405        if !self.is_active() {
406            return Err(DomainError::InvalidStreamState(
407                "Cannot update config of inactive stream".to_string(),
408            ));
409        }
410
411        self.config = config;
412        self.update_timestamp();
413        Ok(())
414    }
415
416    /// Private helper: Update timestamp
417    fn update_timestamp(&mut self) {
418        self.updated_at = Utc::now();
419    }
420
421    /// Private helper: Record frame creation for stats
422    fn record_frame_created(&mut self, frame: &Frame) {
423        self.next_sequence += 1;
424        self.stats.total_frames += 1;
425
426        let frame_size = frame.estimated_size() as u64;
427        self.stats.total_bytes += frame_size;
428
429        match frame.frame_type() {
430            crate::entities::frame::FrameType::Skeleton => {
431                self.stats.skeleton_frames += 1;
432                self.stats.critical_bytes += frame_size;
433            }
434            crate::entities::frame::FrameType::Patch => {
435                self.stats.patch_frames += 1;
436                if frame.is_critical() {
437                    self.stats.critical_bytes += frame_size;
438                } else if frame.is_high_priority() {
439                    self.stats.high_priority_bytes += frame_size;
440                }
441            }
442            crate::entities::frame::FrameType::Complete => {
443                self.stats.complete_frames += 1;
444                self.stats.critical_bytes += frame_size;
445            }
446            crate::entities::frame::FrameType::Error => {
447                self.stats.error_frames += 1;
448                self.stats.critical_bytes += frame_size;
449            }
450        }
451
452        // Update average frame size
453        self.stats.average_frame_size =
454            self.stats.total_bytes as f64 / self.stats.total_frames as f64;
455
456        self.update_timestamp();
457    }
458
459    /// Private helper: Generate skeleton from source data
460    fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
461        // Simplified skeleton generation - create empty structure
462        match data {
463            JsonData::Object(obj) => {
464                let mut skeleton = HashMap::new();
465                for (key, value) in obj.iter() {
466                    skeleton.insert(
467                        key.clone(),
468                        match value {
469                            JsonData::Array(_) => JsonData::Array(Vec::new()),
470                            JsonData::Object(_) => self.generate_skeleton(value)?,
471                            JsonData::Integer(_) => JsonData::Integer(0),
472                            JsonData::Float(_) => JsonData::Float(0.0),
473                            JsonData::String(_) => JsonData::Null,
474                            JsonData::Bool(_) => JsonData::Bool(false),
475                            JsonData::Null => JsonData::Null,
476                        },
477                    );
478                }
479                Ok(JsonData::Object(skeleton))
480            }
481            JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
482            _ => Ok(JsonData::Null),
483        }
484    }
485
486    /// Private helper: Extract patches with priority filtering
487    fn extract_patches(
488        &self,
489        _data: &JsonData,
490        _threshold: Priority,
491    ) -> DomainResult<Vec<crate::entities::frame::FramePatch>> {
492        // Simplified patch extraction - would need more sophisticated logic
493        Ok(Vec::new())
494    }
495
496    /// Private helper: Batch patches into frames
497    fn batch_patches_into_frames(
498        &mut self,
499        patches: Vec<crate::entities::frame::FramePatch>,
500        max_frames: usize,
501    ) -> DomainResult<Vec<Frame>> {
502        if patches.is_empty() {
503            return Ok(Vec::new());
504        }
505
506        let mut frames = Vec::new();
507        let chunk_size = patches.len().div_ceil(max_frames);
508
509        for patch_chunk in patches.chunks(chunk_size) {
510            let priority = patch_chunk
511                .iter()
512                .map(|_| Priority::MEDIUM) // Simplified - would calculate from patch content
513                .max()
514                .unwrap_or(Priority::MEDIUM);
515
516            let frame = Frame::patch(self.id, self.next_sequence, priority, patch_chunk.to_vec())?;
517
518            frames.push(frame);
519        }
520
521        Ok(frames)
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528
529    #[test]
530    fn test_stream_creation() {
531        let session_id = SessionId::new();
532        let source_data = serde_json::json!({
533            "users": [
534                {"id": 1, "name": "John"},
535                {"id": 2, "name": "Jane"}
536            ],
537            "total": 2
538        });
539
540        let stream = Stream::new(
541            session_id,
542            source_data.clone().into(),
543            StreamConfig::default(),
544        );
545
546        assert_eq!(stream.session_id(), session_id);
547        assert_eq!(stream.state(), &StreamState::Preparing);
548        assert!(stream.is_active());
549        assert!(!stream.is_finished());
550        assert_eq!(stream.progress(), 0.0);
551    }
552
553    #[test]
554    fn test_stream_state_transitions() {
555        let session_id = SessionId::new();
556        let source_data = serde_json::json!({});
557        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
558
559        // Start streaming
560        assert!(stream.start_streaming().is_ok());
561        assert_eq!(stream.state(), &StreamState::Streaming);
562
563        // Complete stream
564        assert!(stream.complete().is_ok());
565        assert_eq!(stream.state(), &StreamState::Completed);
566        assert!(stream.is_finished());
567        assert_eq!(stream.progress(), 1.0);
568    }
569
570    #[test]
571    fn test_invalid_state_transitions() {
572        let session_id = SessionId::new();
573        let source_data = serde_json::json!({});
574        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
575
576        // Cannot complete from preparing state
577        assert!(stream.complete().is_err());
578
579        // Start and complete
580        assert!(stream.start_streaming().is_ok());
581        assert!(stream.complete().is_ok());
582
583        // Cannot start again from completed state
584        assert!(stream.start_streaming().is_err());
585    }
586
587    #[test]
588    fn test_frame_creation() {
589        let session_id = SessionId::new();
590        let source_data = serde_json::json!({
591            "test": "data"
592        });
593        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
594
595        // Cannot create frames before streaming
596        assert!(stream.create_skeleton_frame().is_err());
597
598        // Start streaming and create skeleton
599        assert!(stream.start_streaming().is_ok());
600        let skeleton = stream
601            .create_skeleton_frame()
602            .expect("Failed to create skeleton frame in test");
603
604        assert_eq!(
605            skeleton.frame_type(),
606            &crate::entities::frame::FrameType::Skeleton
607        );
608        assert_eq!(skeleton.sequence(), 1);
609        assert_eq!(stream.stats().skeleton_frames, 1);
610    }
611
612    #[test]
613    fn test_stream_metadata() {
614        let session_id = SessionId::new();
615        let source_data = serde_json::json!({});
616        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
617
618        stream.add_metadata("source".to_string(), "api".to_string());
619        stream.add_metadata("version".to_string(), "1.0".to_string());
620
621        assert_eq!(stream.metadata().len(), 2);
622        assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
623    }
624}