use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{debug, info, warn};
use crate::h264::{self, NalUnit};
#[derive(Debug, Clone)]
pub struct ChangeEvent {
pub timestamp: Instant,
pub changed_fraction: f64,
}
pub async fn run(
mut nal_rx: broadcast::Receiver<NalUnit>,
change_tx: mpsc::Sender<ChangeEvent>,
check_interval: Duration,
sensitivity: f64,
mut shutdown: watch::Receiver<bool>,
) {
let mut last_check = Instant::now();
let mut ref_size: Option<usize> = None;
let mut max_p_size: usize = 0;
let mut nal_count: u64 = 0;
debug!("change detector initialized (frame-size mode, sensitivity={sensitivity})");
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("shutdown signal received, detector exiting");
break;
}
recv_result = nal_rx.recv() => match recv_result {
Ok(nal) => {
nal_count += 1;
if let Some(nal_type) = nal.nal_type()
&& nal_type == h264::NAL_TYPE_IDR
{
let size = nal.data.len();
if ref_size.is_none() {
debug!("IDR reference size: {size} bytes");
}
ref_size = Some(size);
continue;
}
if nal.is_keyframe {
continue;
}
max_p_size = max_p_size.max(nal.data.len());
if last_check.elapsed() < check_interval {
continue;
}
last_check = Instant::now();
if let Some(ref_sz) = ref_size {
let threshold = (sensitivity * ref_sz as f64) as usize;
debug!(
max_p_frame = max_p_size,
ref_size = ref_sz,
threshold,
"frame size check"
);
if max_p_size > threshold {
let fraction = (max_p_size as f64 / ref_sz as f64).min(1.0);
info!(
max_p_frame = max_p_size,
threshold,
changed = format!("{:.2}%", fraction * 100.0),
"change detected (frame size)"
);
let _ = change_tx.try_send(ChangeEvent {
timestamp: Instant::now(),
changed_fraction: fraction,
});
}
}
max_p_size = 0;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("detector lagged, missed {n} NAL units");
}
Err(broadcast::error::RecvError::Closed) => {
debug!("NAL broadcast closed, detector exiting");
break;
}
}
}
}
debug!("detector exiting (nal_count={nal_count})");
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{Duration, timeout};
#[tokio::test]
async fn test_detector_exits_on_shutdown_signal() {
let (_nal_tx, nal_rx) = broadcast::channel(8);
let (change_tx, mut change_rx) = mpsc::channel(8);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
run(
nal_rx,
change_tx,
Duration::from_millis(10),
0.02,
shutdown_rx,
)
.await;
});
shutdown_tx
.send(true)
.expect("failed to send shutdown signal");
timeout(Duration::from_secs(1), handle)
.await
.expect("detector task did not exit on shutdown")
.expect("detector task panicked");
assert!(matches!(
change_rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty) | Err(mpsc::error::TryRecvError::Disconnected)
));
}
}