rs2_stream/media/
events.rs

1//! Media streaming events for Kafka integration
2
3use super::types::*;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub enum MediaStreamEvent {
10    StreamStarted {
11        stream_id: String,
12        user_id: u64,
13        quality: QualityLevel,
14        timestamp: DateTime<Utc>,
15    },
16    StreamStopped {
17        stream_id: String,
18        user_id: u64,
19        duration_seconds: u64,
20        bytes_transferred: u64,
21        timestamp: DateTime<Utc>,
22    },
23    QualityChanged {
24        stream_id: String,
25        user_id: u64,
26        old_quality: QualityLevel,
27        new_quality: QualityLevel,
28        timestamp: DateTime<Utc>,
29    },
30    BufferUnderrun {
31        stream_id: String,
32        user_id: u64,
33        buffer_level: f64,
34        timestamp: DateTime<Utc>,
35    },
36    ChunkDropped {
37        stream_id: String,
38        user_id: u64,
39        sequence_number: u64,
40        reason: String,
41        timestamp: DateTime<Utc>,
42    },
43}
44
45impl MediaStreamEvent {
46    pub fn stream_id(&self) -> &str {
47        match self {
48            MediaStreamEvent::StreamStarted { stream_id, .. } => stream_id,
49            MediaStreamEvent::StreamStopped { stream_id, .. } => stream_id,
50            MediaStreamEvent::QualityChanged { stream_id, .. } => stream_id,
51            MediaStreamEvent::BufferUnderrun { stream_id, .. } => stream_id,
52            MediaStreamEvent::ChunkDropped { stream_id, .. } => stream_id,
53        }
54    }
55
56    pub fn user_id(&self) -> u64 {
57        match self {
58            MediaStreamEvent::StreamStarted { user_id, .. } => *user_id,
59            MediaStreamEvent::StreamStopped { user_id, .. } => *user_id,
60            MediaStreamEvent::QualityChanged { user_id, .. } => *user_id,
61            MediaStreamEvent::BufferUnderrun { user_id, .. } => *user_id,
62            MediaStreamEvent::ChunkDropped { user_id, .. } => *user_id,
63        }
64    }
65}
66
67// Integration with your existing activity system
68impl From<MediaStreamEvent> for super::types::UserActivity {
69    fn from(event: MediaStreamEvent) -> Self {
70        use std::collections::HashMap;
71
72        let mut metadata = HashMap::new();
73        let activity_type = match &event {
74            MediaStreamEvent::StreamStarted { quality, .. } => {
75                metadata.insert("quality".to_string(), format!("{:?}", quality));
76                "media_stream_started".to_string()
77            },
78            MediaStreamEvent::StreamStopped { duration_seconds, bytes_transferred, .. } => {
79                metadata.insert("duration_seconds".to_string(), duration_seconds.to_string());
80                metadata.insert("bytes_transferred".to_string(), bytes_transferred.to_string());
81                "media_stream_stopped".to_string()
82            },
83            MediaStreamEvent::QualityChanged { old_quality, new_quality, .. } => {
84                metadata.insert("old_quality".to_string(), format!("{:?}", old_quality));
85                metadata.insert("new_quality".to_string(), format!("{:?}", new_quality));
86                "media_quality_changed".to_string()
87            },
88            MediaStreamEvent::BufferUnderrun { buffer_level, .. } => {
89                metadata.insert("buffer_level".to_string(), buffer_level.to_string());
90                "media_buffer_underrun".to_string()
91            },
92            MediaStreamEvent::ChunkDropped { sequence_number, reason, .. } => {
93                metadata.insert("sequence_number".to_string(), sequence_number.to_string());
94                metadata.insert("reason".to_string(), reason.clone());
95                "media_chunk_dropped".to_string()
96            },
97        };
98
99        metadata.insert("stream_id".to_string(), event.stream_id().to_string());
100
101        super::types::UserActivity {
102            id: uuid::Uuid::new_v4().to_string(),
103            user_id: event.user_id(),
104            activity_type,
105            timestamp: chrono::Utc::now(),
106            metadata,
107        }
108    }
109}