use crate::types::*;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum AnalyticsEvent {
Load {
url: String,
is_live: bool,
},
Play {
position: f64,
},
Pause {
position: f64,
},
Seek {
from: f64,
to: f64,
},
Rebuffer {
position: f64,
buffer_level: f64,
},
RebufferEnd {
position: f64,
duration: f64,
},
QualityChange {
from_bitrate: u64,
to_bitrate: u64,
from_resolution: Option<Resolution>,
to_resolution: Option<Resolution>,
reason: QualityChangeReason,
},
StateChange {
from: PlayerState,
to: PlayerState,
position: f64,
},
End {
position: f64,
watch_time: f64,
},
Error {
code: String,
message: String,
fatal: bool,
position: f64,
},
Heartbeat {
position: f64,
buffer_level: f64,
bitrate: u64,
dropped_frames: u64,
decoded_frames: u64,
},
Custom {
name: String,
data: serde_json::Value,
},
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QualityChangeReason {
Abr,
Manual,
Buffer,
Initial,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsEventRecord {
pub id: Uuid,
pub session_id: SessionId,
pub timestamp: DateTime<Utc>,
pub sequence: u64,
#[serde(flatten)]
pub event: AnalyticsEvent,
}
pub struct AnalyticsEmitter {
session_id: SessionId,
sequence: RwLock<u64>,
buffer: RwLock<Vec<AnalyticsEventRecord>>,
max_buffer_size: usize,
event_tx: mpsc::Sender<AnalyticsEventRecord>,
beacon_url: Option<String>,
}
impl AnalyticsEmitter {
pub fn new() -> Self {
let (event_tx, mut event_rx) = mpsc::channel::<AnalyticsEventRecord>(1000);
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
debug!(
event_id = %event.id,
event = ?event.event,
"Analytics event"
);
}
});
Self {
session_id: SessionId::new(),
sequence: RwLock::new(0),
buffer: RwLock::new(Vec::new()),
max_buffer_size: 50,
event_tx,
beacon_url: None,
}
}
pub fn with_beacon(beacon_url: String) -> Self {
let mut emitter = Self::new();
emitter.beacon_url = Some(beacon_url);
emitter
}
pub async fn emit(&self, event: AnalyticsEvent) {
let mut seq = self.sequence.write().await;
*seq += 1;
let sequence = *seq;
let record = AnalyticsEventRecord {
id: Uuid::new_v4(),
session_id: self.session_id,
timestamp: Utc::now(),
sequence,
event,
};
let mut buffer = self.buffer.write().await;
buffer.push(record.clone());
if buffer.len() >= self.max_buffer_size {
let events: Vec<_> = buffer.drain(..).collect();
drop(buffer);
self.flush_events(events).await;
}
let _ = self.event_tx.send(record).await;
}
async fn flush_events(&self, events: Vec<AnalyticsEventRecord>) {
if events.is_empty() {
return;
}
info!(count = events.len(), "Flushing analytics events");
if let Some(ref url) = self.beacon_url {
let client = reqwest::Client::new();
let _ = client.post(url)
.json(&events)
.send()
.await;
}
}
pub async fn get_events(&self) -> Vec<AnalyticsEventRecord> {
self.buffer.read().await.clone()
}
pub async fn clear(&self) {
self.buffer.write().await.clear();
}
pub fn set_beacon_url(&mut self, url: String) {
self.beacon_url = Some(url);
}
}
impl Default for AnalyticsEmitter {
fn default() -> Self {
Self::new()
}
}
pub struct QoeCalculator {
initial_buffer_time: f64,
rebuffer_count: u32,
rebuffer_duration: f64,
_start_time: f64,
quality_switches: Vec<(f64, u64)>, bitrate_samples: Vec<(f64, u64)>, }
impl QoeCalculator {
pub fn new() -> Self {
Self {
initial_buffer_time: 0.0,
rebuffer_count: 0,
rebuffer_duration: 0.0,
_start_time: 0.0,
quality_switches: Vec::new(),
bitrate_samples: Vec::new(),
}
}
pub fn record_initial_buffer(&mut self, duration: f64) {
self.initial_buffer_time = duration;
}
pub fn record_rebuffer(&mut self, duration: f64) {
self.rebuffer_count += 1;
self.rebuffer_duration += duration;
}
pub fn record_quality_switch(&mut self, timestamp: f64, bitrate: u64) {
self.quality_switches.push((timestamp, bitrate));
}
pub fn record_bitrate(&mut self, duration: f64, bitrate: u64) {
self.bitrate_samples.push((duration, bitrate));
}
pub fn calculate_qoe(&self) -> f64 {
let mut score = 100.0;
if self.initial_buffer_time > 2.0 {
score -= (self.initial_buffer_time - 2.0) * 5.0;
}
score -= self.rebuffer_count as f64 * 10.0;
score -= self.rebuffer_duration * 5.0;
score -= self.quality_switches.len() as f64 * 2.0;
let avg_bitrate = self.average_bitrate();
if avg_bitrate > 5_000_000 {
score += 5.0;
} else if avg_bitrate > 2_000_000 {
score += 2.0;
}
score.clamp(0.0, 100.0)
}
fn average_bitrate(&self) -> u64 {
if self.bitrate_samples.is_empty() {
return 0;
}
let total_duration: f64 = self.bitrate_samples.iter().map(|(d, _)| d).sum();
if total_duration == 0.0 {
return 0;
}
let weighted_sum: f64 = self.bitrate_samples
.iter()
.map(|(d, b)| d * *b as f64)
.sum();
(weighted_sum / total_duration) as u64
}
pub fn breakdown(&self) -> QoeBreakdown {
QoeBreakdown {
score: self.calculate_qoe(),
initial_buffer_time: self.initial_buffer_time,
rebuffer_count: self.rebuffer_count,
rebuffer_duration: self.rebuffer_duration,
quality_switches: self.quality_switches.len() as u32,
average_bitrate: self.average_bitrate(),
}
}
}
impl Default for QoeCalculator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QoeBreakdown {
pub score: f64,
pub initial_buffer_time: f64,
pub rebuffer_count: u32,
pub rebuffer_duration: f64,
pub quality_switches: u32,
pub average_bitrate: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_qoe_perfect() {
let calc = QoeCalculator::new();
assert_eq!(calc.calculate_qoe(), 100.0);
}
#[test]
fn test_qoe_with_rebuffers() {
let mut calc = QoeCalculator::new();
calc.record_rebuffer(1.0);
calc.record_rebuffer(2.0);
assert!((calc.calculate_qoe() - 65.0).abs() < 0.1);
}
#[test]
fn test_qoe_with_initial_buffer() {
let mut calc = QoeCalculator::new();
calc.record_initial_buffer(5.0);
assert!((calc.calculate_qoe() - 85.0).abs() < 0.1);
}
#[tokio::test]
async fn test_analytics_emitter() {
let emitter = AnalyticsEmitter::new();
emitter.emit(AnalyticsEvent::Play { position: 0.0 }).await;
emitter.emit(AnalyticsEvent::Pause { position: 10.0 }).await;
let events = emitter.get_events().await;
assert_eq!(events.len(), 2);
}
}