Skip to main content

pjson_rs_domain/events/
mod.rs

1//! Domain events for event sourcing and integration
2
3use crate::value_objects::{BackpressureSignal, SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Custom serde for SessionId (domain purity)
8mod serde_session_id {
9    use crate::value_objects::SessionId;
10    use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13    where
14        S: Serializer,
15    {
16        id.as_uuid().serialize(serializer)
17    }
18
19    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20    where
21        D: Deserializer<'de>,
22    {
23        let uuid = uuid::Uuid::deserialize(deserializer)?;
24        Ok(SessionId::from_uuid(uuid))
25    }
26}
27
28/// Custom serde for StreamId (domain purity)
29mod serde_stream_id {
30    use crate::value_objects::StreamId;
31    use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34    where
35        S: Serializer,
36    {
37        id.as_uuid().serialize(serializer)
38    }
39
40    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41    where
42        D: Deserializer<'de>,
43    {
44        let uuid = uuid::Uuid::deserialize(deserializer)?;
45        Ok(StreamId::from_uuid(uuid))
46    }
47}
48
49/// Custom serde for `Option<StreamId>`
50mod serde_option_stream_id {
51    use crate::value_objects::StreamId;
52    use serde::{Deserialize, Deserializer, Serialize, Serializer};
53
54    pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
55    where
56        S: Serializer,
57    {
58        id.map(|i| i.as_uuid()).serialize(serializer)
59    }
60
61    pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
62    where
63        D: Deserializer<'de>,
64    {
65        let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
66        Ok(uuid.map(StreamId::from_uuid))
67    }
68}
69
70/// Session state in its lifecycle
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub enum SessionState {
73    /// Session is being initialized
74    Initializing,
75    /// Session is active with streams
76    Active,
77    /// Session is gracefully closing
78    Closing,
79    /// Session completed successfully
80    Completed,
81    /// Session failed with error
82    Failed,
83}
84
85impl SessionState {
86    /// Get string representation without allocation
87    pub fn as_str(&self) -> &'static str {
88        match self {
89            SessionState::Initializing => "Initializing",
90            SessionState::Active => "Active",
91            SessionState::Closing => "Closing",
92            SessionState::Completed => "Completed",
93            SessionState::Failed => "Failed",
94        }
95    }
96}
97
98/// Domain events that represent business-relevant state changes
99#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
100#[serde(tag = "event_type", rename_all = "snake_case")]
101pub enum DomainEvent {
102    /// Session was activated and is ready to accept streams
103    SessionActivated {
104        /// ID of the activated session
105        #[serde(with = "serde_session_id")]
106        session_id: SessionId,
107        /// When the session was activated
108        timestamp: DateTime<Utc>,
109    },
110
111    /// Session was closed gracefully
112    SessionClosed {
113        /// ID of the closed session
114        #[serde(with = "serde_session_id")]
115        session_id: SessionId,
116        /// When the session was closed
117        timestamp: DateTime<Utc>,
118    },
119
120    /// Session expired due to timeout
121    SessionExpired {
122        /// ID of the expired session
123        #[serde(with = "serde_session_id")]
124        session_id: SessionId,
125        /// When the session expired
126        timestamp: DateTime<Utc>,
127    },
128
129    /// Session was forcefully closed due to timeout
130    SessionTimedOut {
131        /// ID of the timed out session
132        #[serde(with = "serde_session_id")]
133        session_id: SessionId,
134        /// State the session was in before timeout
135        original_state: SessionState,
136        /// Duration in seconds before timeout occurred
137        timeout_duration: u64,
138        /// When the timeout occurred
139        timestamp: DateTime<Utc>,
140    },
141
142    /// Session timeout was extended
143    SessionTimeoutExtended {
144        /// ID of the session with extended timeout
145        #[serde(with = "serde_session_id")]
146        session_id: SessionId,
147        /// Additional seconds added to the timeout
148        additional_seconds: u64,
149        /// New expiration timestamp
150        new_expires_at: DateTime<Utc>,
151        /// When the extension was applied
152        timestamp: DateTime<Utc>,
153    },
154
155    /// New stream was created in the session
156    StreamCreated {
157        /// ID of the session containing the stream
158        #[serde(with = "serde_session_id")]
159        session_id: SessionId,
160        /// ID of the newly created stream
161        #[serde(with = "serde_stream_id")]
162        stream_id: StreamId,
163        /// When the stream was created
164        timestamp: DateTime<Utc>,
165    },
166
167    /// Stream started sending data
168    StreamStarted {
169        /// ID of the session containing the stream
170        #[serde(with = "serde_session_id")]
171        session_id: SessionId,
172        /// ID of the stream that started
173        #[serde(with = "serde_stream_id")]
174        stream_id: StreamId,
175        /// When the stream started
176        timestamp: DateTime<Utc>,
177    },
178
179    /// Stream completed successfully
180    StreamCompleted {
181        /// ID of the session containing the stream
182        #[serde(with = "serde_session_id")]
183        session_id: SessionId,
184        /// ID of the completed stream
185        #[serde(with = "serde_stream_id")]
186        stream_id: StreamId,
187        /// When the stream completed
188        timestamp: DateTime<Utc>,
189    },
190
191    /// Stream failed with error
192    StreamFailed {
193        /// ID of the session containing the stream
194        #[serde(with = "serde_session_id")]
195        session_id: SessionId,
196        /// ID of the failed stream
197        #[serde(with = "serde_stream_id")]
198        stream_id: StreamId,
199        /// Error message describing the failure
200        error: String,
201        /// When the stream failed
202        timestamp: DateTime<Utc>,
203    },
204
205    /// Stream was cancelled
206    StreamCancelled {
207        /// ID of the session containing the stream
208        #[serde(with = "serde_session_id")]
209        session_id: SessionId,
210        /// ID of the cancelled stream
211        #[serde(with = "serde_stream_id")]
212        stream_id: StreamId,
213        /// When the stream was cancelled
214        timestamp: DateTime<Utc>,
215    },
216
217    /// Skeleton frame was generated for a stream
218    SkeletonGenerated {
219        /// ID of the session containing the stream
220        #[serde(with = "serde_session_id")]
221        session_id: SessionId,
222        /// ID of the stream that generated the skeleton
223        #[serde(with = "serde_stream_id")]
224        stream_id: StreamId,
225        /// Size of the skeleton frame in bytes
226        frame_size_bytes: u64,
227        /// When the skeleton was generated
228        timestamp: DateTime<Utc>,
229    },
230
231    /// Patch frames were generated for a stream
232    PatchFramesGenerated {
233        /// ID of the session containing the stream
234        #[serde(with = "serde_session_id")]
235        session_id: SessionId,
236        /// ID of the stream that generated patches
237        #[serde(with = "serde_stream_id")]
238        stream_id: StreamId,
239        /// Number of patch frames generated
240        frame_count: usize,
241        /// Total size of all patches in bytes
242        total_bytes: u64,
243        /// Highest priority level among the patches
244        highest_priority: u8,
245        /// When the patches were generated
246        timestamp: DateTime<Utc>,
247    },
248
249    /// Multiple frames were batched for efficient sending
250    FramesBatched {
251        /// ID of the session containing the frames
252        #[serde(with = "serde_session_id")]
253        session_id: SessionId,
254        /// Number of frames in the batch
255        frame_count: usize,
256        /// When the batch was created
257        timestamp: DateTime<Utc>,
258    },
259
260    /// Priority threshold was adjusted for adaptive streaming
261    PriorityThresholdAdjusted {
262        /// ID of the session with adjusted threshold
263        #[serde(with = "serde_session_id")]
264        session_id: SessionId,
265        /// Previous priority threshold value
266        old_threshold: u8,
267        /// New priority threshold value
268        new_threshold: u8,
269        /// Reason for the adjustment
270        reason: String,
271        /// When the threshold was adjusted
272        timestamp: DateTime<Utc>,
273    },
274
275    /// Stream configuration was updated
276    StreamConfigUpdated {
277        /// ID of the session containing the stream
278        #[serde(with = "serde_session_id")]
279        session_id: SessionId,
280        /// ID of the stream with updated configuration
281        #[serde(with = "serde_stream_id")]
282        stream_id: StreamId,
283        /// When the configuration was updated
284        timestamp: DateTime<Utc>,
285    },
286
287    /// Performance metrics were recorded
288    PerformanceMetricsRecorded {
289        /// ID of the session being measured
290        #[serde(with = "serde_session_id")]
291        session_id: SessionId,
292        /// Recorded performance metrics
293        metrics: PerformanceMetrics,
294        /// When the metrics were recorded
295        timestamp: DateTime<Utc>,
296    },
297
298    /// Backpressure signal received from client
299    BackpressureReceived {
300        /// ID of the session receiving backpressure
301        #[serde(with = "serde_session_id")]
302        session_id: SessionId,
303        /// ID of the stream if specific to a stream
304        #[serde(default, skip_serializing_if = "Option::is_none")]
305        #[serde(with = "serde_option_stream_id")]
306        stream_id: Option<StreamId>,
307        /// Backpressure signal type
308        signal: BackpressureSignal,
309        /// When the signal was received
310        timestamp: DateTime<Utc>,
311    },
312
313    /// Stream was paused due to flow control
314    StreamPaused {
315        /// ID of the session containing the stream
316        #[serde(with = "serde_session_id")]
317        session_id: SessionId,
318        /// ID of the paused stream
319        #[serde(with = "serde_stream_id")]
320        stream_id: StreamId,
321        /// Reason for pausing
322        reason: String,
323        /// When the stream was paused
324        timestamp: DateTime<Utc>,
325    },
326
327    /// Stream was resumed after pause
328    StreamResumed {
329        /// ID of the session containing the stream
330        #[serde(with = "serde_session_id")]
331        session_id: SessionId,
332        /// ID of the resumed stream
333        #[serde(with = "serde_stream_id")]
334        stream_id: StreamId,
335        /// Duration the stream was paused in milliseconds
336        paused_duration_ms: u64,
337        /// When the stream was resumed
338        timestamp: DateTime<Utc>,
339    },
340
341    /// Flow control credits were updated
342    CreditsUpdated {
343        /// ID of the session with updated credits
344        #[serde(with = "serde_session_id")]
345        session_id: SessionId,
346        /// New available credits
347        available_credits: usize,
348        /// Maximum allowed credits
349        max_credits: usize,
350        /// When credits were updated
351        timestamp: DateTime<Utc>,
352    },
353}
354
355/// Performance metrics for monitoring and optimization
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
357pub struct PerformanceMetrics {
358    /// Number of frames transmitted per second
359    pub frames_per_second: f64,
360    /// Number of bytes transmitted per second
361    pub bytes_per_second: f64,
362    /// Average size of frames in bytes
363    pub average_frame_size: f64,
364    /// Distribution of frames across priority levels
365    pub priority_distribution: PriorityDistribution,
366    /// Network latency in milliseconds, if available
367    pub latency_ms: Option<u64>,
368}
369
370/// Distribution of frames by priority level
371#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
372pub struct PriorityDistribution {
373    /// Number of critical priority frames
374    pub critical_frames: u64,
375    /// Number of high priority frames
376    pub high_frames: u64,
377    /// Number of medium priority frames
378    pub medium_frames: u64,
379    /// Number of low priority frames
380    pub low_frames: u64,
381    /// Number of background priority frames
382    pub background_frames: u64,
383}
384
385impl PriorityDistribution {
386    /// Create new empty distribution
387    pub fn new() -> Self {
388        Self::default()
389    }
390
391    /// Get total frames count
392    pub fn total_frames(&self) -> u64 {
393        self.critical_frames
394            + self.high_frames
395            + self.medium_frames
396            + self.low_frames
397            + self.background_frames
398    }
399
400    /// Convert to percentages (0.0-1.0)
401    pub fn as_percentages(&self) -> PriorityPercentages {
402        let total = self.total_frames() as f64;
403        if total == 0.0 {
404            return PriorityPercentages::default();
405        }
406
407        PriorityPercentages {
408            critical: self.critical_frames as f64 / total,
409            high: self.high_frames as f64 / total,
410            medium: self.medium_frames as f64 / total,
411            low: self.low_frames as f64 / total,
412            background: self.background_frames as f64 / total,
413        }
414    }
415
416    /// Convert from count-based version
417    pub fn from_counts(
418        critical_count: u64,
419        high_count: u64,
420        medium_count: u64,
421        low_count: u64,
422        background_count: u64,
423    ) -> Self {
424        Self {
425            critical_frames: critical_count,
426            high_frames: high_count,
427            medium_frames: medium_count,
428            low_frames: low_count,
429            background_frames: background_count,
430        }
431    }
432}
433
434/// Priority distribution as percentages (for demos and visualization)
435#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
436pub struct PriorityPercentages {
437    /// Critical priority percentage (0.0-1.0)
438    pub critical: f64,
439    /// High priority percentage (0.0-1.0)
440    pub high: f64,
441    /// Medium priority percentage (0.0-1.0)
442    pub medium: f64,
443    /// Low priority percentage (0.0-1.0)
444    pub low: f64,
445    /// Background priority percentage (0.0-1.0)
446    pub background: f64,
447}
448
449impl Default for PriorityPercentages {
450    fn default() -> Self {
451        Self {
452            critical: 0.0,
453            high: 0.0,
454            medium: 0.0,
455            low: 0.0,
456            background: 0.0,
457        }
458    }
459}
460
461impl DomainEvent {
462    /// Get the session ID associated with this event
463    pub fn session_id(&self) -> SessionId {
464        match self {
465            Self::SessionActivated { session_id, .. } => *session_id,
466            Self::SessionClosed { session_id, .. } => *session_id,
467            Self::SessionExpired { session_id, .. } => *session_id,
468            Self::StreamCreated { session_id, .. } => *session_id,
469            Self::StreamStarted { session_id, .. } => *session_id,
470            Self::StreamCompleted { session_id, .. } => *session_id,
471            Self::StreamFailed { session_id, .. } => *session_id,
472            Self::StreamCancelled { session_id, .. } => *session_id,
473            Self::SkeletonGenerated { session_id, .. } => *session_id,
474            Self::PatchFramesGenerated { session_id, .. } => *session_id,
475            Self::FramesBatched { session_id, .. } => *session_id,
476            Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
477            Self::StreamConfigUpdated { session_id, .. } => *session_id,
478            Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
479            Self::SessionTimedOut { session_id, .. } => *session_id,
480            Self::SessionTimeoutExtended { session_id, .. } => *session_id,
481            Self::BackpressureReceived { session_id, .. } => *session_id,
482            Self::StreamPaused { session_id, .. } => *session_id,
483            Self::StreamResumed { session_id, .. } => *session_id,
484            Self::CreditsUpdated { session_id, .. } => *session_id,
485        }
486    }
487
488    /// Get the stream ID if this is a stream-specific event
489    pub fn stream_id(&self) -> Option<StreamId> {
490        match self {
491            Self::StreamCreated { stream_id, .. } => Some(*stream_id),
492            Self::StreamStarted { stream_id, .. } => Some(*stream_id),
493            Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
494            Self::StreamFailed { stream_id, .. } => Some(*stream_id),
495            Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
496            Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
497            Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
498            Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
499            Self::StreamPaused { stream_id, .. } => Some(*stream_id),
500            Self::StreamResumed { stream_id, .. } => Some(*stream_id),
501            Self::BackpressureReceived { stream_id, .. } => *stream_id,
502            _ => None,
503        }
504    }
505
506    /// Get the timestamp of this event
507    pub fn timestamp(&self) -> DateTime<Utc> {
508        match self {
509            Self::SessionActivated { timestamp, .. } => *timestamp,
510            Self::SessionClosed { timestamp, .. } => *timestamp,
511            Self::SessionExpired { timestamp, .. } => *timestamp,
512            Self::StreamCreated { timestamp, .. } => *timestamp,
513            Self::StreamStarted { timestamp, .. } => *timestamp,
514            Self::StreamCompleted { timestamp, .. } => *timestamp,
515            Self::StreamFailed { timestamp, .. } => *timestamp,
516            Self::StreamCancelled { timestamp, .. } => *timestamp,
517            Self::SkeletonGenerated { timestamp, .. } => *timestamp,
518            Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
519            Self::FramesBatched { timestamp, .. } => *timestamp,
520            Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
521            Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
522            Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
523            Self::SessionTimedOut { timestamp, .. } => *timestamp,
524            Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
525            Self::BackpressureReceived { timestamp, .. } => *timestamp,
526            Self::StreamPaused { timestamp, .. } => *timestamp,
527            Self::StreamResumed { timestamp, .. } => *timestamp,
528            Self::CreditsUpdated { timestamp, .. } => *timestamp,
529        }
530    }
531
532    /// Get the event type as a string
533    pub fn event_type(&self) -> &'static str {
534        match self {
535            Self::SessionActivated { .. } => "session_activated",
536            Self::SessionClosed { .. } => "session_closed",
537            Self::SessionExpired { .. } => "session_expired",
538            Self::StreamCreated { .. } => "stream_created",
539            Self::StreamStarted { .. } => "stream_started",
540            Self::StreamCompleted { .. } => "stream_completed",
541            Self::StreamFailed { .. } => "stream_failed",
542            Self::StreamCancelled { .. } => "stream_cancelled",
543            Self::SkeletonGenerated { .. } => "skeleton_generated",
544            Self::PatchFramesGenerated { .. } => "patch_frames_generated",
545            Self::FramesBatched { .. } => "frames_batched",
546            Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
547            Self::StreamConfigUpdated { .. } => "stream_config_updated",
548            Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
549            Self::SessionTimedOut { .. } => "session_timed_out",
550            Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
551            Self::BackpressureReceived { .. } => "backpressure_received",
552            Self::StreamPaused { .. } => "stream_paused",
553            Self::StreamResumed { .. } => "stream_resumed",
554            Self::CreditsUpdated { .. } => "credits_updated",
555        }
556    }
557
558    /// Check if this is a critical event that requires immediate attention
559    pub fn is_critical(&self) -> bool {
560        matches!(
561            self,
562            Self::StreamFailed { .. } | Self::SessionExpired { .. }
563        )
564    }
565
566    /// Check if this is an error event
567    pub fn is_error(&self) -> bool {
568        matches!(self, Self::StreamFailed { .. })
569    }
570
571    /// Check if this is a completion event
572    pub fn is_completion(&self) -> bool {
573        matches!(
574            self,
575            Self::StreamCompleted { .. } | Self::SessionClosed { .. }
576        )
577    }
578}
579
580/// Event sourcing support for storing and retrieving domain events
581pub trait EventStore {
582    /// Append events to the store
583    ///
584    /// # Errors
585    ///
586    /// Returns an error if events cannot be persisted
587    fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
588
589    /// Get events for a specific session
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if events cannot be retrieved
594    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
595
596    /// Get events for a specific stream
597    ///
598    /// # Errors
599    ///
600    /// Returns an error if events cannot be retrieved
601    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
602
603    /// Get all events since a specific timestamp
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if events cannot be retrieved
608    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
609}
610
611/// Simple in-memory event store for testing
612#[derive(Debug, Clone, Default)]
613pub struct InMemoryEventStore {
614    events: Vec<DomainEvent>,
615}
616
617impl InMemoryEventStore {
618    /// Create a new empty event store
619    pub fn new() -> Self {
620        Self::default()
621    }
622
623    /// Get all events in the store
624    pub fn all_events(&self) -> &[DomainEvent] {
625        &self.events
626    }
627
628    /// Get the total number of events in the store
629    pub fn event_count(&self) -> usize {
630        self.events.len()
631    }
632}
633
634impl EventStore for InMemoryEventStore {
635    fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
636        self.events.append(&mut events);
637        Ok(())
638    }
639
640    fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
641        Ok(self
642            .events
643            .iter()
644            .filter(|e| e.session_id() == session_id)
645            .cloned()
646            .collect())
647    }
648
649    fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
650        Ok(self
651            .events
652            .iter()
653            .filter(|e| e.stream_id() == Some(stream_id))
654            .cloned()
655            .collect())
656    }
657
658    fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
659        Ok(self
660            .events
661            .iter()
662            .filter(|e| e.timestamp() > since)
663            .cloned()
664            .collect())
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use crate::value_objects::{SessionId, StreamId};
672
673    #[test]
674    fn test_domain_event_properties() {
675        let session_id = SessionId::new();
676        let stream_id = StreamId::new();
677        let timestamp = Utc::now();
678
679        let event = DomainEvent::StreamCreated {
680            session_id,
681            stream_id,
682            timestamp,
683        };
684
685        assert_eq!(event.session_id(), session_id);
686        assert_eq!(event.stream_id(), Some(stream_id));
687        assert_eq!(event.timestamp(), timestamp);
688        assert_eq!(event.event_type(), "stream_created");
689        assert!(!event.is_critical());
690        assert!(!event.is_error());
691    }
692
693    #[test]
694    fn test_critical_events() {
695        let session_id = SessionId::new();
696        let stream_id = StreamId::new();
697
698        let error_event = DomainEvent::StreamFailed {
699            session_id,
700            stream_id,
701            error: "Connection lost".to_string(),
702            timestamp: Utc::now(),
703        };
704
705        assert!(error_event.is_critical());
706        assert!(error_event.is_error());
707        assert!(!error_event.is_completion());
708    }
709
710    #[test]
711    fn test_event_store() {
712        let mut store = InMemoryEventStore::new();
713        let session_id = SessionId::new();
714        let stream_id = StreamId::new();
715
716        let events = vec![
717            DomainEvent::SessionActivated {
718                session_id,
719                timestamp: Utc::now(),
720            },
721            DomainEvent::StreamCreated {
722                session_id,
723                stream_id,
724                timestamp: Utc::now(),
725            },
726        ];
727
728        store
729            .append_events(events.clone())
730            .expect("Failed to append events to store in test");
731        assert_eq!(store.event_count(), 2);
732
733        let session_events = store
734            .get_events_for_session(session_id)
735            .expect("Failed to retrieve session events in test");
736        assert_eq!(session_events.len(), 2);
737
738        let stream_events = store
739            .get_events_for_stream(stream_id)
740            .expect("Failed to retrieve stream events in test");
741        assert_eq!(stream_events.len(), 1);
742    }
743
744    #[test]
745    fn test_event_serialization() {
746        let session_id = SessionId::new();
747        let event = DomainEvent::SessionActivated {
748            session_id,
749            timestamp: Utc::now(),
750        };
751
752        let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
753        let deserialized: DomainEvent =
754            serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
755
756        assert_eq!(event, deserialized);
757    }
758}
759
760/// Event identifier for tracking and correlation
761#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
762pub struct EventId(uuid::Uuid);
763
764impl EventId {
765    /// Generate new unique event ID
766    pub fn new() -> Self {
767        Self(uuid::Uuid::new_v4())
768    }
769
770    /// Create from existing UUID
771    pub fn from_uuid(uuid: uuid::Uuid) -> Self {
772        Self(uuid)
773    }
774
775    /// Get inner UUID
776    pub fn inner(&self) -> uuid::Uuid {
777        self.0
778    }
779}
780
781impl std::fmt::Display for EventId {
782    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
783        write!(f, "{}", self.0)
784    }
785}
786
787impl Default for EventId {
788    fn default() -> Self {
789        Self::new()
790    }
791}
792
793/// GAT-based trait for event subscribers that handle domain events
794pub trait EventSubscriber {
795    /// Future type for handling events
796    type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
797    where
798        Self: 'a;
799
800    /// Handle a domain event
801    fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
802}
803
804/// Extension methods for DomainEvent
805impl DomainEvent {
806    /// Get event ID for tracking (generated if not exists)
807    pub fn event_id(&self) -> EventId {
808        // For now, generate deterministic ID based on event content
809        // In future versions, this should be stored with the event
810        let content = format!("{self:?}");
811        use std::collections::hash_map::DefaultHasher;
812        use std::hash::{Hash, Hasher};
813        let mut hash = DefaultHasher::new();
814        content.hash(&mut hash);
815        let hash_val = hash.finish();
816        let uuid = uuid::Uuid::from_bytes([
817            (hash_val >> 56) as u8,
818            (hash_val >> 48) as u8,
819            (hash_val >> 40) as u8,
820            (hash_val >> 32) as u8,
821            (hash_val >> 24) as u8,
822            (hash_val >> 16) as u8,
823            (hash_val >> 8) as u8,
824            hash_val as u8,
825            0,
826            0,
827            0,
828            0,
829            0,
830            0,
831            0,
832            0,
833        ]);
834        EventId::from_uuid(uuid)
835    }
836
837    /// Get event timestamp
838    pub fn occurred_at(&self) -> DateTime<Utc> {
839        match self {
840            DomainEvent::SessionActivated { timestamp, .. }
841            | DomainEvent::SessionClosed { timestamp, .. }
842            | DomainEvent::SessionExpired { timestamp, .. }
843            | DomainEvent::StreamCreated { timestamp, .. }
844            | DomainEvent::StreamStarted { timestamp, .. }
845            | DomainEvent::StreamCompleted { timestamp, .. }
846            | DomainEvent::StreamFailed { timestamp, .. }
847            | DomainEvent::StreamCancelled { timestamp, .. }
848            | DomainEvent::SkeletonGenerated { timestamp, .. }
849            | DomainEvent::PatchFramesGenerated { timestamp, .. }
850            | DomainEvent::FramesBatched { timestamp, .. }
851            | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
852            | DomainEvent::StreamConfigUpdated { timestamp, .. }
853            | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
854            | DomainEvent::SessionTimedOut { timestamp, .. }
855            | DomainEvent::SessionTimeoutExtended { timestamp, .. }
856            | DomainEvent::BackpressureReceived { timestamp, .. }
857            | DomainEvent::StreamPaused { timestamp, .. }
858            | DomainEvent::StreamResumed { timestamp, .. }
859            | DomainEvent::CreditsUpdated { timestamp, .. } => *timestamp,
860        }
861    }
862
863    /// Get event metadata as key-value pairs
864    pub fn metadata(&self) -> std::collections::HashMap<String, String> {
865        let mut metadata = std::collections::HashMap::new();
866        metadata.insert("event_type".to_string(), self.event_type().to_string());
867        metadata.insert("session_id".to_string(), self.session_id().to_string());
868        metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
869
870        if let Some(stream_id) = self.stream_id() {
871            metadata.insert("stream_id".to_string(), stream_id.to_string());
872        }
873
874        metadata
875    }
876}