Skip to main content

pjson_rs_domain/events/
mod.rs

1//! Domain events for event sourcing and integration
2
3use crate::value_objects::{BackpressureSignal, SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Custom serde for SessionId (domain purity)
8mod serde_session_id {
9    use crate::value_objects::SessionId;
10    use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13    where
14        S: Serializer,
15    {
16        id.as_uuid().serialize(serializer)
17    }
18
19    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20    where
21        D: Deserializer<'de>,
22    {
23        let uuid = uuid::Uuid::deserialize(deserializer)?;
24        Ok(SessionId::from_uuid(uuid))
25    }
26}
27
28/// Custom serde for StreamId (domain purity)
29mod serde_stream_id {
30    use crate::value_objects::StreamId;
31    use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34    where
35        S: Serializer,
36    {
37        id.as_uuid().serialize(serializer)
38    }
39
40    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41    where
42        D: Deserializer<'de>,
43    {
44        let uuid = uuid::Uuid::deserialize(deserializer)?;
45        Ok(StreamId::from_uuid(uuid))
46    }
47}
48
49/// Custom serde for `Option<StreamId>`
50mod serde_option_stream_id {
51    use crate::value_objects::StreamId;
52    use serde::{Deserialize, Deserializer, Serialize, Serializer};
53
54    pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
55    where
56        S: Serializer,
57    {
58        id.map(|i| i.as_uuid()).serialize(serializer)
59    }
60
61    pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
62    where
63        D: Deserializer<'de>,
64    {
65        let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
66        Ok(uuid.map(StreamId::from_uuid))
67    }
68}
69
70/// Session state in its lifecycle
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72#[non_exhaustive]
73pub enum SessionState {
74    /// Session is being initialized
75    Initializing,
76    /// Session is active with streams
77    Active,
78    /// Session is gracefully closing
79    Closing,
80    /// Session completed successfully
81    Completed,
82    /// Session failed with error
83    Failed,
84}
85
86impl SessionState {
87    /// Get string representation without allocation
88    pub fn as_str(&self) -> &'static str {
89        match self {
90            SessionState::Initializing => "Initializing",
91            SessionState::Active => "Active",
92            SessionState::Closing => "Closing",
93            SessionState::Completed => "Completed",
94            SessionState::Failed => "Failed",
95        }
96    }
97}
98
99/// Domain events that represent business-relevant state changes
100///
101/// # Wire format
102///
103/// `DomainEvent` is serialized as a JSON object with an `event_type` discriminator
104/// (`#[serde(tag = "event_type")]`). Adding a new variant changes the wire format —
105/// older deserializers will fail on unknown `event_type` values. The `#[non_exhaustive]`
106/// marker prevents downstream Rust crates from breaking on `match` exhaustiveness, but
107/// it does not address the wire-format compatibility concern; consumers should be
108/// prepared to handle unknown events at the deserialization boundary.
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110#[serde(tag = "event_type", rename_all = "snake_case")]
111#[non_exhaustive]
112pub enum DomainEvent {
113    /// Session was activated and is ready to accept streams
114    SessionActivated {
115        /// ID of the activated session
116        #[serde(with = "serde_session_id")]
117        session_id: SessionId,
118        /// When the session was activated
119        timestamp: DateTime<Utc>,
120    },
121
122    /// Session was closed gracefully
123    SessionClosed {
124        /// ID of the closed session
125        #[serde(with = "serde_session_id")]
126        session_id: SessionId,
127        /// When the session was closed
128        timestamp: DateTime<Utc>,
129    },
130
131    /// Session expired due to timeout
132    SessionExpired {
133        /// ID of the expired session
134        #[serde(with = "serde_session_id")]
135        session_id: SessionId,
136        /// When the session expired
137        timestamp: DateTime<Utc>,
138    },
139
140    /// Session was forcefully closed due to timeout
141    SessionTimedOut {
142        /// ID of the timed out session
143        #[serde(with = "serde_session_id")]
144        session_id: SessionId,
145        /// State the session was in before timeout
146        original_state: SessionState,
147        /// Duration in seconds before timeout occurred
148        timeout_duration: u64,
149        /// When the timeout occurred
150        timestamp: DateTime<Utc>,
151    },
152
153    /// Session timeout was extended
154    SessionTimeoutExtended {
155        /// ID of the session with extended timeout
156        #[serde(with = "serde_session_id")]
157        session_id: SessionId,
158        /// Additional seconds added to the timeout
159        additional_seconds: u64,
160        /// New expiration timestamp
161        new_expires_at: DateTime<Utc>,
162        /// When the extension was applied
163        timestamp: DateTime<Utc>,
164    },
165
166    /// New stream was created in the session
167    StreamCreated {
168        /// ID of the session containing the stream
169        #[serde(with = "serde_session_id")]
170        session_id: SessionId,
171        /// ID of the newly created stream
172        #[serde(with = "serde_stream_id")]
173        stream_id: StreamId,
174        /// When the stream was created
175        timestamp: DateTime<Utc>,
176    },
177
178    /// Stream started sending data
179    StreamStarted {
180        /// ID of the session containing the stream
181        #[serde(with = "serde_session_id")]
182        session_id: SessionId,
183        /// ID of the stream that started
184        #[serde(with = "serde_stream_id")]
185        stream_id: StreamId,
186        /// When the stream started
187        timestamp: DateTime<Utc>,
188    },
189
190    /// Stream completed successfully
191    StreamCompleted {
192        /// ID of the session containing the stream
193        #[serde(with = "serde_session_id")]
194        session_id: SessionId,
195        /// ID of the completed stream
196        #[serde(with = "serde_stream_id")]
197        stream_id: StreamId,
198        /// When the stream completed
199        timestamp: DateTime<Utc>,
200    },
201
202    /// Stream failed with error
203    StreamFailed {
204        /// ID of the session containing the stream
205        #[serde(with = "serde_session_id")]
206        session_id: SessionId,
207        /// ID of the failed stream
208        #[serde(with = "serde_stream_id")]
209        stream_id: StreamId,
210        /// Error message describing the failure
211        error: String,
212        /// When the stream failed
213        timestamp: DateTime<Utc>,
214    },
215
216    /// Stream was cancelled
217    StreamCancelled {
218        /// ID of the session containing the stream
219        #[serde(with = "serde_session_id")]
220        session_id: SessionId,
221        /// ID of the cancelled stream
222        #[serde(with = "serde_stream_id")]
223        stream_id: StreamId,
224        /// When the stream was cancelled
225        timestamp: DateTime<Utc>,
226    },
227
228    /// Skeleton frame was generated for a stream
229    SkeletonGenerated {
230        /// ID of the session containing the stream
231        #[serde(with = "serde_session_id")]
232        session_id: SessionId,
233        /// ID of the stream that generated the skeleton
234        #[serde(with = "serde_stream_id")]
235        stream_id: StreamId,
236        /// Size of the skeleton frame in bytes
237        frame_size_bytes: u64,
238        /// When the skeleton was generated
239        timestamp: DateTime<Utc>,
240    },
241
242    /// Patch frames were generated for a stream
243    PatchFramesGenerated {
244        /// ID of the session containing the stream
245        #[serde(with = "serde_session_id")]
246        session_id: SessionId,
247        /// ID of the stream that generated patches
248        #[serde(with = "serde_stream_id")]
249        stream_id: StreamId,
250        /// Number of patch frames generated
251        frame_count: usize,
252        /// Total size of all patches in bytes
253        total_bytes: u64,
254        /// Highest priority level among the patches
255        highest_priority: u8,
256        /// When the patches were generated
257        timestamp: DateTime<Utc>,
258    },
259
260    /// Multiple frames were batched for efficient sending
261    FramesBatched {
262        /// ID of the session containing the frames
263        #[serde(with = "serde_session_id")]
264        session_id: SessionId,
265        /// Number of frames in the batch
266        frame_count: usize,
267        /// When the batch was created
268        timestamp: DateTime<Utc>,
269    },
270
271    /// Priority threshold was adjusted for adaptive streaming
272    PriorityThresholdAdjusted {
273        /// ID of the session with adjusted threshold
274        #[serde(with = "serde_session_id")]
275        session_id: SessionId,
276        /// Previous priority threshold value
277        old_threshold: u8,
278        /// New priority threshold value
279        new_threshold: u8,
280        /// Reason for the adjustment
281        reason: String,
282        /// When the threshold was adjusted
283        timestamp: DateTime<Utc>,
284    },
285
286    /// Stream configuration was updated
287    StreamConfigUpdated {
288        /// ID of the session containing the stream
289        #[serde(with = "serde_session_id")]
290        session_id: SessionId,
291        /// ID of the stream with updated configuration
292        #[serde(with = "serde_stream_id")]
293        stream_id: StreamId,
294        /// When the configuration was updated
295        timestamp: DateTime<Utc>,
296    },
297
298    /// Performance metrics were recorded
299    PerformanceMetricsRecorded {
300        /// ID of the session being measured
301        #[serde(with = "serde_session_id")]
302        session_id: SessionId,
303        /// Recorded performance metrics
304        metrics: PerformanceMetrics,
305        /// When the metrics were recorded
306        timestamp: DateTime<Utc>,
307    },
308
309    /// Backpressure signal received from client
310    BackpressureReceived {
311        /// ID of the session receiving backpressure
312        #[serde(with = "serde_session_id")]
313        session_id: SessionId,
314        /// ID of the stream if specific to a stream
315        #[serde(default, skip_serializing_if = "Option::is_none")]
316        #[serde(with = "serde_option_stream_id")]
317        stream_id: Option<StreamId>,
318        /// Backpressure signal type
319        signal: BackpressureSignal,
320        /// When the signal was received
321        timestamp: DateTime<Utc>,
322    },
323
324    /// Stream was paused due to flow control
325    StreamPaused {
326        /// ID of the session containing the stream
327        #[serde(with = "serde_session_id")]
328        session_id: SessionId,
329        /// ID of the paused stream
330        #[serde(with = "serde_stream_id")]
331        stream_id: StreamId,
332        /// Reason for pausing
333        reason: String,
334        /// When the stream was paused
335        timestamp: DateTime<Utc>,
336    },
337
338    /// Stream was resumed after pause
339    StreamResumed {
340        /// ID of the session containing the stream
341        #[serde(with = "serde_session_id")]
342        session_id: SessionId,
343        /// ID of the resumed stream
344        #[serde(with = "serde_stream_id")]
345        stream_id: StreamId,
346        /// Duration the stream was paused in milliseconds
347        paused_duration_ms: u64,
348        /// When the stream was resumed
349        timestamp: DateTime<Utc>,
350    },
351
352    /// Flow control credits were updated
353    CreditsUpdated {
354        /// ID of the session with updated credits
355        #[serde(with = "serde_session_id")]
356        session_id: SessionId,
357        /// New available credits
358        available_credits: usize,
359        /// Maximum allowed credits
360        max_credits: usize,
361        /// When credits were updated
362        timestamp: DateTime<Utc>,
363    },
364}
365
366/// Performance metrics for monitoring and optimization
367#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
368pub struct PerformanceMetrics {
369    /// Number of frames transmitted per second
370    pub frames_per_second: f64,
371    /// Number of bytes transmitted per second
372    pub bytes_per_second: f64,
373    /// Average size of frames in bytes
374    pub average_frame_size: f64,
375    /// Distribution of frames across priority levels
376    pub priority_distribution: PriorityDistribution,
377    /// Network latency in milliseconds, if available
378    pub latency_ms: Option<u64>,
379}
380
381/// Distribution of frames by priority level
382#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
383pub struct PriorityDistribution {
384    /// Number of critical priority frames
385    pub critical_frames: u64,
386    /// Number of high priority frames
387    pub high_frames: u64,
388    /// Number of medium priority frames
389    pub medium_frames: u64,
390    /// Number of low priority frames
391    pub low_frames: u64,
392    /// Number of background priority frames
393    pub background_frames: u64,
394}
395
396impl PriorityDistribution {
397    /// Create new empty distribution
398    pub fn new() -> Self {
399        Self::default()
400    }
401
402    /// Get total frames count
403    pub fn total_frames(&self) -> u64 {
404        self.critical_frames
405            + self.high_frames
406            + self.medium_frames
407            + self.low_frames
408            + self.background_frames
409    }
410
411    /// Convert to percentages (0.0-1.0)
412    pub fn as_percentages(&self) -> PriorityPercentages {
413        let total = self.total_frames() as f64;
414        if total == 0.0 {
415            return PriorityPercentages::default();
416        }
417
418        PriorityPercentages {
419            critical: self.critical_frames as f64 / total,
420            high: self.high_frames as f64 / total,
421            medium: self.medium_frames as f64 / total,
422            low: self.low_frames as f64 / total,
423            background: self.background_frames as f64 / total,
424        }
425    }
426
427    /// Convert from count-based version
428    pub fn from_counts(
429        critical_count: u64,
430        high_count: u64,
431        medium_count: u64,
432        low_count: u64,
433        background_count: u64,
434    ) -> Self {
435        Self {
436            critical_frames: critical_count,
437            high_frames: high_count,
438            medium_frames: medium_count,
439            low_frames: low_count,
440            background_frames: background_count,
441        }
442    }
443}
444
445/// Priority distribution as percentages (for demos and visualization)
446#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
447pub struct PriorityPercentages {
448    /// Critical priority percentage (0.0-1.0)
449    pub critical: f64,
450    /// High priority percentage (0.0-1.0)
451    pub high: f64,
452    /// Medium priority percentage (0.0-1.0)
453    pub medium: f64,
454    /// Low priority percentage (0.0-1.0)
455    pub low: f64,
456    /// Background priority percentage (0.0-1.0)
457    pub background: f64,
458}
459
460impl Default for PriorityPercentages {
461    fn default() -> Self {
462        Self {
463            critical: 0.0,
464            high: 0.0,
465            medium: 0.0,
466            low: 0.0,
467            background: 0.0,
468        }
469    }
470}
471
472impl DomainEvent {
473    /// Get the session ID associated with this event
474    pub fn session_id(&self) -> SessionId {
475        match self {
476            Self::SessionActivated { session_id, .. } => *session_id,
477            Self::SessionClosed { session_id, .. } => *session_id,
478            Self::SessionExpired { session_id, .. } => *session_id,
479            Self::StreamCreated { session_id, .. } => *session_id,
480            Self::StreamStarted { session_id, .. } => *session_id,
481            Self::StreamCompleted { session_id, .. } => *session_id,
482            Self::StreamFailed { session_id, .. } => *session_id,
483            Self::StreamCancelled { session_id, .. } => *session_id,
484            Self::SkeletonGenerated { session_id, .. } => *session_id,
485            Self::PatchFramesGenerated { session_id, .. } => *session_id,
486            Self::FramesBatched { session_id, .. } => *session_id,
487            Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
488            Self::StreamConfigUpdated { session_id, .. } => *session_id,
489            Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
490            Self::SessionTimedOut { session_id, .. } => *session_id,
491            Self::SessionTimeoutExtended { session_id, .. } => *session_id,
492            Self::BackpressureReceived { session_id, .. } => *session_id,
493            Self::StreamPaused { session_id, .. } => *session_id,
494            Self::StreamResumed { session_id, .. } => *session_id,
495            Self::CreditsUpdated { session_id, .. } => *session_id,
496        }
497    }
498
499    /// Get the stream ID if this is a stream-specific event
500    pub fn stream_id(&self) -> Option<StreamId> {
501        match self {
502            Self::StreamCreated { stream_id, .. } => Some(*stream_id),
503            Self::StreamStarted { stream_id, .. } => Some(*stream_id),
504            Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
505            Self::StreamFailed { stream_id, .. } => Some(*stream_id),
506            Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
507            Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
508            Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
509            Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
510            Self::StreamPaused { stream_id, .. } => Some(*stream_id),
511            Self::StreamResumed { stream_id, .. } => Some(*stream_id),
512            Self::BackpressureReceived { stream_id, .. } => *stream_id,
513            _ => None,
514        }
515    }
516
517    /// Get the timestamp of this event
518    pub fn timestamp(&self) -> DateTime<Utc> {
519        match self {
520            Self::SessionActivated { timestamp, .. } => *timestamp,
521            Self::SessionClosed { timestamp, .. } => *timestamp,
522            Self::SessionExpired { timestamp, .. } => *timestamp,
523            Self::StreamCreated { timestamp, .. } => *timestamp,
524            Self::StreamStarted { timestamp, .. } => *timestamp,
525            Self::StreamCompleted { timestamp, .. } => *timestamp,
526            Self::StreamFailed { timestamp, .. } => *timestamp,
527            Self::StreamCancelled { timestamp, .. } => *timestamp,
528            Self::SkeletonGenerated { timestamp, .. } => *timestamp,
529            Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
530            Self::FramesBatched { timestamp, .. } => *timestamp,
531            Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
532            Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
533            Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
534            Self::SessionTimedOut { timestamp, .. } => *timestamp,
535            Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
536            Self::BackpressureReceived { timestamp, .. } => *timestamp,
537            Self::StreamPaused { timestamp, .. } => *timestamp,
538            Self::StreamResumed { timestamp, .. } => *timestamp,
539            Self::CreditsUpdated { timestamp, .. } => *timestamp,
540        }
541    }
542
543    /// Get the event type as a string
544    pub fn event_type(&self) -> &'static str {
545        match self {
546            Self::SessionActivated { .. } => "session_activated",
547            Self::SessionClosed { .. } => "session_closed",
548            Self::SessionExpired { .. } => "session_expired",
549            Self::StreamCreated { .. } => "stream_created",
550            Self::StreamStarted { .. } => "stream_started",
551            Self::StreamCompleted { .. } => "stream_completed",
552            Self::StreamFailed { .. } => "stream_failed",
553            Self::StreamCancelled { .. } => "stream_cancelled",
554            Self::SkeletonGenerated { .. } => "skeleton_generated",
555            Self::PatchFramesGenerated { .. } => "patch_frames_generated",
556            Self::FramesBatched { .. } => "frames_batched",
557            Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
558            Self::StreamConfigUpdated { .. } => "stream_config_updated",
559            Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
560            Self::SessionTimedOut { .. } => "session_timed_out",
561            Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
562            Self::BackpressureReceived { .. } => "backpressure_received",
563            Self::StreamPaused { .. } => "stream_paused",
564            Self::StreamResumed { .. } => "stream_resumed",
565            Self::CreditsUpdated { .. } => "credits_updated",
566        }
567    }
568
569    /// Check if this is a critical event that requires immediate attention
570    pub fn is_critical(&self) -> bool {
571        matches!(
572            self,
573            Self::StreamFailed { .. } | Self::SessionExpired { .. }
574        )
575    }
576
577    /// Check if this is an error event
578    pub fn is_error(&self) -> bool {
579        matches!(self, Self::StreamFailed { .. })
580    }
581
582    /// Check if this is a completion event
583    pub fn is_completion(&self) -> bool {
584        matches!(
585            self,
586            Self::StreamCompleted { .. } | Self::SessionClosed { .. }
587        )
588    }
589}
590
591/// Event sourcing support for storing and retrieving domain events
592pub trait EventStore {
593    /// Append events to the store
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if events cannot be persisted
598    fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
599
600    /// Get events for a specific session
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if events cannot be retrieved
605    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
606
607    /// Get events for a specific stream
608    ///
609    /// # Errors
610    ///
611    /// Returns an error if events cannot be retrieved
612    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
613
614    /// Get all events since a specific timestamp
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if events cannot be retrieved
619    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
620}
621
622/// Simple in-memory event store for testing
623#[derive(Debug, Clone, Default)]
624pub struct InMemoryEventStore {
625    events: Vec<DomainEvent>,
626}
627
628impl InMemoryEventStore {
629    /// Create a new empty event store
630    pub fn new() -> Self {
631        Self::default()
632    }
633
634    /// Get all events in the store
635    pub fn all_events(&self) -> &[DomainEvent] {
636        &self.events
637    }
638
639    /// Get the total number of events in the store
640    pub fn event_count(&self) -> usize {
641        self.events.len()
642    }
643}
644
645impl EventStore for InMemoryEventStore {
646    fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
647        self.events.append(&mut events);
648        Ok(())
649    }
650
651    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
652        Ok(self
653            .events
654            .iter()
655            .filter(|e| e.session_id() == session_id)
656            .cloned()
657            .collect())
658    }
659
660    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
661        Ok(self
662            .events
663            .iter()
664            .filter(|e| e.stream_id() == Some(stream_id))
665            .cloned()
666            .collect())
667    }
668
669    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
670        Ok(self
671            .events
672            .iter()
673            .filter(|e| e.timestamp() > since)
674            .cloned()
675            .collect())
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682    use crate::value_objects::{SessionId, StreamId};
683
684    #[test]
685    fn test_domain_event_properties() {
686        let session_id = SessionId::new();
687        let stream_id = StreamId::new();
688        let timestamp = Utc::now();
689
690        let event = DomainEvent::StreamCreated {
691            session_id,
692            stream_id,
693            timestamp,
694        };
695
696        assert_eq!(event.session_id(), session_id);
697        assert_eq!(event.stream_id(), Some(stream_id));
698        assert_eq!(event.timestamp(), timestamp);
699        assert_eq!(event.event_type(), "stream_created");
700        assert!(!event.is_critical());
701        assert!(!event.is_error());
702    }
703
704    #[test]
705    fn test_critical_events() {
706        let session_id = SessionId::new();
707        let stream_id = StreamId::new();
708
709        let error_event = DomainEvent::StreamFailed {
710            session_id,
711            stream_id,
712            error: "Connection lost".to_string(),
713            timestamp: Utc::now(),
714        };
715
716        assert!(error_event.is_critical());
717        assert!(error_event.is_error());
718        assert!(!error_event.is_completion());
719    }
720
721    #[test]
722    fn test_event_store() {
723        let mut store = InMemoryEventStore::new();
724        let session_id = SessionId::new();
725        let stream_id = StreamId::new();
726
727        let events = vec![
728            DomainEvent::SessionActivated {
729                session_id,
730                timestamp: Utc::now(),
731            },
732            DomainEvent::StreamCreated {
733                session_id,
734                stream_id,
735                timestamp: Utc::now(),
736            },
737        ];
738
739        store
740            .append_events(events.clone())
741            .expect("Failed to append events to store in test");
742        assert_eq!(store.event_count(), 2);
743
744        let session_events = store
745            .get_events_for_session(session_id)
746            .expect("Failed to retrieve session events in test");
747        assert_eq!(session_events.len(), 2);
748
749        let stream_events = store
750            .get_events_for_stream(stream_id)
751            .expect("Failed to retrieve stream events in test");
752        assert_eq!(stream_events.len(), 1);
753    }
754
755    #[test]
756    fn test_event_serialization() {
757        let session_id = SessionId::new();
758        let event = DomainEvent::SessionActivated {
759            session_id,
760            timestamp: Utc::now(),
761        };
762
763        let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
764        let deserialized: DomainEvent =
765            serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
766
767        assert_eq!(event, deserialized);
768    }
769}
770
771/// Event identifier for tracking and correlation
772#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
773pub struct EventId(uuid::Uuid);
774
775impl EventId {
776    /// Generate new unique event ID
777    pub fn new() -> Self {
778        Self(uuid::Uuid::new_v4())
779    }
780
781    /// Create from existing UUID
782    pub fn from_uuid(uuid: uuid::Uuid) -> Self {
783        Self(uuid)
784    }
785
786    /// Get inner UUID
787    pub fn inner(&self) -> uuid::Uuid {
788        self.0
789    }
790}
791
792impl std::fmt::Display for EventId {
793    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
794        write!(f, "{}", self.0)
795    }
796}
797
798impl Default for EventId {
799    fn default() -> Self {
800        Self::new()
801    }
802}
803
804/// GAT-based trait for event subscribers that handle domain events
805pub trait EventSubscriber {
806    /// Future type for handling events
807    type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
808    where
809        Self: 'a;
810
811    /// Handle a domain event
812    fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
813}
814
815/// Extension methods for DomainEvent
816impl DomainEvent {
817    /// Get event ID for tracking (generated if not exists)
818    pub fn event_id(&self) -> EventId {
819        // For now, generate deterministic ID based on event content
820        // In future versions, this should be stored with the event
821        let content = format!("{self:?}");
822        use std::collections::hash_map::DefaultHasher;
823        use std::hash::{Hash, Hasher};
824        let mut hash = DefaultHasher::new();
825        content.hash(&mut hash);
826        let hash_val = hash.finish();
827        let uuid = uuid::Uuid::from_bytes([
828            (hash_val >> 56) as u8,
829            (hash_val >> 48) as u8,
830            (hash_val >> 40) as u8,
831            (hash_val >> 32) as u8,
832            (hash_val >> 24) as u8,
833            (hash_val >> 16) as u8,
834            (hash_val >> 8) as u8,
835            hash_val as u8,
836            0,
837            0,
838            0,
839            0,
840            0,
841            0,
842            0,
843            0,
844        ]);
845        EventId::from_uuid(uuid)
846    }
847
848    /// Get event timestamp
849    pub fn occurred_at(&self) -> DateTime<Utc> {
850        match self {
851            DomainEvent::SessionActivated { timestamp, .. }
852            | DomainEvent::SessionClosed { timestamp, .. }
853            | DomainEvent::SessionExpired { timestamp, .. }
854            | DomainEvent::StreamCreated { timestamp, .. }
855            | DomainEvent::StreamStarted { timestamp, .. }
856            | DomainEvent::StreamCompleted { timestamp, .. }
857            | DomainEvent::StreamFailed { timestamp, .. }
858            | DomainEvent::StreamCancelled { timestamp, .. }
859            | DomainEvent::SkeletonGenerated { timestamp, .. }
860            | DomainEvent::PatchFramesGenerated { timestamp, .. }
861            | DomainEvent::FramesBatched { timestamp, .. }
862            | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
863            | DomainEvent::StreamConfigUpdated { timestamp, .. }
864            | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
865            | DomainEvent::SessionTimedOut { timestamp, .. }
866            | DomainEvent::SessionTimeoutExtended { timestamp, .. }
867            | DomainEvent::BackpressureReceived { timestamp, .. }
868            | DomainEvent::StreamPaused { timestamp, .. }
869            | DomainEvent::StreamResumed { timestamp, .. }
870            | DomainEvent::CreditsUpdated { timestamp, .. } => *timestamp,
871        }
872    }
873
874    /// Get event metadata as key-value pairs
875    pub fn metadata(&self) -> std::collections::HashMap<String, String> {
876        let mut metadata = std::collections::HashMap::new();
877        metadata.insert("event_type".to_string(), self.event_type().to_string());
878        metadata.insert("session_id".to_string(), self.session_id().to_string());
879        metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
880
881        if let Some(stream_id) = self.stream_id() {
882            metadata.insert("stream_id".to_string(), stream_id.to_string());
883        }
884
885        metadata
886    }
887}