1use crate::value_objects::{BackpressureSignal, SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7mod serde_session_id {
9 use crate::value_objects::SessionId;
10 use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13 where
14 S: Serializer,
15 {
16 id.as_uuid().serialize(serializer)
17 }
18
19 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20 where
21 D: Deserializer<'de>,
22 {
23 let uuid = uuid::Uuid::deserialize(deserializer)?;
24 Ok(SessionId::from_uuid(uuid))
25 }
26}
27
28mod serde_stream_id {
30 use crate::value_objects::StreamId;
31 use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34 where
35 S: Serializer,
36 {
37 id.as_uuid().serialize(serializer)
38 }
39
40 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41 where
42 D: Deserializer<'de>,
43 {
44 let uuid = uuid::Uuid::deserialize(deserializer)?;
45 Ok(StreamId::from_uuid(uuid))
46 }
47}
48
49mod serde_option_stream_id {
51 use crate::value_objects::StreamId;
52 use serde::{Deserialize, Deserializer, Serialize, Serializer};
53
54 pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
55 where
56 S: Serializer,
57 {
58 id.map(|i| i.as_uuid()).serialize(serializer)
59 }
60
61 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
62 where
63 D: Deserializer<'de>,
64 {
65 let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
66 Ok(uuid.map(StreamId::from_uuid))
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub enum SessionState {
73 Initializing,
75 Active,
77 Closing,
79 Completed,
81 Failed,
83}
84
85impl SessionState {
86 pub fn as_str(&self) -> &'static str {
88 match self {
89 SessionState::Initializing => "Initializing",
90 SessionState::Active => "Active",
91 SessionState::Closing => "Closing",
92 SessionState::Completed => "Completed",
93 SessionState::Failed => "Failed",
94 }
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
100#[serde(tag = "event_type", rename_all = "snake_case")]
101pub enum DomainEvent {
102 SessionActivated {
104 #[serde(with = "serde_session_id")]
106 session_id: SessionId,
107 timestamp: DateTime<Utc>,
109 },
110
111 SessionClosed {
113 #[serde(with = "serde_session_id")]
115 session_id: SessionId,
116 timestamp: DateTime<Utc>,
118 },
119
120 SessionExpired {
122 #[serde(with = "serde_session_id")]
124 session_id: SessionId,
125 timestamp: DateTime<Utc>,
127 },
128
129 SessionTimedOut {
131 #[serde(with = "serde_session_id")]
133 session_id: SessionId,
134 original_state: SessionState,
136 timeout_duration: u64,
138 timestamp: DateTime<Utc>,
140 },
141
142 SessionTimeoutExtended {
144 #[serde(with = "serde_session_id")]
146 session_id: SessionId,
147 additional_seconds: u64,
149 new_expires_at: DateTime<Utc>,
151 timestamp: DateTime<Utc>,
153 },
154
155 StreamCreated {
157 #[serde(with = "serde_session_id")]
159 session_id: SessionId,
160 #[serde(with = "serde_stream_id")]
162 stream_id: StreamId,
163 timestamp: DateTime<Utc>,
165 },
166
167 StreamStarted {
169 #[serde(with = "serde_session_id")]
171 session_id: SessionId,
172 #[serde(with = "serde_stream_id")]
174 stream_id: StreamId,
175 timestamp: DateTime<Utc>,
177 },
178
179 StreamCompleted {
181 #[serde(with = "serde_session_id")]
183 session_id: SessionId,
184 #[serde(with = "serde_stream_id")]
186 stream_id: StreamId,
187 timestamp: DateTime<Utc>,
189 },
190
191 StreamFailed {
193 #[serde(with = "serde_session_id")]
195 session_id: SessionId,
196 #[serde(with = "serde_stream_id")]
198 stream_id: StreamId,
199 error: String,
201 timestamp: DateTime<Utc>,
203 },
204
205 StreamCancelled {
207 #[serde(with = "serde_session_id")]
209 session_id: SessionId,
210 #[serde(with = "serde_stream_id")]
212 stream_id: StreamId,
213 timestamp: DateTime<Utc>,
215 },
216
217 SkeletonGenerated {
219 #[serde(with = "serde_session_id")]
221 session_id: SessionId,
222 #[serde(with = "serde_stream_id")]
224 stream_id: StreamId,
225 frame_size_bytes: u64,
227 timestamp: DateTime<Utc>,
229 },
230
231 PatchFramesGenerated {
233 #[serde(with = "serde_session_id")]
235 session_id: SessionId,
236 #[serde(with = "serde_stream_id")]
238 stream_id: StreamId,
239 frame_count: usize,
241 total_bytes: u64,
243 highest_priority: u8,
245 timestamp: DateTime<Utc>,
247 },
248
249 FramesBatched {
251 #[serde(with = "serde_session_id")]
253 session_id: SessionId,
254 frame_count: usize,
256 timestamp: DateTime<Utc>,
258 },
259
260 PriorityThresholdAdjusted {
262 #[serde(with = "serde_session_id")]
264 session_id: SessionId,
265 old_threshold: u8,
267 new_threshold: u8,
269 reason: String,
271 timestamp: DateTime<Utc>,
273 },
274
275 StreamConfigUpdated {
277 #[serde(with = "serde_session_id")]
279 session_id: SessionId,
280 #[serde(with = "serde_stream_id")]
282 stream_id: StreamId,
283 timestamp: DateTime<Utc>,
285 },
286
287 PerformanceMetricsRecorded {
289 #[serde(with = "serde_session_id")]
291 session_id: SessionId,
292 metrics: PerformanceMetrics,
294 timestamp: DateTime<Utc>,
296 },
297
298 BackpressureReceived {
300 #[serde(with = "serde_session_id")]
302 session_id: SessionId,
303 #[serde(default, skip_serializing_if = "Option::is_none")]
305 #[serde(with = "serde_option_stream_id")]
306 stream_id: Option<StreamId>,
307 signal: BackpressureSignal,
309 timestamp: DateTime<Utc>,
311 },
312
313 StreamPaused {
315 #[serde(with = "serde_session_id")]
317 session_id: SessionId,
318 #[serde(with = "serde_stream_id")]
320 stream_id: StreamId,
321 reason: String,
323 timestamp: DateTime<Utc>,
325 },
326
327 StreamResumed {
329 #[serde(with = "serde_session_id")]
331 session_id: SessionId,
332 #[serde(with = "serde_stream_id")]
334 stream_id: StreamId,
335 paused_duration_ms: u64,
337 timestamp: DateTime<Utc>,
339 },
340
341 CreditsUpdated {
343 #[serde(with = "serde_session_id")]
345 session_id: SessionId,
346 available_credits: usize,
348 max_credits: usize,
350 timestamp: DateTime<Utc>,
352 },
353}
354
355#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
357pub struct PerformanceMetrics {
358 pub frames_per_second: f64,
360 pub bytes_per_second: f64,
362 pub average_frame_size: f64,
364 pub priority_distribution: PriorityDistribution,
366 pub latency_ms: Option<u64>,
368}
369
370#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
372pub struct PriorityDistribution {
373 pub critical_frames: u64,
375 pub high_frames: u64,
377 pub medium_frames: u64,
379 pub low_frames: u64,
381 pub background_frames: u64,
383}
384
385impl PriorityDistribution {
386 pub fn new() -> Self {
388 Self::default()
389 }
390
391 pub fn total_frames(&self) -> u64 {
393 self.critical_frames
394 + self.high_frames
395 + self.medium_frames
396 + self.low_frames
397 + self.background_frames
398 }
399
400 pub fn as_percentages(&self) -> PriorityPercentages {
402 let total = self.total_frames() as f64;
403 if total == 0.0 {
404 return PriorityPercentages::default();
405 }
406
407 PriorityPercentages {
408 critical: self.critical_frames as f64 / total,
409 high: self.high_frames as f64 / total,
410 medium: self.medium_frames as f64 / total,
411 low: self.low_frames as f64 / total,
412 background: self.background_frames as f64 / total,
413 }
414 }
415
416 pub fn from_counts(
418 critical_count: u64,
419 high_count: u64,
420 medium_count: u64,
421 low_count: u64,
422 background_count: u64,
423 ) -> Self {
424 Self {
425 critical_frames: critical_count,
426 high_frames: high_count,
427 medium_frames: medium_count,
428 low_frames: low_count,
429 background_frames: background_count,
430 }
431 }
432}
433
434#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
436pub struct PriorityPercentages {
437 pub critical: f64,
439 pub high: f64,
441 pub medium: f64,
443 pub low: f64,
445 pub background: f64,
447}
448
449impl Default for PriorityPercentages {
450 fn default() -> Self {
451 Self {
452 critical: 0.0,
453 high: 0.0,
454 medium: 0.0,
455 low: 0.0,
456 background: 0.0,
457 }
458 }
459}
460
461impl DomainEvent {
462 pub fn session_id(&self) -> SessionId {
464 match self {
465 Self::SessionActivated { session_id, .. } => *session_id,
466 Self::SessionClosed { session_id, .. } => *session_id,
467 Self::SessionExpired { session_id, .. } => *session_id,
468 Self::StreamCreated { session_id, .. } => *session_id,
469 Self::StreamStarted { session_id, .. } => *session_id,
470 Self::StreamCompleted { session_id, .. } => *session_id,
471 Self::StreamFailed { session_id, .. } => *session_id,
472 Self::StreamCancelled { session_id, .. } => *session_id,
473 Self::SkeletonGenerated { session_id, .. } => *session_id,
474 Self::PatchFramesGenerated { session_id, .. } => *session_id,
475 Self::FramesBatched { session_id, .. } => *session_id,
476 Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
477 Self::StreamConfigUpdated { session_id, .. } => *session_id,
478 Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
479 Self::SessionTimedOut { session_id, .. } => *session_id,
480 Self::SessionTimeoutExtended { session_id, .. } => *session_id,
481 Self::BackpressureReceived { session_id, .. } => *session_id,
482 Self::StreamPaused { session_id, .. } => *session_id,
483 Self::StreamResumed { session_id, .. } => *session_id,
484 Self::CreditsUpdated { session_id, .. } => *session_id,
485 }
486 }
487
488 pub fn stream_id(&self) -> Option<StreamId> {
490 match self {
491 Self::StreamCreated { stream_id, .. } => Some(*stream_id),
492 Self::StreamStarted { stream_id, .. } => Some(*stream_id),
493 Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
494 Self::StreamFailed { stream_id, .. } => Some(*stream_id),
495 Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
496 Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
497 Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
498 Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
499 Self::StreamPaused { stream_id, .. } => Some(*stream_id),
500 Self::StreamResumed { stream_id, .. } => Some(*stream_id),
501 Self::BackpressureReceived { stream_id, .. } => *stream_id,
502 _ => None,
503 }
504 }
505
506 pub fn timestamp(&self) -> DateTime<Utc> {
508 match self {
509 Self::SessionActivated { timestamp, .. } => *timestamp,
510 Self::SessionClosed { timestamp, .. } => *timestamp,
511 Self::SessionExpired { timestamp, .. } => *timestamp,
512 Self::StreamCreated { timestamp, .. } => *timestamp,
513 Self::StreamStarted { timestamp, .. } => *timestamp,
514 Self::StreamCompleted { timestamp, .. } => *timestamp,
515 Self::StreamFailed { timestamp, .. } => *timestamp,
516 Self::StreamCancelled { timestamp, .. } => *timestamp,
517 Self::SkeletonGenerated { timestamp, .. } => *timestamp,
518 Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
519 Self::FramesBatched { timestamp, .. } => *timestamp,
520 Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
521 Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
522 Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
523 Self::SessionTimedOut { timestamp, .. } => *timestamp,
524 Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
525 Self::BackpressureReceived { timestamp, .. } => *timestamp,
526 Self::StreamPaused { timestamp, .. } => *timestamp,
527 Self::StreamResumed { timestamp, .. } => *timestamp,
528 Self::CreditsUpdated { timestamp, .. } => *timestamp,
529 }
530 }
531
532 pub fn event_type(&self) -> &'static str {
534 match self {
535 Self::SessionActivated { .. } => "session_activated",
536 Self::SessionClosed { .. } => "session_closed",
537 Self::SessionExpired { .. } => "session_expired",
538 Self::StreamCreated { .. } => "stream_created",
539 Self::StreamStarted { .. } => "stream_started",
540 Self::StreamCompleted { .. } => "stream_completed",
541 Self::StreamFailed { .. } => "stream_failed",
542 Self::StreamCancelled { .. } => "stream_cancelled",
543 Self::SkeletonGenerated { .. } => "skeleton_generated",
544 Self::PatchFramesGenerated { .. } => "patch_frames_generated",
545 Self::FramesBatched { .. } => "frames_batched",
546 Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
547 Self::StreamConfigUpdated { .. } => "stream_config_updated",
548 Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
549 Self::SessionTimedOut { .. } => "session_timed_out",
550 Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
551 Self::BackpressureReceived { .. } => "backpressure_received",
552 Self::StreamPaused { .. } => "stream_paused",
553 Self::StreamResumed { .. } => "stream_resumed",
554 Self::CreditsUpdated { .. } => "credits_updated",
555 }
556 }
557
558 pub fn is_critical(&self) -> bool {
560 matches!(
561 self,
562 Self::StreamFailed { .. } | Self::SessionExpired { .. }
563 )
564 }
565
566 pub fn is_error(&self) -> bool {
568 matches!(self, Self::StreamFailed { .. })
569 }
570
571 pub fn is_completion(&self) -> bool {
573 matches!(
574 self,
575 Self::StreamCompleted { .. } | Self::SessionClosed { .. }
576 )
577 }
578}
579
580pub trait EventStore {
582 fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
588
589 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
595
596 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
602
603 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
609}
610
611#[derive(Debug, Clone, Default)]
613pub struct InMemoryEventStore {
614 events: Vec<DomainEvent>,
615}
616
617impl InMemoryEventStore {
618 pub fn new() -> Self {
620 Self::default()
621 }
622
623 pub fn all_events(&self) -> &[DomainEvent] {
625 &self.events
626 }
627
628 pub fn event_count(&self) -> usize {
630 self.events.len()
631 }
632}
633
634impl EventStore for InMemoryEventStore {
635 fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
636 self.events.append(&mut events);
637 Ok(())
638 }
639
640 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
641 Ok(self
642 .events
643 .iter()
644 .filter(|e| e.session_id() == session_id)
645 .cloned()
646 .collect())
647 }
648
649 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
650 Ok(self
651 .events
652 .iter()
653 .filter(|e| e.stream_id() == Some(stream_id))
654 .cloned()
655 .collect())
656 }
657
658 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
659 Ok(self
660 .events
661 .iter()
662 .filter(|e| e.timestamp() > since)
663 .cloned()
664 .collect())
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671 use crate::value_objects::{SessionId, StreamId};
672
673 #[test]
674 fn test_domain_event_properties() {
675 let session_id = SessionId::new();
676 let stream_id = StreamId::new();
677 let timestamp = Utc::now();
678
679 let event = DomainEvent::StreamCreated {
680 session_id,
681 stream_id,
682 timestamp,
683 };
684
685 assert_eq!(event.session_id(), session_id);
686 assert_eq!(event.stream_id(), Some(stream_id));
687 assert_eq!(event.timestamp(), timestamp);
688 assert_eq!(event.event_type(), "stream_created");
689 assert!(!event.is_critical());
690 assert!(!event.is_error());
691 }
692
693 #[test]
694 fn test_critical_events() {
695 let session_id = SessionId::new();
696 let stream_id = StreamId::new();
697
698 let error_event = DomainEvent::StreamFailed {
699 session_id,
700 stream_id,
701 error: "Connection lost".to_string(),
702 timestamp: Utc::now(),
703 };
704
705 assert!(error_event.is_critical());
706 assert!(error_event.is_error());
707 assert!(!error_event.is_completion());
708 }
709
710 #[test]
711 fn test_event_store() {
712 let mut store = InMemoryEventStore::new();
713 let session_id = SessionId::new();
714 let stream_id = StreamId::new();
715
716 let events = vec![
717 DomainEvent::SessionActivated {
718 session_id,
719 timestamp: Utc::now(),
720 },
721 DomainEvent::StreamCreated {
722 session_id,
723 stream_id,
724 timestamp: Utc::now(),
725 },
726 ];
727
728 store
729 .append_events(events.clone())
730 .expect("Failed to append events to store in test");
731 assert_eq!(store.event_count(), 2);
732
733 let session_events = store
734 .get_events_for_session(session_id)
735 .expect("Failed to retrieve session events in test");
736 assert_eq!(session_events.len(), 2);
737
738 let stream_events = store
739 .get_events_for_stream(stream_id)
740 .expect("Failed to retrieve stream events in test");
741 assert_eq!(stream_events.len(), 1);
742 }
743
744 #[test]
745 fn test_event_serialization() {
746 let session_id = SessionId::new();
747 let event = DomainEvent::SessionActivated {
748 session_id,
749 timestamp: Utc::now(),
750 };
751
752 let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
753 let deserialized: DomainEvent =
754 serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
755
756 assert_eq!(event, deserialized);
757 }
758}
759
760#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
762pub struct EventId(uuid::Uuid);
763
764impl EventId {
765 pub fn new() -> Self {
767 Self(uuid::Uuid::new_v4())
768 }
769
770 pub fn from_uuid(uuid: uuid::Uuid) -> Self {
772 Self(uuid)
773 }
774
775 pub fn inner(&self) -> uuid::Uuid {
777 self.0
778 }
779}
780
781impl std::fmt::Display for EventId {
782 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
783 write!(f, "{}", self.0)
784 }
785}
786
787impl Default for EventId {
788 fn default() -> Self {
789 Self::new()
790 }
791}
792
793pub trait EventSubscriber {
795 type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
797 where
798 Self: 'a;
799
800 fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
802}
803
804impl DomainEvent {
806 pub fn event_id(&self) -> EventId {
808 let content = format!("{self:?}");
811 use std::collections::hash_map::DefaultHasher;
812 use std::hash::{Hash, Hasher};
813 let mut hash = DefaultHasher::new();
814 content.hash(&mut hash);
815 let hash_val = hash.finish();
816 let uuid = uuid::Uuid::from_bytes([
817 (hash_val >> 56) as u8,
818 (hash_val >> 48) as u8,
819 (hash_val >> 40) as u8,
820 (hash_val >> 32) as u8,
821 (hash_val >> 24) as u8,
822 (hash_val >> 16) as u8,
823 (hash_val >> 8) as u8,
824 hash_val as u8,
825 0,
826 0,
827 0,
828 0,
829 0,
830 0,
831 0,
832 0,
833 ]);
834 EventId::from_uuid(uuid)
835 }
836
837 pub fn occurred_at(&self) -> DateTime<Utc> {
839 match self {
840 DomainEvent::SessionActivated { timestamp, .. }
841 | DomainEvent::SessionClosed { timestamp, .. }
842 | DomainEvent::SessionExpired { timestamp, .. }
843 | DomainEvent::StreamCreated { timestamp, .. }
844 | DomainEvent::StreamStarted { timestamp, .. }
845 | DomainEvent::StreamCompleted { timestamp, .. }
846 | DomainEvent::StreamFailed { timestamp, .. }
847 | DomainEvent::StreamCancelled { timestamp, .. }
848 | DomainEvent::SkeletonGenerated { timestamp, .. }
849 | DomainEvent::PatchFramesGenerated { timestamp, .. }
850 | DomainEvent::FramesBatched { timestamp, .. }
851 | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
852 | DomainEvent::StreamConfigUpdated { timestamp, .. }
853 | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
854 | DomainEvent::SessionTimedOut { timestamp, .. }
855 | DomainEvent::SessionTimeoutExtended { timestamp, .. }
856 | DomainEvent::BackpressureReceived { timestamp, .. }
857 | DomainEvent::StreamPaused { timestamp, .. }
858 | DomainEvent::StreamResumed { timestamp, .. }
859 | DomainEvent::CreditsUpdated { timestamp, .. } => *timestamp,
860 }
861 }
862
863 pub fn metadata(&self) -> std::collections::HashMap<String, String> {
865 let mut metadata = std::collections::HashMap::new();
866 metadata.insert("event_type".to_string(), self.event_type().to_string());
867 metadata.insert("session_id".to_string(), self.session_id().to_string());
868 metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
869
870 if let Some(stream_id) = self.stream_id() {
871 metadata.insert("stream_id".to_string(), stream_id.to_string());
872 }
873
874 metadata
875 }
876}