use super::MediaPacket;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::Duration;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamMetrics {
pub stream_id: Uuid,
pub video_bitrate: u64,
pub audio_bitrate: u64,
pub framerate: f64,
pub dropped_frames: u64,
pub keyframe_interval: u64,
pub total_packets: u64,
pub total_bytes: u64,
pub avg_latency: u64,
pub packet_loss_rate: f64,
}
impl StreamMetrics {
#[must_use]
pub fn new(stream_id: Uuid) -> Self {
Self {
stream_id,
video_bitrate: 0,
audio_bitrate: 0,
framerate: 0.0,
dropped_frames: 0,
keyframe_interval: 0,
total_packets: 0,
total_bytes: 0,
avg_latency: 0,
packet_loss_rate: 0.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ViewerMetrics {
pub viewer_id: String,
pub stream_id: Uuid,
pub join_time: DateTime<Utc>,
pub leave_time: Option<DateTime<Utc>>,
pub watch_time: Duration,
pub rebuffer_count: u64,
pub rebuffer_duration: Duration,
pub avg_bitrate: u64,
pub quality_switches: u64,
pub current_quality: Option<String>,
pub location: Option<String>,
pub user_agent: Option<String>,
}
impl ViewerMetrics {
#[must_use]
pub fn new(viewer_id: impl Into<String>, stream_id: Uuid) -> Self {
Self {
viewer_id: viewer_id.into(),
stream_id,
join_time: Utc::now(),
leave_time: None,
watch_time: Duration::ZERO,
rebuffer_count: 0,
rebuffer_duration: Duration::ZERO,
avg_bitrate: 0,
quality_switches: 0,
current_quality: None,
location: None,
user_agent: None,
}
}
pub fn leave(&mut self) {
self.leave_time = Some(Utc::now());
if let Ok(duration) = (Utc::now() - self.join_time).to_std() {
self.watch_time = duration;
}
}
pub fn record_rebuffer(&mut self, duration: Duration) {
self.rebuffer_count += 1;
self.rebuffer_duration += duration;
}
pub fn record_quality_switch(&mut self, new_quality: impl Into<String>) {
self.quality_switches += 1;
self.current_quality = Some(new_quality.into());
}
}
#[derive(Debug, Clone)]
struct DataPoint {
timestamp: DateTime<Utc>,
value: f64,
}
struct TimeSeries {
data: VecDeque<DataPoint>,
max_points: usize,
window: Duration,
}
impl TimeSeries {
fn new(max_points: usize, window: Duration) -> Self {
Self {
data: VecDeque::new(),
max_points,
window,
}
}
fn add(&mut self, value: f64) {
let now = Utc::now();
self.data.push_back(DataPoint {
timestamp: now,
value,
});
let cutoff = now - chrono::Duration::from_std(self.window).unwrap_or_default();
while let Some(first) = self.data.front() {
if first.timestamp < cutoff {
self.data.pop_front();
} else {
break;
}
}
if self.data.len() > self.max_points {
self.data.pop_front();
}
}
fn average(&self) -> f64 {
if self.data.is_empty() {
return 0.0;
}
let sum: f64 = self.data.iter().map(|p| p.value).sum();
sum / self.data.len() as f64
}
fn max(&self) -> f64 {
self.data
.iter()
.map(|p| p.value)
.max_by(|a, b| a.total_cmp(b))
.unwrap_or(0.0)
}
fn min(&self) -> f64 {
self.data
.iter()
.map(|p| p.value)
.min_by(|a, b| a.total_cmp(b))
.unwrap_or(0.0)
}
}
pub struct Analytics {
stream_id: Uuid,
stream_metrics: RwLock<StreamMetrics>,
viewers: RwLock<HashMap<String, ViewerMetrics>>,
bitrate_series: RwLock<TimeSeries>,
framerate_series: RwLock<TimeSeries>,
viewer_count_series: RwLock<TimeSeries>,
latency_series: RwLock<TimeSeries>,
total_watch_time: RwLock<Duration>,
}
impl Analytics {
#[must_use]
pub fn new(stream_id: Uuid) -> Self {
Self {
stream_id,
stream_metrics: RwLock::new(StreamMetrics::new(stream_id)),
viewers: RwLock::new(HashMap::new()),
bitrate_series: RwLock::new(TimeSeries::new(1000, Duration::from_secs(300))),
framerate_series: RwLock::new(TimeSeries::new(1000, Duration::from_secs(300))),
viewer_count_series: RwLock::new(TimeSeries::new(1000, Duration::from_secs(3600))),
latency_series: RwLock::new(TimeSeries::new(1000, Duration::from_secs(300))),
total_watch_time: RwLock::new(Duration::ZERO),
}
}
pub fn record_packet(&self, packet: &MediaPacket) {
let mut metrics = self.stream_metrics.write();
metrics.total_packets += 1;
metrics.total_bytes += packet.data.len() as u64;
let bitrate = (packet.data.len() * 8) as f64 / packet.duration as f64 * 1000.0;
self.bitrate_series.write().add(bitrate);
match packet.media_type {
super::MediaType::Video => {
if packet.keyframe {
self.framerate_series.write().add(30.0); }
}
super::MediaType::Audio => {}
super::MediaType::Metadata => {}
}
}
pub fn add_viewer(&self, viewer_id: impl Into<String>) -> ViewerMetrics {
let viewer_id = viewer_id.into();
let metrics = ViewerMetrics::new(&viewer_id, self.stream_id);
let mut viewers = self.viewers.write();
viewers.insert(viewer_id.clone(), metrics.clone());
self.viewer_count_series.write().add(viewers.len() as f64);
metrics
}
pub fn remove_viewer(&self, viewer_id: &str) {
let mut viewers = self.viewers.write();
if let Some(mut viewer) = viewers.remove(viewer_id) {
viewer.leave();
*self.total_watch_time.write() += viewer.watch_time;
}
self.viewer_count_series.write().add(viewers.len() as f64);
}
pub fn record_viewer_rebuffer(&self, viewer_id: &str, duration: Duration) {
let mut viewers = self.viewers.write();
if let Some(viewer) = viewers.get_mut(viewer_id) {
viewer.record_rebuffer(duration);
}
}
pub fn record_viewer_quality_switch(&self, viewer_id: &str, quality: impl Into<String>) {
let mut viewers = self.viewers.write();
if let Some(viewer) = viewers.get_mut(viewer_id) {
viewer.record_quality_switch(quality);
}
}
#[must_use]
pub fn stream_metrics(&self) -> StreamMetrics {
let mut metrics = self.stream_metrics.read().clone();
metrics.video_bitrate = self.bitrate_series.read().average() as u64;
metrics.framerate = self.framerate_series.read().average();
metrics.avg_latency = self.latency_series.read().average() as u64;
metrics
}
#[must_use]
pub fn viewer_metrics(&self, viewer_id: &str) -> Option<ViewerMetrics> {
let viewers = self.viewers.read();
viewers.get(viewer_id).cloned()
}
#[must_use]
pub fn all_viewer_metrics(&self) -> Vec<ViewerMetrics> {
let viewers = self.viewers.read();
viewers.values().cloned().collect()
}
#[must_use]
pub fn viewer_count(&self) -> usize {
let viewers = self.viewers.read();
viewers.len()
}
#[must_use]
pub fn peak_viewer_count(&self) -> usize {
self.viewer_count_series.read().max() as usize
}
#[must_use]
pub fn avg_viewer_count(&self) -> f64 {
self.viewer_count_series.read().average()
}
#[must_use]
pub fn total_watch_time(&self) -> Duration {
*self.total_watch_time.read()
}
pub fn record_latency(&self, latency_ms: u64) {
self.latency_series.write().add(latency_ms as f64);
}
#[must_use]
pub fn summary(&self) -> AnalyticsSummary {
AnalyticsSummary {
stream_id: self.stream_id,
stream_metrics: self.stream_metrics(),
current_viewers: self.viewer_count(),
peak_viewers: self.peak_viewer_count(),
avg_viewers: self.avg_viewer_count(),
total_watch_time: self.total_watch_time(),
avg_bitrate: self.bitrate_series.read().average() as u64,
avg_framerate: self.framerate_series.read().average(),
avg_latency: self.latency_series.read().average() as u64,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsSummary {
pub stream_id: Uuid,
pub stream_metrics: StreamMetrics,
pub current_viewers: usize,
pub peak_viewers: usize,
pub avg_viewers: f64,
pub total_watch_time: Duration,
pub avg_bitrate: u64,
pub avg_framerate: f64,
pub avg_latency: u64,
}