rs2_stream/media/
events.rs

1//! Media streaming events for Kafka integration
2
3use super::types::*;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub enum MediaStreamEvent {
9    StreamStarted {
10        stream_id: String,
11        user_id: u64,
12        quality: QualityLevel,
13        timestamp: DateTime<Utc>,
14    },
15    StreamStopped {
16        stream_id: String,
17        user_id: u64,
18        duration_seconds: u64,
19        bytes_transferred: u64,
20        timestamp: DateTime<Utc>,
21    },
22    QualityChanged {
23        stream_id: String,
24        user_id: u64,
25        old_quality: QualityLevel,
26        new_quality: QualityLevel,
27        timestamp: DateTime<Utc>,
28    },
29    BufferUnderrun {
30        stream_id: String,
31        user_id: u64,
32        buffer_level: f64,
33        timestamp: DateTime<Utc>,
34    },
35    ChunkDropped {
36        stream_id: String,
37        user_id: u64,
38        sequence_number: u64,
39        reason: String,
40        timestamp: DateTime<Utc>,
41    },
42}
43
44impl MediaStreamEvent {
45    pub fn stream_id(&self) -> &str {
46        match self {
47            MediaStreamEvent::StreamStarted { stream_id, .. } => stream_id,
48            MediaStreamEvent::StreamStopped { stream_id, .. } => stream_id,
49            MediaStreamEvent::QualityChanged { stream_id, .. } => stream_id,
50            MediaStreamEvent::BufferUnderrun { stream_id, .. } => stream_id,
51            MediaStreamEvent::ChunkDropped { stream_id, .. } => stream_id,
52        }
53    }
54
55    pub fn user_id(&self) -> u64 {
56        match self {
57            MediaStreamEvent::StreamStarted { user_id, .. } => *user_id,
58            MediaStreamEvent::StreamStopped { user_id, .. } => *user_id,
59            MediaStreamEvent::QualityChanged { user_id, .. } => *user_id,
60            MediaStreamEvent::BufferUnderrun { user_id, .. } => *user_id,
61            MediaStreamEvent::ChunkDropped { user_id, .. } => *user_id,
62        }
63    }
64}
65
66// Integration with your existing activity system
67impl From<MediaStreamEvent> for super::types::UserActivity {
68    fn from(event: MediaStreamEvent) -> Self {
69        use std::collections::HashMap;
70
71        let mut metadata = HashMap::new();
72        let activity_type = match &event {
73            MediaStreamEvent::StreamStarted { quality, .. } => {
74                metadata.insert("quality".to_string(), format!("{:?}", quality));
75                "media_stream_started".to_string()
76            }
77            MediaStreamEvent::StreamStopped {
78                duration_seconds,
79                bytes_transferred,
80                ..
81            } => {
82                metadata.insert("duration_seconds".to_string(), duration_seconds.to_string());
83                metadata.insert(
84                    "bytes_transferred".to_string(),
85                    bytes_transferred.to_string(),
86                );
87                "media_stream_stopped".to_string()
88            }
89            MediaStreamEvent::QualityChanged {
90                old_quality,
91                new_quality,
92                ..
93            } => {
94                metadata.insert("old_quality".to_string(), format!("{:?}", old_quality));
95                metadata.insert("new_quality".to_string(), format!("{:?}", new_quality));
96                "media_quality_changed".to_string()
97            }
98            MediaStreamEvent::BufferUnderrun { buffer_level, .. } => {
99                metadata.insert("buffer_level".to_string(), buffer_level.to_string());
100                "media_buffer_underrun".to_string()
101            }
102            MediaStreamEvent::ChunkDropped {
103                sequence_number,
104                reason,
105                ..
106            } => {
107                metadata.insert("sequence_number".to_string(), sequence_number.to_string());
108                metadata.insert("reason".to_string(), reason.clone());
109                "media_chunk_dropped".to_string()
110            }
111        };
112
113        metadata.insert("stream_id".to_string(), event.stream_id().to_string());
114
115        super::types::UserActivity {
116            id: uuid::Uuid::new_v4().to_string(),
117            user_id: event.user_id(),
118            activity_type,
119            timestamp: chrono::Utc::now(),
120            metadata,
121        }
122    }
123}