1use crate::value_objects::{BackpressureSignal, SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7mod serde_session_id {
9 use crate::value_objects::SessionId;
10 use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
13 where
14 S: Serializer,
15 {
16 id.as_uuid().serialize(serializer)
17 }
18
19 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
20 where
21 D: Deserializer<'de>,
22 {
23 let uuid = uuid::Uuid::deserialize(deserializer)?;
24 Ok(SessionId::from_uuid(uuid))
25 }
26}
27
28mod serde_stream_id {
30 use crate::value_objects::StreamId;
31 use serde::{Deserialize, Deserializer, Serialize, Serializer};
32
33 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
34 where
35 S: Serializer,
36 {
37 id.as_uuid().serialize(serializer)
38 }
39
40 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
41 where
42 D: Deserializer<'de>,
43 {
44 let uuid = uuid::Uuid::deserialize(deserializer)?;
45 Ok(StreamId::from_uuid(uuid))
46 }
47}
48
49#[allow(dead_code)]
51mod serde_option_stream_id {
52 use crate::value_objects::StreamId;
53 use serde::{Deserialize, Deserializer, Serialize, Serializer};
54
55 pub fn serialize<S>(id: &Option<StreamId>, serializer: S) -> Result<S::Ok, S::Error>
56 where
57 S: Serializer,
58 {
59 id.map(|i| i.as_uuid()).serialize(serializer)
60 }
61
62 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StreamId>, D::Error>
63 where
64 D: Deserializer<'de>,
65 {
66 let uuid: Option<uuid::Uuid> = Option::deserialize(deserializer)?;
67 Ok(uuid.map(StreamId::from_uuid))
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub enum SessionState {
74 Initializing,
76 Active,
78 Closing,
80 Completed,
82 Failed,
84}
85
86impl SessionState {
87 pub fn as_str(&self) -> &'static str {
89 match self {
90 SessionState::Initializing => "Initializing",
91 SessionState::Active => "Active",
92 SessionState::Closing => "Closing",
93 SessionState::Completed => "Completed",
94 SessionState::Failed => "Failed",
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
101#[serde(tag = "event_type", rename_all = "snake_case")]
102pub enum DomainEvent {
103 SessionActivated {
105 #[serde(with = "serde_session_id")]
107 session_id: SessionId,
108 timestamp: DateTime<Utc>,
110 },
111
112 SessionClosed {
114 #[serde(with = "serde_session_id")]
116 session_id: SessionId,
117 timestamp: DateTime<Utc>,
119 },
120
121 SessionExpired {
123 #[serde(with = "serde_session_id")]
125 session_id: SessionId,
126 timestamp: DateTime<Utc>,
128 },
129
130 SessionTimedOut {
132 #[serde(with = "serde_session_id")]
134 session_id: SessionId,
135 original_state: SessionState,
137 timeout_duration: u64,
139 timestamp: DateTime<Utc>,
141 },
142
143 SessionTimeoutExtended {
145 #[serde(with = "serde_session_id")]
147 session_id: SessionId,
148 additional_seconds: u64,
150 new_expires_at: DateTime<Utc>,
152 timestamp: DateTime<Utc>,
154 },
155
156 StreamCreated {
158 #[serde(with = "serde_session_id")]
160 session_id: SessionId,
161 #[serde(with = "serde_stream_id")]
163 stream_id: StreamId,
164 timestamp: DateTime<Utc>,
166 },
167
168 StreamStarted {
170 #[serde(with = "serde_session_id")]
172 session_id: SessionId,
173 #[serde(with = "serde_stream_id")]
175 stream_id: StreamId,
176 timestamp: DateTime<Utc>,
178 },
179
180 StreamCompleted {
182 #[serde(with = "serde_session_id")]
184 session_id: SessionId,
185 #[serde(with = "serde_stream_id")]
187 stream_id: StreamId,
188 timestamp: DateTime<Utc>,
190 },
191
192 StreamFailed {
194 #[serde(with = "serde_session_id")]
196 session_id: SessionId,
197 #[serde(with = "serde_stream_id")]
199 stream_id: StreamId,
200 error: String,
202 timestamp: DateTime<Utc>,
204 },
205
206 StreamCancelled {
208 #[serde(with = "serde_session_id")]
210 session_id: SessionId,
211 #[serde(with = "serde_stream_id")]
213 stream_id: StreamId,
214 timestamp: DateTime<Utc>,
216 },
217
218 SkeletonGenerated {
220 #[serde(with = "serde_session_id")]
222 session_id: SessionId,
223 #[serde(with = "serde_stream_id")]
225 stream_id: StreamId,
226 frame_size_bytes: u64,
228 timestamp: DateTime<Utc>,
230 },
231
232 PatchFramesGenerated {
234 #[serde(with = "serde_session_id")]
236 session_id: SessionId,
237 #[serde(with = "serde_stream_id")]
239 stream_id: StreamId,
240 frame_count: usize,
242 total_bytes: u64,
244 highest_priority: u8,
246 timestamp: DateTime<Utc>,
248 },
249
250 FramesBatched {
252 #[serde(with = "serde_session_id")]
254 session_id: SessionId,
255 frame_count: usize,
257 timestamp: DateTime<Utc>,
259 },
260
261 PriorityThresholdAdjusted {
263 #[serde(with = "serde_session_id")]
265 session_id: SessionId,
266 old_threshold: u8,
268 new_threshold: u8,
270 reason: String,
272 timestamp: DateTime<Utc>,
274 },
275
276 StreamConfigUpdated {
278 #[serde(with = "serde_session_id")]
280 session_id: SessionId,
281 #[serde(with = "serde_stream_id")]
283 stream_id: StreamId,
284 timestamp: DateTime<Utc>,
286 },
287
288 PerformanceMetricsRecorded {
290 #[serde(with = "serde_session_id")]
292 session_id: SessionId,
293 metrics: PerformanceMetrics,
295 timestamp: DateTime<Utc>,
297 },
298
299 BackpressureReceived {
301 #[serde(with = "serde_session_id")]
303 session_id: SessionId,
304 #[serde(default, skip_serializing_if = "Option::is_none")]
306 #[serde(with = "serde_option_stream_id")]
307 stream_id: Option<StreamId>,
308 signal: BackpressureSignal,
310 timestamp: DateTime<Utc>,
312 },
313
314 StreamPaused {
316 #[serde(with = "serde_session_id")]
318 session_id: SessionId,
319 #[serde(with = "serde_stream_id")]
321 stream_id: StreamId,
322 reason: String,
324 timestamp: DateTime<Utc>,
326 },
327
328 StreamResumed {
330 #[serde(with = "serde_session_id")]
332 session_id: SessionId,
333 #[serde(with = "serde_stream_id")]
335 stream_id: StreamId,
336 paused_duration_ms: u64,
338 timestamp: DateTime<Utc>,
340 },
341
342 CreditsUpdated {
344 #[serde(with = "serde_session_id")]
346 session_id: SessionId,
347 available_credits: usize,
349 max_credits: usize,
351 timestamp: DateTime<Utc>,
353 },
354}
355
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
358pub struct PerformanceMetrics {
359 pub frames_per_second: f64,
361 pub bytes_per_second: f64,
363 pub average_frame_size: f64,
365 pub priority_distribution: PriorityDistribution,
367 pub latency_ms: Option<u64>,
369}
370
371#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
373pub struct PriorityDistribution {
374 pub critical_frames: u64,
376 pub high_frames: u64,
378 pub medium_frames: u64,
380 pub low_frames: u64,
382 pub background_frames: u64,
384}
385
386impl PriorityDistribution {
387 pub fn new() -> Self {
389 Self::default()
390 }
391
392 pub fn total_frames(&self) -> u64 {
394 self.critical_frames
395 + self.high_frames
396 + self.medium_frames
397 + self.low_frames
398 + self.background_frames
399 }
400
401 pub fn as_percentages(&self) -> PriorityPercentages {
403 let total = self.total_frames() as f64;
404 if total == 0.0 {
405 return PriorityPercentages::default();
406 }
407
408 PriorityPercentages {
409 critical: self.critical_frames as f64 / total,
410 high: self.high_frames as f64 / total,
411 medium: self.medium_frames as f64 / total,
412 low: self.low_frames as f64 / total,
413 background: self.background_frames as f64 / total,
414 }
415 }
416
417 pub fn from_counts(
419 critical_count: u64,
420 high_count: u64,
421 medium_count: u64,
422 low_count: u64,
423 background_count: u64,
424 ) -> Self {
425 Self {
426 critical_frames: critical_count,
427 high_frames: high_count,
428 medium_frames: medium_count,
429 low_frames: low_count,
430 background_frames: background_count,
431 }
432 }
433}
434
435#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
437pub struct PriorityPercentages {
438 pub critical: f64,
440 pub high: f64,
442 pub medium: f64,
444 pub low: f64,
446 pub background: f64,
448}
449
450impl Default for PriorityPercentages {
451 fn default() -> Self {
452 Self {
453 critical: 0.0,
454 high: 0.0,
455 medium: 0.0,
456 low: 0.0,
457 background: 0.0,
458 }
459 }
460}
461
462impl DomainEvent {
463 pub fn session_id(&self) -> SessionId {
465 match self {
466 Self::SessionActivated { session_id, .. } => *session_id,
467 Self::SessionClosed { session_id, .. } => *session_id,
468 Self::SessionExpired { session_id, .. } => *session_id,
469 Self::StreamCreated { session_id, .. } => *session_id,
470 Self::StreamStarted { session_id, .. } => *session_id,
471 Self::StreamCompleted { session_id, .. } => *session_id,
472 Self::StreamFailed { session_id, .. } => *session_id,
473 Self::StreamCancelled { session_id, .. } => *session_id,
474 Self::SkeletonGenerated { session_id, .. } => *session_id,
475 Self::PatchFramesGenerated { session_id, .. } => *session_id,
476 Self::FramesBatched { session_id, .. } => *session_id,
477 Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
478 Self::StreamConfigUpdated { session_id, .. } => *session_id,
479 Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
480 Self::SessionTimedOut { session_id, .. } => *session_id,
481 Self::SessionTimeoutExtended { session_id, .. } => *session_id,
482 Self::BackpressureReceived { session_id, .. } => *session_id,
483 Self::StreamPaused { session_id, .. } => *session_id,
484 Self::StreamResumed { session_id, .. } => *session_id,
485 Self::CreditsUpdated { session_id, .. } => *session_id,
486 }
487 }
488
489 pub fn stream_id(&self) -> Option<StreamId> {
491 match self {
492 Self::StreamCreated { stream_id, .. } => Some(*stream_id),
493 Self::StreamStarted { stream_id, .. } => Some(*stream_id),
494 Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
495 Self::StreamFailed { stream_id, .. } => Some(*stream_id),
496 Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
497 Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
498 Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
499 Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
500 Self::StreamPaused { stream_id, .. } => Some(*stream_id),
501 Self::StreamResumed { stream_id, .. } => Some(*stream_id),
502 Self::BackpressureReceived { stream_id, .. } => *stream_id,
503 _ => None,
504 }
505 }
506
507 pub fn timestamp(&self) -> DateTime<Utc> {
509 match self {
510 Self::SessionActivated { timestamp, .. } => *timestamp,
511 Self::SessionClosed { timestamp, .. } => *timestamp,
512 Self::SessionExpired { timestamp, .. } => *timestamp,
513 Self::StreamCreated { timestamp, .. } => *timestamp,
514 Self::StreamStarted { timestamp, .. } => *timestamp,
515 Self::StreamCompleted { timestamp, .. } => *timestamp,
516 Self::StreamFailed { timestamp, .. } => *timestamp,
517 Self::StreamCancelled { timestamp, .. } => *timestamp,
518 Self::SkeletonGenerated { timestamp, .. } => *timestamp,
519 Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
520 Self::FramesBatched { timestamp, .. } => *timestamp,
521 Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
522 Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
523 Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
524 Self::SessionTimedOut { timestamp, .. } => *timestamp,
525 Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
526 Self::BackpressureReceived { timestamp, .. } => *timestamp,
527 Self::StreamPaused { timestamp, .. } => *timestamp,
528 Self::StreamResumed { timestamp, .. } => *timestamp,
529 Self::CreditsUpdated { timestamp, .. } => *timestamp,
530 }
531 }
532
533 pub fn event_type(&self) -> &'static str {
535 match self {
536 Self::SessionActivated { .. } => "session_activated",
537 Self::SessionClosed { .. } => "session_closed",
538 Self::SessionExpired { .. } => "session_expired",
539 Self::StreamCreated { .. } => "stream_created",
540 Self::StreamStarted { .. } => "stream_started",
541 Self::StreamCompleted { .. } => "stream_completed",
542 Self::StreamFailed { .. } => "stream_failed",
543 Self::StreamCancelled { .. } => "stream_cancelled",
544 Self::SkeletonGenerated { .. } => "skeleton_generated",
545 Self::PatchFramesGenerated { .. } => "patch_frames_generated",
546 Self::FramesBatched { .. } => "frames_batched",
547 Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
548 Self::StreamConfigUpdated { .. } => "stream_config_updated",
549 Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
550 Self::SessionTimedOut { .. } => "session_timed_out",
551 Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
552 Self::BackpressureReceived { .. } => "backpressure_received",
553 Self::StreamPaused { .. } => "stream_paused",
554 Self::StreamResumed { .. } => "stream_resumed",
555 Self::CreditsUpdated { .. } => "credits_updated",
556 }
557 }
558
559 pub fn is_critical(&self) -> bool {
561 matches!(
562 self,
563 Self::StreamFailed { .. } | Self::SessionExpired { .. }
564 )
565 }
566
567 pub fn is_error(&self) -> bool {
569 matches!(self, Self::StreamFailed { .. })
570 }
571
572 pub fn is_completion(&self) -> bool {
574 matches!(
575 self,
576 Self::StreamCompleted { .. } | Self::SessionClosed { .. }
577 )
578 }
579}
580
581pub trait EventStore {
583 fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
589
590 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
596
597 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
603
604 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
610}
611
612#[derive(Debug, Clone, Default)]
614pub struct InMemoryEventStore {
615 events: Vec<DomainEvent>,
616}
617
618impl InMemoryEventStore {
619 pub fn new() -> Self {
621 Self::default()
622 }
623
624 pub fn all_events(&self) -> &[DomainEvent] {
626 &self.events
627 }
628
629 pub fn event_count(&self) -> usize {
631 self.events.len()
632 }
633}
634
635impl EventStore for InMemoryEventStore {
636 fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
637 self.events.append(&mut events);
638 Ok(())
639 }
640
641 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
642 Ok(self
643 .events
644 .iter()
645 .filter(|e| e.session_id() == session_id)
646 .cloned()
647 .collect())
648 }
649
650 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
651 Ok(self
652 .events
653 .iter()
654 .filter(|e| e.stream_id() == Some(stream_id))
655 .cloned()
656 .collect())
657 }
658
659 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
660 Ok(self
661 .events
662 .iter()
663 .filter(|e| e.timestamp() > since)
664 .cloned()
665 .collect())
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672 use crate::value_objects::{SessionId, StreamId};
673
674 #[test]
675 fn test_domain_event_properties() {
676 let session_id = SessionId::new();
677 let stream_id = StreamId::new();
678 let timestamp = Utc::now();
679
680 let event = DomainEvent::StreamCreated {
681 session_id,
682 stream_id,
683 timestamp,
684 };
685
686 assert_eq!(event.session_id(), session_id);
687 assert_eq!(event.stream_id(), Some(stream_id));
688 assert_eq!(event.timestamp(), timestamp);
689 assert_eq!(event.event_type(), "stream_created");
690 assert!(!event.is_critical());
691 assert!(!event.is_error());
692 }
693
694 #[test]
695 fn test_critical_events() {
696 let session_id = SessionId::new();
697 let stream_id = StreamId::new();
698
699 let error_event = DomainEvent::StreamFailed {
700 session_id,
701 stream_id,
702 error: "Connection lost".to_string(),
703 timestamp: Utc::now(),
704 };
705
706 assert!(error_event.is_critical());
707 assert!(error_event.is_error());
708 assert!(!error_event.is_completion());
709 }
710
711 #[test]
712 fn test_event_store() {
713 let mut store = InMemoryEventStore::new();
714 let session_id = SessionId::new();
715 let stream_id = StreamId::new();
716
717 let events = vec![
718 DomainEvent::SessionActivated {
719 session_id,
720 timestamp: Utc::now(),
721 },
722 DomainEvent::StreamCreated {
723 session_id,
724 stream_id,
725 timestamp: Utc::now(),
726 },
727 ];
728
729 store
730 .append_events(events.clone())
731 .expect("Failed to append events to store in test");
732 assert_eq!(store.event_count(), 2);
733
734 let session_events = store
735 .get_events_for_session(session_id)
736 .expect("Failed to retrieve session events in test");
737 assert_eq!(session_events.len(), 2);
738
739 let stream_events = store
740 .get_events_for_stream(stream_id)
741 .expect("Failed to retrieve stream events in test");
742 assert_eq!(stream_events.len(), 1);
743 }
744
745 #[test]
746 fn test_event_serialization() {
747 let session_id = SessionId::new();
748 let event = DomainEvent::SessionActivated {
749 session_id,
750 timestamp: Utc::now(),
751 };
752
753 let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
754 let deserialized: DomainEvent =
755 serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
756
757 assert_eq!(event, deserialized);
758 }
759}
760
761#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
763pub struct EventId(uuid::Uuid);
764
765impl EventId {
766 pub fn new() -> Self {
768 Self(uuid::Uuid::new_v4())
769 }
770
771 pub fn from_uuid(uuid: uuid::Uuid) -> Self {
773 Self(uuid)
774 }
775
776 pub fn inner(&self) -> uuid::Uuid {
778 self.0
779 }
780}
781
782impl std::fmt::Display for EventId {
783 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
784 write!(f, "{}", self.0)
785 }
786}
787
788impl Default for EventId {
789 fn default() -> Self {
790 Self::new()
791 }
792}
793
794pub trait EventSubscriber {
796 type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
798 where
799 Self: 'a;
800
801 fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
803}
804
805impl DomainEvent {
807 pub fn event_id(&self) -> EventId {
809 let content = format!("{self:?}");
812 use std::collections::hash_map::DefaultHasher;
813 use std::hash::{Hash, Hasher};
814 let mut hash = DefaultHasher::new();
815 content.hash(&mut hash);
816 let hash_val = hash.finish();
817 let uuid = uuid::Uuid::from_bytes([
818 (hash_val >> 56) as u8,
819 (hash_val >> 48) as u8,
820 (hash_val >> 40) as u8,
821 (hash_val >> 32) as u8,
822 (hash_val >> 24) as u8,
823 (hash_val >> 16) as u8,
824 (hash_val >> 8) as u8,
825 hash_val as u8,
826 0,
827 0,
828 0,
829 0,
830 0,
831 0,
832 0,
833 0,
834 ]);
835 EventId::from_uuid(uuid)
836 }
837
838 pub fn occurred_at(&self) -> DateTime<Utc> {
840 match self {
841 DomainEvent::SessionActivated { timestamp, .. }
842 | DomainEvent::SessionClosed { timestamp, .. }
843 | DomainEvent::SessionExpired { timestamp, .. }
844 | DomainEvent::StreamCreated { timestamp, .. }
845 | DomainEvent::StreamStarted { timestamp, .. }
846 | DomainEvent::StreamCompleted { timestamp, .. }
847 | DomainEvent::StreamFailed { timestamp, .. }
848 | DomainEvent::StreamCancelled { timestamp, .. }
849 | DomainEvent::SkeletonGenerated { timestamp, .. }
850 | DomainEvent::PatchFramesGenerated { timestamp, .. }
851 | DomainEvent::FramesBatched { timestamp, .. }
852 | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
853 | DomainEvent::StreamConfigUpdated { timestamp, .. }
854 | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
855 | DomainEvent::SessionTimedOut { timestamp, .. }
856 | DomainEvent::SessionTimeoutExtended { timestamp, .. }
857 | DomainEvent::BackpressureReceived { timestamp, .. }
858 | DomainEvent::StreamPaused { timestamp, .. }
859 | DomainEvent::StreamResumed { timestamp, .. }
860 | DomainEvent::CreditsUpdated { timestamp, .. } => *timestamp,
861 }
862 }
863
864 pub fn metadata(&self) -> std::collections::HashMap<String, String> {
866 let mut metadata = std::collections::HashMap::new();
867 metadata.insert("event_type".to_string(), self.event_type().to_string());
868 metadata.insert("session_id".to_string(), self.session_id().to_string());
869 metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
870
871 if let Some(stream_id) = self.stream_id() {
872 metadata.insert("stream_id".to_string(), stream_id.to_string());
873 }
874
875 metadata
876 }
877}