rs2_stream/media/
events.rs1use super::types::*;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub enum MediaStreamEvent {
9 StreamStarted {
10 stream_id: String,
11 user_id: u64,
12 quality: QualityLevel,
13 timestamp: DateTime<Utc>,
14 },
15 StreamStopped {
16 stream_id: String,
17 user_id: u64,
18 duration_seconds: u64,
19 bytes_transferred: u64,
20 timestamp: DateTime<Utc>,
21 },
22 QualityChanged {
23 stream_id: String,
24 user_id: u64,
25 old_quality: QualityLevel,
26 new_quality: QualityLevel,
27 timestamp: DateTime<Utc>,
28 },
29 BufferUnderrun {
30 stream_id: String,
31 user_id: u64,
32 buffer_level: f64,
33 timestamp: DateTime<Utc>,
34 },
35 ChunkDropped {
36 stream_id: String,
37 user_id: u64,
38 sequence_number: u64,
39 reason: String,
40 timestamp: DateTime<Utc>,
41 },
42}
43
44impl MediaStreamEvent {
45 pub fn stream_id(&self) -> &str {
46 match self {
47 MediaStreamEvent::StreamStarted { stream_id, .. } => stream_id,
48 MediaStreamEvent::StreamStopped { stream_id, .. } => stream_id,
49 MediaStreamEvent::QualityChanged { stream_id, .. } => stream_id,
50 MediaStreamEvent::BufferUnderrun { stream_id, .. } => stream_id,
51 MediaStreamEvent::ChunkDropped { stream_id, .. } => stream_id,
52 }
53 }
54
55 pub fn user_id(&self) -> u64 {
56 match self {
57 MediaStreamEvent::StreamStarted { user_id, .. } => *user_id,
58 MediaStreamEvent::StreamStopped { user_id, .. } => *user_id,
59 MediaStreamEvent::QualityChanged { user_id, .. } => *user_id,
60 MediaStreamEvent::BufferUnderrun { user_id, .. } => *user_id,
61 MediaStreamEvent::ChunkDropped { user_id, .. } => *user_id,
62 }
63 }
64}
65
66impl From<MediaStreamEvent> for super::types::UserActivity {
68 fn from(event: MediaStreamEvent) -> Self {
69 use std::collections::HashMap;
70
71 let mut metadata = HashMap::new();
72 let activity_type = match &event {
73 MediaStreamEvent::StreamStarted { quality, .. } => {
74 metadata.insert("quality".to_string(), format!("{:?}", quality));
75 "media_stream_started".to_string()
76 }
77 MediaStreamEvent::StreamStopped {
78 duration_seconds,
79 bytes_transferred,
80 ..
81 } => {
82 metadata.insert("duration_seconds".to_string(), duration_seconds.to_string());
83 metadata.insert(
84 "bytes_transferred".to_string(),
85 bytes_transferred.to_string(),
86 );
87 "media_stream_stopped".to_string()
88 }
89 MediaStreamEvent::QualityChanged {
90 old_quality,
91 new_quality,
92 ..
93 } => {
94 metadata.insert("old_quality".to_string(), format!("{:?}", old_quality));
95 metadata.insert("new_quality".to_string(), format!("{:?}", new_quality));
96 "media_quality_changed".to_string()
97 }
98 MediaStreamEvent::BufferUnderrun { buffer_level, .. } => {
99 metadata.insert("buffer_level".to_string(), buffer_level.to_string());
100 "media_buffer_underrun".to_string()
101 }
102 MediaStreamEvent::ChunkDropped {
103 sequence_number,
104 reason,
105 ..
106 } => {
107 metadata.insert("sequence_number".to_string(), sequence_number.to_string());
108 metadata.insert("reason".to_string(), reason.clone());
109 "media_chunk_dropped".to_string()
110 }
111 };
112
113 metadata.insert("stream_id".to_string(), event.stream_id().to_string());
114
115 super::types::UserActivity {
116 id: uuid::Uuid::new_v4().to_string(),
117 user_id: event.user_id(),
118 activity_type,
119 timestamp: chrono::Utc::now(),
120 metadata,
121 }
122 }
123}