pjson_rs_domain/events/
mod.rs

1//! Domain events for event sourcing and integration
2
3use crate::value_objects::{SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Custom serde for SessionId (domain purity)
8mod serde_session_id {
9    use crate::value_objects::SessionId;
10    use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13    where
14        S: Serializer,
15    {
16        id.as_uuid().serialize(serializer)
17    }
18
19    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20    where
21        D: Deserializer<'de>,
22    {
23        let uuid = uuid::Uuid::deserialize(deserializer)?;
24        Ok(SessionId::from_uuid(uuid))
25    }
26}
27
28/// Custom serde for StreamId (domain purity)
29mod serde_stream_id {
30    use crate::value_objects::StreamId;
31    use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34    where
35        S: Serializer,
36    {
37        id.as_uuid().serialize(serializer)
38    }
39
40    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41    where
42        D: Deserializer<'de>,
43    {
44        let uuid = uuid::Uuid::deserialize(deserializer)?;
45        Ok(StreamId::from_uuid(uuid))
46    }
47}
48
49/// Custom serde for `Option<StreamId>`
50#[allow(dead_code)]
51mod serde_option_stream_id {
52    use crate::value_objects::StreamId;
53    use serde::{Deserialize, Deserializer, Serialize, Serializer};
54
55    pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: Serializer,
58    {
59        id.map(|i| i.as_uuid()).serialize(serializer)
60    }
61
62    pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
63    where
64        D: Deserializer<'de>,
65    {
66        let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
67        Ok(uuid.map(StreamId::from_uuid))
68    }
69}
70
71/// Session state in its lifecycle
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub enum SessionState {
74    /// Session is being initialized
75    Initializing,
76    /// Session is active with streams
77    Active,
78    /// Session is gracefully closing
79    Closing,
80    /// Session completed successfully
81    Completed,
82    /// Session failed with error
83    Failed,
84}
85
86/// Domain events that represent business-relevant state changes
87#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
88#[serde(tag = "event_type", rename_all = "snake_case")]
89pub enum DomainEvent {
90    /// Session was activated and is ready to accept streams
91    SessionActivated {
92        /// ID of the activated session
93        #[serde(with = "serde_session_id")]
94        session_id: SessionId,
95        /// When the session was activated
96        timestamp: DateTime<Utc>,
97    },
98
99    /// Session was closed gracefully
100    SessionClosed {
101        /// ID of the closed session
102        #[serde(with = "serde_session_id")]
103        session_id: SessionId,
104        /// When the session was closed
105        timestamp: DateTime<Utc>,
106    },
107
108    /// Session expired due to timeout
109    SessionExpired {
110        /// ID of the expired session
111        #[serde(with = "serde_session_id")]
112        session_id: SessionId,
113        /// When the session expired
114        timestamp: DateTime<Utc>,
115    },
116
117    /// Session was forcefully closed due to timeout
118    SessionTimedOut {
119        /// ID of the timed out session
120        #[serde(with = "serde_session_id")]
121        session_id: SessionId,
122        /// State the session was in before timeout
123        original_state: SessionState,
124        /// Duration in seconds before timeout occurred
125        timeout_duration: u64,
126        /// When the timeout occurred
127        timestamp: DateTime<Utc>,
128    },
129
130    /// Session timeout was extended
131    SessionTimeoutExtended {
132        /// ID of the session with extended timeout
133        #[serde(with = "serde_session_id")]
134        session_id: SessionId,
135        /// Additional seconds added to the timeout
136        additional_seconds: u64,
137        /// New expiration timestamp
138        new_expires_at: DateTime<Utc>,
139        /// When the extension was applied
140        timestamp: DateTime<Utc>,
141    },
142
143    /// New stream was created in the session
144    StreamCreated {
145        /// ID of the session containing the stream
146        #[serde(with = "serde_session_id")]
147        session_id: SessionId,
148        /// ID of the newly created stream
149        #[serde(with = "serde_stream_id")]
150        stream_id: StreamId,
151        /// When the stream was created
152        timestamp: DateTime<Utc>,
153    },
154
155    /// Stream started sending data
156    StreamStarted {
157        /// ID of the session containing the stream
158        #[serde(with = "serde_session_id")]
159        session_id: SessionId,
160        /// ID of the stream that started
161        #[serde(with = "serde_stream_id")]
162        stream_id: StreamId,
163        /// When the stream started
164        timestamp: DateTime<Utc>,
165    },
166
167    /// Stream completed successfully
168    StreamCompleted {
169        /// ID of the session containing the stream
170        #[serde(with = "serde_session_id")]
171        session_id: SessionId,
172        /// ID of the completed stream
173        #[serde(with = "serde_stream_id")]
174        stream_id: StreamId,
175        /// When the stream completed
176        timestamp: DateTime<Utc>,
177    },
178
179    /// Stream failed with error
180    StreamFailed {
181        /// ID of the session containing the stream
182        #[serde(with = "serde_session_id")]
183        session_id: SessionId,
184        /// ID of the failed stream
185        #[serde(with = "serde_stream_id")]
186        stream_id: StreamId,
187        /// Error message describing the failure
188        error: String,
189        /// When the stream failed
190        timestamp: DateTime<Utc>,
191    },
192
193    /// Stream was cancelled
194    StreamCancelled {
195        /// ID of the session containing the stream
196        #[serde(with = "serde_session_id")]
197        session_id: SessionId,
198        /// ID of the cancelled stream
199        #[serde(with = "serde_stream_id")]
200        stream_id: StreamId,
201        /// When the stream was cancelled
202        timestamp: DateTime<Utc>,
203    },
204
205    /// Skeleton frame was generated for a stream
206    SkeletonGenerated {
207        /// ID of the session containing the stream
208        #[serde(with = "serde_session_id")]
209        session_id: SessionId,
210        /// ID of the stream that generated the skeleton
211        #[serde(with = "serde_stream_id")]
212        stream_id: StreamId,
213        /// Size of the skeleton frame in bytes
214        frame_size_bytes: u64,
215        /// When the skeleton was generated
216        timestamp: DateTime<Utc>,
217    },
218
219    /// Patch frames were generated for a stream
220    PatchFramesGenerated {
221        /// ID of the session containing the stream
222        #[serde(with = "serde_session_id")]
223        session_id: SessionId,
224        /// ID of the stream that generated patches
225        #[serde(with = "serde_stream_id")]
226        stream_id: StreamId,
227        /// Number of patch frames generated
228        frame_count: usize,
229        /// Total size of all patches in bytes
230        total_bytes: u64,
231        /// Highest priority level among the patches
232        highest_priority: u8,
233        /// When the patches were generated
234        timestamp: DateTime<Utc>,
235    },
236
237    /// Multiple frames were batched for efficient sending
238    FramesBatched {
239        /// ID of the session containing the frames
240        #[serde(with = "serde_session_id")]
241        session_id: SessionId,
242        /// Number of frames in the batch
243        frame_count: usize,
244        /// When the batch was created
245        timestamp: DateTime<Utc>,
246    },
247
248    /// Priority threshold was adjusted for adaptive streaming
249    PriorityThresholdAdjusted {
250        /// ID of the session with adjusted threshold
251        #[serde(with = "serde_session_id")]
252        session_id: SessionId,
253        /// Previous priority threshold value
254        old_threshold: u8,
255        /// New priority threshold value
256        new_threshold: u8,
257        /// Reason for the adjustment
258        reason: String,
259        /// When the threshold was adjusted
260        timestamp: DateTime<Utc>,
261    },
262
263    /// Stream configuration was updated
264    StreamConfigUpdated {
265        /// ID of the session containing the stream
266        #[serde(with = "serde_session_id")]
267        session_id: SessionId,
268        /// ID of the stream with updated configuration
269        #[serde(with = "serde_stream_id")]
270        stream_id: StreamId,
271        /// When the configuration was updated
272        timestamp: DateTime<Utc>,
273    },
274
275    /// Performance metrics were recorded
276    PerformanceMetricsRecorded {
277        /// ID of the session being measured
278        #[serde(with = "serde_session_id")]
279        session_id: SessionId,
280        /// Recorded performance metrics
281        metrics: PerformanceMetrics,
282        /// When the metrics were recorded
283        timestamp: DateTime<Utc>,
284    },
285}
286
287/// Performance metrics for monitoring and optimization
288#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
289pub struct PerformanceMetrics {
290    /// Number of frames transmitted per second
291    pub frames_per_second: f64,
292    /// Number of bytes transmitted per second
293    pub bytes_per_second: f64,
294    /// Average size of frames in bytes
295    pub average_frame_size: f64,
296    /// Distribution of frames across priority levels
297    pub priority_distribution: PriorityDistribution,
298    /// Network latency in milliseconds, if available
299    pub latency_ms: Option<u64>,
300}
301
302/// Distribution of frames by priority level
303#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
304pub struct PriorityDistribution {
305    /// Number of critical priority frames
306    pub critical_frames: u64,
307    /// Number of high priority frames
308    pub high_frames: u64,
309    /// Number of medium priority frames
310    pub medium_frames: u64,
311    /// Number of low priority frames
312    pub low_frames: u64,
313    /// Number of background priority frames
314    pub background_frames: u64,
315}
316
317impl PriorityDistribution {
318    /// Create new empty distribution
319    pub fn new() -> Self {
320        Self::default()
321    }
322
323    /// Get total frames count
324    pub fn total_frames(&self) -> u64 {
325        self.critical_frames
326            + self.high_frames
327            + self.medium_frames
328            + self.low_frames
329            + self.background_frames
330    }
331
332    /// Convert to percentages (0.0-1.0)
333    pub fn as_percentages(&self) -> PriorityPercentages {
334        let total = self.total_frames() as f64;
335        if total == 0.0 {
336            return PriorityPercentages::default();
337        }
338
339        PriorityPercentages {
340            critical: self.critical_frames as f64 / total,
341            high: self.high_frames as f64 / total,
342            medium: self.medium_frames as f64 / total,
343            low: self.low_frames as f64 / total,
344            background: self.background_frames as f64 / total,
345        }
346    }
347
348    /// Convert from count-based version
349    pub fn from_counts(
350        critical_count: u64,
351        high_count: u64,
352        medium_count: u64,
353        low_count: u64,
354        background_count: u64,
355    ) -> Self {
356        Self {
357            critical_frames: critical_count,
358            high_frames: high_count,
359            medium_frames: medium_count,
360            low_frames: low_count,
361            background_frames: background_count,
362        }
363    }
364}
365
366/// Priority distribution as percentages (for demos and visualization)
367#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
368pub struct PriorityPercentages {
369    /// Critical priority percentage (0.0-1.0)
370    pub critical: f64,
371    /// High priority percentage (0.0-1.0)
372    pub high: f64,
373    /// Medium priority percentage (0.0-1.0)
374    pub medium: f64,
375    /// Low priority percentage (0.0-1.0)
376    pub low: f64,
377    /// Background priority percentage (0.0-1.0)
378    pub background: f64,
379}
380
381impl Default for PriorityPercentages {
382    fn default() -> Self {
383        Self {
384            critical: 0.0,
385            high: 0.0,
386            medium: 0.0,
387            low: 0.0,
388            background: 0.0,
389        }
390    }
391}
392
393impl DomainEvent {
394    /// Get the session ID associated with this event
395    pub fn session_id(&self) -> SessionId {
396        match self {
397            Self::SessionActivated { session_id, .. } => *session_id,
398            Self::SessionClosed { session_id, .. } => *session_id,
399            Self::SessionExpired { session_id, .. } => *session_id,
400            Self::StreamCreated { session_id, .. } => *session_id,
401            Self::StreamStarted { session_id, .. } => *session_id,
402            Self::StreamCompleted { session_id, .. } => *session_id,
403            Self::StreamFailed { session_id, .. } => *session_id,
404            Self::StreamCancelled { session_id, .. } => *session_id,
405            Self::SkeletonGenerated { session_id, .. } => *session_id,
406            Self::PatchFramesGenerated { session_id, .. } => *session_id,
407            Self::FramesBatched { session_id, .. } => *session_id,
408            Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
409            Self::StreamConfigUpdated { session_id, .. } => *session_id,
410            Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
411            Self::SessionTimedOut { session_id, .. } => *session_id,
412            Self::SessionTimeoutExtended { session_id, .. } => *session_id,
413        }
414    }
415
416    /// Get the stream ID if this is a stream-specific event
417    pub fn stream_id(&self) -> Option<StreamId> {
418        match self {
419            Self::StreamCreated { stream_id, .. } => Some(*stream_id),
420            Self::StreamStarted { stream_id, .. } => Some(*stream_id),
421            Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
422            Self::StreamFailed { stream_id, .. } => Some(*stream_id),
423            Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
424            Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
425            Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
426            Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
427            _ => None,
428        }
429    }
430
431    /// Get the timestamp of this event
432    pub fn timestamp(&self) -> DateTime<Utc> {
433        match self {
434            Self::SessionActivated { timestamp, .. } => *timestamp,
435            Self::SessionClosed { timestamp, .. } => *timestamp,
436            Self::SessionExpired { timestamp, .. } => *timestamp,
437            Self::StreamCreated { timestamp, .. } => *timestamp,
438            Self::StreamStarted { timestamp, .. } => *timestamp,
439            Self::StreamCompleted { timestamp, .. } => *timestamp,
440            Self::StreamFailed { timestamp, .. } => *timestamp,
441            Self::StreamCancelled { timestamp, .. } => *timestamp,
442            Self::SkeletonGenerated { timestamp, .. } => *timestamp,
443            Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
444            Self::FramesBatched { timestamp, .. } => *timestamp,
445            Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
446            Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
447            Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
448            Self::SessionTimedOut { timestamp, .. } => *timestamp,
449            Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
450        }
451    }
452
453    /// Get the event type as a string
454    pub fn event_type(&self) -> &'static str {
455        match self {
456            Self::SessionActivated { .. } => "session_activated",
457            Self::SessionClosed { .. } => "session_closed",
458            Self::SessionExpired { .. } => "session_expired",
459            Self::StreamCreated { .. } => "stream_created",
460            Self::StreamStarted { .. } => "stream_started",
461            Self::StreamCompleted { .. } => "stream_completed",
462            Self::StreamFailed { .. } => "stream_failed",
463            Self::StreamCancelled { .. } => "stream_cancelled",
464            Self::SkeletonGenerated { .. } => "skeleton_generated",
465            Self::PatchFramesGenerated { .. } => "patch_frames_generated",
466            Self::FramesBatched { .. } => "frames_batched",
467            Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
468            Self::StreamConfigUpdated { .. } => "stream_config_updated",
469            Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
470            Self::SessionTimedOut { .. } => "session_timed_out",
471            Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
472        }
473    }
474
475    /// Check if this is a critical event that requires immediate attention
476    pub fn is_critical(&self) -> bool {
477        matches!(
478            self,
479            Self::StreamFailed { .. } | Self::SessionExpired { .. }
480        )
481    }
482
483    /// Check if this is an error event
484    pub fn is_error(&self) -> bool {
485        matches!(self, Self::StreamFailed { .. })
486    }
487
488    /// Check if this is a completion event
489    pub fn is_completion(&self) -> bool {
490        matches!(
491            self,
492            Self::StreamCompleted { .. } | Self::SessionClosed { .. }
493        )
494    }
495}
496
497/// Event sourcing support for storing and retrieving domain events
498pub trait EventStore {
499    /// Append events to the store
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if events cannot be persisted
504    fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
505
506    /// Get events for a specific session
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if events cannot be retrieved
511    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
512
513    /// Get events for a specific stream
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if events cannot be retrieved
518    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
519
520    /// Get all events since a specific timestamp
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if events cannot be retrieved
525    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
526}
527
528/// Simple in-memory event store for testing
529#[derive(Debug, Clone, Default)]
530pub struct InMemoryEventStore {
531    events: Vec<DomainEvent>,
532}
533
534impl InMemoryEventStore {
535    /// Create a new empty event store
536    pub fn new() -> Self {
537        Self::default()
538    }
539
540    /// Get all events in the store
541    pub fn all_events(&self) -> &[DomainEvent] {
542        &self.events
543    }
544
545    /// Get the total number of events in the store
546    pub fn event_count(&self) -> usize {
547        self.events.len()
548    }
549}
550
551impl EventStore for InMemoryEventStore {
552    fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
553        self.events.append(&mut events);
554        Ok(())
555    }
556
557    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
558        Ok(self
559            .events
560            .iter()
561            .filter(|e| e.session_id() == session_id)
562            .cloned()
563            .collect())
564    }
565
566    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
567        Ok(self
568            .events
569            .iter()
570            .filter(|e| e.stream_id() == Some(stream_id))
571            .cloned()
572            .collect())
573    }
574
575    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
576        Ok(self
577            .events
578            .iter()
579            .filter(|e| e.timestamp() > since)
580            .cloned()
581            .collect())
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use crate::value_objects::{SessionId, StreamId};
589
590    #[test]
591    fn test_domain_event_properties() {
592        let session_id = SessionId::new();
593        let stream_id = StreamId::new();
594        let timestamp = Utc::now();
595
596        let event = DomainEvent::StreamCreated {
597            session_id,
598            stream_id,
599            timestamp,
600        };
601
602        assert_eq!(event.session_id(), session_id);
603        assert_eq!(event.stream_id(), Some(stream_id));
604        assert_eq!(event.timestamp(), timestamp);
605        assert_eq!(event.event_type(), "stream_created");
606        assert!(!event.is_critical());
607        assert!(!event.is_error());
608    }
609
610    #[test]
611    fn test_critical_events() {
612        let session_id = SessionId::new();
613        let stream_id = StreamId::new();
614
615        let error_event = DomainEvent::StreamFailed {
616            session_id,
617            stream_id,
618            error: "Connection lost".to_string(),
619            timestamp: Utc::now(),
620        };
621
622        assert!(error_event.is_critical());
623        assert!(error_event.is_error());
624        assert!(!error_event.is_completion());
625    }
626
627    #[test]
628    fn test_event_store() {
629        let mut store = InMemoryEventStore::new();
630        let session_id = SessionId::new();
631        let stream_id = StreamId::new();
632
633        let events = vec![
634            DomainEvent::SessionActivated {
635                session_id,
636                timestamp: Utc::now(),
637            },
638            DomainEvent::StreamCreated {
639                session_id,
640                stream_id,
641                timestamp: Utc::now(),
642            },
643        ];
644
645        store
646            .append_events(events.clone())
647            .expect("Failed to append events to store in test");
648        assert_eq!(store.event_count(), 2);
649
650        let session_events = store
651            .get_events_for_session(session_id)
652            .expect("Failed to retrieve session events in test");
653        assert_eq!(session_events.len(), 2);
654
655        let stream_events = store
656            .get_events_for_stream(stream_id)
657            .expect("Failed to retrieve stream events in test");
658        assert_eq!(stream_events.len(), 1);
659    }
660
661    #[test]
662    fn test_event_serialization() {
663        let session_id = SessionId::new();
664        let event = DomainEvent::SessionActivated {
665            session_id,
666            timestamp: Utc::now(),
667        };
668
669        let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
670        let deserialized: DomainEvent =
671            serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
672
673        assert_eq!(event, deserialized);
674    }
675}
676
677/// Event identifier for tracking and correlation
678#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
679pub struct EventId(uuid::Uuid);
680
681impl EventId {
682    /// Generate new unique event ID
683    pub fn new() -> Self {
684        Self(uuid::Uuid::new_v4())
685    }
686
687    /// Create from existing UUID
688    pub fn from_uuid(uuid: uuid::Uuid) -> Self {
689        Self(uuid)
690    }
691
692    /// Get inner UUID
693    pub fn inner(&self) -> uuid::Uuid {
694        self.0
695    }
696}
697
698impl std::fmt::Display for EventId {
699    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
700        write!(f, "{}", self.0)
701    }
702}
703
704impl Default for EventId {
705    fn default() -> Self {
706        Self::new()
707    }
708}
709
710/// GAT-based trait for event subscribers that handle domain events
711pub trait EventSubscriber {
712    /// Future type for handling events
713    type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
714    where
715        Self: 'a;
716
717    /// Handle a domain event
718    fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
719}
720
721/// Extension methods for DomainEvent
722impl DomainEvent {
723    /// Get event ID for tracking (generated if not exists)
724    pub fn event_id(&self) -> EventId {
725        // For now, generate deterministic ID based on event content
726        // In future versions, this should be stored with the event
727        let content = format!("{self:?}");
728        use std::collections::hash_map::DefaultHasher;
729        use std::hash::{Hash, Hasher};
730        let mut hash = DefaultHasher::new();
731        content.hash(&mut hash);
732        let hash_val = hash.finish();
733        let uuid = uuid::Uuid::from_bytes([
734            (hash_val >> 56) as u8,
735            (hash_val >> 48) as u8,
736            (hash_val >> 40) as u8,
737            (hash_val >> 32) as u8,
738            (hash_val >> 24) as u8,
739            (hash_val >> 16) as u8,
740            (hash_val >> 8) as u8,
741            hash_val as u8,
742            0,
743            0,
744            0,
745            0,
746            0,
747            0,
748            0,
749            0,
750        ]);
751        EventId::from_uuid(uuid)
752    }
753
754    /// Get event timestamp
755    pub fn occurred_at(&self) -> DateTime<Utc> {
756        match self {
757            DomainEvent::SessionActivated { timestamp, .. }
758            | DomainEvent::SessionClosed { timestamp, .. }
759            | DomainEvent::SessionExpired { timestamp, .. }
760            | DomainEvent::StreamCreated { timestamp, .. }
761            | DomainEvent::StreamStarted { timestamp, .. }
762            | DomainEvent::StreamCompleted { timestamp, .. }
763            | DomainEvent::StreamFailed { timestamp, .. }
764            | DomainEvent::StreamCancelled { timestamp, .. }
765            | DomainEvent::SkeletonGenerated { timestamp, .. }
766            | DomainEvent::PatchFramesGenerated { timestamp, .. }
767            | DomainEvent::FramesBatched { timestamp, .. }
768            | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
769            | DomainEvent::StreamConfigUpdated { timestamp, .. }
770            | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
771            | DomainEvent::SessionTimedOut { timestamp, .. }
772            | DomainEvent::SessionTimeoutExtended { timestamp, .. } => *timestamp,
773        }
774    }
775
776    /// Get event metadata as key-value pairs
777    pub fn metadata(&self) -> std::collections::HashMap<String, String> {
778        let mut metadata = std::collections::HashMap::new();
779        metadata.insert("event_type".to_string(), self.event_type().to_string());
780        metadata.insert("session_id".to_string(), self.session_id().to_string());
781        metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
782
783        if let Some(stream_id) = self.stream_id() {
784            metadata.insert("stream_id".to_string(), stream_id.to_string());
785        }
786
787        metadata
788    }
789}