rs2_stream/media/
events.rs1use super::types::*;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub enum MediaStreamEvent {
10 StreamStarted {
11 stream_id: String,
12 user_id: u64,
13 quality: QualityLevel,
14 timestamp: DateTime<Utc>,
15 },
16 StreamStopped {
17 stream_id: String,
18 user_id: u64,
19 duration_seconds: u64,
20 bytes_transferred: u64,
21 timestamp: DateTime<Utc>,
22 },
23 QualityChanged {
24 stream_id: String,
25 user_id: u64,
26 old_quality: QualityLevel,
27 new_quality: QualityLevel,
28 timestamp: DateTime<Utc>,
29 },
30 BufferUnderrun {
31 stream_id: String,
32 user_id: u64,
33 buffer_level: f64,
34 timestamp: DateTime<Utc>,
35 },
36 ChunkDropped {
37 stream_id: String,
38 user_id: u64,
39 sequence_number: u64,
40 reason: String,
41 timestamp: DateTime<Utc>,
42 },
43}
44
45impl MediaStreamEvent {
46 pub fn stream_id(&self) -> &str {
47 match self {
48 MediaStreamEvent::StreamStarted { stream_id, .. } => stream_id,
49 MediaStreamEvent::StreamStopped { stream_id, .. } => stream_id,
50 MediaStreamEvent::QualityChanged { stream_id, .. } => stream_id,
51 MediaStreamEvent::BufferUnderrun { stream_id, .. } => stream_id,
52 MediaStreamEvent::ChunkDropped { stream_id, .. } => stream_id,
53 }
54 }
55
56 pub fn user_id(&self) -> u64 {
57 match self {
58 MediaStreamEvent::StreamStarted { user_id, .. } => *user_id,
59 MediaStreamEvent::StreamStopped { user_id, .. } => *user_id,
60 MediaStreamEvent::QualityChanged { user_id, .. } => *user_id,
61 MediaStreamEvent::BufferUnderrun { user_id, .. } => *user_id,
62 MediaStreamEvent::ChunkDropped { user_id, .. } => *user_id,
63 }
64 }
65}
66
67impl From<MediaStreamEvent> for super::types::UserActivity {
69 fn from(event: MediaStreamEvent) -> Self {
70 use std::collections::HashMap;
71
72 let mut metadata = HashMap::new();
73 let activity_type = match &event {
74 MediaStreamEvent::StreamStarted { quality, .. } => {
75 metadata.insert("quality".to_string(), format!("{:?}", quality));
76 "media_stream_started".to_string()
77 },
78 MediaStreamEvent::StreamStopped { duration_seconds, bytes_transferred, .. } => {
79 metadata.insert("duration_seconds".to_string(), duration_seconds.to_string());
80 metadata.insert("bytes_transferred".to_string(), bytes_transferred.to_string());
81 "media_stream_stopped".to_string()
82 },
83 MediaStreamEvent::QualityChanged { old_quality, new_quality, .. } => {
84 metadata.insert("old_quality".to_string(), format!("{:?}", old_quality));
85 metadata.insert("new_quality".to_string(), format!("{:?}", new_quality));
86 "media_quality_changed".to_string()
87 },
88 MediaStreamEvent::BufferUnderrun { buffer_level, .. } => {
89 metadata.insert("buffer_level".to_string(), buffer_level.to_string());
90 "media_buffer_underrun".to_string()
91 },
92 MediaStreamEvent::ChunkDropped { sequence_number, reason, .. } => {
93 metadata.insert("sequence_number".to_string(), sequence_number.to_string());
94 metadata.insert("reason".to_string(), reason.clone());
95 "media_chunk_dropped".to_string()
96 },
97 };
98
99 metadata.insert("stream_id".to_string(), event.stream_id().to_string());
100
101 super::types::UserActivity {
102 id: uuid::Uuid::new_v4().to_string(),
103 user_id: event.user_id(),
104 activity_type,
105 timestamp: chrono::Utc::now(),
106 metadata,
107 }
108 }
109}