1use crate::types::*;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{mpsc, RwLock};
13use tracing::{debug, info};
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "event", rename_all = "snake_case")]
19pub enum AnalyticsEvent {
20 Load {
22 url: String,
23 is_live: bool,
24 },
25
26 Play {
28 position: f64,
29 },
30
31 Pause {
33 position: f64,
34 },
35
36 Seek {
38 from: f64,
39 to: f64,
40 },
41
42 Rebuffer {
44 position: f64,
45 buffer_level: f64,
46 },
47
48 RebufferEnd {
50 position: f64,
51 duration: f64,
52 },
53
54 QualityChange {
56 from_bitrate: u64,
57 to_bitrate: u64,
58 from_resolution: Option<Resolution>,
59 to_resolution: Option<Resolution>,
60 reason: QualityChangeReason,
61 },
62
63 StateChange {
65 from: PlayerState,
66 to: PlayerState,
67 position: f64,
68 },
69
70 End {
72 position: f64,
73 watch_time: f64,
74 },
75
76 Error {
78 code: String,
79 message: String,
80 fatal: bool,
81 position: f64,
82 },
83
84 Heartbeat {
86 position: f64,
87 buffer_level: f64,
88 bitrate: u64,
89 dropped_frames: u64,
90 decoded_frames: u64,
91 },
92
93 Custom {
95 name: String,
96 data: serde_json::Value,
97 },
98}
99
100#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
102#[serde(rename_all = "snake_case")]
103pub enum QualityChangeReason {
104 Abr,
106 Manual,
108 Buffer,
110 Initial,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct AnalyticsEventRecord {
117 pub id: Uuid,
119 pub session_id: SessionId,
121 pub timestamp: DateTime<Utc>,
123 pub sequence: u64,
125 #[serde(flatten)]
127 pub event: AnalyticsEvent,
128}
129
130pub struct AnalyticsEmitter {
132 session_id: SessionId,
134 sequence: RwLock<u64>,
136 buffer: RwLock<Vec<AnalyticsEventRecord>>,
138 max_buffer_size: usize,
140 event_tx: mpsc::Sender<AnalyticsEventRecord>,
142 beacon_url: Option<String>,
144}
145
146impl AnalyticsEmitter {
147 pub fn new() -> Self {
149 let (event_tx, mut event_rx) = mpsc::channel::<AnalyticsEventRecord>(1000);
150
151 tokio::spawn(async move {
153 while let Some(event) = event_rx.recv().await {
154 debug!(
156 event_id = %event.id,
157 event = ?event.event,
158 "Analytics event"
159 );
160 }
161 });
162
163 Self {
164 session_id: SessionId::new(),
165 sequence: RwLock::new(0),
166 buffer: RwLock::new(Vec::new()),
167 max_buffer_size: 50,
168 event_tx,
169 beacon_url: None,
170 }
171 }
172
173 pub fn with_beacon(beacon_url: String) -> Self {
175 let mut emitter = Self::new();
176 emitter.beacon_url = Some(beacon_url);
177 emitter
178 }
179
180 pub async fn emit(&self, event: AnalyticsEvent) {
182 let mut seq = self.sequence.write().await;
183 *seq += 1;
184 let sequence = *seq;
185
186 let record = AnalyticsEventRecord {
187 id: Uuid::new_v4(),
188 session_id: self.session_id,
189 timestamp: Utc::now(),
190 sequence,
191 event,
192 };
193
194 let mut buffer = self.buffer.write().await;
196 buffer.push(record.clone());
197
198 if buffer.len() >= self.max_buffer_size {
200 let events: Vec<_> = buffer.drain(..).collect();
201 drop(buffer);
202 self.flush_events(events).await;
203 }
204
205 let _ = self.event_tx.send(record).await;
207 }
208
209 async fn flush_events(&self, events: Vec<AnalyticsEventRecord>) {
211 if events.is_empty() {
212 return;
213 }
214
215 info!(count = events.len(), "Flushing analytics events");
216
217 if let Some(ref url) = self.beacon_url {
219 let client = reqwest::Client::new();
222 let _ = client.post(url)
223 .json(&events)
224 .send()
225 .await;
226 }
227 }
228
229 pub async fn get_events(&self) -> Vec<AnalyticsEventRecord> {
231 self.buffer.read().await.clone()
232 }
233
234 pub async fn clear(&self) {
236 self.buffer.write().await.clear();
237 }
238
239 pub fn set_beacon_url(&mut self, url: String) {
241 self.beacon_url = Some(url);
242 }
243}
244
245impl Default for AnalyticsEmitter {
246 fn default() -> Self {
247 Self::new()
248 }
249}
250
251pub struct QoeCalculator {
253 initial_buffer_time: f64,
255 rebuffer_count: u32,
257 rebuffer_duration: f64,
259 _start_time: f64,
261 quality_switches: Vec<(f64, u64)>, bitrate_samples: Vec<(f64, u64)>, }
266
267impl QoeCalculator {
268 pub fn new() -> Self {
269 Self {
270 initial_buffer_time: 0.0,
271 rebuffer_count: 0,
272 rebuffer_duration: 0.0,
273 _start_time: 0.0,
274 quality_switches: Vec::new(),
275 bitrate_samples: Vec::new(),
276 }
277 }
278
279 pub fn record_initial_buffer(&mut self, duration: f64) {
281 self.initial_buffer_time = duration;
282 }
283
284 pub fn record_rebuffer(&mut self, duration: f64) {
286 self.rebuffer_count += 1;
287 self.rebuffer_duration += duration;
288 }
289
290 pub fn record_quality_switch(&mut self, timestamp: f64, bitrate: u64) {
292 self.quality_switches.push((timestamp, bitrate));
293 }
294
295 pub fn record_bitrate(&mut self, duration: f64, bitrate: u64) {
297 self.bitrate_samples.push((duration, bitrate));
298 }
299
300 pub fn calculate_qoe(&self) -> f64 {
302 let mut score = 100.0;
309
310 if self.initial_buffer_time > 2.0 {
313 score -= (self.initial_buffer_time - 2.0) * 5.0;
314 }
315
316 score -= self.rebuffer_count as f64 * 10.0;
319
320 score -= self.rebuffer_duration * 5.0;
323
324 score -= self.quality_switches.len() as f64 * 2.0;
327
328 let avg_bitrate = self.average_bitrate();
330 if avg_bitrate > 5_000_000 {
331 score += 5.0;
332 } else if avg_bitrate > 2_000_000 {
333 score += 2.0;
334 }
335
336 score.clamp(0.0, 100.0)
337 }
338
339 fn average_bitrate(&self) -> u64 {
341 if self.bitrate_samples.is_empty() {
342 return 0;
343 }
344
345 let total_duration: f64 = self.bitrate_samples.iter().map(|(d, _)| d).sum();
346 if total_duration == 0.0 {
347 return 0;
348 }
349
350 let weighted_sum: f64 = self.bitrate_samples
351 .iter()
352 .map(|(d, b)| d * *b as f64)
353 .sum();
354
355 (weighted_sum / total_duration) as u64
356 }
357
358 pub fn breakdown(&self) -> QoeBreakdown {
360 QoeBreakdown {
361 score: self.calculate_qoe(),
362 initial_buffer_time: self.initial_buffer_time,
363 rebuffer_count: self.rebuffer_count,
364 rebuffer_duration: self.rebuffer_duration,
365 quality_switches: self.quality_switches.len() as u32,
366 average_bitrate: self.average_bitrate(),
367 }
368 }
369}
370
371impl Default for QoeCalculator {
372 fn default() -> Self {
373 Self::new()
374 }
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize)]
379pub struct QoeBreakdown {
380 pub score: f64,
381 pub initial_buffer_time: f64,
382 pub rebuffer_count: u32,
383 pub rebuffer_duration: f64,
384 pub quality_switches: u32,
385 pub average_bitrate: u64,
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_qoe_perfect() {
394 let calc = QoeCalculator::new();
395 assert_eq!(calc.calculate_qoe(), 100.0);
396 }
397
398 #[test]
399 fn test_qoe_with_rebuffers() {
400 let mut calc = QoeCalculator::new();
401 calc.record_rebuffer(1.0);
402 calc.record_rebuffer(2.0);
403
404 assert!((calc.calculate_qoe() - 65.0).abs() < 0.1);
406 }
407
408 #[test]
409 fn test_qoe_with_initial_buffer() {
410 let mut calc = QoeCalculator::new();
411 calc.record_initial_buffer(5.0); assert!((calc.calculate_qoe() - 85.0).abs() < 0.1);
415 }
416
417 #[tokio::test]
418 async fn test_analytics_emitter() {
419 let emitter = AnalyticsEmitter::new();
420
421 emitter.emit(AnalyticsEvent::Play { position: 0.0 }).await;
422 emitter.emit(AnalyticsEvent::Pause { position: 10.0 }).await;
423
424 let events = emitter.get_events().await;
425 assert_eq!(events.len(), 2);
426 }
427}