use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FrameType {
I,
P,
B,
}
impl FrameType {
#[must_use]
pub fn label(self) -> &'static str {
match self {
Self::I => "I",
Self::P => "P",
Self::B => "B",
}
}
}
#[derive(Debug, Clone)]
pub struct FrameMetric {
pub frame_number: u64,
pub encode_time_us: u32,
pub psnr: f32,
pub frame_type: FrameType,
pub output_bits: u32,
}
impl FrameMetric {
#[must_use]
pub fn new(
frame_number: u64,
encode_time_us: u32,
psnr: f32,
frame_type: FrameType,
output_bits: u32,
) -> Self {
Self {
frame_number,
encode_time_us,
psnr,
frame_type,
output_bits,
}
}
#[must_use]
pub fn output_bytes(&self) -> u32 {
(self.output_bits + 7) / 8
}
#[must_use]
pub fn instant_bitrate_kbps(&self, fps: f32) -> f32 {
if fps <= 0.0 {
return 0.0;
}
self.output_bits as f32 * fps / 1000.0
}
}
#[derive(Debug)]
pub struct TranscodeMetrics {
pub frames_encoded: AtomicU64,
pub frames_dropped: AtomicU64,
pub bytes_output: AtomicU64,
pub encoding_errors: AtomicU64,
}
impl TranscodeMetrics {
#[must_use]
pub fn new() -> Self {
Self {
frames_encoded: AtomicU64::new(0),
frames_dropped: AtomicU64::new(0),
bytes_output: AtomicU64::new(0),
encoding_errors: AtomicU64::new(0),
}
}
pub fn inc_frames_encoded(&self, delta: u64) {
self.frames_encoded.fetch_add(delta, Ordering::Relaxed);
}
pub fn inc_frames_dropped(&self, delta: u64) {
self.frames_dropped.fetch_add(delta, Ordering::Relaxed);
}
pub fn add_bytes_output(&self, bytes: u64) {
self.bytes_output.fetch_add(bytes, Ordering::Relaxed);
}
pub fn inc_errors(&self, delta: u64) {
self.encoding_errors.fetch_add(delta, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
frames_encoded: self.frames_encoded.load(Ordering::Relaxed),
frames_dropped: self.frames_dropped.load(Ordering::Relaxed),
bytes_output: self.bytes_output.load(Ordering::Relaxed),
encoding_errors: self.encoding_errors.load(Ordering::Relaxed),
}
}
}
impl Default for TranscodeMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MetricsSnapshot {
pub frames_encoded: u64,
pub frames_dropped: u64,
pub bytes_output: u64,
pub encoding_errors: u64,
}
#[derive(Debug, Clone)]
pub struct EncodingRate {
pub fps: f32,
pub real_time_factor: f32,
pub instant_bitrate_kbps: u32,
}
impl EncodingRate {
#[must_use]
pub fn is_realtime(&self) -> bool {
self.real_time_factor >= 1.0
}
}
#[derive(Debug, Clone)]
pub struct QualityMetrics {
pub avg_psnr: f32,
pub avg_ssim: f32,
pub avg_vmaf: f32,
}
impl QualityMetrics {
#[must_use]
pub fn zero() -> Self {
Self {
avg_psnr: 0.0,
avg_ssim: 0.0,
avg_vmaf: 0.0,
}
}
}
#[derive(Debug)]
pub struct SessionMetrics {
pub session_id: u64,
pub start_time_ms: i64,
pub encoding_rate: EncodingRate,
pub quality: QualityMetrics,
pub per_frame_metrics: VecDeque<FrameMetric>,
}
const PER_FRAME_WINDOW: usize = 100;
impl SessionMetrics {
#[must_use]
pub fn new(session_id: u64, start_time_ms: i64) -> Self {
Self {
session_id,
start_time_ms,
encoding_rate: EncodingRate {
fps: 0.0,
real_time_factor: 0.0,
instant_bitrate_kbps: 0,
},
quality: QualityMetrics::zero(),
per_frame_metrics: VecDeque::with_capacity(PER_FRAME_WINDOW),
}
}
pub fn push_frame(&mut self, metric: FrameMetric) {
if self.per_frame_metrics.len() >= PER_FRAME_WINDOW {
self.per_frame_metrics.pop_front();
}
self.per_frame_metrics.push_back(metric);
}
#[must_use]
pub fn elapsed_secs(&self, current_time_ms: i64) -> f64 {
let delta = current_time_ms.saturating_sub(self.start_time_ms);
delta as f64 / 1000.0
}
}
#[derive(Debug)]
pub struct MetricAggregator {
session_metrics: SessionMetrics,
shared_counters: Arc<TranscodeMetrics>,
source_fps: f32,
}
impl MetricAggregator {
#[must_use]
pub fn new(session_id: u64, start_time_ms: i64, source_fps: f32) -> Self {
Self {
session_metrics: SessionMetrics::new(session_id, start_time_ms),
shared_counters: Arc::new(TranscodeMetrics::new()),
source_fps,
}
}
#[must_use]
pub fn counters(&self) -> Arc<TranscodeMetrics> {
Arc::clone(&self.shared_counters)
}
pub fn update_frame(&mut self, metric: FrameMetric) {
let byte_count = metric.output_bytes() as u64;
self.shared_counters.inc_frames_encoded(1);
self.shared_counters.add_bytes_output(byte_count);
self.session_metrics.push_frame(metric);
}
#[must_use]
pub fn compute_rate(&self, current_time_ms: i64) -> EncodingRate {
let elapsed = self.session_metrics.elapsed_secs(current_time_ms);
if elapsed <= 0.0 {
return EncodingRate {
fps: 0.0,
real_time_factor: 0.0,
instant_bitrate_kbps: 0,
};
}
let frames_encoded = self.shared_counters.frames_encoded.load(Ordering::Relaxed);
let bytes_output = self.shared_counters.bytes_output.load(Ordering::Relaxed);
let fps = frames_encoded as f32 / elapsed as f32;
let rtf = if self.source_fps > 0.0 {
fps / self.source_fps
} else {
0.0
};
let bitrate_kbps = if elapsed > 0.0 {
(bytes_output as f64 * 8.0 / elapsed / 1000.0) as u32
} else {
0
};
EncodingRate {
fps,
real_time_factor: rtf,
instant_bitrate_kbps: bitrate_kbps,
}
}
#[must_use]
pub fn rolling_avg_psnr(&self, window: usize) -> f32 {
let buf = &self.session_metrics.per_frame_metrics;
if buf.is_empty() {
return 0.0;
}
let effective_window = window.min(buf.len());
let start = buf.len() - effective_window;
let sum: f32 = buf.iter().skip(start).map(|m| m.psnr).sum();
sum / effective_window as f32
}
#[must_use]
pub fn export_csv(&self) -> String {
let mut out = String::from("frame_number,frame_type,encode_time_us,output_bits,psnr\n");
for m in &self.session_metrics.per_frame_metrics {
out.push_str(&format!(
"{},{},{},{},{:.4}\n",
m.frame_number,
m.frame_type.label(),
m.encode_time_us,
m.output_bits,
m.psnr,
));
}
out
}
#[must_use]
pub fn to_prometheus(&self, session_id: u64) -> String {
let snapshot = self.shared_counters.snapshot();
let rate = self.compute_rate(self.session_metrics.start_time_ms);
let mut buf = String::new();
buf.push_str(
"# HELP oximedia_transcode_frames_encoded Total frames successfully encoded\n",
);
buf.push_str("# TYPE oximedia_transcode_frames_encoded counter\n");
buf.push_str(&format!(
"oximedia_transcode_frames_encoded{{session=\"{session_id}\"}} {}\n",
snapshot.frames_encoded
));
buf.push_str("# HELP oximedia_transcode_frames_dropped Total frames dropped\n");
buf.push_str("# TYPE oximedia_transcode_frames_dropped counter\n");
buf.push_str(&format!(
"oximedia_transcode_frames_dropped{{session=\"{session_id}\"}} {}\n",
snapshot.frames_dropped
));
buf.push_str("# HELP oximedia_transcode_bytes_output Total compressed bytes written\n");
buf.push_str("# TYPE oximedia_transcode_bytes_output counter\n");
buf.push_str(&format!(
"oximedia_transcode_bytes_output{{session=\"{session_id}\"}} {}\n",
snapshot.bytes_output
));
buf.push_str("# HELP oximedia_transcode_encoding_errors Total encoding errors\n");
buf.push_str("# TYPE oximedia_transcode_encoding_errors counter\n");
buf.push_str(&format!(
"oximedia_transcode_encoding_errors{{session=\"{session_id}\"}} {}\n",
snapshot.encoding_errors
));
buf.push_str("# HELP oximedia_transcode_fps Current encoding frames per second\n");
buf.push_str("# TYPE oximedia_transcode_fps gauge\n");
buf.push_str(&format!(
"oximedia_transcode_fps{{session=\"{session_id}\"}} {:.3}\n",
rate.fps
));
buf.push_str(
"# HELP oximedia_transcode_real_time_factor Encoding speed relative to real-time\n",
);
buf.push_str("# TYPE oximedia_transcode_real_time_factor gauge\n");
buf.push_str(&format!(
"oximedia_transcode_real_time_factor{{session=\"{session_id}\"}} {:.4}\n",
rate.real_time_factor
));
buf.push_str(
"# HELP oximedia_transcode_bitrate_kbps Instantaneous output bitrate in kbps\n",
);
buf.push_str("# TYPE oximedia_transcode_bitrate_kbps gauge\n");
buf.push_str(&format!(
"oximedia_transcode_bitrate_kbps{{session=\"{session_id}\"}} {}\n",
rate.instant_bitrate_kbps
));
let avg_psnr = self.rolling_avg_psnr(PER_FRAME_WINDOW);
buf.push_str(
"# HELP oximedia_transcode_avg_psnr Rolling average PSNR over last 100 frames\n",
);
buf.push_str("# TYPE oximedia_transcode_avg_psnr gauge\n");
buf.push_str(&format!(
"oximedia_transcode_avg_psnr{{session=\"{session_id}\"}} {:.4}\n",
avg_psnr
));
buf
}
#[must_use]
pub fn session(&self) -> &SessionMetrics {
&self.session_metrics
}
}
#[derive(Debug, Clone)]
pub struct LegacyFrameMetric {
pub frame_index: u64,
pub encode_us: u64,
pub compressed_bytes: u64,
pub psnr_db: Option<f64>,
}
impl LegacyFrameMetric {
#[must_use]
pub fn new(frame_index: u64, encode_us: u64, compressed_bytes: u64) -> Self {
Self {
frame_index,
encode_us,
compressed_bytes,
psnr_db: None,
}
}
#[must_use]
pub fn with_psnr(mut self, psnr_db: f64) -> Self {
self.psnr_db = Some(psnr_db);
self
}
#[must_use]
pub fn instantaneous_bitrate_bps(&self, fps: f64) -> f64 {
self.compressed_bytes as f64 * 8.0 * fps
}
}
#[derive(Debug, Clone)]
pub struct MetricsSummary {
pub frame_count: u64,
pub mean_encode_us: f64,
pub peak_encode_us: u64,
pub total_bytes: u64,
pub mean_psnr_db: Option<f64>,
pub min_psnr_db: Option<f64>,
}
impl MetricsSummary {
#[must_use]
pub fn mean_bitrate_bps(&self, fps: f64) -> f64 {
if self.frame_count == 0 || fps <= 0.0 {
return 0.0;
}
let total_bits = self.total_bytes as f64 * 8.0;
let duration_secs = self.frame_count as f64 / fps;
total_bits / duration_secs
}
#[must_use]
pub fn encode_fps(&self) -> f64 {
if self.mean_encode_us <= 0.0 {
return 0.0;
}
1_000_000.0 / self.mean_encode_us
}
}
#[derive(Debug, Default)]
pub struct TranscodeMetricsCollector {
metrics: Vec<LegacyFrameMetric>,
}
impl TranscodeMetricsCollector {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_capacity(cap: usize) -> Self {
Self {
metrics: Vec::with_capacity(cap),
}
}
pub fn record(&mut self, metric: LegacyFrameMetric) {
self.metrics.push(metric);
}
#[must_use]
pub fn frame_count(&self) -> usize {
self.metrics.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.metrics.is_empty()
}
pub fn summarise(&self) -> MetricsSummary {
let count = self.metrics.len() as u64;
if count == 0 {
return MetricsSummary {
frame_count: 0,
mean_encode_us: 0.0,
peak_encode_us: 0,
total_bytes: 0,
mean_psnr_db: None,
min_psnr_db: None,
};
}
let total_encode_us: u64 = self.metrics.iter().map(|m| m.encode_us).sum();
let peak_encode_us = self.metrics.iter().map(|m| m.encode_us).max().unwrap_or(0);
let total_bytes: u64 = self.metrics.iter().map(|m| m.compressed_bytes).sum();
let psnr_values: Vec<f64> = self.metrics.iter().filter_map(|m| m.psnr_db).collect();
let mean_psnr_db = if psnr_values.is_empty() {
None
} else {
Some(psnr_values.iter().sum::<f64>() / psnr_values.len() as f64)
};
let min_psnr_db = psnr_values.iter().copied().reduce(f64::min);
MetricsSummary {
frame_count: count,
mean_encode_us: total_encode_us as f64 / count as f64,
peak_encode_us,
total_bytes,
mean_psnr_db,
min_psnr_db,
}
}
#[must_use]
pub fn worst_psnr_frame(&self) -> Option<&LegacyFrameMetric> {
self.metrics
.iter()
.filter(|m| m.psnr_db.is_some())
.min_by(|a, b| {
let pa = a.psnr_db.expect("filter ensures Some");
let pb = b.psnr_db.expect("filter ensures Some");
pa.partial_cmp(&pb).unwrap_or(std::cmp::Ordering::Equal)
})
}
#[must_use]
pub fn slowest_frame(&self) -> Option<&LegacyFrameMetric> {
self.metrics.iter().max_by_key(|m| m.encode_us)
}
pub fn clear(&mut self) {
self.metrics.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_frame(n: u64, enc_us: u32, psnr: f32, ftype: FrameType, bits: u32) -> FrameMetric {
FrameMetric::new(n, enc_us, psnr, ftype, bits)
}
#[test]
fn test_frame_type_labels() {
assert_eq!(FrameType::I.label(), "I");
assert_eq!(FrameType::P.label(), "P");
assert_eq!(FrameType::B.label(), "B");
}
#[test]
fn test_frame_metric_output_bytes() {
let m = make_frame(0, 1000, 42.0, FrameType::I, 800); assert_eq!(m.output_bytes(), 100);
}
#[test]
fn test_frame_metric_output_bytes_partial() {
let m = make_frame(0, 1000, 42.0, FrameType::P, 801);
assert_eq!(m.output_bytes(), 101);
}
#[test]
fn test_frame_metric_instant_bitrate() {
let m = make_frame(0, 5000, 40.0, FrameType::I, 8000);
let kbps = m.instant_bitrate_kbps(30.0);
assert!((kbps - 240.0).abs() < 0.01, "kbps={kbps}");
}
#[test]
fn test_frame_metric_zero_fps() {
let m = make_frame(0, 5000, 40.0, FrameType::I, 8000);
assert_eq!(m.instant_bitrate_kbps(0.0), 0.0);
}
#[test]
fn test_atomic_counters_start_at_zero() {
let m = TranscodeMetrics::new();
let s = m.snapshot();
assert_eq!(s.frames_encoded, 0);
assert_eq!(s.frames_dropped, 0);
assert_eq!(s.bytes_output, 0);
assert_eq!(s.encoding_errors, 0);
}
#[test]
fn test_atomic_counters_increment() {
let m = TranscodeMetrics::new();
m.inc_frames_encoded(5);
m.inc_frames_dropped(2);
m.add_bytes_output(1024);
m.inc_errors(1);
let s = m.snapshot();
assert_eq!(s.frames_encoded, 5);
assert_eq!(s.frames_dropped, 2);
assert_eq!(s.bytes_output, 1024);
assert_eq!(s.encoding_errors, 1);
}
#[test]
fn test_aggregator_update_frame_increments_counters() {
let mut agg = MetricAggregator::new(1, 0, 30.0);
agg.update_frame(make_frame(0, 5000, 42.0, FrameType::I, 8000));
agg.update_frame(make_frame(1, 4000, 41.0, FrameType::P, 4000));
let s = agg.counters().snapshot();
assert_eq!(s.frames_encoded, 2);
assert_eq!(s.bytes_output, 1000 + 500); }
#[test]
fn test_aggregator_rolling_avg_psnr_empty() {
let agg = MetricAggregator::new(1, 0, 30.0);
assert_eq!(agg.rolling_avg_psnr(10), 0.0);
}
#[test]
fn test_aggregator_rolling_avg_psnr_basic() {
let mut agg = MetricAggregator::new(1, 0, 30.0);
agg.update_frame(make_frame(0, 1000, 40.0, FrameType::I, 8000));
agg.update_frame(make_frame(1, 1000, 44.0, FrameType::P, 4000));
let avg = agg.rolling_avg_psnr(2);
assert!((avg - 42.0).abs() < 0.01, "avg={avg}");
}
#[test]
fn test_aggregator_rolling_avg_psnr_window_clamp() {
let mut agg = MetricAggregator::new(1, 0, 30.0);
agg.update_frame(make_frame(0, 1000, 50.0, FrameType::I, 8000));
let avg = agg.rolling_avg_psnr(100);
assert!((avg - 50.0).abs() < 0.01);
}
#[test]
fn test_aggregator_ring_buffer_eviction() {
let mut agg = MetricAggregator::new(1, 0, 30.0);
for i in 0..=100_u64 {
agg.update_frame(make_frame(i, 1000, i as f32, FrameType::P, 1000));
}
assert_eq!(agg.session().per_frame_metrics.len(), 100);
}
#[test]
fn test_aggregator_compute_rate_zero_elapsed() {
let agg = MetricAggregator::new(1, 1000, 30.0);
let rate = agg.compute_rate(1000); assert_eq!(rate.fps, 0.0);
}
#[test]
fn test_aggregator_compute_rate_basic() {
let mut agg = MetricAggregator::new(1, 0, 30.0);
for i in 0..30 {
agg.update_frame(make_frame(i, 1000, 42.0, FrameType::P, 8000));
}
let rate = agg.compute_rate(1000);
assert!((rate.fps - 30.0).abs() < 0.01, "fps={}", rate.fps);
assert!((rate.real_time_factor - 1.0).abs() < 0.01);
}
#[test]
fn test_export_csv_header() {
let agg = MetricAggregator::new(1, 0, 30.0);
let csv = agg.export_csv();
assert!(csv.starts_with("frame_number,frame_type,encode_time_us,output_bits,psnr"));
}
#[test]
fn test_export_csv_rows() {
let mut agg = MetricAggregator::new(1, 0, 30.0);
agg.update_frame(make_frame(0, 5000, 42.5, FrameType::I, 8000));
agg.update_frame(make_frame(1, 3000, 40.0, FrameType::B, 2000));
let csv = agg.export_csv();
let lines: Vec<&str> = csv.lines().collect();
assert_eq!(lines.len(), 3); assert!(lines[1].starts_with("0,I,"));
assert!(lines[2].starts_with("1,B,"));
}
#[test]
fn test_prometheus_export_contains_required_metrics() {
let mut agg = MetricAggregator::new(42, 0, 30.0);
agg.update_frame(make_frame(0, 5000, 42.0, FrameType::I, 8000));
let prom = agg.to_prometheus(42);
assert!(prom.contains("oximedia_transcode_frames_encoded"));
assert!(prom.contains("oximedia_transcode_frames_dropped"));
assert!(prom.contains("oximedia_transcode_bytes_output"));
assert!(prom.contains("oximedia_transcode_encoding_errors"));
assert!(prom.contains("oximedia_transcode_fps"));
assert!(prom.contains("oximedia_transcode_real_time_factor"));
assert!(prom.contains("oximedia_transcode_bitrate_kbps"));
assert!(prom.contains("oximedia_transcode_avg_psnr"));
}
#[test]
fn test_prometheus_export_session_label() {
let agg = MetricAggregator::new(99, 0, 30.0);
let prom = agg.to_prometheus(99);
assert!(prom.contains("session=\"99\""));
}
#[test]
fn test_encoding_rate_is_realtime() {
let fast = EncodingRate {
fps: 60.0,
real_time_factor: 2.0,
instant_bitrate_kbps: 5000,
};
let slow = EncodingRate {
fps: 10.0,
real_time_factor: 0.5,
instant_bitrate_kbps: 1000,
};
assert!(fast.is_realtime());
assert!(!slow.is_realtime());
}
#[test]
fn test_quality_metrics_zero() {
let q = QualityMetrics::zero();
assert_eq!(q.avg_psnr, 0.0);
assert_eq!(q.avg_ssim, 0.0);
assert_eq!(q.avg_vmaf, 0.0);
}
#[test]
fn test_legacy_collector_record_and_summarise() {
let mut c = TranscodeMetricsCollector::new();
c.record(LegacyFrameMetric::new(0, 1000, 400));
c.record(LegacyFrameMetric::new(1, 3000, 600));
let s = c.summarise();
assert_eq!(s.frame_count, 2);
assert_eq!(s.total_bytes, 1000);
assert!((s.mean_encode_us - 2000.0).abs() < 1e-6);
}
#[test]
fn test_legacy_worst_psnr() {
let mut c = TranscodeMetricsCollector::new();
c.record(LegacyFrameMetric::new(0, 100, 100).with_psnr(45.0));
c.record(LegacyFrameMetric::new(1, 100, 100).with_psnr(35.0));
let worst = c.worst_psnr_frame().expect("should exist");
assert_eq!(worst.frame_index, 1);
}
#[test]
fn test_legacy_clear() {
let mut c = TranscodeMetricsCollector::new();
c.record(LegacyFrameMetric::new(0, 100, 100));
c.clear();
assert!(c.is_empty());
}
}