use crate::buffer::buffer::{AtomicBuffer, Stats};
use anyhow::Result;
use prometheus::{Counter, Gauge, Opts, Registry};
use prometheus::{Histogram, HistogramOpts};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
#[allow(dead_code)]
pub struct PrometheusHandler {
drift: Histogram,
expected_count: Counter,
received_count: Counter,
packet_count: Counter,
total_bytes: Counter,
sessions: Gauge,
peers: Gauge,
audio_tracks: Gauge,
video_tracks: Gauge,
}
impl PrometheusHandler {
fn new() -> Self {
let arift_buckets = vec![5.0, 10.0, 20.0, 40.0, 80.0, 160.0, f64::MAX];
let drift = Histogram::with_opts(
HistogramOpts::new("drift_millis", "drift_millis")
.subsystem("rtp")
.buckets(arift_buckets),
)
.unwrap();
let expected_count =
Counter::with_opts(Opts::new("expected", "expected").subsystem("rtp")).unwrap();
let received_count =
Counter::with_opts(Opts::new("received", "received").subsystem("rtp")).unwrap();
let packet_count =
Counter::with_opts(Opts::new("packets", "packets").subsystem("rtp")).unwrap();
let total_bytes = Counter::with_opts(Opts::new("bytes", "bytes").subsystem("rtp")).unwrap();
let sessions =
Gauge::with_opts(Opts::new("sessions", "Current number of sessions").subsystem("sfu"))
.unwrap();
let peers = Gauge::with_opts(
Opts::new("peers", "Current number of peers connected").subsystem("sfu"),
)
.unwrap();
let audio_tracks = Gauge::with_opts(
Opts::new("audio_tracks", "Current number of audio tracks").subsystem("sfu"),
)
.unwrap();
let video_tracks = Gauge::with_opts(
Opts::new("video_tracks", "Current number of video tracks").subsystem("sfu"),
)
.unwrap();
Self {
drift,
expected_count,
received_count,
packet_count,
total_bytes,
sessions,
peers,
audio_tracks,
video_tracks,
}
}
#[allow(dead_code)]
fn register(&self) -> Result<()> {
let r = Registry::new();
r.register(Box::new(self.drift.clone()))?;
r.register(Box::new(self.expected_count.clone()))?;
r.register(Box::new(self.received_count.clone()))?;
r.register(Box::new(self.packet_count.clone()))?;
r.register(Box::new(self.total_bytes.clone()))?;
r.register(Box::new(self.sessions.clone()))?;
r.register(Box::new(self.peers.clone()))?;
r.register(Box::new(self.audio_tracks.clone()))?;
r.register(Box::new(self.video_tracks.clone()))?;
Ok(())
}
}
#[allow(dead_code)]
#[derive(Default)]
pub struct StreamStats {
has_stas: bool,
last_stats: Stats,
diff_stats: Stats,
}
#[allow(dead_code)]
pub struct Stream {
buffer: Arc<AtomicBuffer>,
cname: Arc<Mutex<String>>,
drift_in_millis: AtomicU64,
stats: Arc<Mutex<StreamStats>>,
prometheus_handler: PrometheusHandler,
}
impl Stream {
pub fn new(buffer: Arc<AtomicBuffer>) -> Self {
let prometheus_handler = PrometheusHandler::new();
Self {
buffer,
cname: Arc::new(Mutex::new(String::default())),
drift_in_millis: AtomicU64::default(),
stats: Arc::new(Mutex::new(StreamStats::default())),
prometheus_handler,
}
}
#[allow(dead_code)]
async fn get_cname(&mut self) -> String {
let cname = self.cname.lock().await;
cname.clone()
}
pub async fn set_cname(&mut self, val: String) {
let mut cname = self.cname.lock().await;
*cname = val;
}
#[allow(dead_code)]
fn set_drift_in_millis(&mut self, val: u64) {
self.drift_in_millis.store(val, Ordering::Relaxed);
}
#[allow(dead_code)]
fn get_drift_in_millis(&self) -> u64 {
self.drift_in_millis.load(Ordering::Relaxed)
}
#[allow(dead_code)]
async fn update_stats(&mut self, stats: Stats) -> (bool, Stats) {
let mut cur_stats = self.stats.lock().await;
let mut has_status = false;
if cur_stats.has_stas {
cur_stats.diff_stats.last_expected =
stats.last_expected - cur_stats.last_stats.last_expected;
cur_stats.diff_stats.last_expected =
stats.last_received - cur_stats.last_stats.last_received;
cur_stats.diff_stats.packet_count =
stats.packet_count - cur_stats.last_stats.packet_count;
cur_stats.diff_stats.total_byte = stats.total_byte - cur_stats.last_stats.total_byte;
has_status = true
}
cur_stats.last_stats = stats;
cur_stats.has_stas = true;
(has_status, cur_stats.diff_stats.clone())
}
#[allow(dead_code)]
async fn calc_stats(&mut self) {
let buffer_stats = self.buffer.get_status().await;
let drift_in_millis = self.get_drift_in_millis();
let (has_stats, _) = self.update_stats(buffer_stats).await;
self.prometheus_handler
.drift
.observe(drift_in_millis as f64);
if has_stats {
}
}
}