Skip to main content

pjson_rs_domain/events/
mod.rs

1//! Domain events for event sourcing and integration
2
3use crate::value_objects::{BackpressureSignal, SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Custom serde for SessionId (domain purity)
8mod serde_session_id {
9    use crate::value_objects::SessionId;
10    use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13    where
14        S: Serializer,
15    {
16        id.as_uuid().serialize(serializer)
17    }
18
19    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20    where
21        D: Deserializer<'de>,
22    {
23        let uuid = uuid::Uuid::deserialize(deserializer)?;
24        Ok(SessionId::from_uuid(uuid))
25    }
26}
27
28/// Custom serde for StreamId (domain purity)
29mod serde_stream_id {
30    use crate::value_objects::StreamId;
31    use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34    where
35        S: Serializer,
36    {
37        id.as_uuid().serialize(serializer)
38    }
39
40    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41    where
42        D: Deserializer<'de>,
43    {
44        let uuid = uuid::Uuid::deserialize(deserializer)?;
45        Ok(StreamId::from_uuid(uuid))
46    }
47}
48
49/// Custom serde for `Option<StreamId>`
50#[allow(dead_code)]
51mod serde_option_stream_id {
52    use crate::value_objects::StreamId;
53    use serde::{Deserialize, Deserializer, Serialize, Serializer};
54
55    pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: Serializer,
58    {
59        id.map(|i| i.as_uuid()).serialize(serializer)
60    }
61
62    pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
63    where
64        D: Deserializer<'de>,
65    {
66        let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
67        Ok(uuid.map(StreamId::from_uuid))
68    }
69}
70
71/// Session state in its lifecycle
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub enum SessionState {
74    /// Session is being initialized
75    Initializing,
76    /// Session is active with streams
77    Active,
78    /// Session is gracefully closing
79    Closing,
80    /// Session completed successfully
81    Completed,
82    /// Session failed with error
83    Failed,
84}
85
86impl SessionState {
87    /// Get string representation without allocation
88    pub fn as_str(&self) -> &'static str {
89        match self {
90            SessionState::Initializing => "Initializing",
91            SessionState::Active => "Active",
92            SessionState::Closing => "Closing",
93            SessionState::Completed => "Completed",
94            SessionState::Failed => "Failed",
95        }
96    }
97}
98
99/// Domain events that represent business-relevant state changes
100#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
101#[serde(tag = "event_type", rename_all = "snake_case")]
102pub enum DomainEvent {
103    /// Session was activated and is ready to accept streams
104    SessionActivated {
105        /// ID of the activated session
106        #[serde(with = "serde_session_id")]
107        session_id: SessionId,
108        /// When the session was activated
109        timestamp: DateTime<Utc>,
110    },
111
112    /// Session was closed gracefully
113    SessionClosed {
114        /// ID of the closed session
115        #[serde(with = "serde_session_id")]
116        session_id: SessionId,
117        /// When the session was closed
118        timestamp: DateTime<Utc>,
119    },
120
121    /// Session expired due to timeout
122    SessionExpired {
123        /// ID of the expired session
124        #[serde(with = "serde_session_id")]
125        session_id: SessionId,
126        /// When the session expired
127        timestamp: DateTime<Utc>,
128    },
129
130    /// Session was forcefully closed due to timeout
131    SessionTimedOut {
132        /// ID of the timed out session
133        #[serde(with = "serde_session_id")]
134        session_id: SessionId,
135        /// State the session was in before timeout
136        original_state: SessionState,
137        /// Duration in seconds before timeout occurred
138        timeout_duration: u64,
139        /// When the timeout occurred
140        timestamp: DateTime<Utc>,
141    },
142
143    /// Session timeout was extended
144    SessionTimeoutExtended {
145        /// ID of the session with extended timeout
146        #[serde(with = "serde_session_id")]
147        session_id: SessionId,
148        /// Additional seconds added to the timeout
149        additional_seconds: u64,
150        /// New expiration timestamp
151        new_expires_at: DateTime<Utc>,
152        /// When the extension was applied
153        timestamp: DateTime<Utc>,
154    },
155
156    /// New stream was created in the session
157    StreamCreated {
158        /// ID of the session containing the stream
159        #[serde(with = "serde_session_id")]
160        session_id: SessionId,
161        /// ID of the newly created stream
162        #[serde(with = "serde_stream_id")]
163        stream_id: StreamId,
164        /// When the stream was created
165        timestamp: DateTime<Utc>,
166    },
167
168    /// Stream started sending data
169    StreamStarted {
170        /// ID of the session containing the stream
171        #[serde(with = "serde_session_id")]
172        session_id: SessionId,
173        /// ID of the stream that started
174        #[serde(with = "serde_stream_id")]
175        stream_id: StreamId,
176        /// When the stream started
177        timestamp: DateTime<Utc>,
178    },
179
180    /// Stream completed successfully
181    StreamCompleted {
182        /// ID of the session containing the stream
183        #[serde(with = "serde_session_id")]
184        session_id: SessionId,
185        /// ID of the completed stream
186        #[serde(with = "serde_stream_id")]
187        stream_id: StreamId,
188        /// When the stream completed
189        timestamp: DateTime<Utc>,
190    },
191
192    /// Stream failed with error
193    StreamFailed {
194        /// ID of the session containing the stream
195        #[serde(with = "serde_session_id")]
196        session_id: SessionId,
197        /// ID of the failed stream
198        #[serde(with = "serde_stream_id")]
199        stream_id: StreamId,
200        /// Error message describing the failure
201        error: String,
202        /// When the stream failed
203        timestamp: DateTime<Utc>,
204    },
205
206    /// Stream was cancelled
207    StreamCancelled {
208        /// ID of the session containing the stream
209        #[serde(with = "serde_session_id")]
210        session_id: SessionId,
211        /// ID of the cancelled stream
212        #[serde(with = "serde_stream_id")]
213        stream_id: StreamId,
214        /// When the stream was cancelled
215        timestamp: DateTime<Utc>,
216    },
217
218    /// Skeleton frame was generated for a stream
219    SkeletonGenerated {
220        /// ID of the session containing the stream
221        #[serde(with = "serde_session_id")]
222        session_id: SessionId,
223        /// ID of the stream that generated the skeleton
224        #[serde(with = "serde_stream_id")]
225        stream_id: StreamId,
226        /// Size of the skeleton frame in bytes
227        frame_size_bytes: u64,
228        /// When the skeleton was generated
229        timestamp: DateTime<Utc>,
230    },
231
232    /// Patch frames were generated for a stream
233    PatchFramesGenerated {
234        /// ID of the session containing the stream
235        #[serde(with = "serde_session_id")]
236        session_id: SessionId,
237        /// ID of the stream that generated patches
238        #[serde(with = "serde_stream_id")]
239        stream_id: StreamId,
240        /// Number of patch frames generated
241        frame_count: usize,
242        /// Total size of all patches in bytes
243        total_bytes: u64,
244        /// Highest priority level among the patches
245        highest_priority: u8,
246        /// When the patches were generated
247        timestamp: DateTime<Utc>,
248    },
249
250    /// Multiple frames were batched for efficient sending
251    FramesBatched {
252        /// ID of the session containing the frames
253        #[serde(with = "serde_session_id")]
254        session_id: SessionId,
255        /// Number of frames in the batch
256        frame_count: usize,
257        /// When the batch was created
258        timestamp: DateTime<Utc>,
259    },
260
261    /// Priority threshold was adjusted for adaptive streaming
262    PriorityThresholdAdjusted {
263        /// ID of the session with adjusted threshold
264        #[serde(with = "serde_session_id")]
265        session_id: SessionId,
266        /// Previous priority threshold value
267        old_threshold: u8,
268        /// New priority threshold value
269        new_threshold: u8,
270        /// Reason for the adjustment
271        reason: String,
272        /// When the threshold was adjusted
273        timestamp: DateTime<Utc>,
274    },
275
276    /// Stream configuration was updated
277    StreamConfigUpdated {
278        /// ID of the session containing the stream
279        #[serde(with = "serde_session_id")]
280        session_id: SessionId,
281        /// ID of the stream with updated configuration
282        #[serde(with = "serde_stream_id")]
283        stream_id: StreamId,
284        /// When the configuration was updated
285        timestamp: DateTime<Utc>,
286    },
287
288    /// Performance metrics were recorded
289    PerformanceMetricsRecorded {
290        /// ID of the session being measured
291        #[serde(with = "serde_session_id")]
292        session_id: SessionId,
293        /// Recorded performance metrics
294        metrics: PerformanceMetrics,
295        /// When the metrics were recorded
296        timestamp: DateTime<Utc>,
297    },
298
299    /// Backpressure signal received from client
300    BackpressureReceived {
301        /// ID of the session receiving backpressure
302        #[serde(with = "serde_session_id")]
303        session_id: SessionId,
304        /// ID of the stream if specific to a stream
305        #[serde(default, skip_serializing_if = "Option::is_none")]
306        #[serde(with = "serde_option_stream_id")]
307        stream_id: Option<StreamId>,
308        /// Backpressure signal type
309        signal: BackpressureSignal,
310        /// When the signal was received
311        timestamp: DateTime<Utc>,
312    },
313
314    /// Stream was paused due to flow control
315    StreamPaused {
316        /// ID of the session containing the stream
317        #[serde(with = "serde_session_id")]
318        session_id: SessionId,
319        /// ID of the paused stream
320        #[serde(with = "serde_stream_id")]
321        stream_id: StreamId,
322        /// Reason for pausing
323        reason: String,
324        /// When the stream was paused
325        timestamp: DateTime<Utc>,
326    },
327
328    /// Stream was resumed after pause
329    StreamResumed {
330        /// ID of the session containing the stream
331        #[serde(with = "serde_session_id")]
332        session_id: SessionId,
333        /// ID of the resumed stream
334        #[serde(with = "serde_stream_id")]
335        stream_id: StreamId,
336        /// Duration the stream was paused in milliseconds
337        paused_duration_ms: u64,
338        /// When the stream was resumed
339        timestamp: DateTime<Utc>,
340    },
341
342    /// Flow control credits were updated
343    CreditsUpdated {
344        /// ID of the session with updated credits
345        #[serde(with = "serde_session_id")]
346        session_id: SessionId,
347        /// New available credits
348        available_credits: usize,
349        /// Maximum allowed credits
350        max_credits: usize,
351        /// When credits were updated
352        timestamp: DateTime<Utc>,
353    },
354}
355
356/// Performance metrics for monitoring and optimization
357#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
358pub struct PerformanceMetrics {
359    /// Number of frames transmitted per second
360    pub frames_per_second: f64,
361    /// Number of bytes transmitted per second
362    pub bytes_per_second: f64,
363    /// Average size of frames in bytes
364    pub average_frame_size: f64,
365    /// Distribution of frames across priority levels
366    pub priority_distribution: PriorityDistribution,
367    /// Network latency in milliseconds, if available
368    pub latency_ms: Option<u64>,
369}
370
371/// Distribution of frames by priority level
372#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
373pub struct PriorityDistribution {
374    /// Number of critical priority frames
375    pub critical_frames: u64,
376    /// Number of high priority frames
377    pub high_frames: u64,
378    /// Number of medium priority frames
379    pub medium_frames: u64,
380    /// Number of low priority frames
381    pub low_frames: u64,
382    /// Number of background priority frames
383    pub background_frames: u64,
384}
385
386impl PriorityDistribution {
387    /// Create new empty distribution
388    pub fn new() -> Self {
389        Self::default()
390    }
391
392    /// Get total frames count
393    pub fn total_frames(&self) -> u64 {
394        self.critical_frames
395            + self.high_frames
396            + self.medium_frames
397            + self.low_frames
398            + self.background_frames
399    }
400
401    /// Convert to percentages (0.0-1.0)
402    pub fn as_percentages(&self) -> PriorityPercentages {
403        let total = self.total_frames() as f64;
404        if total == 0.0 {
405            return PriorityPercentages::default();
406        }
407
408        PriorityPercentages {
409            critical: self.critical_frames as f64 / total,
410            high: self.high_frames as f64 / total,
411            medium: self.medium_frames as f64 / total,
412            low: self.low_frames as f64 / total,
413            background: self.background_frames as f64 / total,
414        }
415    }
416
417    /// Convert from count-based version
418    pub fn from_counts(
419        critical_count: u64,
420        high_count: u64,
421        medium_count: u64,
422        low_count: u64,
423        background_count: u64,
424    ) -> Self {
425        Self {
426            critical_frames: critical_count,
427            high_frames: high_count,
428            medium_frames: medium_count,
429            low_frames: low_count,
430            background_frames: background_count,
431        }
432    }
433}
434
435/// Priority distribution as percentages (for demos and visualization)
436#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
437pub struct PriorityPercentages {
438    /// Critical priority percentage (0.0-1.0)
439    pub critical: f64,
440    /// High priority percentage (0.0-1.0)
441    pub high: f64,
442    /// Medium priority percentage (0.0-1.0)
443    pub medium: f64,
444    /// Low priority percentage (0.0-1.0)
445    pub low: f64,
446    /// Background priority percentage (0.0-1.0)
447    pub background: f64,
448}
449
450impl Default for PriorityPercentages {
451    fn default() -> Self {
452        Self {
453            critical: 0.0,
454            high: 0.0,
455            medium: 0.0,
456            low: 0.0,
457            background: 0.0,
458        }
459    }
460}
461
462impl DomainEvent {
463    /// Get the session ID associated with this event
464    pub fn session_id(&self) -> SessionId {
465        match self {
466            Self::SessionActivated { session_id, .. } => *session_id,
467            Self::SessionClosed { session_id, .. } => *session_id,
468            Self::SessionExpired { session_id, .. } => *session_id,
469            Self::StreamCreated { session_id, .. } => *session_id,
470            Self::StreamStarted { session_id, .. } => *session_id,
471            Self::StreamCompleted { session_id, .. } => *session_id,
472            Self::StreamFailed { session_id, .. } => *session_id,
473            Self::StreamCancelled { session_id, .. } => *session_id,
474            Self::SkeletonGenerated { session_id, .. } => *session_id,
475            Self::PatchFramesGenerated { session_id, .. } => *session_id,
476            Self::FramesBatched { session_id, .. } => *session_id,
477            Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
478            Self::StreamConfigUpdated { session_id, .. } => *session_id,
479            Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
480            Self::SessionTimedOut { session_id, .. } => *session_id,
481            Self::SessionTimeoutExtended { session_id, .. } => *session_id,
482            Self::BackpressureReceived { session_id, .. } => *session_id,
483            Self::StreamPaused { session_id, .. } => *session_id,
484            Self::StreamResumed { session_id, .. } => *session_id,
485            Self::CreditsUpdated { session_id, .. } => *session_id,
486        }
487    }
488
489    /// Get the stream ID if this is a stream-specific event
490    pub fn stream_id(&self) -> Option<StreamId> {
491        match self {
492            Self::StreamCreated { stream_id, .. } => Some(*stream_id),
493            Self::StreamStarted { stream_id, .. } => Some(*stream_id),
494            Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
495            Self::StreamFailed { stream_id, .. } => Some(*stream_id),
496            Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
497            Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
498            Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
499            Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
500            Self::StreamPaused { stream_id, .. } => Some(*stream_id),
501            Self::StreamResumed { stream_id, .. } => Some(*stream_id),
502            Self::BackpressureReceived { stream_id, .. } => *stream_id,
503            _ => None,
504        }
505    }
506
507    /// Get the timestamp of this event
508    pub fn timestamp(&self) -> DateTime<Utc> {
509        match self {
510            Self::SessionActivated { timestamp, .. } => *timestamp,
511            Self::SessionClosed { timestamp, .. } => *timestamp,
512            Self::SessionExpired { timestamp, .. } => *timestamp,
513            Self::StreamCreated { timestamp, .. } => *timestamp,
514            Self::StreamStarted { timestamp, .. } => *timestamp,
515            Self::StreamCompleted { timestamp, .. } => *timestamp,
516            Self::StreamFailed { timestamp, .. } => *timestamp,
517            Self::StreamCancelled { timestamp, .. } => *timestamp,
518            Self::SkeletonGenerated { timestamp, .. } => *timestamp,
519            Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
520            Self::FramesBatched { timestamp, .. } => *timestamp,
521            Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
522            Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
523            Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
524            Self::SessionTimedOut { timestamp, .. } => *timestamp,
525            Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
526            Self::BackpressureReceived { timestamp, .. } => *timestamp,
527            Self::StreamPaused { timestamp, .. } => *timestamp,
528            Self::StreamResumed { timestamp, .. } => *timestamp,
529            Self::CreditsUpdated { timestamp, .. } => *timestamp,
530        }
531    }
532
533    /// Get the event type as a string
534    pub fn event_type(&self) -> &'static str {
535        match self {
536            Self::SessionActivated { .. } => "session_activated",
537            Self::SessionClosed { .. } => "session_closed",
538            Self::SessionExpired { .. } => "session_expired",
539            Self::StreamCreated { .. } => "stream_created",
540            Self::StreamStarted { .. } => "stream_started",
541            Self::StreamCompleted { .. } => "stream_completed",
542            Self::StreamFailed { .. } => "stream_failed",
543            Self::StreamCancelled { .. } => "stream_cancelled",
544            Self::SkeletonGenerated { .. } => "skeleton_generated",
545            Self::PatchFramesGenerated { .. } => "patch_frames_generated",
546            Self::FramesBatched { .. } => "frames_batched",
547            Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
548            Self::StreamConfigUpdated { .. } => "stream_config_updated",
549            Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
550            Self::SessionTimedOut { .. } => "session_timed_out",
551            Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
552            Self::BackpressureReceived { .. } => "backpressure_received",
553            Self::StreamPaused { .. } => "stream_paused",
554            Self::StreamResumed { .. } => "stream_resumed",
555            Self::CreditsUpdated { .. } => "credits_updated",
556        }
557    }
558
559    /// Check if this is a critical event that requires immediate attention
560    pub fn is_critical(&self) -> bool {
561        matches!(
562            self,
563            Self::StreamFailed { .. } | Self::SessionExpired { .. }
564        )
565    }
566
567    /// Check if this is an error event
568    pub fn is_error(&self) -> bool {
569        matches!(self, Self::StreamFailed { .. })
570    }
571
572    /// Check if this is a completion event
573    pub fn is_completion(&self) -> bool {
574        matches!(
575            self,
576            Self::StreamCompleted { .. } | Self::SessionClosed { .. }
577        )
578    }
579}
580
581/// Event sourcing support for storing and retrieving domain events
582pub trait EventStore {
583    /// Append events to the store
584    ///
585    /// # Errors
586    ///
587    /// Returns an error if events cannot be persisted
588    fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
589
590    /// Get events for a specific session
591    ///
592    /// # Errors
593    ///
594    /// Returns an error if events cannot be retrieved
595    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
596
597    /// Get events for a specific stream
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if events cannot be retrieved
602    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
603
604    /// Get all events since a specific timestamp
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if events cannot be retrieved
609    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
610}
611
612/// Simple in-memory event store for testing
613#[derive(Debug, Clone, Default)]
614pub struct InMemoryEventStore {
615    events: Vec<DomainEvent>,
616}
617
618impl InMemoryEventStore {
619    /// Create a new empty event store
620    pub fn new() -> Self {
621        Self::default()
622    }
623
624    /// Get all events in the store
625    pub fn all_events(&self) -> &[DomainEvent] {
626        &self.events
627    }
628
629    /// Get the total number of events in the store
630    pub fn event_count(&self) -> usize {
631        self.events.len()
632    }
633}
634
635impl EventStore for InMemoryEventStore {
636    fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
637        self.events.append(&mut events);
638        Ok(())
639    }
640
641    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
642        Ok(self
643            .events
644            .iter()
645            .filter(|e| e.session_id() == session_id)
646            .cloned()
647            .collect())
648    }
649
650    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
651        Ok(self
652            .events
653            .iter()
654            .filter(|e| e.stream_id() == Some(stream_id))
655            .cloned()
656            .collect())
657    }
658
659    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
660        Ok(self
661            .events
662            .iter()
663            .filter(|e| e.timestamp() > since)
664            .cloned()
665            .collect())
666    }
667}
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672    use crate::value_objects::{SessionId, StreamId};
673
674    #[test]
675    fn test_domain_event_properties() {
676        let session_id = SessionId::new();
677        let stream_id = StreamId::new();
678        let timestamp = Utc::now();
679
680        let event = DomainEvent::StreamCreated {
681            session_id,
682            stream_id,
683            timestamp,
684        };
685
686        assert_eq!(event.session_id(), session_id);
687        assert_eq!(event.stream_id(), Some(stream_id));
688        assert_eq!(event.timestamp(), timestamp);
689        assert_eq!(event.event_type(), "stream_created");
690        assert!(!event.is_critical());
691        assert!(!event.is_error());
692    }
693
694    #[test]
695    fn test_critical_events() {
696        let session_id = SessionId::new();
697        let stream_id = StreamId::new();
698
699        let error_event = DomainEvent::StreamFailed {
700            session_id,
701            stream_id,
702            error: "Connection lost".to_string(),
703            timestamp: Utc::now(),
704        };
705
706        assert!(error_event.is_critical());
707        assert!(error_event.is_error());
708        assert!(!error_event.is_completion());
709    }
710
711    #[test]
712    fn test_event_store() {
713        let mut store = InMemoryEventStore::new();
714        let session_id = SessionId::new();
715        let stream_id = StreamId::new();
716
717        let events = vec![
718            DomainEvent::SessionActivated {
719                session_id,
720                timestamp: Utc::now(),
721            },
722            DomainEvent::StreamCreated {
723                session_id,
724                stream_id,
725                timestamp: Utc::now(),
726            },
727        ];
728
729        store
730            .append_events(events.clone())
731            .expect("Failed to append events to store in test");
732        assert_eq!(store.event_count(), 2);
733
734        let session_events = store
735            .get_events_for_session(session_id)
736            .expect("Failed to retrieve session events in test");
737        assert_eq!(session_events.len(), 2);
738
739        let stream_events = store
740            .get_events_for_stream(stream_id)
741            .expect("Failed to retrieve stream events in test");
742        assert_eq!(stream_events.len(), 1);
743    }
744
745    #[test]
746    fn test_event_serialization() {
747        let session_id = SessionId::new();
748        let event = DomainEvent::SessionActivated {
749            session_id,
750            timestamp: Utc::now(),
751        };
752
753        let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
754        let deserialized: DomainEvent =
755            serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
756
757        assert_eq!(event, deserialized);
758    }
759}
760
761/// Event identifier for tracking and correlation
762#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
763pub struct EventId(uuid::Uuid);
764
765impl EventId {
766    /// Generate new unique event ID
767    pub fn new() -> Self {
768        Self(uuid::Uuid::new_v4())
769    }
770
771    /// Create from existing UUID
772    pub fn from_uuid(uuid: uuid::Uuid) -> Self {
773        Self(uuid)
774    }
775
776    /// Get inner UUID
777    pub fn inner(&self) -> uuid::Uuid {
778        self.0
779    }
780}
781
782impl std::fmt::Display for EventId {
783    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
784        write!(f, "{}", self.0)
785    }
786}
787
788impl Default for EventId {
789    fn default() -> Self {
790        Self::new()
791    }
792}
793
794/// GAT-based trait for event subscribers that handle domain events
795pub trait EventSubscriber {
796    /// Future type for handling events
797    type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
798    where
799        Self: 'a;
800
801    /// Handle a domain event
802    fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
803}
804
805/// Extension methods for DomainEvent
806impl DomainEvent {
807    /// Get event ID for tracking (generated if not exists)
808    pub fn event_id(&self) -> EventId {
809        // For now, generate deterministic ID based on event content
810        // In future versions, this should be stored with the event
811        let content = format!("{self:?}");
812        use std::collections::hash_map::DefaultHasher;
813        use std::hash::{Hash, Hasher};
814        let mut hash = DefaultHasher::new();
815        content.hash(&mut hash);
816        let hash_val = hash.finish();
817        let uuid = uuid::Uuid::from_bytes([
818            (hash_val >> 56) as u8,
819            (hash_val >> 48) as u8,
820            (hash_val >> 40) as u8,
821            (hash_val >> 32) as u8,
822            (hash_val >> 24) as u8,
823            (hash_val >> 16) as u8,
824            (hash_val >> 8) as u8,
825            hash_val as u8,
826            0,
827            0,
828            0,
829            0,
830            0,
831            0,
832            0,
833            0,
834        ]);
835        EventId::from_uuid(uuid)
836    }
837
838    /// Get event timestamp
839    pub fn occurred_at(&self) -> DateTime<Utc> {
840        match self {
841            DomainEvent::SessionActivated { timestamp, .. }
842            | DomainEvent::SessionClosed { timestamp, .. }
843            | DomainEvent::SessionExpired { timestamp, .. }
844            | DomainEvent::StreamCreated { timestamp, .. }
845            | DomainEvent::StreamStarted { timestamp, .. }
846            | DomainEvent::StreamCompleted { timestamp, .. }
847            | DomainEvent::StreamFailed { timestamp, .. }
848            | DomainEvent::StreamCancelled { timestamp, .. }
849            | DomainEvent::SkeletonGenerated { timestamp, .. }
850            | DomainEvent::PatchFramesGenerated { timestamp, .. }
851            | DomainEvent::FramesBatched { timestamp, .. }
852            | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
853            | DomainEvent::StreamConfigUpdated { timestamp, .. }
854            | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
855            | DomainEvent::SessionTimedOut { timestamp, .. }
856            | DomainEvent::SessionTimeoutExtended { timestamp, .. }
857            | DomainEvent::BackpressureReceived { timestamp, .. }
858            | DomainEvent::StreamPaused { timestamp, .. }
859            | DomainEvent::StreamResumed { timestamp, .. }
860            | DomainEvent::CreditsUpdated { timestamp, .. } => *timestamp,
861        }
862    }
863
864    /// Get event metadata as key-value pairs
865    pub fn metadata(&self) -> std::collections::HashMap<String, String> {
866        let mut metadata = std::collections::HashMap::new();
867        metadata.insert("event_type".to_string(), self.event_type().to_string());
868        metadata.insert("session_id".to_string(), self.session_id().to_string());
869        metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
870
871        if let Some(stream_id) = self.stream_id() {
872            metadata.insert("stream_id".to_string(), stream_id.to_string());
873        }
874
875        metadata
876    }
877}