1use crate::value_objects::{BackpressureSignal, SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7mod serde_session_id {
9 use crate::value_objects::SessionId;
10 use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13 where
14 S: Serializer,
15 {
16 id.as_uuid().serialize(serializer)
17 }
18
19 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20 where
21 D: Deserializer<'de>,
22 {
23 let uuid = uuid::Uuid::deserialize(deserializer)?;
24 Ok(SessionId::from_uuid(uuid))
25 }
26}
27
28mod serde_stream_id {
30 use crate::value_objects::StreamId;
31 use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34 where
35 S: Serializer,
36 {
37 id.as_uuid().serialize(serializer)
38 }
39
40 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41 where
42 D: Deserializer<'de>,
43 {
44 let uuid = uuid::Uuid::deserialize(deserializer)?;
45 Ok(StreamId::from_uuid(uuid))
46 }
47}
48
49mod serde_option_stream_id {
51 use crate::value_objects::StreamId;
52 use serde::{Deserialize, Deserializer, Serialize, Serializer};
53
54 pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
55 where
56 S: Serializer,
57 {
58 id.map(|i| i.as_uuid()).serialize(serializer)
59 }
60
61 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
62 where
63 D: Deserializer<'de>,
64 {
65 let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
66 Ok(uuid.map(StreamId::from_uuid))
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72#[non_exhaustive]
73pub enum SessionState {
74 Initializing,
76 Active,
78 Closing,
80 Completed,
82 Failed,
84}
85
86impl SessionState {
87 pub fn as_str(&self) -> &'static str {
89 match self {
90 SessionState::Initializing => "Initializing",
91 SessionState::Active => "Active",
92 SessionState::Closing => "Closing",
93 SessionState::Completed => "Completed",
94 SessionState::Failed => "Failed",
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110#[serde(tag = "event_type", rename_all = "snake_case")]
111#[non_exhaustive]
112pub enum DomainEvent {
113 SessionActivated {
115 #[serde(with = "serde_session_id")]
117 session_id: SessionId,
118 timestamp: DateTime<Utc>,
120 },
121
122 SessionClosed {
124 #[serde(with = "serde_session_id")]
126 session_id: SessionId,
127 timestamp: DateTime<Utc>,
129 },
130
131 SessionExpired {
133 #[serde(with = "serde_session_id")]
135 session_id: SessionId,
136 timestamp: DateTime<Utc>,
138 },
139
140 SessionTimedOut {
142 #[serde(with = "serde_session_id")]
144 session_id: SessionId,
145 original_state: SessionState,
147 timeout_duration: u64,
149 timestamp: DateTime<Utc>,
151 },
152
153 SessionTimeoutExtended {
155 #[serde(with = "serde_session_id")]
157 session_id: SessionId,
158 additional_seconds: u64,
160 new_expires_at: DateTime<Utc>,
162 timestamp: DateTime<Utc>,
164 },
165
166 StreamCreated {
168 #[serde(with = "serde_session_id")]
170 session_id: SessionId,
171 #[serde(with = "serde_stream_id")]
173 stream_id: StreamId,
174 timestamp: DateTime<Utc>,
176 },
177
178 StreamStarted {
180 #[serde(with = "serde_session_id")]
182 session_id: SessionId,
183 #[serde(with = "serde_stream_id")]
185 stream_id: StreamId,
186 timestamp: DateTime<Utc>,
188 },
189
190 StreamCompleted {
192 #[serde(with = "serde_session_id")]
194 session_id: SessionId,
195 #[serde(with = "serde_stream_id")]
197 stream_id: StreamId,
198 timestamp: DateTime<Utc>,
200 },
201
202 StreamFailed {
204 #[serde(with = "serde_session_id")]
206 session_id: SessionId,
207 #[serde(with = "serde_stream_id")]
209 stream_id: StreamId,
210 error: String,
212 timestamp: DateTime<Utc>,
214 },
215
216 StreamCancelled {
218 #[serde(with = "serde_session_id")]
220 session_id: SessionId,
221 #[serde(with = "serde_stream_id")]
223 stream_id: StreamId,
224 timestamp: DateTime<Utc>,
226 },
227
228 SkeletonGenerated {
230 #[serde(with = "serde_session_id")]
232 session_id: SessionId,
233 #[serde(with = "serde_stream_id")]
235 stream_id: StreamId,
236 frame_size_bytes: u64,
238 timestamp: DateTime<Utc>,
240 },
241
242 PatchFramesGenerated {
244 #[serde(with = "serde_session_id")]
246 session_id: SessionId,
247 #[serde(with = "serde_stream_id")]
249 stream_id: StreamId,
250 frame_count: usize,
252 total_bytes: u64,
254 highest_priority: u8,
256 timestamp: DateTime<Utc>,
258 },
259
260 FramesBatched {
262 #[serde(with = "serde_session_id")]
264 session_id: SessionId,
265 frame_count: usize,
267 timestamp: DateTime<Utc>,
269 },
270
271 PriorityThresholdAdjusted {
273 #[serde(with = "serde_session_id")]
275 session_id: SessionId,
276 old_threshold: u8,
278 new_threshold: u8,
280 reason: String,
282 timestamp: DateTime<Utc>,
284 },
285
286 StreamConfigUpdated {
288 #[serde(with = "serde_session_id")]
290 session_id: SessionId,
291 #[serde(with = "serde_stream_id")]
293 stream_id: StreamId,
294 timestamp: DateTime<Utc>,
296 },
297
298 PerformanceMetricsRecorded {
300 #[serde(with = "serde_session_id")]
302 session_id: SessionId,
303 metrics: PerformanceMetrics,
305 timestamp: DateTime<Utc>,
307 },
308
309 BackpressureReceived {
311 #[serde(with = "serde_session_id")]
313 session_id: SessionId,
314 #[serde(default, skip_serializing_if = "Option::is_none")]
316 #[serde(with = "serde_option_stream_id")]
317 stream_id: Option<StreamId>,
318 signal: BackpressureSignal,
320 timestamp: DateTime<Utc>,
322 },
323
324 StreamPaused {
326 #[serde(with = "serde_session_id")]
328 session_id: SessionId,
329 #[serde(with = "serde_stream_id")]
331 stream_id: StreamId,
332 reason: String,
334 timestamp: DateTime<Utc>,
336 },
337
338 StreamResumed {
340 #[serde(with = "serde_session_id")]
342 session_id: SessionId,
343 #[serde(with = "serde_stream_id")]
345 stream_id: StreamId,
346 paused_duration_ms: u64,
348 timestamp: DateTime<Utc>,
350 },
351
352 CreditsUpdated {
354 #[serde(with = "serde_session_id")]
356 session_id: SessionId,
357 available_credits: usize,
359 max_credits: usize,
361 timestamp: DateTime<Utc>,
363 },
364}
365
366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
368pub struct PerformanceMetrics {
369 pub frames_per_second: f64,
371 pub bytes_per_second: f64,
373 pub average_frame_size: f64,
375 pub priority_distribution: PriorityDistribution,
377 pub latency_ms: Option<u64>,
379}
380
381#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
383pub struct PriorityDistribution {
384 pub critical_frames: u64,
386 pub high_frames: u64,
388 pub medium_frames: u64,
390 pub low_frames: u64,
392 pub background_frames: u64,
394}
395
396impl PriorityDistribution {
397 pub fn new() -> Self {
399 Self::default()
400 }
401
402 pub fn total_frames(&self) -> u64 {
404 self.critical_frames
405 + self.high_frames
406 + self.medium_frames
407 + self.low_frames
408 + self.background_frames
409 }
410
411 pub fn as_percentages(&self) -> PriorityPercentages {
413 let total = self.total_frames() as f64;
414 if total == 0.0 {
415 return PriorityPercentages::default();
416 }
417
418 PriorityPercentages {
419 critical: self.critical_frames as f64 / total,
420 high: self.high_frames as f64 / total,
421 medium: self.medium_frames as f64 / total,
422 low: self.low_frames as f64 / total,
423 background: self.background_frames as f64 / total,
424 }
425 }
426
427 pub fn from_counts(
429 critical_count: u64,
430 high_count: u64,
431 medium_count: u64,
432 low_count: u64,
433 background_count: u64,
434 ) -> Self {
435 Self {
436 critical_frames: critical_count,
437 high_frames: high_count,
438 medium_frames: medium_count,
439 low_frames: low_count,
440 background_frames: background_count,
441 }
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
447pub struct PriorityPercentages {
448 pub critical: f64,
450 pub high: f64,
452 pub medium: f64,
454 pub low: f64,
456 pub background: f64,
458}
459
460impl Default for PriorityPercentages {
461 fn default() -> Self {
462 Self {
463 critical: 0.0,
464 high: 0.0,
465 medium: 0.0,
466 low: 0.0,
467 background: 0.0,
468 }
469 }
470}
471
472impl DomainEvent {
473 pub fn session_id(&self) -> SessionId {
475 match self {
476 Self::SessionActivated { session_id, .. } => *session_id,
477 Self::SessionClosed { session_id, .. } => *session_id,
478 Self::SessionExpired { session_id, .. } => *session_id,
479 Self::StreamCreated { session_id, .. } => *session_id,
480 Self::StreamStarted { session_id, .. } => *session_id,
481 Self::StreamCompleted { session_id, .. } => *session_id,
482 Self::StreamFailed { session_id, .. } => *session_id,
483 Self::StreamCancelled { session_id, .. } => *session_id,
484 Self::SkeletonGenerated { session_id, .. } => *session_id,
485 Self::PatchFramesGenerated { session_id, .. } => *session_id,
486 Self::FramesBatched { session_id, .. } => *session_id,
487 Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
488 Self::StreamConfigUpdated { session_id, .. } => *session_id,
489 Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
490 Self::SessionTimedOut { session_id, .. } => *session_id,
491 Self::SessionTimeoutExtended { session_id, .. } => *session_id,
492 Self::BackpressureReceived { session_id, .. } => *session_id,
493 Self::StreamPaused { session_id, .. } => *session_id,
494 Self::StreamResumed { session_id, .. } => *session_id,
495 Self::CreditsUpdated { session_id, .. } => *session_id,
496 }
497 }
498
499 pub fn stream_id(&self) -> Option<StreamId> {
501 match self {
502 Self::StreamCreated { stream_id, .. } => Some(*stream_id),
503 Self::StreamStarted { stream_id, .. } => Some(*stream_id),
504 Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
505 Self::StreamFailed { stream_id, .. } => Some(*stream_id),
506 Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
507 Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
508 Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
509 Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
510 Self::StreamPaused { stream_id, .. } => Some(*stream_id),
511 Self::StreamResumed { stream_id, .. } => Some(*stream_id),
512 Self::BackpressureReceived { stream_id, .. } => *stream_id,
513 _ => None,
514 }
515 }
516
517 pub fn timestamp(&self) -> DateTime<Utc> {
519 match self {
520 Self::SessionActivated { timestamp, .. } => *timestamp,
521 Self::SessionClosed { timestamp, .. } => *timestamp,
522 Self::SessionExpired { timestamp, .. } => *timestamp,
523 Self::StreamCreated { timestamp, .. } => *timestamp,
524 Self::StreamStarted { timestamp, .. } => *timestamp,
525 Self::StreamCompleted { timestamp, .. } => *timestamp,
526 Self::StreamFailed { timestamp, .. } => *timestamp,
527 Self::StreamCancelled { timestamp, .. } => *timestamp,
528 Self::SkeletonGenerated { timestamp, .. } => *timestamp,
529 Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
530 Self::FramesBatched { timestamp, .. } => *timestamp,
531 Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
532 Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
533 Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
534 Self::SessionTimedOut { timestamp, .. } => *timestamp,
535 Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
536 Self::BackpressureReceived { timestamp, .. } => *timestamp,
537 Self::StreamPaused { timestamp, .. } => *timestamp,
538 Self::StreamResumed { timestamp, .. } => *timestamp,
539 Self::CreditsUpdated { timestamp, .. } => *timestamp,
540 }
541 }
542
543 pub fn event_type(&self) -> &'static str {
545 match self {
546 Self::SessionActivated { .. } => "session_activated",
547 Self::SessionClosed { .. } => "session_closed",
548 Self::SessionExpired { .. } => "session_expired",
549 Self::StreamCreated { .. } => "stream_created",
550 Self::StreamStarted { .. } => "stream_started",
551 Self::StreamCompleted { .. } => "stream_completed",
552 Self::StreamFailed { .. } => "stream_failed",
553 Self::StreamCancelled { .. } => "stream_cancelled",
554 Self::SkeletonGenerated { .. } => "skeleton_generated",
555 Self::PatchFramesGenerated { .. } => "patch_frames_generated",
556 Self::FramesBatched { .. } => "frames_batched",
557 Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
558 Self::StreamConfigUpdated { .. } => "stream_config_updated",
559 Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
560 Self::SessionTimedOut { .. } => "session_timed_out",
561 Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
562 Self::BackpressureReceived { .. } => "backpressure_received",
563 Self::StreamPaused { .. } => "stream_paused",
564 Self::StreamResumed { .. } => "stream_resumed",
565 Self::CreditsUpdated { .. } => "credits_updated",
566 }
567 }
568
569 pub fn is_critical(&self) -> bool {
571 matches!(
572 self,
573 Self::StreamFailed { .. } | Self::SessionExpired { .. }
574 )
575 }
576
577 pub fn is_error(&self) -> bool {
579 matches!(self, Self::StreamFailed { .. })
580 }
581
582 pub fn is_completion(&self) -> bool {
584 matches!(
585 self,
586 Self::StreamCompleted { .. } | Self::SessionClosed { .. }
587 )
588 }
589}
590
591pub trait EventStore {
593 fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
599
600 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
606
607 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
613
614 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
620}
621
622#[derive(Debug, Clone, Default)]
624pub struct InMemoryEventStore {
625 events: Vec<DomainEvent>,
626}
627
628impl InMemoryEventStore {
629 pub fn new() -> Self {
631 Self::default()
632 }
633
634 pub fn all_events(&self) -> &[DomainEvent] {
636 &self.events
637 }
638
639 pub fn event_count(&self) -> usize {
641 self.events.len()
642 }
643}
644
645impl EventStore for InMemoryEventStore {
646 fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
647 self.events.append(&mut events);
648 Ok(())
649 }
650
651 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
652 Ok(self
653 .events
654 .iter()
655 .filter(|e| e.session_id() == session_id)
656 .cloned()
657 .collect())
658 }
659
660 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
661 Ok(self
662 .events
663 .iter()
664 .filter(|e| e.stream_id() == Some(stream_id))
665 .cloned()
666 .collect())
667 }
668
669 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
670 Ok(self
671 .events
672 .iter()
673 .filter(|e| e.timestamp() > since)
674 .cloned()
675 .collect())
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682 use crate::value_objects::{SessionId, StreamId};
683
684 #[test]
685 fn test_domain_event_properties() {
686 let session_id = SessionId::new();
687 let stream_id = StreamId::new();
688 let timestamp = Utc::now();
689
690 let event = DomainEvent::StreamCreated {
691 session_id,
692 stream_id,
693 timestamp,
694 };
695
696 assert_eq!(event.session_id(), session_id);
697 assert_eq!(event.stream_id(), Some(stream_id));
698 assert_eq!(event.timestamp(), timestamp);
699 assert_eq!(event.event_type(), "stream_created");
700 assert!(!event.is_critical());
701 assert!(!event.is_error());
702 }
703
704 #[test]
705 fn test_critical_events() {
706 let session_id = SessionId::new();
707 let stream_id = StreamId::new();
708
709 let error_event = DomainEvent::StreamFailed {
710 session_id,
711 stream_id,
712 error: "Connection lost".to_string(),
713 timestamp: Utc::now(),
714 };
715
716 assert!(error_event.is_critical());
717 assert!(error_event.is_error());
718 assert!(!error_event.is_completion());
719 }
720
721 #[test]
722 fn test_event_store() {
723 let mut store = InMemoryEventStore::new();
724 let session_id = SessionId::new();
725 let stream_id = StreamId::new();
726
727 let events = vec![
728 DomainEvent::SessionActivated {
729 session_id,
730 timestamp: Utc::now(),
731 },
732 DomainEvent::StreamCreated {
733 session_id,
734 stream_id,
735 timestamp: Utc::now(),
736 },
737 ];
738
739 store
740 .append_events(events.clone())
741 .expect("Failed to append events to store in test");
742 assert_eq!(store.event_count(), 2);
743
744 let session_events = store
745 .get_events_for_session(session_id)
746 .expect("Failed to retrieve session events in test");
747 assert_eq!(session_events.len(), 2);
748
749 let stream_events = store
750 .get_events_for_stream(stream_id)
751 .expect("Failed to retrieve stream events in test");
752 assert_eq!(stream_events.len(), 1);
753 }
754
755 #[test]
756 fn test_event_serialization() {
757 let session_id = SessionId::new();
758 let event = DomainEvent::SessionActivated {
759 session_id,
760 timestamp: Utc::now(),
761 };
762
763 let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
764 let deserialized: DomainEvent =
765 serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
766
767 assert_eq!(event, deserialized);
768 }
769}
770
771#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
773pub struct EventId(uuid::Uuid);
774
775impl EventId {
776 pub fn new() -> Self {
778 Self(uuid::Uuid::new_v4())
779 }
780
781 pub fn from_uuid(uuid: uuid::Uuid) -> Self {
783 Self(uuid)
784 }
785
786 pub fn inner(&self) -> uuid::Uuid {
788 self.0
789 }
790}
791
792impl std::fmt::Display for EventId {
793 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
794 write!(f, "{}", self.0)
795 }
796}
797
798impl Default for EventId {
799 fn default() -> Self {
800 Self::new()
801 }
802}
803
804pub trait EventSubscriber {
806 type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
808 where
809 Self: 'a;
810
811 fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
813}
814
815impl DomainEvent {
817 pub fn event_id(&self) -> EventId {
819 let content = format!("{self:?}");
822 use std::collections::hash_map::DefaultHasher;
823 use std::hash::{Hash, Hasher};
824 let mut hash = DefaultHasher::new();
825 content.hash(&mut hash);
826 let hash_val = hash.finish();
827 let uuid = uuid::Uuid::from_bytes([
828 (hash_val >> 56) as u8,
829 (hash_val >> 48) as u8,
830 (hash_val >> 40) as u8,
831 (hash_val >> 32) as u8,
832 (hash_val >> 24) as u8,
833 (hash_val >> 16) as u8,
834 (hash_val >> 8) as u8,
835 hash_val as u8,
836 0,
837 0,
838 0,
839 0,
840 0,
841 0,
842 0,
843 0,
844 ]);
845 EventId::from_uuid(uuid)
846 }
847
848 pub fn occurred_at(&self) -> DateTime<Utc> {
850 match self {
851 DomainEvent::SessionActivated { timestamp, .. }
852 | DomainEvent::SessionClosed { timestamp, .. }
853 | DomainEvent::SessionExpired { timestamp, .. }
854 | DomainEvent::StreamCreated { timestamp, .. }
855 | DomainEvent::StreamStarted { timestamp, .. }
856 | DomainEvent::StreamCompleted { timestamp, .. }
857 | DomainEvent::StreamFailed { timestamp, .. }
858 | DomainEvent::StreamCancelled { timestamp, .. }
859 | DomainEvent::SkeletonGenerated { timestamp, .. }
860 | DomainEvent::PatchFramesGenerated { timestamp, .. }
861 | DomainEvent::FramesBatched { timestamp, .. }
862 | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
863 | DomainEvent::StreamConfigUpdated { timestamp, .. }
864 | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
865 | DomainEvent::SessionTimedOut { timestamp, .. }
866 | DomainEvent::SessionTimeoutExtended { timestamp, .. }
867 | DomainEvent::BackpressureReceived { timestamp, .. }
868 | DomainEvent::StreamPaused { timestamp, .. }
869 | DomainEvent::StreamResumed { timestamp, .. }
870 | DomainEvent::CreditsUpdated { timestamp, .. } => *timestamp,
871 }
872 }
873
874 pub fn metadata(&self) -> std::collections::HashMap<String, String> {
876 let mut metadata = std::collections::HashMap::new();
877 metadata.insert("event_type".to_string(), self.event_type().to_string());
878 metadata.insert("session_id".to_string(), self.session_id().to_string());
879 metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
880
881 if let Some(stream_id) = self.stream_id() {
882 metadata.insert("stream_id".to_string(), stream_id.to_string());
883 }
884
885 metadata
886 }
887}