use crate::metrics::{Counter, Gauge, Histogram, RateCalculator};
use crate::topics::*;
use crate::types::*;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use std::sync::Mutex;
use std::time::Instant;
pub struct StreamingCollector {
source: String,
frames_received: Counter,
frames_encoded: Counter,
frames_sent: Counter,
frames_dropped: Counter,
encoder_queue_depth: Gauge,
bytes_sent: Counter,
fps_calculator: RateCalculator,
bandwidth_calculator: RateCalculator,
encoding_histogram: Mutex<Option<Histogram>>,
slow_frames: Counter,
last_publish: Mutex<Instant>,
}
impl StreamingCollector {
pub fn new(source: impl Into<String>) -> Self {
Self {
source: source.into(),
frames_received: Counter::new(),
frames_encoded: Counter::new(),
frames_sent: Counter::new(),
frames_dropped: Counter::new(),
encoder_queue_depth: Gauge::new(),
bytes_sent: Counter::new(),
fps_calculator: RateCalculator::new(),
bandwidth_calculator: RateCalculator::new(),
encoding_histogram: Mutex::new(Histogram::for_duration().ok()),
slow_frames: Counter::new(),
last_publish: Mutex::new(Instant::now()),
}
}
#[inline]
pub fn record_frame_received(&self) {
self.frames_received.inc();
}
#[inline]
pub fn record_frame_encoded(&self, encode_time_us: u64) {
self.frames_encoded.inc();
if let Ok(mut hist) = self.encoding_histogram.try_lock() {
if let Some(h) = hist.as_mut() {
h.record(encode_time_us);
}
}
if encode_time_us > 50_000 {
self.slow_frames.inc();
}
}
#[inline]
pub fn record_frame_sent(&self, frame_size_bytes: u64) {
self.frames_sent.inc();
self.bytes_sent.add(frame_size_bytes);
}
#[inline]
pub fn record_frame_dropped(&self) {
self.frames_dropped.inc();
}
#[inline]
pub fn set_encoder_queue_depth(&self, depth: usize) {
self.encoder_queue_depth.set(depth as u64);
}
pub async fn publish_pipeline_metrics(&self, ctx: &Context) -> Result<()> {
let frames_received = self.frames_received.get();
let frames_encoded = self.frames_encoded.get();
let frames_sent = self.frames_sent.get();
let frames_dropped = self.frames_dropped.get();
let bytes_sent = self.bytes_sent.get();
let fps = self.fps_calculator.calculate(frames_received).unwrap_or(0.0);
let bytes_per_second = self.bandwidth_calculator.calculate(bytes_sent).unwrap_or(0.0) as u64;
let metrics = StreamingPipelineMetrics {
frames_received,
frames_encoded,
frames_sent,
frames_dropped,
bytes_per_second,
fps,
};
let msg = DiagnosticMessage::new(&self.source, metrics);
ctx.publish_to(
Topic::<DiagnosticMessage<StreamingPipelineMetrics>>::new(TOPIC_DIAGNOSTICS_STREAMING_PIPELINE),
&msg,
)
.await?;
Ok(())
}
pub async fn publish_encoding_metrics(&self, ctx: &Context) -> Result<()> {
let total_frames = self.frames_encoded.get();
let slow_frames = self.slow_frames.get();
let queue_depth = self.encoder_queue_depth.get() as usize;
let (avg, p50, p95, p99, max) = {
let hist_lock = self.encoding_histogram.lock().unwrap();
if let Some(hist) = hist_lock.as_ref() {
if hist.count() > 0 {
(
hist.mean() / 1000.0, hist.p50() as f64 / 1000.0,
hist.p95() as f64 / 1000.0,
hist.p99() as f64 / 1000.0,
hist.max() as f64 / 1000.0,
)
} else {
(0.0, 0.0, 0.0, 0.0, 0.0)
}
} else {
(0.0, 0.0, 0.0, 0.0, 0.0)
}
};
let metrics = EncodingMetrics {
total_frames,
slow_frames,
avg_encode_time_ms: avg,
p50_encode_time_ms: p50,
p95_encode_time_ms: p95,
p99_encode_time_ms: p99,
max_encode_time_ms: max,
queue_depth,
};
let msg = DiagnosticMessage::new(&self.source, metrics);
ctx.publish_to(
Topic::<DiagnosticMessage<EncodingMetrics>>::new(TOPIC_DIAGNOSTICS_STREAMING_ENCODING),
&msg,
)
.await?;
Ok(())
}
pub async fn publish_bandwidth_metrics(&self, ctx: &Context, target_bitrate_bps: u64) -> Result<()> {
let bytes_sent = self.bytes_sent.get();
let frames_sent = self.frames_sent.get();
let bytes_per_second = self.bandwidth_calculator.calculate(bytes_sent).unwrap_or(0.0) as u64;
let bitrate_bps = bytes_per_second * 8;
let avg_frame_size = if frames_sent > 0 { bytes_sent / frames_sent } else { 0 };
let utilization = if target_bitrate_bps > 0 {
(bitrate_bps as f64 / target_bitrate_bps as f64).min(1.0)
} else {
0.0
};
let metrics = BandwidthMetrics {
bitrate_bps,
target_bitrate_bps,
avg_frame_size_bytes: avg_frame_size,
total_bytes_sent: bytes_sent,
utilization,
};
let msg = DiagnosticMessage::new(&self.source, metrics);
ctx.publish_to(
Topic::<DiagnosticMessage<BandwidthMetrics>>::new(TOPIC_DIAGNOSTICS_STREAMING_BANDWIDTH),
&msg,
)
.await?;
Ok(())
}
pub async fn publish_all(&self, ctx: &Context, target_bitrate_bps: u64) -> Result<()> {
let should_publish = {
let mut last = self.last_publish.lock().unwrap();
let elapsed = last.elapsed();
if elapsed.as_secs() >= 1 {
*last = Instant::now();
true
} else {
false
}
};
if !should_publish {
return Ok(());
}
self.publish_pipeline_metrics(ctx).await?;
self.publish_encoding_metrics(ctx).await?;
self.publish_bandwidth_metrics(ctx, target_bitrate_bps).await?;
Ok(())
}
pub fn reset(&self) {
self.frames_received.reset();
self.frames_encoded.reset();
self.frames_sent.reset();
self.frames_dropped.reset();
self.bytes_sent.reset();
self.slow_frames.reset();
self.encoder_queue_depth.set(0);
self.fps_calculator.reset();
self.bandwidth_calculator.reset();
if let Ok(mut hist) = self.encoding_histogram.lock() {
if let Some(h) = hist.as_mut() {
h.reset();
}
}
}
}