1use crate::value_objects::{SessionId, StreamId};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9pub enum SessionState {
10 Initializing,
12 Active,
14 Closing,
16 Completed,
18 Failed,
20}
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24#[serde(tag = "event_type", rename_all = "snake_case")]
25pub enum DomainEvent {
26 SessionActivated {
28 session_id: SessionId,
30 timestamp: DateTime<Utc>,
32 },
33
34 SessionClosed {
36 session_id: SessionId,
38 timestamp: DateTime<Utc>,
40 },
41
42 SessionExpired {
44 session_id: SessionId,
46 timestamp: DateTime<Utc>,
48 },
49
50 SessionTimedOut {
52 session_id: SessionId,
54 original_state: SessionState,
56 timeout_duration: u64,
58 timestamp: DateTime<Utc>,
60 },
61
62 SessionTimeoutExtended {
64 session_id: SessionId,
66 additional_seconds: u64,
68 new_expires_at: DateTime<Utc>,
70 timestamp: DateTime<Utc>,
72 },
73
74 StreamCreated {
76 session_id: SessionId,
78 stream_id: StreamId,
80 timestamp: DateTime<Utc>,
82 },
83
84 StreamStarted {
86 session_id: SessionId,
88 stream_id: StreamId,
90 timestamp: DateTime<Utc>,
92 },
93
94 StreamCompleted {
96 session_id: SessionId,
98 stream_id: StreamId,
100 timestamp: DateTime<Utc>,
102 },
103
104 StreamFailed {
106 session_id: SessionId,
108 stream_id: StreamId,
110 error: String,
112 timestamp: DateTime<Utc>,
114 },
115
116 StreamCancelled {
118 session_id: SessionId,
120 stream_id: StreamId,
122 timestamp: DateTime<Utc>,
124 },
125
126 SkeletonGenerated {
128 session_id: SessionId,
130 stream_id: StreamId,
132 frame_size_bytes: u64,
134 timestamp: DateTime<Utc>,
136 },
137
138 PatchFramesGenerated {
140 session_id: SessionId,
142 stream_id: StreamId,
144 frame_count: usize,
146 total_bytes: u64,
148 highest_priority: u8,
150 timestamp: DateTime<Utc>,
152 },
153
154 FramesBatched {
156 session_id: SessionId,
158 frame_count: usize,
160 timestamp: DateTime<Utc>,
162 },
163
164 PriorityThresholdAdjusted {
166 session_id: SessionId,
168 old_threshold: u8,
170 new_threshold: u8,
172 reason: String,
174 timestamp: DateTime<Utc>,
176 },
177
178 StreamConfigUpdated {
180 session_id: SessionId,
182 stream_id: StreamId,
184 timestamp: DateTime<Utc>,
186 },
187
188 PerformanceMetricsRecorded {
190 session_id: SessionId,
192 metrics: PerformanceMetrics,
194 timestamp: DateTime<Utc>,
196 },
197}
198
199#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
201pub struct PerformanceMetrics {
202 pub frames_per_second: f64,
204 pub bytes_per_second: f64,
206 pub average_frame_size: f64,
208 pub priority_distribution: PriorityDistribution,
210 pub latency_ms: Option<u64>,
212}
213
214#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
216pub struct PriorityDistribution {
217 pub critical_frames: u64,
219 pub high_frames: u64,
221 pub medium_frames: u64,
223 pub low_frames: u64,
225 pub background_frames: u64,
227}
228
229impl PriorityDistribution {
230 pub fn new() -> Self {
232 Self::default()
233 }
234
235 pub fn total_frames(&self) -> u64 {
237 self.critical_frames
238 + self.high_frames
239 + self.medium_frames
240 + self.low_frames
241 + self.background_frames
242 }
243
244 pub fn as_percentages(&self) -> PriorityPercentages {
246 let total = self.total_frames() as f64;
247 if total == 0.0 {
248 return PriorityPercentages::default();
249 }
250
251 PriorityPercentages {
252 critical: self.critical_frames as f64 / total,
253 high: self.high_frames as f64 / total,
254 medium: self.medium_frames as f64 / total,
255 low: self.low_frames as f64 / total,
256 background: self.background_frames as f64 / total,
257 }
258 }
259
260 pub fn from_counts(
262 critical_count: u64,
263 high_count: u64,
264 medium_count: u64,
265 low_count: u64,
266 background_count: u64,
267 ) -> Self {
268 Self {
269 critical_frames: critical_count,
270 high_frames: high_count,
271 medium_frames: medium_count,
272 low_frames: low_count,
273 background_frames: background_count,
274 }
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
280pub struct PriorityPercentages {
281 pub critical: f64,
283 pub high: f64,
285 pub medium: f64,
287 pub low: f64,
289 pub background: f64,
291}
292
293impl Default for PriorityPercentages {
294 fn default() -> Self {
295 Self {
296 critical: 0.0,
297 high: 0.0,
298 medium: 0.0,
299 low: 0.0,
300 background: 0.0,
301 }
302 }
303}
304
305impl DomainEvent {
306 pub fn session_id(&self) -> SessionId {
308 match self {
309 Self::SessionActivated { session_id, .. } => *session_id,
310 Self::SessionClosed { session_id, .. } => *session_id,
311 Self::SessionExpired { session_id, .. } => *session_id,
312 Self::StreamCreated { session_id, .. } => *session_id,
313 Self::StreamStarted { session_id, .. } => *session_id,
314 Self::StreamCompleted { session_id, .. } => *session_id,
315 Self::StreamFailed { session_id, .. } => *session_id,
316 Self::StreamCancelled { session_id, .. } => *session_id,
317 Self::SkeletonGenerated { session_id, .. } => *session_id,
318 Self::PatchFramesGenerated { session_id, .. } => *session_id,
319 Self::FramesBatched { session_id, .. } => *session_id,
320 Self::PriorityThresholdAdjusted { session_id, .. } => *session_id,
321 Self::StreamConfigUpdated { session_id, .. } => *session_id,
322 Self::PerformanceMetricsRecorded { session_id, .. } => *session_id,
323 Self::SessionTimedOut { session_id, .. } => *session_id,
324 Self::SessionTimeoutExtended { session_id, .. } => *session_id,
325 }
326 }
327
328 pub fn stream_id(&self) -> Option<StreamId> {
330 match self {
331 Self::StreamCreated { stream_id, .. } => Some(*stream_id),
332 Self::StreamStarted { stream_id, .. } => Some(*stream_id),
333 Self::StreamCompleted { stream_id, .. } => Some(*stream_id),
334 Self::StreamFailed { stream_id, .. } => Some(*stream_id),
335 Self::StreamCancelled { stream_id, .. } => Some(*stream_id),
336 Self::SkeletonGenerated { stream_id, .. } => Some(*stream_id),
337 Self::PatchFramesGenerated { stream_id, .. } => Some(*stream_id),
338 Self::StreamConfigUpdated { stream_id, .. } => Some(*stream_id),
339 _ => None,
340 }
341 }
342
343 pub fn timestamp(&self) -> DateTime<Utc> {
345 match self {
346 Self::SessionActivated { timestamp, .. } => *timestamp,
347 Self::SessionClosed { timestamp, .. } => *timestamp,
348 Self::SessionExpired { timestamp, .. } => *timestamp,
349 Self::StreamCreated { timestamp, .. } => *timestamp,
350 Self::StreamStarted { timestamp, .. } => *timestamp,
351 Self::StreamCompleted { timestamp, .. } => *timestamp,
352 Self::StreamFailed { timestamp, .. } => *timestamp,
353 Self::StreamCancelled { timestamp, .. } => *timestamp,
354 Self::SkeletonGenerated { timestamp, .. } => *timestamp,
355 Self::PatchFramesGenerated { timestamp, .. } => *timestamp,
356 Self::FramesBatched { timestamp, .. } => *timestamp,
357 Self::PriorityThresholdAdjusted { timestamp, .. } => *timestamp,
358 Self::StreamConfigUpdated { timestamp, .. } => *timestamp,
359 Self::PerformanceMetricsRecorded { timestamp, .. } => *timestamp,
360 Self::SessionTimedOut { timestamp, .. } => *timestamp,
361 Self::SessionTimeoutExtended { timestamp, .. } => *timestamp,
362 }
363 }
364
365 pub fn event_type(&self) -> &'static str {
367 match self {
368 Self::SessionActivated { .. } => "session_activated",
369 Self::SessionClosed { .. } => "session_closed",
370 Self::SessionExpired { .. } => "session_expired",
371 Self::StreamCreated { .. } => "stream_created",
372 Self::StreamStarted { .. } => "stream_started",
373 Self::StreamCompleted { .. } => "stream_completed",
374 Self::StreamFailed { .. } => "stream_failed",
375 Self::StreamCancelled { .. } => "stream_cancelled",
376 Self::SkeletonGenerated { .. } => "skeleton_generated",
377 Self::PatchFramesGenerated { .. } => "patch_frames_generated",
378 Self::FramesBatched { .. } => "frames_batched",
379 Self::PriorityThresholdAdjusted { .. } => "priority_threshold_adjusted",
380 Self::StreamConfigUpdated { .. } => "stream_config_updated",
381 Self::PerformanceMetricsRecorded { .. } => "performance_metrics_recorded",
382 Self::SessionTimedOut { .. } => "session_timed_out",
383 Self::SessionTimeoutExtended { .. } => "session_timeout_extended",
384 }
385 }
386
387 pub fn is_critical(&self) -> bool {
389 matches!(
390 self,
391 Self::StreamFailed { .. } | Self::SessionExpired { .. }
392 )
393 }
394
395 pub fn is_error(&self) -> bool {
397 matches!(self, Self::StreamFailed { .. })
398 }
399
400 pub fn is_completion(&self) -> bool {
402 matches!(
403 self,
404 Self::StreamCompleted { .. } | Self::SessionClosed { .. }
405 )
406 }
407}
408
409pub trait EventStore {
411 fn append_events(&mut self, events: Vec<DomainEvent>) -> Result<(), String>;
417
418 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String>;
424
425 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String>;
431
432 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String>;
438}
439
440#[derive(Debug, Clone, Default)]
442pub struct InMemoryEventStore {
443 events: Vec<DomainEvent>,
444}
445
446impl InMemoryEventStore {
447 pub fn new() -> Self {
449 Self::default()
450 }
451
452 pub fn all_events(&self) -> &[DomainEvent] {
454 &self.events
455 }
456
457 pub fn event_count(&self) -> usize {
459 self.events.len()
460 }
461}
462
463impl EventStore for InMemoryEventStore {
464 fn append_events(&mut self, mut events: Vec<DomainEvent>) -> Result<(), String> {
465 self.events.append(&mut events);
466 Ok(())
467 }
468
469 fn get_events_for_session(&self, session_id: SessionId) -> Result<Vec<DomainEvent>, String> {
470 Ok(self
471 .events
472 .iter()
473 .filter(|e| e.session_id() == session_id)
474 .cloned()
475 .collect())
476 }
477
478 fn get_events_for_stream(&self, stream_id: StreamId) -> Result<Vec<DomainEvent>, String> {
479 Ok(self
480 .events
481 .iter()
482 .filter(|e| e.stream_id() == Some(stream_id))
483 .cloned()
484 .collect())
485 }
486
487 fn get_events_since(&self, since: DateTime<Utc>) -> Result<Vec<DomainEvent>, String> {
488 Ok(self
489 .events
490 .iter()
491 .filter(|e| e.timestamp() > since)
492 .cloned()
493 .collect())
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500 use crate::value_objects::{SessionId, StreamId};
501
502 #[test]
503 fn test_domain_event_properties() {
504 let session_id = SessionId::new();
505 let stream_id = StreamId::new();
506 let timestamp = Utc::now();
507
508 let event = DomainEvent::StreamCreated {
509 session_id,
510 stream_id,
511 timestamp,
512 };
513
514 assert_eq!(event.session_id(), session_id);
515 assert_eq!(event.stream_id(), Some(stream_id));
516 assert_eq!(event.timestamp(), timestamp);
517 assert_eq!(event.event_type(), "stream_created");
518 assert!(!event.is_critical());
519 assert!(!event.is_error());
520 }
521
522 #[test]
523 fn test_critical_events() {
524 let session_id = SessionId::new();
525 let stream_id = StreamId::new();
526
527 let error_event = DomainEvent::StreamFailed {
528 session_id,
529 stream_id,
530 error: "Connection lost".to_string(),
531 timestamp: Utc::now(),
532 };
533
534 assert!(error_event.is_critical());
535 assert!(error_event.is_error());
536 assert!(!error_event.is_completion());
537 }
538
539 #[test]
540 fn test_event_store() {
541 let mut store = InMemoryEventStore::new();
542 let session_id = SessionId::new();
543 let stream_id = StreamId::new();
544
545 let events = vec![
546 DomainEvent::SessionActivated {
547 session_id,
548 timestamp: Utc::now(),
549 },
550 DomainEvent::StreamCreated {
551 session_id,
552 stream_id,
553 timestamp: Utc::now(),
554 },
555 ];
556
557 store
558 .append_events(events.clone())
559 .expect("Failed to append events to store in test");
560 assert_eq!(store.event_count(), 2);
561
562 let session_events = store
563 .get_events_for_session(session_id)
564 .expect("Failed to retrieve session events in test");
565 assert_eq!(session_events.len(), 2);
566
567 let stream_events = store
568 .get_events_for_stream(stream_id)
569 .expect("Failed to retrieve stream events in test");
570 assert_eq!(stream_events.len(), 1);
571 }
572
573 #[test]
574 fn test_event_serialization() {
575 let session_id = SessionId::new();
576 let event = DomainEvent::SessionActivated {
577 session_id,
578 timestamp: Utc::now(),
579 };
580
581 let serialized = serde_json::to_string(&event).expect("Failed to serialize event in test");
582 let deserialized: DomainEvent =
583 serde_json::from_str(&serialized).expect("Failed to deserialize event in test");
584
585 assert_eq!(event, deserialized);
586 }
587}
588
589#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
591pub struct EventId(uuid::Uuid);
592
593impl EventId {
594 pub fn new() -> Self {
596 Self(uuid::Uuid::new_v4())
597 }
598
599 pub fn from_uuid(uuid: uuid::Uuid) -> Self {
601 Self(uuid)
602 }
603
604 pub fn inner(&self) -> uuid::Uuid {
606 self.0
607 }
608}
609
610impl std::fmt::Display for EventId {
611 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612 write!(f, "{}", self.0)
613 }
614}
615
616impl Default for EventId {
617 fn default() -> Self {
618 Self::new()
619 }
620}
621
622pub trait EventSubscriber {
624 type HandleFuture<'a>: std::future::Future<Output = crate::DomainResult<()>> + Send + 'a
626 where
627 Self: 'a;
628
629 fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
631}
632
633impl DomainEvent {
635 pub fn event_id(&self) -> EventId {
637 let content = format!("{self:?}");
640 use std::collections::hash_map::DefaultHasher;
641 use std::hash::{Hash, Hasher};
642 let mut hash = DefaultHasher::new();
643 content.hash(&mut hash);
644 let hash_val = hash.finish();
645 let uuid = uuid::Uuid::from_bytes([
646 (hash_val >> 56) as u8,
647 (hash_val >> 48) as u8,
648 (hash_val >> 40) as u8,
649 (hash_val >> 32) as u8,
650 (hash_val >> 24) as u8,
651 (hash_val >> 16) as u8,
652 (hash_val >> 8) as u8,
653 hash_val as u8,
654 0,
655 0,
656 0,
657 0,
658 0,
659 0,
660 0,
661 0,
662 ]);
663 EventId::from_uuid(uuid)
664 }
665
666 pub fn occurred_at(&self) -> DateTime<Utc> {
668 match self {
669 DomainEvent::SessionActivated { timestamp, .. }
670 | DomainEvent::SessionClosed { timestamp, .. }
671 | DomainEvent::SessionExpired { timestamp, .. }
672 | DomainEvent::StreamCreated { timestamp, .. }
673 | DomainEvent::StreamStarted { timestamp, .. }
674 | DomainEvent::StreamCompleted { timestamp, .. }
675 | DomainEvent::StreamFailed { timestamp, .. }
676 | DomainEvent::StreamCancelled { timestamp, .. }
677 | DomainEvent::SkeletonGenerated { timestamp, .. }
678 | DomainEvent::PatchFramesGenerated { timestamp, .. }
679 | DomainEvent::FramesBatched { timestamp, .. }
680 | DomainEvent::PriorityThresholdAdjusted { timestamp, .. }
681 | DomainEvent::StreamConfigUpdated { timestamp, .. }
682 | DomainEvent::PerformanceMetricsRecorded { timestamp, .. }
683 | DomainEvent::SessionTimedOut { timestamp, .. }
684 | DomainEvent::SessionTimeoutExtended { timestamp, .. } => *timestamp,
685 }
686 }
687
688 pub fn metadata(&self) -> std::collections::HashMap<String, String> {
690 let mut metadata = std::collections::HashMap::new();
691 metadata.insert("event_type".to_string(), self.event_type().to_string());
692 metadata.insert("session_id".to_string(), self.session_id().to_string());
693 metadata.insert("timestamp".to_string(), self.occurred_at().to_rfc3339());
694
695 if let Some(stream_id) = self.stream_id() {
696 metadata.insert("stream_id".to_string(), stream_id.to_string());
697 }
698
699 metadata
700 }
701}