1use crate::value_objects::{SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7mod serde_session_id {
9 use crate::value_objects::SessionId;
10 use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13 where
14 S: Serializer,
15 {
16 id.as_uuid().serialize(serializer)
17 }
18
19 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20 where
21 D: Deserializer<'de>,
22 {
23 let uuid = uuid::Uuid::deserialize(deserializer)?;
24 Ok(SessionId::from_uuid(uuid))
25 }
26}
27
28mod serde_stream_id {
30 use crate::value_objects::StreamId;
31 use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34 where
35 S: Serializer,
36 {
37 id.as_uuid().serialize(serializer)
38 }
39
40 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41 where
42 D: Deserializer<'de>,
43 {
44 let uuid = uuid::Uuid::deserialize(deserializer)?;
45 Ok(StreamId::from_uuid(uuid))
46 }
47}
48
49#[allow(dead_code)]
51mod serde_option_stream_id {
52 use crate::value_objects::StreamId;
53 use serde::{Deserialize, Deserializer, Serialize, Serializer};
54
55 pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
56 where
57 S: Serializer,
58 {
59 id.map(|i| i.as_uuid()).serialize(serializer)
60 }
61
62 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
63 where
64 D: Deserializer<'de>,
65 {
66 let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
67 Ok(uuid.map(StreamId::from_uuid))
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub enum SessionState {
74 Initializing,
76 Active,
78 Closing,
80 Completed,
82 Failed,
84}
85
86#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
88#[serde(tag = "event_type", rename_all = "snake_case")]
89pub enum DomainEvent {
90 SessionActivated {
92 #[serde(with = "serde_session_id")]
94 session_id: SessionId,
95 timestamp: DateTime<Utc>,
97 },
98
99 SessionClosed {
101 #[serde(with = "serde_session_id")]
103 session_id: SessionId,
104 timestamp: DateTime<Utc>,
106 },
107
108 SessionExpired {
110 #[serde(with = "serde_session_id")]
112 session_id: SessionId,
113 timestamp: DateTime<Utc>,
115 },
116
117 SessionTimedOut {
119 #[serde(with = "serde_session_id")]
121 session_id: SessionId,
122 original_state: SessionState,
124 timeout_duration: u64,
126 timestamp: DateTime<Utc>,
128 },
129
130 SessionTimeoutExtended {
132 #[serde(with = "serde_session_id")]
134 session_id: SessionId,
135 additional_seconds: u64,
137 new_expires_at: DateTime<Utc>,
139 timestamp: DateTime<Utc>,
141 },
142
143 StreamCreated {
145 #[serde(with = "serde_session_id")]
147 session_id: SessionId,
148 #[serde(with = "serde_stream_id")]
150 stream_id: StreamId,
151 timestamp: DateTime<Utc>,
153 },
154
155 StreamStarted {
157 #[serde(with = "serde_session_id")]
159 session_id: SessionId,
160 #[serde(with = "serde_stream_id")]
162 stream_id: StreamId,
163 timestamp: DateTime<Utc>,
165 },
166
167 StreamCompleted {
169 #[serde(with = "serde_session_id")]
171 session_id: SessionId,
172 #[serde(with = "serde_stream_id")]
174 stream_id: StreamId,
175 timestamp: DateTime<Utc>,
177 },
178
179 StreamFailed {
181 #[serde(with = "serde_session_id")]
183 session_id: SessionId,
184 #[serde(with = "serde_stream_id")]
186 stream_id: StreamId,
187 error: String,
189 timestamp: DateTime<Utc>,
191 },
192
193 StreamCancelled {
195 #[serde(with = "serde_session_id")]
197 session_id: SessionId,
198 #[serde(with = "serde_stream_id")]
200 stream_id: StreamId,
201 timestamp: DateTime<Utc>,
203 },
204
205 SkeletonGenerated {
207 #[serde(with = "serde_session_id")]
209 session_id: SessionId,
210 #[serde(with = "serde_stream_id")]
212 stream_id: StreamId,
213 frame_size_bytes: u64,
215 timestamp: DateTime<Utc>,
217 },
218
219 PatchFramesGenerated {
221 #[serde(with = "serde_session_id")]
223 session_id: SessionId,
224 #[serde(with = "serde_stream_id")]
226 stream_id: StreamId,
227 frame_count: usize,
229 total_bytes: u64,
231 highest_priority: u8,
233 timestamp: DateTime<Utc>,
235 },
236
237 FramesBatched {
239 #[serde(with = "serde_session_id")]
241 session_id: SessionId,
242 frame_count: usize,
244 timestamp: DateTime<Utc>,
246 },
247
248 PriorityThresholdAdjusted {
250 #[serde(with = "serde_session_id")]
252 session_id: SessionId,
253 old_threshold: u8,
255 new_threshold: u8,
257 reason: String,
259 timestamp: DateTime<Utc>,
261 },
262
263 StreamConfigUpdated {
265 #[serde(with = "serde_session_id")]
267 session_id: SessionId,
268 #[serde(with = "serde_stream_id")]
270 stream_id: StreamId,
271 timestamp: DateTime<Utc>,
273 },
274
275 PerformanceMetricsRecorded {
277 #[serde(with = "serde_session_id")]
279 session_id: SessionId,
280 metrics: PerformanceMetrics,
282 timestamp: DateTime<Utc>,
284 },
285}
286
287#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
289pub struct PerformanceMetrics {
290 pub frames_per_second: f64,
292 pub bytes_per_second: f64,
294 pub average_frame_size: f64,
296 pub priority_distribution: PriorityDistribution,
298 pub latency_ms: Option<u64>,
300}
301
302#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
304pub struct PriorityDistribution {
305 pub critical_frames: u64,
307 pub high_frames: u64,
309 pub medium_frames: u64,
311 pub low_frames: u64,
313 pub background_frames: u64,
315}
316
317impl PriorityDistribution {
318 pub fn new() -> Self {
320 Self::default()
321 }
322
323 pub fn total_frames(&self) -> u64 {
325 self.critical_frames
326 + self.high_frames
327 + self.medium_frames
328 + self.low_frames
329 + self.background_frames
330 }
331
332 pub fn as_percentages(&self) -> PriorityPercentages {
334 let total = self.total_frames() as f64;
335 if total == 0.0 {
336 return PriorityPercentages::default();
337 }
338
339 PriorityPercentages {
340 critical: self.critical_frames as f64 / total,
341 high: self.high_frames as f64 / total,
342 medium: self.medium_frames as f64 / total,
343 low: self.low_frames as f64 / total,
344 background: self.background_frames as f64 / total,
345 }
346 }
347
348 pub fn from_counts(
350 critical_count: u64,
351 high_count: u64,
352 medium_count: u64,
353 low_count: u64,
354 background_count: u64,
355 ) -> Self {
356 Self {
357 critical_frames: critical_count,
358 high_frames: high_count,
359 medium_frames: medium_count,
360 low_frames: low_count,
361 background_frames: background_count,
362 }
363 }
364}
365
366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
368pub struct PriorityPercentages {
369 pub critical: f64,
371 pub high: f64,
373 pub medium: f64,
375 pub low: f64,
377 pub background: f64,
379}
380
381impl Default for PriorityPercentages {
382 fn default() -> Self {
383 Self {
384 critical: 0.0,
385 high: 0.0,
386 medium: 0.0,
387 low: 0.0,
388 background: 0.0,
389 }
390 }
391}
392
393impl DomainEvent {
394 pub fn session_id(&self) -> SessionId {
396 match self {
397 Self::SessionActivated { session_id, .. } => *session_id,
398 Self::SessionClosed { session_id, .. } => *session_id,
399 Self::SessionExpired { session_id, .. } => *session_id,
400 Self::StreamCreated { session_id, .. } => *session_id,
401 Self::StreamStarted { session_id, .. } => *session_id,
402 Self::StreamCompleted { session_id, .. } => *session_id,
403 Self::StreamFailed { session_id, .. } => *session_id,
404 Self::StreamCancelled { session_id, .. } => *session_id,
405 Self::SkeletonGenerated { session_id, .. } => *session_id,
406 Self::PatchFramesGenerated { session_id, .. } => *session_id,
407 Self::FramesBatched { session_id, .. } => *session_id,
408 Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
409 Self::StreamConfigUpdated { session_id, .. } => *session_id,
410 Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
411 Self::SessionTimedOut { session_id, .. } => *session_id,
412 Self::SessionTimeoutExtended { session_id, .. } => *session_id,
413 }
414 }
415
416 pub fn stream_id(&self) -> Option<StreamId> {
418 match self {
419 Self::StreamCreated { stream_id, .. } => Some(*stream_id),
420 Self::StreamStarted { stream_id, .. } => Some(*stream_id),
421 Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
422 Self::StreamFailed { stream_id, .. } => Some(*stream_id),
423 Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
424 Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
425 Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
426 Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
427 _ => None,
428 }
429 }
430
431 pub fn timestamp(&self) -> DateTime<Utc> {
433 match self {
434 Self::SessionActivated { timestamp, .. } => *timestamp,
435 Self::SessionClosed { timestamp, .. } => *timestamp,
436 Self::SessionExpired { timestamp, .. } => *timestamp,
437 Self::StreamCreated { timestamp, .. } => *timestamp,
438 Self::StreamStarted { timestamp, .. } => *timestamp,
439 Self::StreamCompleted { timestamp, .. } => *timestamp,
440 Self::StreamFailed { timestamp, .. } => *timestamp,
441 Self::StreamCancelled { timestamp, .. } => *timestamp,
442 Self::SkeletonGenerated { timestamp, .. } => *timestamp,
443 Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
444 Self::FramesBatched { timestamp, .. } => *timestamp,
445 Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
446 Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
447 Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
448 Self::SessionTimedOut { timestamp, .. } => *timestamp,
449 Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
450 }
451 }
452
453 pub fn event_type(&self) -> &'static str {
455 match self {
456 Self::SessionActivated { .. } => "session_activated",
457 Self::SessionClosed { .. } => "session_closed",
458 Self::SessionExpired { .. } => "session_expired",
459 Self::StreamCreated { .. } => "stream_created",
460 Self::StreamStarted { .. } => "stream_started",
461 Self::StreamCompleted { .. } => "stream_completed",
462 Self::StreamFailed { .. } => "stream_failed",
463 Self::StreamCancelled { .. } => "stream_cancelled",
464 Self::SkeletonGenerated { .. } => "skeleton_generated",
465 Self::PatchFramesGenerated { .. } => "patch_frames_generated",
466 Self::FramesBatched { .. } => "frames_batched",
467 Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
468 Self::StreamConfigUpdated { .. } => "stream_config_updated",
469 Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
470 Self::SessionTimedOut { .. } => "session_timed_out",
471 Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
472 }
473 }
474
475 pub fn is_critical(&self) -> bool {
477 matches!(
478 self,
479 Self::StreamFailed { .. } | Self::SessionExpired { .. }
480 )
481 }
482
483 pub fn is_error(&self) -> bool {
485 matches!(self, Self::StreamFailed { .. })
486 }
487
488 pub fn is_completion(&self) -> bool {
490 matches!(
491 self,
492 Self::StreamCompleted { .. } | Self::SessionClosed { .. }
493 )
494 }
495}
496
497pub trait EventStore {
499 fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
505
506 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
512
513 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
519
520 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
526}
527
528#[derive(Debug, Clone, Default)]
530pub struct InMemoryEventStore {
531 events: Vec<DomainEvent>,
532}
533
534impl InMemoryEventStore {
535 pub fn new() -> Self {
537 Self::default()
538 }
539
540 pub fn all_events(&self) -> &[DomainEvent] {
542 &self.events
543 }
544
545 pub fn event_count(&self) -> usize {
547 self.events.len()
548 }
549}
550
551impl EventStore for InMemoryEventStore {
552 fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
553 self.events.append(&mut events);
554 Ok(())
555 }
556
557 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
558 Ok(self
559 .events
560 .iter()
561 .filter(|e| e.session_id() == session_id)
562 .cloned()
563 .collect())
564 }
565
566 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
567 Ok(self
568 .events
569 .iter()
570 .filter(|e| e.stream_id() == Some(stream_id))
571 .cloned()
572 .collect())
573 }
574
575 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
576 Ok(self
577 .events
578 .iter()
579 .filter(|e| e.timestamp() > since)
580 .cloned()
581 .collect())
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use crate::value_objects::{SessionId, StreamId};
589
590 #[test]
591 fn test_domain_event_properties() {
592 let session_id = SessionId::new();
593 let stream_id = StreamId::new();
594 let timestamp = Utc::now();
595
596 let event = DomainEvent::StreamCreated {
597 session_id,
598 stream_id,
599 timestamp,
600 };
601
602 assert_eq!(event.session_id(), session_id);
603 assert_eq!(event.stream_id(), Some(stream_id));
604 assert_eq!(event.timestamp(), timestamp);
605 assert_eq!(event.event_type(), "stream_created");
606 assert!(!event.is_critical());
607 assert!(!event.is_error());
608 }
609
610 #[test]
611 fn test_critical_events() {
612 let session_id = SessionId::new();
613 let stream_id = StreamId::new();
614
615 let error_event = DomainEvent::StreamFailed {
616 session_id,
617 stream_id,
618 error: "Connection lost".to_string(),
619 timestamp: Utc::now(),
620 };
621
622 assert!(error_event.is_critical());
623 assert!(error_event.is_error());
624 assert!(!error_event.is_completion());
625 }
626
627 #[test]
628 fn test_event_store() {
629 let mut store = InMemoryEventStore::new();
630 let session_id = SessionId::new();
631 let stream_id = StreamId::new();
632
633 let events = vec![
634 DomainEvent::SessionActivated {
635 session_id,
636 timestamp: Utc::now(),
637 },
638 DomainEvent::StreamCreated {
639 session_id,
640 stream_id,
641 timestamp: Utc::now(),
642 },
643 ];
644
645 store
646 .append_events(events.clone())
647 .expect("Failed to append events to store in test");
648 assert_eq!(store.event_count(), 2);
649
650 let session_events = store
651 .get_events_for_session(session_id)
652 .expect("Failed to retrieve session events in test");
653 assert_eq!(session_events.len(), 2);
654
655 let stream_events = store
656 .get_events_for_stream(stream_id)
657 .expect("Failed to retrieve stream events in test");
658 assert_eq!(stream_events.len(), 1);
659 }
660
661 #[test]
662 fn test_event_serialization() {
663 let session_id = SessionId::new();
664 let event = DomainEvent::SessionActivated {
665 session_id,
666 timestamp: Utc::now(),
667 };
668
669 let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
670 let deserialized: DomainEvent =
671 serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
672
673 assert_eq!(event, deserialized);
674 }
675}
676
677#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
679pub struct EventId(uuid::Uuid);
680
681impl EventId {
682 pub fn new() -> Self {
684 Self(uuid::Uuid::new_v4())
685 }
686
687 pub fn from_uuid(uuid: uuid::Uuid) -> Self {
689 Self(uuid)
690 }
691
692 pub fn inner(&self) -> uuid::Uuid {
694 self.0
695 }
696}
697
698impl std::fmt::Display for EventId {
699 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
700 write!(f, "{}", self.0)
701 }
702}
703
704impl Default for EventId {
705 fn default() -> Self {
706 Self::new()
707 }
708}
709
710pub trait EventSubscriber {
712 type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
714 where
715 Self: 'a;
716
717 fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
719}
720
721impl DomainEvent {
723 pub fn event_id(&self) -> EventId {
725 let content = format!("{self:?}");
728 use std::collections::hash_map::DefaultHasher;
729 use std::hash::{Hash, Hasher};
730 let mut hash = DefaultHasher::new();
731 content.hash(&mut hash);
732 let hash_val = hash.finish();
733 let uuid = uuid::Uuid::from_bytes([
734 (hash_val >> 56) as u8,
735 (hash_val >> 48) as u8,
736 (hash_val >> 40) as u8,
737 (hash_val >> 32) as u8,
738 (hash_val >> 24) as u8,
739 (hash_val >> 16) as u8,
740 (hash_val >> 8) as u8,
741 hash_val as u8,
742 0,
743 0,
744 0,
745 0,
746 0,
747 0,
748 0,
749 0,
750 ]);
751 EventId::from_uuid(uuid)
752 }
753
754 pub fn occurred_at(&self) -> DateTime<Utc> {
756 match self {
757 DomainEvent::SessionActivated { timestamp, .. }
758 | DomainEvent::SessionClosed { timestamp, .. }
759 | DomainEvent::SessionExpired { timestamp, .. }
760 | DomainEvent::StreamCreated { timestamp, .. }
761 | DomainEvent::StreamStarted { timestamp, .. }
762 | DomainEvent::StreamCompleted { timestamp, .. }
763 | DomainEvent::StreamFailed { timestamp, .. }
764 | DomainEvent::StreamCancelled { timestamp, .. }
765 | DomainEvent::SkeletonGenerated { timestamp, .. }
766 | DomainEvent::PatchFramesGenerated { timestamp, .. }
767 | DomainEvent::FramesBatched { timestamp, .. }
768 | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
769 | DomainEvent::StreamConfigUpdated { timestamp, .. }
770 | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
771 | DomainEvent::SessionTimedOut { timestamp, .. }
772 | DomainEvent::SessionTimeoutExtended { timestamp, .. } => *timestamp,
773 }
774 }
775
776 pub fn metadata(&self) -> std::collections::HashMap<String, String> {
778 let mut metadata = std::collections::HashMap::new();
779 metadata.insert("event_type".to_string(), self.event_type().to_string());
780 metadata.insert("session_id".to_string(), self.session_id().to_string());
781 metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
782
783 if let Some(stream_id) = self.stream_id() {
784 metadata.insert("stream_id".to_string(), stream_id.to_string());
785 }
786
787 metadata
788 }
789}