pjson_rs_domain/events/
mod.rs

1//! Domain events for event sourcing and integration
2
3use crate::value_objects::{SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Session state in its lifecycle
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9pub enum SessionState {
10    /// Session is being initialized
11    Initializing,
12    /// Session is active with streams
13    Active,
14    /// Session is gracefully closing
15    Closing,
16    /// Session completed successfully
17    Completed,
18    /// Session failed with error
19    Failed,
20}
21
22/// Domain events that represent business-relevant state changes
23#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24#[serde(tag = "event_type", rename_all = "snake_case")]
25pub enum DomainEvent {
26    /// Session was activated and is ready to accept streams
27    SessionActivated {
28        /// ID of the activated session
29        session_id: SessionId,
30        /// When the session was activated
31        timestamp: DateTime<Utc>,
32    },
33
34    /// Session was closed gracefully
35    SessionClosed {
36        /// ID of the closed session
37        session_id: SessionId,
38        /// When the session was closed
39        timestamp: DateTime<Utc>,
40    },
41
42    /// Session expired due to timeout
43    SessionExpired {
44        /// ID of the expired session
45        session_id: SessionId,
46        /// When the session expired
47        timestamp: DateTime<Utc>,
48    },
49
50    /// Session was forcefully closed due to timeout
51    SessionTimedOut {
52        /// ID of the timed out session
53        session_id: SessionId,
54        /// State the session was in before timeout
55        original_state: SessionState,
56        /// Duration in seconds before timeout occurred
57        timeout_duration: u64,
58        /// When the timeout occurred
59        timestamp: DateTime<Utc>,
60    },
61
62    /// Session timeout was extended
63    SessionTimeoutExtended {
64        /// ID of the session with extended timeout
65        session_id: SessionId,
66        /// Additional seconds added to the timeout
67        additional_seconds: u64,
68        /// New expiration timestamp
69        new_expires_at: DateTime<Utc>,
70        /// When the extension was applied
71        timestamp: DateTime<Utc>,
72    },
73
74    /// New stream was created in the session
75    StreamCreated {
76        /// ID of the session containing the stream
77        session_id: SessionId,
78        /// ID of the newly created stream
79        stream_id: StreamId,
80        /// When the stream was created
81        timestamp: DateTime<Utc>,
82    },
83
84    /// Stream started sending data
85    StreamStarted {
86        /// ID of the session containing the stream
87        session_id: SessionId,
88        /// ID of the stream that started
89        stream_id: StreamId,
90        /// When the stream started
91        timestamp: DateTime<Utc>,
92    },
93
94    /// Stream completed successfully
95    StreamCompleted {
96        /// ID of the session containing the stream
97        session_id: SessionId,
98        /// ID of the completed stream
99        stream_id: StreamId,
100        /// When the stream completed
101        timestamp: DateTime<Utc>,
102    },
103
104    /// Stream failed with error
105    StreamFailed {
106        /// ID of the session containing the stream
107        session_id: SessionId,
108        /// ID of the failed stream
109        stream_id: StreamId,
110        /// Error message describing the failure
111        error: String,
112        /// When the stream failed
113        timestamp: DateTime<Utc>,
114    },
115
116    /// Stream was cancelled
117    StreamCancelled {
118        /// ID of the session containing the stream
119        session_id: SessionId,
120        /// ID of the cancelled stream
121        stream_id: StreamId,
122        /// When the stream was cancelled
123        timestamp: DateTime<Utc>,
124    },
125
126    /// Skeleton frame was generated for a stream
127    SkeletonGenerated {
128        /// ID of the session containing the stream
129        session_id: SessionId,
130        /// ID of the stream that generated the skeleton
131        stream_id: StreamId,
132        /// Size of the skeleton frame in bytes
133        frame_size_bytes: u64,
134        /// When the skeleton was generated
135        timestamp: DateTime<Utc>,
136    },
137
138    /// Patch frames were generated for a stream
139    PatchFramesGenerated {
140        /// ID of the session containing the stream
141        session_id: SessionId,
142        /// ID of the stream that generated patches
143        stream_id: StreamId,
144        /// Number of patch frames generated
145        frame_count: usize,
146        /// Total size of all patches in bytes
147        total_bytes: u64,
148        /// Highest priority level among the patches
149        highest_priority: u8,
150        /// When the patches were generated
151        timestamp: DateTime<Utc>,
152    },
153
154    /// Multiple frames were batched for efficient sending
155    FramesBatched {
156        /// ID of the session containing the frames
157        session_id: SessionId,
158        /// Number of frames in the batch
159        frame_count: usize,
160        /// When the batch was created
161        timestamp: DateTime<Utc>,
162    },
163
164    /// Priority threshold was adjusted for adaptive streaming
165    PriorityThresholdAdjusted {
166        /// ID of the session with adjusted threshold
167        session_id: SessionId,
168        /// Previous priority threshold value
169        old_threshold: u8,
170        /// New priority threshold value
171        new_threshold: u8,
172        /// Reason for the adjustment
173        reason: String,
174        /// When the threshold was adjusted
175        timestamp: DateTime<Utc>,
176    },
177
178    /// Stream configuration was updated
179    StreamConfigUpdated {
180        /// ID of the session containing the stream
181        session_id: SessionId,
182        /// ID of the stream with updated configuration
183        stream_id: StreamId,
184        /// When the configuration was updated
185        timestamp: DateTime<Utc>,
186    },
187
188    /// Performance metrics were recorded
189    PerformanceMetricsRecorded {
190        /// ID of the session being measured
191        session_id: SessionId,
192        /// Recorded performance metrics
193        metrics: PerformanceMetrics,
194        /// When the metrics were recorded
195        timestamp: DateTime<Utc>,
196    },
197}
198
199/// Performance metrics for monitoring and optimization
200#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
201pub struct PerformanceMetrics {
202    /// Number of frames transmitted per second
203    pub frames_per_second: f64,
204    /// Number of bytes transmitted per second
205    pub bytes_per_second: f64,
206    /// Average size of frames in bytes
207    pub average_frame_size: f64,
208    /// Distribution of frames across priority levels
209    pub priority_distribution: PriorityDistribution,
210    /// Network latency in milliseconds, if available
211    pub latency_ms: Option<u64>,
212}
213
214/// Distribution of frames by priority level
215#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
216pub struct PriorityDistribution {
217    /// Number of critical priority frames
218    pub critical_frames: u64,
219    /// Number of high priority frames
220    pub high_frames: u64,
221    /// Number of medium priority frames
222    pub medium_frames: u64,
223    /// Number of low priority frames
224    pub low_frames: u64,
225    /// Number of background priority frames
226    pub background_frames: u64,
227}
228
229impl PriorityDistribution {
230    /// Create new empty distribution
231    pub fn new() -> Self {
232        Self::default()
233    }
234
235    /// Get total frames count
236    pub fn total_frames(&self) -> u64 {
237        self.critical_frames
238            + self.high_frames
239            + self.medium_frames
240            + self.low_frames
241            + self.background_frames
242    }
243
244    /// Convert to percentages (0.0-1.0)
245    pub fn as_percentages(&self) -> PriorityPercentages {
246        let total = self.total_frames() as f64;
247        if total == 0.0 {
248            return PriorityPercentages::default();
249        }
250
251        PriorityPercentages {
252            critical: self.critical_frames as f64 / total,
253            high: self.high_frames as f64 / total,
254            medium: self.medium_frames as f64 / total,
255            low: self.low_frames as f64 / total,
256            background: self.background_frames as f64 / total,
257        }
258    }
259
260    /// Convert from count-based version
261    pub fn from_counts(
262        critical_count: u64,
263        high_count: u64,
264        medium_count: u64,
265        low_count: u64,
266        background_count: u64,
267    ) -> Self {
268        Self {
269            critical_frames: critical_count,
270            high_frames: high_count,
271            medium_frames: medium_count,
272            low_frames: low_count,
273            background_frames: background_count,
274        }
275    }
276}
277
278/// Priority distribution as percentages (for demos and visualization)
279#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
280pub struct PriorityPercentages {
281    /// Critical priority percentage (0.0-1.0)
282    pub critical: f64,
283    /// High priority percentage (0.0-1.0)
284    pub high: f64,
285    /// Medium priority percentage (0.0-1.0)
286    pub medium: f64,
287    /// Low priority percentage (0.0-1.0)
288    pub low: f64,
289    /// Background priority percentage (0.0-1.0)
290    pub background: f64,
291}
292
293impl Default for PriorityPercentages {
294    fn default() -> Self {
295        Self {
296            critical: 0.0,
297            high: 0.0,
298            medium: 0.0,
299            low: 0.0,
300            background: 0.0,
301        }
302    }
303}
304
305impl DomainEvent {
306    /// Get the session ID associated with this event
307    pub fn session_id(&self) -> SessionId {
308        match self {
309            Self::SessionActivated { session_id, .. } => *session_id,
310            Self::SessionClosed { session_id, .. } => *session_id,
311            Self::SessionExpired { session_id, .. } => *session_id,
312            Self::StreamCreated { session_id, .. } => *session_id,
313            Self::StreamStarted { session_id, .. } => *session_id,
314            Self::StreamCompleted { session_id, .. } => *session_id,
315            Self::StreamFailed { session_id, .. } => *session_id,
316            Self::StreamCancelled { session_id, .. } => *session_id,
317            Self::SkeletonGenerated { session_id, .. } => *session_id,
318            Self::PatchFramesGenerated { session_id, .. } => *session_id,
319            Self::FramesBatched { session_id, .. } => *session_id,
320            Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
321            Self::StreamConfigUpdated { session_id, .. } => *session_id,
322            Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
323            Self::SessionTimedOut { session_id, .. } => *session_id,
324            Self::SessionTimeoutExtended { session_id, .. } => *session_id,
325        }
326    }
327
328    /// Get the stream ID if this is a stream-specific event
329    pub fn stream_id(&self) -> Option<StreamId> {
330        match self {
331            Self::StreamCreated { stream_id, .. } => Some(*stream_id),
332            Self::StreamStarted { stream_id, .. } => Some(*stream_id),
333            Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
334            Self::StreamFailed { stream_id, .. } => Some(*stream_id),
335            Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
336            Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
337            Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
338            Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
339            _ => None,
340        }
341    }
342
343    /// Get the timestamp of this event
344    pub fn timestamp(&self) -> DateTime<Utc> {
345        match self {
346            Self::SessionActivated { timestamp, .. } => *timestamp,
347            Self::SessionClosed { timestamp, .. } => *timestamp,
348            Self::SessionExpired { timestamp, .. } => *timestamp,
349            Self::StreamCreated { timestamp, .. } => *timestamp,
350            Self::StreamStarted { timestamp, .. } => *timestamp,
351            Self::StreamCompleted { timestamp, .. } => *timestamp,
352            Self::StreamFailed { timestamp, .. } => *timestamp,
353            Self::StreamCancelled { timestamp, .. } => *timestamp,
354            Self::SkeletonGenerated { timestamp, .. } => *timestamp,
355            Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
356            Self::FramesBatched { timestamp, .. } => *timestamp,
357            Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
358            Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
359            Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
360            Self::SessionTimedOut { timestamp, .. } => *timestamp,
361            Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
362        }
363    }
364
365    /// Get the event type as a string
366    pub fn event_type(&self) -> &'static str {
367        match self {
368            Self::SessionActivated { .. } => "session_activated",
369            Self::SessionClosed { .. } => "session_closed",
370            Self::SessionExpired { .. } => "session_expired",
371            Self::StreamCreated { .. } => "stream_created",
372            Self::StreamStarted { .. } => "stream_started",
373            Self::StreamCompleted { .. } => "stream_completed",
374            Self::StreamFailed { .. } => "stream_failed",
375            Self::StreamCancelled { .. } => "stream_cancelled",
376            Self::SkeletonGenerated { .. } => "skeleton_generated",
377            Self::PatchFramesGenerated { .. } => "patch_frames_generated",
378            Self::FramesBatched { .. } => "frames_batched",
379            Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
380            Self::StreamConfigUpdated { .. } => "stream_config_updated",
381            Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
382            Self::SessionTimedOut { .. } => "session_timed_out",
383            Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
384        }
385    }
386
387    /// Check if this is a critical event that requires immediate attention
388    pub fn is_critical(&self) -> bool {
389        matches!(
390            self,
391            Self::StreamFailed { .. } | Self::SessionExpired { .. }
392        )
393    }
394
395    /// Check if this is an error event
396    pub fn is_error(&self) -> bool {
397        matches!(self, Self::StreamFailed { .. })
398    }
399
400    /// Check if this is a completion event
401    pub fn is_completion(&self) -> bool {
402        matches!(
403            self,
404            Self::StreamCompleted { .. } | Self::SessionClosed { .. }
405        )
406    }
407}
408
409/// Event sourcing support for storing and retrieving domain events
410pub trait EventStore {
411    /// Append events to the store
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if events cannot be persisted
416    fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
417
418    /// Get events for a specific session
419    ///
420    /// # Errors
421    ///
422    /// Returns an error if events cannot be retrieved
423    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
424
425    /// Get events for a specific stream
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if events cannot be retrieved
430    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
431
432    /// Get all events since a specific timestamp
433    ///
434    /// # Errors
435    ///
436    /// Returns an error if events cannot be retrieved
437    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
438}
439
440/// Simple in-memory event store for testing
441#[derive(Debug, Clone, Default)]
442pub struct InMemoryEventStore {
443    events: Vec<DomainEvent>,
444}
445
446impl InMemoryEventStore {
447    /// Create a new empty event store
448    pub fn new() -> Self {
449        Self::default()
450    }
451
452    /// Get all events in the store
453    pub fn all_events(&self) -> &[DomainEvent] {
454        &self.events
455    }
456
457    /// Get the total number of events in the store
458    pub fn event_count(&self) -> usize {
459        self.events.len()
460    }
461}
462
463impl EventStore for InMemoryEventStore {
464    fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
465        self.events.append(&mut events);
466        Ok(())
467    }
468
469    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
470        Ok(self
471            .events
472            .iter()
473            .filter(|e| e.session_id() == session_id)
474            .cloned()
475            .collect())
476    }
477
478    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
479        Ok(self
480            .events
481            .iter()
482            .filter(|e| e.stream_id() == Some(stream_id))
483            .cloned()
484            .collect())
485    }
486
487    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
488        Ok(self
489            .events
490            .iter()
491            .filter(|e| e.timestamp() > since)
492            .cloned()
493            .collect())
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use crate::value_objects::{SessionId, StreamId};
501
502    #[test]
503    fn test_domain_event_properties() {
504        let session_id = SessionId::new();
505        let stream_id = StreamId::new();
506        let timestamp = Utc::now();
507
508        let event = DomainEvent::StreamCreated {
509            session_id,
510            stream_id,
511            timestamp,
512        };
513
514        assert_eq!(event.session_id(), session_id);
515        assert_eq!(event.stream_id(), Some(stream_id));
516        assert_eq!(event.timestamp(), timestamp);
517        assert_eq!(event.event_type(), "stream_created");
518        assert!(!event.is_critical());
519        assert!(!event.is_error());
520    }
521
522    #[test]
523    fn test_critical_events() {
524        let session_id = SessionId::new();
525        let stream_id = StreamId::new();
526
527        let error_event = DomainEvent::StreamFailed {
528            session_id,
529            stream_id,
530            error: "Connection lost".to_string(),
531            timestamp: Utc::now(),
532        };
533
534        assert!(error_event.is_critical());
535        assert!(error_event.is_error());
536        assert!(!error_event.is_completion());
537    }
538
539    #[test]
540    fn test_event_store() {
541        let mut store = InMemoryEventStore::new();
542        let session_id = SessionId::new();
543        let stream_id = StreamId::new();
544
545        let events = vec![
546            DomainEvent::SessionActivated {
547                session_id,
548                timestamp: Utc::now(),
549            },
550            DomainEvent::StreamCreated {
551                session_id,
552                stream_id,
553                timestamp: Utc::now(),
554            },
555        ];
556
557        store
558            .append_events(events.clone())
559            .expect("Failed to append events to store in test");
560        assert_eq!(store.event_count(), 2);
561
562        let session_events = store
563            .get_events_for_session(session_id)
564            .expect("Failed to retrieve session events in test");
565        assert_eq!(session_events.len(), 2);
566
567        let stream_events = store
568            .get_events_for_stream(stream_id)
569            .expect("Failed to retrieve stream events in test");
570        assert_eq!(stream_events.len(), 1);
571    }
572
573    #[test]
574    fn test_event_serialization() {
575        let session_id = SessionId::new();
576        let event = DomainEvent::SessionActivated {
577            session_id,
578            timestamp: Utc::now(),
579        };
580
581        let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
582        let deserialized: DomainEvent =
583            serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
584
585        assert_eq!(event, deserialized);
586    }
587}
588
589/// Event identifier for tracking and correlation
590#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
591pub struct EventId(uuid::Uuid);
592
593impl EventId {
594    /// Generate new unique event ID
595    pub fn new() -> Self {
596        Self(uuid::Uuid::new_v4())
597    }
598
599    /// Create from existing UUID
600    pub fn from_uuid(uuid: uuid::Uuid) -> Self {
601        Self(uuid)
602    }
603
604    /// Get inner UUID
605    pub fn inner(&self) -> uuid::Uuid {
606        self.0
607    }
608}
609
610impl std::fmt::Display for EventId {
611    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612        write!(f, "{}", self.0)
613    }
614}
615
616impl Default for EventId {
617    fn default() -> Self {
618        Self::new()
619    }
620}
621
622/// GAT-based trait for event subscribers that handle domain events
623pub trait EventSubscriber {
624    /// Future type for handling events
625    type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
626    where
627        Self: 'a;
628
629    /// Handle a domain event
630    fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
631}
632
633/// Extension methods for DomainEvent
634impl DomainEvent {
635    /// Get event ID for tracking (generated if not exists)
636    pub fn event_id(&self) -> EventId {
637        // For now, generate deterministic ID based on event content
638        // In future versions, this should be stored with the event
639        let content = format!("{self:?}");
640        use std::collections::hash_map::DefaultHasher;
641        use std::hash::{Hash, Hasher};
642        let mut hash = DefaultHasher::new();
643        content.hash(&mut hash);
644        let hash_val = hash.finish();
645        let uuid = uuid::Uuid::from_bytes([
646            (hash_val >> 56) as u8,
647            (hash_val >> 48) as u8,
648            (hash_val >> 40) as u8,
649            (hash_val >> 32) as u8,
650            (hash_val >> 24) as u8,
651            (hash_val >> 16) as u8,
652            (hash_val >> 8) as u8,
653            hash_val as u8,
654            0,
655            0,
656            0,
657            0,
658            0,
659            0,
660            0,
661            0,
662        ]);
663        EventId::from_uuid(uuid)
664    }
665
666    /// Get event timestamp
667    pub fn occurred_at(&self) -> DateTime<Utc> {
668        match self {
669            DomainEvent::SessionActivated { timestamp, .. }
670            | DomainEvent::SessionClosed { timestamp, .. }
671            | DomainEvent::SessionExpired { timestamp, .. }
672            | DomainEvent::StreamCreated { timestamp, .. }
673            | DomainEvent::StreamStarted { timestamp, .. }
674            | DomainEvent::StreamCompleted { timestamp, .. }
675            | DomainEvent::StreamFailed { timestamp, .. }
676            | DomainEvent::StreamCancelled { timestamp, .. }
677            | DomainEvent::SkeletonGenerated { timestamp, .. }
678            | DomainEvent::PatchFramesGenerated { timestamp, .. }
679            | DomainEvent::FramesBatched { timestamp, .. }
680            | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
681            | DomainEvent::StreamConfigUpdated { timestamp, .. }
682            | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
683            | DomainEvent::SessionTimedOut { timestamp, .. }
684            | DomainEvent::SessionTimeoutExtended { timestamp, .. } => *timestamp,
685        }
686    }
687
688    /// Get event metadata as key-value pairs
689    pub fn metadata(&self) -> std::collections::HashMap<String, String> {
690        let mut metadata = std::collections::HashMap::new();
691        metadata.insert("event_type".to_string(), self.event_type().to_string());
692        metadata.insert("session_id".to_string(), self.session_id().to_string());
693        metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
694
695        if let Some(stream_id) = self.stream_id() {
696            metadata.insert("stream_id".to_string(), stream_id.to_string());
697        }
698
699        metadata
700    }
701}