use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use bairelay_rtsp::buffer::{LastFrameBuffer, VideoBurst};
use bairelay_rtsp::codec::nal::{
detect_codec, is_decodable_nal, split_annex_b, H264NalType, H265NalType,
};
use bairelay_rtsp::codec::VideoCodec;
use bairelay_rtsp::provider::Frame;
use bairelay_rtsp::sdp::{SdpParams, VideoParams};
use bairelay_rtsp::url::StreamKind as RtspStreamKind;
use bairelay_neolink_core::bc_protocol::{BcCamera, StreamKind as CoreStreamKind};
use bairelay_neolink_core::bcmedia::model::{BcMedia, BcMediaIframe, BcMediaPframe};
use crate::bcmedia_dump::{BcMediaDumpConfig, FrameDumper};
const BROADCAST_CAPACITY: usize = 64;
const START_VIDEO_TIMEOUT: Duration = Duration::from_secs(30);
const STOP_VIDEO_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const SDP_POLL_INTERVAL: Duration = Duration::from_millis(100);
const GAP_DETECTION_TICK: Duration = Duration::from_millis(200);
pub(crate) fn lock_recover<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
m.lock().unwrap_or_else(|e| e.into_inner())
}
pub(crate) fn rlock_recover<T>(m: &RwLock<T>) -> std::sync::RwLockReadGuard<'_, T> {
m.read().unwrap_or_else(|e| e.into_inner())
}
pub(crate) fn wlock_recover<T>(m: &RwLock<T>) -> std::sync::RwLockWriteGuard<'_, T> {
m.write().unwrap_or_else(|e| e.into_inner())
}
pub(crate) trait RwLockPoisonRecover<T> {
fn read_recover(&self) -> std::sync::RwLockReadGuard<'_, T>;
fn write_recover(&self) -> std::sync::RwLockWriteGuard<'_, T>;
}
impl<T> RwLockPoisonRecover<T> for RwLock<T> {
fn read_recover(&self) -> std::sync::RwLockReadGuard<'_, T> {
rlock_recover(self)
}
fn write_recover(&self) -> std::sync::RwLockWriteGuard<'_, T> {
wlock_recover(self)
}
}
pub(crate) trait MutexPoisonRecover<T> {
fn lock_recover(&self) -> std::sync::MutexGuard<'_, T>;
}
impl<T> MutexPoisonRecover<T> for Mutex<T> {
fn lock_recover(&self) -> std::sync::MutexGuard<'_, T> {
lock_recover(self)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GapState {
Live,
Bridging,
}
pub struct StreamSource {
tx: broadcast::Sender<Frame>,
last_frame: Arc<LastFrameBuffer>,
sdp_params: Arc<RwLock<SdpParams>>,
#[allow(dead_code)]
translator_state: Arc<Mutex<StreamTranslatorState>>,
cancel: CancellationToken,
task: Mutex<Option<JoinHandle<()>>>,
pub(crate) last_idle_since: Mutex<Option<Instant>>,
gap_threshold: Duration,
gap_state: Arc<Mutex<GapState>>,
#[allow(dead_code)]
last_live_frame_at: Arc<Mutex<tokio::time::Instant>>,
#[allow(dead_code)]
last_emitted_pts_90khz: Arc<Mutex<Option<u32>>>,
#[allow(dead_code)]
last_emit_wallclock_at: Arc<Mutex<Option<tokio::time::Instant>>>,
}
struct StreamSourceParts {
tx: broadcast::Sender<Frame>,
last_frame: Arc<LastFrameBuffer>,
sdp_params: Arc<RwLock<SdpParams>>,
translator_state: Arc<Mutex<StreamTranslatorState>>,
cancel: CancellationToken,
gap_threshold: Duration,
gap_state: Arc<Mutex<GapState>>,
last_live_frame_at: Arc<Mutex<tokio::time::Instant>>,
last_emitted_pts_90khz: Arc<Mutex<Option<u32>>>,
last_emit_wallclock_at: Arc<Mutex<Option<tokio::time::Instant>>>,
}
impl StreamSourceParts {
fn new(
camera_name: &str,
kind: RtspStreamKind,
last_frame: Arc<LastFrameBuffer>,
gap_threshold: Duration,
) -> Self {
let (tx, _) = broadcast::channel::<Frame>(BROADCAST_CAPACITY);
let sdp_params: Arc<RwLock<SdpParams>> = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: format!("{camera_name}/{kind}"),
video: None,
audio: None,
}));
Self {
tx,
last_frame,
sdp_params,
translator_state: Arc::new(Mutex::new(StreamTranslatorState::default())),
cancel: CancellationToken::new(),
gap_threshold,
gap_state: Arc::new(Mutex::new(GapState::Live)),
last_live_frame_at: Arc::new(Mutex::new(tokio::time::Instant::now())),
last_emitted_pts_90khz: Arc::new(Mutex::new(None)),
last_emit_wallclock_at: Arc::new(Mutex::new(None)),
}
}
fn into_source(self, task: JoinHandle<()>) -> Arc<StreamSource> {
Arc::new(StreamSource {
tx: self.tx,
last_frame: self.last_frame,
sdp_params: self.sdp_params,
translator_state: self.translator_state,
cancel: self.cancel,
task: Mutex::new(Some(task)),
last_idle_since: Mutex::new(None),
gap_threshold: self.gap_threshold,
gap_state: self.gap_state,
last_live_frame_at: self.last_live_frame_at,
last_emitted_pts_90khz: self.last_emitted_pts_90khz,
last_emit_wallclock_at: self.last_emit_wallclock_at,
})
}
}
struct ReaderTaskArgs {
camera: Arc<BcCamera>,
camera_name: String,
rtsp_kind: RtspStreamKind,
core_kind: CoreStreamKind,
tx: broadcast::Sender<Frame>,
audio_pace_tx: Option<mpsc::Sender<PacedFrame>>,
video_pace_tx: Option<mpsc::Sender<PacedFrame>>,
last_frame: Arc<LastFrameBuffer>,
sdp_params: Arc<RwLock<SdpParams>>,
cancel: CancellationToken,
bcmedia_dump: Option<Arc<BcMediaDumpConfig>>,
audio_presence: Arc<RwLock<crate::audio_presence::AudioPresence>>,
translator_state: Arc<Mutex<StreamTranslatorState>>,
gap_threshold: Duration,
gap_state: Arc<Mutex<GapState>>,
last_live_frame_at: Arc<Mutex<tokio::time::Instant>>,
last_emitted_pts_90khz: Arc<Mutex<Option<u32>>>,
last_emit_wallclock_at: Arc<Mutex<Option<tokio::time::Instant>>>,
}
#[derive(Debug, Default)]
pub struct StreamTranslatorState {
pub detected_codec: Option<VideoCodec>,
pub aac_pts_next: u32,
pub g711_pts_next: u32,
pub aac_aot: Option<u8>,
pub last_video_pts_90khz: Option<u32>,
}
impl StreamSource {
pub fn start(
camera: Arc<BcCamera>,
camera_name: String,
kind: RtspStreamKind,
last_frame: Arc<LastFrameBuffer>,
bcmedia_dump: Option<Arc<BcMediaDumpConfig>>,
audio_presence: Arc<RwLock<crate::audio_presence::AudioPresence>>,
gap_threshold: Duration,
) -> Arc<Self> {
let parts =
StreamSourceParts::new(&camera_name, kind, Arc::clone(&last_frame), gap_threshold);
let core_kind = map_stream_kind(kind);
let (audio_pace_tx, audio_pace_rx) = mpsc::channel::<PacedFrame>(AUDIO_PACER_QUEUE);
let audio_broadcast = parts.tx.clone();
let audio_cancel = parts.cancel.clone();
tokio::spawn(audio_pacer_task(
audio_pace_rx,
audio_broadcast,
audio_cancel,
));
let (video_pace_tx, video_pace_rx) = mpsc::channel::<PacedFrame>(VIDEO_PACER_QUEUE);
let video_broadcast = parts.tx.clone();
let video_cancel = parts.cancel.clone();
tokio::spawn(video_pacer_task(
video_pace_rx,
video_broadcast,
video_cancel,
));
let reader_args = ReaderTaskArgs {
camera: Arc::clone(&camera),
camera_name: camera_name.clone(),
rtsp_kind: kind,
core_kind,
tx: parts.tx.clone(),
audio_pace_tx: Some(audio_pace_tx),
video_pace_tx: Some(video_pace_tx),
last_frame: Arc::clone(&parts.last_frame),
sdp_params: Arc::clone(&parts.sdp_params),
cancel: parts.cancel.clone(),
bcmedia_dump: bcmedia_dump.clone(),
audio_presence: Arc::clone(&audio_presence),
translator_state: Arc::clone(&parts.translator_state),
gap_threshold,
gap_state: Arc::clone(&parts.gap_state),
last_live_frame_at: Arc::clone(&parts.last_live_frame_at),
last_emitted_pts_90khz: Arc::clone(&parts.last_emitted_pts_90khz),
last_emit_wallclock_at: Arc::clone(&parts.last_emit_wallclock_at),
};
let task = tokio::spawn(async move {
reader_task(reader_args).await;
});
parts.into_source(task)
}
#[allow(dead_code, private_bounds)]
pub(crate) fn start_with_packet_source<S>(
camera_name: String,
kind: RtspStreamKind,
last_frame: Arc<LastFrameBuffer>,
gap_threshold: Duration,
source: S,
) -> Arc<Self>
where
S: PacketSource + 'static,
{
let parts =
StreamSourceParts::new(&camera_name, kind, Arc::clone(&last_frame), gap_threshold);
let core_kind = map_stream_kind(kind);
let args = TranslatorLoopArgs {
camera_name: camera_name.clone(),
rtsp_kind: kind,
core_kind,
tx: parts.tx.clone(),
audio_pace_tx: None,
video_pace_tx: None,
last_frame: Arc::clone(&parts.last_frame),
sdp_params: Arc::clone(&parts.sdp_params),
cancel: parts.cancel.clone(),
bcmedia_dump: None,
audio_presence: Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown)),
translator_state: Arc::clone(&parts.translator_state),
gap_threshold,
gap_state: Arc::clone(&parts.gap_state),
last_live_frame_at: Arc::clone(&parts.last_live_frame_at),
last_emitted_pts_90khz: Arc::clone(&parts.last_emitted_pts_90khz),
last_emit_wallclock_at: Arc::clone(&parts.last_emit_wallclock_at),
};
let task = tokio::spawn(async move {
let mut s = source;
drive_translator_loop(args, &mut s).await;
});
parts.into_source(task)
}
pub fn subscribe(&self) -> broadcast::Receiver<Frame> {
self.tx.subscribe()
}
pub fn sdp_params(&self) -> SdpParams {
rlock_recover(&self.sdp_params).clone()
}
pub(crate) fn sdp_params_handle(&self) -> Arc<RwLock<SdpParams>> {
Arc::clone(&self.sdp_params)
}
pub async fn await_sdp_ready(
&self,
timeout: std::time::Duration,
) -> Result<SdpParams, &'static str> {
let start = std::time::Instant::now();
loop {
{
let params = rlock_recover(&self.sdp_params).clone();
if params.video.is_some() {
return Ok(params);
}
}
if start.elapsed() > timeout {
return Err("SDP video parameters not ready");
}
tokio::time::sleep(SDP_POLL_INTERVAL).await;
}
}
pub async fn await_sdp_both_ready(
&self,
timeout: std::time::Duration,
) -> Result<SdpParams, &'static str> {
await_sdp_both(&self.sdp_params, timeout).await
}
pub async fn await_audio(&self, timeout: std::time::Duration) -> Result<(), &'static str> {
await_audio_or_deadline(&self.sdp_params, timeout).await
}
pub fn last_frame(&self) -> Arc<LastFrameBuffer> {
Arc::clone(&self.last_frame)
}
pub fn subscribers(&self) -> usize {
self.tx.receiver_count()
}
pub fn gap_threshold(&self) -> Duration {
self.gap_threshold
}
pub fn gap_state(&self) -> GapState {
*lock_recover(&self.gap_state)
}
pub fn stop(&self) {
self.cancel.cancel();
}
pub async fn stop_and_wait(
&self,
timeout: Duration,
) -> Result<(), tokio::time::error::Elapsed> {
self.cancel.cancel();
let handle = lock_recover(&self.task).take();
if let Some(h) = handle {
match tokio::time::timeout(timeout, h).await {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
} else {
Ok(())
}
}
}
impl Drop for StreamSource {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[allow(dead_code)]
impl StreamSource {
pub(crate) fn start_inert_for_test() -> Arc<Self> {
Self::start_inert_for_test_with_gap(Duration::from_secs(1))
}
pub(crate) fn start_inert_for_test_with_gap(gap_threshold: Duration) -> Arc<Self> {
Self::start_inert_for_test_with_gap_and_injector(gap_threshold).0
}
pub(crate) fn start_inert_for_test_with_gap_and_injector(
gap_threshold: Duration,
) -> (Arc<Self>, FakeFrameInjector) {
Self::start_inert_for_test_with_gap_and_last_frame_and_injector(
gap_threshold,
Arc::new(LastFrameBuffer::new()),
)
}
pub(crate) fn start_inert_for_test_with_gap_and_last_frame(
gap_threshold: Duration,
) -> (Arc<Self>, Arc<LastFrameBuffer>) {
let last_frame = Arc::new(LastFrameBuffer::new());
let (src, _inject) = Self::start_inert_for_test_with_gap_and_last_frame_and_injector(
gap_threshold,
Arc::clone(&last_frame),
);
(src, last_frame)
}
pub fn start_inert_for_test_with_gap_and_last_frame_and_injector(
gap_threshold: Duration,
last_frame: Arc<LastFrameBuffer>,
) -> (Arc<Self>, FakeFrameInjector) {
let parts = StreamSourceParts::new("test", RtspStreamKind::Main, last_frame, gap_threshold);
let gap_state_task = Arc::clone(&parts.gap_state);
let last_live_frame_at_task = Arc::clone(&parts.last_live_frame_at);
let last_emitted_pts_task = Arc::clone(&parts.last_emitted_pts_90khz);
let last_emit_wallclock_task = Arc::clone(&parts.last_emit_wallclock_at);
let last_frame_task = Arc::clone(&parts.last_frame);
let tx_task = parts.tx.clone();
let cancel_task = parts.cancel.clone();
let injector = FakeFrameInjector {
gap_state: Arc::clone(&parts.gap_state),
last_live_frame_at: Arc::clone(&parts.last_live_frame_at),
tx: parts.tx.clone(),
};
let task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(GAP_DETECTION_TICK);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel_task.cancelled() => break,
_ = ticker.tick() => {
check_gap_and_update_state(
&last_live_frame_at_task,
&gap_state_task,
gap_threshold,
);
emit_replay_frame_if_bridging(
&tx_task,
&last_frame_task,
&gap_state_task,
&last_emitted_pts_task,
&last_emit_wallclock_task,
);
}
}
}
});
(parts.into_source(task), injector)
}
pub(crate) fn subscribe_for_test(&self) -> broadcast::Receiver<Frame> {
self.tx.subscribe()
}
pub(crate) fn set_gap_state_for_test(&self, state: GapState) {
*lock_recover(&self.gap_state) = state;
}
pub fn set_sdp_params_for_test(&self, params: SdpParams) {
*wlock_recover(&self.sdp_params) = params;
}
}
pub struct FakeFrameInjector {
gap_state: Arc<Mutex<GapState>>,
last_live_frame_at: Arc<Mutex<tokio::time::Instant>>,
tx: broadcast::Sender<Frame>,
}
impl FakeFrameInjector {
pub fn inject_fake_video_frame(&self) {
*lock_recover(&self.last_live_frame_at) = tokio::time::Instant::now();
*lock_recover(&self.gap_state) = GapState::Live;
}
pub fn broadcast_live_video_frame(&self, frame: Frame) {
let _ = self.tx.send(frame);
self.inject_fake_video_frame();
}
}
pub async fn await_sdp_both(
sdp: &Arc<RwLock<SdpParams>>,
timeout: std::time::Duration,
) -> Result<SdpParams, &'static str> {
let start = std::time::Instant::now();
loop {
{
let params = rlock_recover(sdp).clone();
if params.video.is_some() && params.audio.is_some() {
return Ok(params);
}
}
if start.elapsed() > timeout {
return Err("SDP video+audio parameters not ready");
}
tokio::time::sleep(SDP_POLL_INTERVAL).await;
}
}
pub async fn await_audio_or_deadline(
sdp: &Arc<RwLock<SdpParams>>,
timeout: std::time::Duration,
) -> Result<(), &'static str> {
let start = std::time::Instant::now();
loop {
if rlock_recover(sdp).audio.is_some() {
return Ok(());
}
if start.elapsed() > timeout {
return Err("audio SDP not observed before deadline");
}
tokio::time::sleep(SDP_POLL_INTERVAL).await;
}
}
fn map_stream_kind(kind: RtspStreamKind) -> CoreStreamKind {
match kind {
RtspStreamKind::Main => CoreStreamKind::Main,
RtspStreamKind::Sub => CoreStreamKind::Sub,
RtspStreamKind::Extern => CoreStreamKind::Extern,
}
}
#[derive(Debug)]
pub struct PacedFrame {
pub frame: Frame,
pub duration: Duration,
}
const AUDIO_PACER_MAX_LEAD: Duration = Duration::from_millis(2000);
const VIDEO_PACER_MAX_LEAD: Duration = Duration::from_millis(3000);
const AUDIO_PACER_INITIAL_LATENCY: Duration = Duration::from_millis(500);
const VIDEO_PACER_INITIAL_LATENCY: Duration = Duration::from_millis(1500);
async fn audio_pacer_task(
rx: mpsc::Receiver<PacedFrame>,
broadcast: broadcast::Sender<Frame>,
cancel: CancellationToken,
) {
media_pacer_task(
rx,
broadcast,
cancel,
AUDIO_PACER_MAX_LEAD,
AUDIO_PACER_INITIAL_LATENCY,
true,
)
.await
}
async fn video_pacer_task(
rx: mpsc::Receiver<PacedFrame>,
broadcast: broadcast::Sender<Frame>,
cancel: CancellationToken,
) {
media_pacer_task(
rx,
broadcast,
cancel,
VIDEO_PACER_MAX_LEAD,
VIDEO_PACER_INITIAL_LATENCY,
false,
)
.await
}
async fn media_pacer_task(
mut rx: mpsc::Receiver<PacedFrame>,
broadcast: broadcast::Sender<Frame>,
cancel: CancellationToken,
max_lead: Duration,
initial_latency: Duration,
snap_on_past: bool,
) {
let mut next_emit_at: Option<tokio::time::Instant> = None;
loop {
let item = tokio::select! {
biased;
_ = cancel.cancelled() => return,
it = rx.recv() => match it {
Some(it) => it,
None => return, },
};
let now = tokio::time::Instant::now();
let target = match next_emit_at {
Some(t) if t > now + max_lead => now,
Some(t) if snap_on_past && t < now => now,
Some(t) => t,
None => now + initial_latency,
};
if target > now {
tokio::select! {
biased;
_ = cancel.cancelled() => return,
_ = tokio::time::sleep_until(target) => {}
}
}
let _ = broadcast.send(item.frame);
next_emit_at = Some(target.checked_add(item.duration).unwrap_or(target));
}
}
const AUDIO_PACER_QUEUE: usize = 200;
const VIDEO_PACER_QUEUE: usize = 300;
fn micros_to_90khz(micros: u32) -> u32 {
((micros as u64).wrapping_mul(9) / 100) as u32
}
fn check_gap_and_update_state(
last_live_frame_at: &Mutex<tokio::time::Instant>,
gap_state: &Mutex<GapState>,
gap_threshold: Duration,
) {
let last = *lock_recover(last_live_frame_at);
if last.elapsed() > gap_threshold {
*lock_recover(gap_state) = GapState::Bridging;
}
}
fn emit_replay_frame_if_bridging(
tx: &broadcast::Sender<Frame>,
last_frame: &Arc<LastFrameBuffer>,
gap_state: &Mutex<GapState>,
last_emitted_pts_90khz: &Mutex<Option<u32>>,
last_emit_wallclock_at: &Mutex<Option<tokio::time::Instant>>,
) {
if *lock_recover(gap_state) != GapState::Bridging {
return;
}
let Some(burst) = last_frame.video_snapshot() else {
return;
};
let nals: Vec<Bytes> = burst
.iframe_nals
.iter()
.filter(|n| !is_parameter_set_nal(n, burst.codec) && is_decodable_nal(n, burst.codec))
.map(|n| Bytes::copy_from_slice(n))
.collect();
if nals.is_empty() {
return;
}
let now = tokio::time::Instant::now();
let delta_90khz = {
let last_at_opt = *lock_recover(last_emit_wallclock_at);
match last_at_opt {
Some(last_at) => {
let delta = now.saturating_duration_since(last_at);
(delta.as_nanos().saturating_mul(90_000) / 1_000_000_000) as u32
}
None => 0,
}
};
let synth_pts = {
let mut pts_guard = lock_recover(last_emitted_pts_90khz);
let base = pts_guard.unwrap_or(burst.captured_pts_90khz);
let synth = base.wrapping_add(delta_90khz);
*pts_guard = Some(synth);
synth
};
*lock_recover(last_emit_wallclock_at) = Some(now);
let frame = Frame::Video {
codec: burst.codec,
nals,
pts_90khz: synth_pts,
keyframe: true,
access_unit_end: true,
};
let _ = tx.send(frame);
}
async fn reader_task(args: ReaderTaskArgs) {
let ReaderTaskArgs {
camera,
camera_name,
rtsp_kind,
core_kind,
tx,
audio_pace_tx,
video_pace_tx,
last_frame,
sdp_params,
cancel,
bcmedia_dump,
audio_presence,
translator_state,
gap_threshold,
gap_state,
last_live_frame_at,
last_emitted_pts_90khz,
last_emit_wallclock_at,
} = args;
let start_video_future =
tokio::time::timeout(START_VIDEO_TIMEOUT, camera.start_video(core_kind, 0, false));
let stream_data = tokio::select! {
_ = cancel.cancelled() => {
tracing::debug!(camera = %camera_name, stream = ?core_kind,
"stream reader cancelled before start_video completed");
return;
}
result = start_video_future => match result {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
tracing::error!(camera = %camera_name, stream = ?core_kind, error = %e,
"start_video failed");
return;
}
Err(_) => {
tracing::error!(camera = %camera_name, stream = ?core_kind,
"start_video timed out; camera-side preview may still be active (battery drain risk)");
return;
}
},
};
let translator = TranslatorLoopArgs {
camera_name: camera_name.clone(),
rtsp_kind,
core_kind,
tx,
audio_pace_tx,
video_pace_tx,
last_frame,
sdp_params,
cancel: cancel.clone(),
bcmedia_dump,
audio_presence,
translator_state,
gap_threshold,
gap_state,
last_live_frame_at,
last_emitted_pts_90khz,
last_emit_wallclock_at,
};
let translator_camera = camera_name.clone();
let translator_kind = core_kind;
let translator_handle = tokio::spawn(async move {
let mut source = StreamDataSource(stream_data);
drive_translator_loop(translator, &mut source).await;
});
match translator_handle.await {
Ok(()) => {}
Err(e) if e.is_panic() => {
tracing::error!(camera = %translator_camera, stream = ?translator_kind,
"translator task panicked; calling stop_video to release camera battery path");
}
Err(e) => {
tracing::warn!(camera = %translator_camera, stream = ?translator_kind, error = %e,
"translator task ended unexpectedly");
}
}
match tokio::time::timeout(STOP_VIDEO_TIMEOUT, camera.stop_video(core_kind)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::debug!(camera = %camera_name, error = %e,
"stop_video returned error (camera may already be off)");
}
Err(_) => {
tracing::warn!(camera = %camera_name, "stop_video timed out");
}
}
}
struct TranslatorLoopArgs {
camera_name: String,
rtsp_kind: RtspStreamKind,
core_kind: CoreStreamKind,
tx: broadcast::Sender<Frame>,
audio_pace_tx: Option<mpsc::Sender<PacedFrame>>,
video_pace_tx: Option<mpsc::Sender<PacedFrame>>,
last_frame: Arc<LastFrameBuffer>,
sdp_params: Arc<RwLock<SdpParams>>,
cancel: CancellationToken,
bcmedia_dump: Option<Arc<BcMediaDumpConfig>>,
audio_presence: Arc<RwLock<crate::audio_presence::AudioPresence>>,
translator_state: Arc<Mutex<StreamTranslatorState>>,
gap_threshold: Duration,
gap_state: Arc<Mutex<GapState>>,
last_live_frame_at: Arc<Mutex<tokio::time::Instant>>,
last_emitted_pts_90khz: Arc<Mutex<Option<u32>>>,
last_emit_wallclock_at: Arc<Mutex<Option<tokio::time::Instant>>>,
}
#[async_trait::async_trait]
trait PacketSource: Send {
async fn get_data(
&mut self,
) -> Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
>;
}
struct StreamDataSource(bairelay_neolink_core::bc_protocol::StreamData);
#[async_trait::async_trait]
impl PacketSource for StreamDataSource {
async fn get_data(
&mut self,
) -> Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> {
self.0.get_data().await
}
}
async fn drive_translator_loop<S: PacketSource>(args: TranslatorLoopArgs, source: &mut S) {
let TranslatorLoopArgs {
camera_name,
rtsp_kind,
core_kind,
tx,
audio_pace_tx,
video_pace_tx,
last_frame,
sdp_params,
cancel,
bcmedia_dump,
audio_presence,
translator_state,
gap_threshold,
gap_state,
last_live_frame_at,
last_emitted_pts_90khz,
last_emit_wallclock_at,
} = args;
let mut dumper: Option<FrameDumper> = None;
let mut dumper_init_failed = false;
let mut gap_ticker = tokio::time::interval(GAP_DETECTION_TICK);
gap_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::debug!(camera = %camera_name, stream = ?core_kind,
"stream reader cancelled");
break;
}
_ = gap_ticker.tick() => {
check_gap_and_update_state(&last_live_frame_at, &gap_state, gap_threshold);
emit_replay_frame_if_bridging(
&tx,
&last_frame,
&gap_state,
&last_emitted_pts_90khz,
&last_emit_wallclock_at,
);
}
result = source.get_data() => {
if !process_stream_result(
result,
&camera_name,
rtsp_kind,
&tx,
audio_pace_tx.as_ref(),
video_pace_tx.as_ref(),
&last_frame,
&sdp_params,
&audio_presence,
&translator_state,
&gap_state,
&last_live_frame_at,
&last_emitted_pts_90khz,
&last_emit_wallclock_at,
bcmedia_dump.as_ref(),
&mut dumper,
&mut dumper_init_failed,
&cancel,
) {
break;
}
}
}
}
if let Some(ref mut d) = dumper {
d.flush();
}
}
#[allow(clippy::too_many_arguments)]
fn process_stream_result(
result: Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
>,
camera_name: &str,
rtsp_kind: RtspStreamKind,
tx: &broadcast::Sender<Frame>,
audio_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
video_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
last_frame: &Arc<LastFrameBuffer>,
sdp_params: &Arc<RwLock<SdpParams>>,
audio_presence: &Arc<RwLock<crate::audio_presence::AudioPresence>>,
translator_state: &Arc<Mutex<StreamTranslatorState>>,
gap_state: &Arc<Mutex<GapState>>,
last_live_frame_at: &Arc<Mutex<tokio::time::Instant>>,
last_emitted_pts_90khz: &Arc<Mutex<Option<u32>>>,
last_emit_wallclock_at: &Arc<Mutex<Option<tokio::time::Instant>>>,
bcmedia_dump: Option<&Arc<BcMediaDumpConfig>>,
dumper: &mut Option<FrameDumper>,
dumper_init_failed: &mut bool,
cancel: &CancellationToken,
) -> bool {
match result {
Ok(Ok(packet)) => {
if let Some(dump_cfg) = bcmedia_dump {
maybe_capture_packet(
dump_cfg,
camera_name,
rtsp_kind,
&packet,
dumper,
dumper_init_failed,
);
}
if matches!(packet, BcMedia::Iframe(_) | BcMedia::Pframe(_)) {
*lock_recover(last_live_frame_at) = tokio::time::Instant::now();
}
let broadcast_pts = {
let mut s = lock_recover(translator_state);
apply_bcmedia_packet(
&packet,
tx,
audio_pace_tx,
video_pace_tx,
last_frame,
sdp_params,
audio_presence,
&mut s,
gap_state,
)
};
if let Some(pts_90khz) = broadcast_pts {
let now = tokio::time::Instant::now();
let mut g = lock_recover(gap_state);
*g = GapState::Live;
drop(g);
*lock_recover(last_emitted_pts_90khz) = Some(pts_90khz);
*lock_recover(last_emit_wallclock_at) = Some(now);
}
true
}
Ok(Err(e)) => {
tracing::warn!(camera = %camera_name, error = %e,
"decode error, skipping packet");
true
}
Err(e) => {
if cancel.is_cancelled() {
tracing::debug!(camera = %camera_name, error = %e,
"stream finished (cancel)");
} else {
tracing::warn!(camera = %camera_name, error = %e,
"stream finished unexpectedly");
}
false
}
}
}
fn maybe_capture_packet(
config: &Arc<BcMediaDumpConfig>,
camera_name: &str,
kind: RtspStreamKind,
packet: &BcMedia,
dumper: &mut Option<FrameDumper>,
init_failed: &mut bool,
) {
if dumper.is_none() && !*init_failed {
match FrameDumper::create(config, camera_name, kind) {
Ok(d) => *dumper = Some(d),
Err(e) => {
tracing::warn!(
camera = %camera_name,
stream = ?kind,
error = %e,
root = %config.root.display(),
"failed to initialise BcMedia capture; disabling for this source"
);
*init_failed = true;
}
}
}
if let Some(ref mut d) = dumper {
d.write_frame(packet);
}
}
#[allow(clippy::too_many_arguments)]
pub fn apply_bcmedia_packet(
packet: &BcMedia,
tx: &broadcast::Sender<Frame>,
audio_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
video_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
last_frame: &Arc<LastFrameBuffer>,
sdp_params: &Arc<RwLock<SdpParams>>,
audio_presence: &Arc<RwLock<crate::audio_presence::AudioPresence>>,
state: &mut StreamTranslatorState,
gap_state: &Mutex<GapState>,
) -> Option<u32> {
match packet {
BcMedia::Iframe(iframe) => {
handle_iframe(iframe, tx, video_pace_tx, last_frame, sdp_params, state)
}
BcMedia::Pframe(pframe) => handle_pframe(pframe, tx, video_pace_tx, last_frame, state),
BcMedia::Aac(aac) => {
handle_aac(
aac,
tx,
audio_pace_tx,
sdp_params,
audio_presence,
state,
gap_state,
);
None
}
BcMedia::Adpcm(adpcm) => {
handle_adpcm(
adpcm,
tx,
audio_pace_tx,
sdp_params,
audio_presence,
state,
gap_state,
);
None
}
BcMedia::InfoV1(_) | BcMedia::InfoV2(_) => None,
}
}
fn handle_iframe(
iframe: &BcMediaIframe,
tx: &broadcast::Sender<Frame>,
video_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
last_frame: &Arc<LastFrameBuffer>,
sdp_params: &Arc<RwLock<SdpParams>>,
state: &mut StreamTranslatorState,
) -> Option<u32> {
let nals = split_annex_b(&iframe.data);
if nals.is_empty() {
return None;
}
if state.detected_codec.is_none() {
for nal in &nals {
if let Some(c) = detect_codec(nal) {
state.detected_codec = Some(c);
break;
}
}
}
let codec = match state.detected_codec {
Some(c) => c,
None => {
tracing::warn!("I-frame with no detectable codec; dropping");
return None;
}
};
let nals: Vec<&[u8]> = nals
.into_iter()
.filter(|n| is_decodable_nal(n, codec))
.collect();
if nals.is_empty() {
return None;
}
let (mut non_slice, mut slice): (Vec<&[u8]>, Vec<&[u8]>) =
nals.iter().partition(|n| !is_slice_nal(n, codec));
non_slice.append(&mut slice);
let reordered: Vec<&[u8]> = non_slice;
let (parameter_sets, iframe_nals, sps_bytes, pps_bytes, vps_bytes) =
extract_iframe_parts(codec, &reordered);
if let (Some(sps), Some(pps)) = (sps_bytes.as_ref(), pps_bytes.as_ref()) {
let profile_level_id = if sps.len() >= 4 {
[sps[1], sps[2], sps[3]]
} else {
[0u8; 3]
};
let video_params = VideoParams {
codec,
payload_type: 96,
sps: sps.clone(),
pps: pps.clone(),
vps: vps_bytes.clone(),
profile_level_id,
};
if let Ok(mut guard) = sdp_params.write() {
guard.video = Some(video_params);
}
}
let burst_pts = micros_to_90khz(iframe.microseconds);
let burst = VideoBurst {
codec,
parameter_sets: parameter_sets.clone(),
iframe_nals: iframe_nals.clone(),
pframe_nals: Vec::new(),
captured_at: Instant::now(),
captured_pts_90khz: burst_pts,
};
last_frame.replace_video(burst);
let nals_bytes: Vec<Bytes> = reordered
.iter()
.filter(|n| !is_parameter_set_nal(n, codec))
.map(|n| Bytes::copy_from_slice(n))
.collect();
if nals_bytes.is_empty() {
tracing::debug!(
"I-frame access unit had no slice NALs after parameter-set strip; dropping"
);
return None;
}
let pts_90khz = micros_to_90khz(iframe.microseconds);
let frame = Frame::Video {
codec,
nals: nals_bytes,
pts_90khz,
keyframe: true,
access_unit_end: true,
};
let duration = video_frame_duration(state, pts_90khz);
dispatch_paced_video(video_pace_tx, tx, frame, duration);
Some(pts_90khz)
}
fn is_parameter_set_nal(nal: &[u8], codec: VideoCodec) -> bool {
if nal.is_empty() {
return false;
}
match codec {
VideoCodec::H264 => {
let ty = H264NalType::from_header_byte(nal[0]);
matches!(ty, H264NalType::SPS | H264NalType::PPS)
}
VideoCodec::H265 => {
let ty = H265NalType::from_header_byte(nal[0]);
matches!(ty, H265NalType::VPS | H265NalType::SPS | H265NalType::PPS)
}
}
}
fn is_slice_nal(nal: &[u8], codec: VideoCodec) -> bool {
if nal.is_empty() {
return false;
}
match codec {
VideoCodec::H264 => {
let ty = H264NalType::from_header_byte(nal[0]);
matches!(ty, 1..=5)
}
VideoCodec::H265 => {
let ty = H265NalType::from_header_byte(nal[0]);
matches!(ty, 0..=9 | 16..=21)
}
}
}
#[allow(clippy::type_complexity)]
fn extract_iframe_parts(
codec: VideoCodec,
nals: &[&[u8]],
) -> (
Vec<Vec<u8>>, // parameter_sets
Vec<Vec<u8>>, // iframe_nals
Option<Vec<u8>>, // sps
Option<Vec<u8>>, // pps
Option<Vec<u8>>, // vps (H.265 only)
) {
let mut parameter_sets: Vec<Vec<u8>> = Vec::new();
let mut iframe_nals: Vec<Vec<u8>> = Vec::new();
let mut sps: Option<Vec<u8>> = None;
let mut pps: Option<Vec<u8>> = None;
let mut vps: Option<Vec<u8>> = None;
for nal in nals {
if nal.is_empty() {
continue;
}
let owned: Vec<u8> = (*nal).to_vec();
match codec {
VideoCodec::H264 => {
let ty = H264NalType::from_header_byte(owned[0]);
match ty {
H264NalType::SPS => {
sps = Some(owned.clone());
parameter_sets.push(owned);
}
H264NalType::PPS => {
pps = Some(owned.clone());
parameter_sets.push(owned);
}
H264NalType::IDR_SLICE => {
iframe_nals.push(owned);
}
_ => {
}
}
}
VideoCodec::H265 => {
if owned.is_empty() {
continue;
}
let ty = H265NalType::from_header_byte(owned[0]);
match ty {
H265NalType::VPS => {
vps = Some(owned.clone());
parameter_sets.push(owned);
}
H265NalType::SPS => {
sps = Some(owned.clone());
parameter_sets.push(owned);
}
H265NalType::PPS => {
pps = Some(owned.clone());
parameter_sets.push(owned);
}
H265NalType::IDR_W_RADL
| H265NalType::IDR_N_LP
| H265NalType::CRA
| H265NalType::BLA_W_LP => {
iframe_nals.push(owned);
}
_ => {}
}
}
}
}
(parameter_sets, iframe_nals, sps, pps, vps)
}
fn handle_pframe(
pframe: &BcMediaPframe,
tx: &broadcast::Sender<Frame>,
video_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
last_frame: &Arc<LastFrameBuffer>,
state: &mut StreamTranslatorState,
) -> Option<u32> {
let codec = match state.detected_codec {
Some(c) => c,
None => {
return None;
}
};
let nals = split_annex_b(&pframe.data);
if nals.is_empty() {
return None;
}
let nals: Vec<&[u8]> = nals
.into_iter()
.filter(|n| is_decodable_nal(n, codec))
.collect();
if nals.is_empty() {
return None;
}
let (mut non_slice, mut slice): (Vec<&[u8]>, Vec<&[u8]>) =
nals.iter().partition(|n| !is_slice_nal(n, codec));
non_slice.append(&mut slice);
let reordered: Vec<&[u8]> = non_slice;
let nals_owned: Vec<Vec<u8>> = reordered.iter().map(|n| (*n).to_vec()).collect();
last_frame.append_pframe(nals_owned);
let nals_bytes: Vec<Bytes> = reordered
.iter()
.map(|n| Bytes::copy_from_slice(n))
.collect();
let pts_90khz = micros_to_90khz(pframe.microseconds);
let frame = Frame::Video {
codec,
nals: nals_bytes,
pts_90khz,
keyframe: false,
access_unit_end: true,
};
let duration = video_frame_duration(state, pts_90khz);
dispatch_paced_video(video_pace_tx, tx, frame, duration);
Some(pts_90khz)
}
const PACER_ANOMALY_CAP_TICKS: u32 = 90_000 * 5;
fn video_frame_duration(state: &mut StreamTranslatorState, pts_90khz: u32) -> Duration {
let dur = match state.last_video_pts_90khz {
Some(prev) => {
let delta = pts_90khz.wrapping_sub(prev);
let ticks = if delta > PACER_ANOMALY_CAP_TICKS {
0
} else {
delta
};
Duration::from_micros((ticks as u64 * 1_000_000) / 90_000)
}
None => Duration::ZERO,
};
state.last_video_pts_90khz = Some(pts_90khz);
dur
}
fn record_pacer_overflow(kind: &'static str, duration: Duration) {
use std::sync::atomic::{AtomicU64, Ordering};
const WINDOW_NS: u64 = 60 * 1_000_000_000;
static ANCHOR: std::sync::OnceLock<std::time::Instant> = std::sync::OnceLock::new();
static LAST_LOGGED_NS: AtomicU64 = AtomicU64::new(0);
static SUPPRESSED: AtomicU64 = AtomicU64::new(0);
let anchor = *ANCHOR.get_or_init(std::time::Instant::now);
let now_ns = anchor.elapsed().as_nanos() as u64;
let prev = LAST_LOGGED_NS.load(Ordering::Relaxed);
let should_log = prev == 0 || now_ns.saturating_sub(prev) >= WINDOW_NS;
if should_log
&& LAST_LOGGED_NS
.compare_exchange(prev, now_ns, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let suppressed = SUPPRESSED.swap(0, Ordering::Relaxed);
tracing::warn!(
kind = kind,
duration_us = duration.as_micros() as u64,
suppressed_since_last = suppressed,
"pacer queue full; dropping frame"
);
} else {
SUPPRESSED.fetch_add(1, Ordering::Relaxed);
}
}
fn dispatch_paced_video(
video_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
tx: &broadcast::Sender<Frame>,
frame: Frame,
duration: Duration,
) {
if let Some(pace_tx) = video_pace_tx {
match pace_tx.try_send(PacedFrame { frame, duration }) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(item)) => {
record_pacer_overflow("video", item.duration);
}
Err(mpsc::error::TrySendError::Closed(item)) => {
let _ = item;
}
}
return;
}
let _ = tx.send(frame);
}
pub(crate) fn aac_samples_per_au(aot: u8) -> Option<u32> {
match aot {
2 => Some(1024),
5 | 29 => Some(2048),
_ => None,
}
}
fn handle_aac(
aac: &bairelay_neolink_core::bcmedia::model::BcMediaAac,
tx: &broadcast::Sender<Frame>,
audio_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
sdp_params: &Arc<RwLock<SdpParams>>,
audio_presence: &Arc<RwLock<crate::audio_presence::AudioPresence>>,
state: &mut StreamTranslatorState,
gap_state: &Mutex<GapState>,
) {
use bairelay_rtsp::codec::aac::{
build_audio_specific_config_hex, parse_adts, AAC_PAYLOAD_TYPE, ADTS_HEADER_LEN,
};
use bairelay_rtsp::codec::AudioCodec;
use bairelay_rtsp::provider::AudioPayload;
use bairelay_rtsp::sdp::AudioParams;
let Some(header) = parse_adts(&aac.data) else {
tracing::debug!("ADTS header parse failed; dropping AAC packet");
return;
};
if header.channels == 0 {
if state.aac_aot != Some(header.aot) {
tracing::warn!(
aot = header.aot,
"handle_aac: PCE-specified channel config (channels=0); dropping AAC packet"
);
state.aac_aot = Some(header.aot);
}
return;
}
let read_sdp = || rlock_recover(sdp_params);
let write_sdp = || wlock_recover(sdp_params);
let needs_sdp_write = read_sdp().audio.is_none();
if needs_sdp_write {
if let Some(asc) =
build_audio_specific_config_hex(header.aot, header.sample_rate, header.channels)
{
let mut w = write_sdp();
if w.audio.is_none() {
w.audio = Some(AudioParams {
codec: AudioCodec::Aac,
payload_type: AAC_PAYLOAD_TYPE,
sample_rate: header.sample_rate,
channels: header.channels,
asc_hex: Some(asc),
});
}
} else {
tracing::warn!(
sample_rate = header.sample_rate,
channels = header.channels,
"AAC sample_rate/channels unsupported for AudioSpecificConfig"
);
}
}
if aac.data.len() < ADTS_HEADER_LEN || header.frame_length < ADTS_HEADER_LEN {
tracing::debug!(
frame_length = header.frame_length,
data_len = aac.data.len(),
"AAC frame_length/data too small for ADTS header; dropping"
);
return;
}
let payload = &aac.data[ADTS_HEADER_LEN..];
let au_bytes_len = header
.frame_length
.saturating_sub(ADTS_HEADER_LEN)
.min(payload.len());
if au_bytes_len == 0 {
tracing::debug!("AAC packet with empty body; dropping");
return;
}
let au_data = bytes::Bytes::copy_from_slice(&payload[..au_bytes_len]);
let samples_per_au = match aac_samples_per_au(header.aot) {
Some(n) => n,
None => {
if state.aac_aot != Some(header.aot) {
tracing::warn!(
aot = header.aot,
"handle_aac: unsupported AudioObjectType; dropping AAC packet"
);
state.aac_aot = Some(header.aot);
}
return;
}
};
if state.aac_aot != Some(header.aot) {
tracing::debug!(
aot = header.aot,
sample_rate = header.sample_rate,
channels = header.channels,
samples_per_au,
aac_frames = header.aac_frames,
"AAC stream parameters"
);
state.aac_aot = Some(header.aot);
}
let pts_step = samples_per_au.saturating_mul(header.aac_frames as u32);
let pts = state.aac_pts_next;
state.aac_pts_next = state.aac_pts_next.wrapping_add(pts_step);
if *lock_recover(gap_state) == GapState::Bridging {
return;
}
let frame = Frame::Audio {
payload: AudioPayload::Aac {
au_data,
sample_rate: header.sample_rate,
channels: header.channels,
},
pts,
};
let duration = paced_audio_duration(pts_step, header.sample_rate);
dispatch_paced_audio(audio_pace_tx, tx, frame, duration);
let mut p = wlock_recover(audio_presence);
*p = p.observed(AudioCodec::Aac);
}
fn paced_audio_duration(samples: u32, sample_rate: u32) -> Duration {
if sample_rate == 0 {
return Duration::ZERO;
}
let micros = (samples as u64).saturating_mul(1_000_000) / sample_rate as u64;
Duration::from_micros(micros)
}
fn dispatch_paced_audio(
audio_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
tx: &broadcast::Sender<Frame>,
frame: Frame,
duration: Duration,
) {
if let Some(pace_tx) = audio_pace_tx {
match pace_tx.try_send(PacedFrame { frame, duration }) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(item)) => {
record_pacer_overflow("audio", item.duration);
}
Err(mpsc::error::TrySendError::Closed(item)) => {
let _ = item;
}
}
return;
}
let _ = tx.send(frame);
}
fn handle_adpcm(
adpcm: &bairelay_neolink_core::bcmedia::model::BcMediaAdpcm,
tx: &broadcast::Sender<Frame>,
audio_pace_tx: Option<&mpsc::Sender<PacedFrame>>,
sdp_params: &Arc<RwLock<SdpParams>>,
audio_presence: &Arc<RwLock<crate::audio_presence::AudioPresence>>,
state: &mut StreamTranslatorState,
gap_state: &Mutex<GapState>,
) {
use bairelay_rtsp::codec::g711::{encode as g711_encode, G711_PAYLOAD_TYPE};
use bairelay_rtsp::codec::AudioCodec;
use bairelay_rtsp::provider::AudioPayload;
use bairelay_rtsp::sdp::AudioParams;
use bairelay_rtsp::transcode::{adpcm::AdpcmDecoder, resample::decimate_16_to_8};
let mut dec = AdpcmDecoder::new();
let pcm_16k = match dec.decode_block(&adpcm.data) {
Ok(p) => p,
Err(e) => {
tracing::debug!(error = ?e, "ADPCM decode failed; dropping packet");
return;
}
};
if pcm_16k.is_empty() {
tracing::debug!("ADPCM block decoded to zero samples; dropping");
return;
}
let pcm_8k = decimate_16_to_8(&pcm_16k);
if pcm_8k.is_empty() {
tracing::debug!("ADPCM block too short after decimation; dropping");
return;
}
let ulaw = bytes::Bytes::from(g711_encode(&pcm_8k));
let read_sdp = || rlock_recover(sdp_params);
let write_sdp = || wlock_recover(sdp_params);
if read_sdp().audio.is_none() {
let mut w = write_sdp();
if w.audio.is_none() {
w.audio = Some(AudioParams {
codec: AudioCodec::G711Ulaw,
payload_type: G711_PAYLOAD_TYPE,
sample_rate: 8_000,
channels: 1,
asc_hex: None,
});
}
}
let sample_count = ulaw.len() as u32;
let pts = state.g711_pts_next;
state.g711_pts_next = state.g711_pts_next.wrapping_add(sample_count);
if *lock_recover(gap_state) == GapState::Bridging {
return;
}
let frame = Frame::Audio {
payload: AudioPayload::G711Ulaw { samples: ulaw },
pts,
};
let duration = paced_audio_duration(sample_count, 8_000);
dispatch_paced_audio(audio_pace_tx, tx, frame, duration);
let mut p = wlock_recover(audio_presence);
*p = p.observed(AudioCodec::G711Ulaw);
}
#[cfg(test)]
mod tests {
use super::*;
use bairelay_neolink_core::bcmedia::model::{BcMediaIframe, BcMediaPframe, VideoType};
fn live_gap_state() -> Mutex<GapState> {
Mutex::new(GapState::Live)
}
#[test]
fn arc_stream_source_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Arc<StreamSource>>();
}
#[test]
fn paced_audio_duration_aac_lc_at_16khz_is_64ms() {
assert_eq!(
paced_audio_duration(1024, 16_000),
Duration::from_micros(64_000)
);
}
#[test]
fn paced_audio_duration_g711_at_8khz_per_byte_is_125us() {
assert_eq!(
paced_audio_duration(160, 8_000),
Duration::from_micros(20_000)
);
}
#[test]
fn paced_audio_duration_zero_sample_rate_returns_zero() {
assert_eq!(paced_audio_duration(1024, 0), Duration::ZERO);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn audio_pacer_drains_at_codec_rate() {
use bairelay_rtsp::provider::AudioPayload;
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<Frame>(64);
let (pace_tx, pace_rx) = mpsc::channel::<PacedFrame>(16);
let cancel = CancellationToken::new();
let pacer = tokio::spawn(audio_pacer_task(pace_rx, broadcast_tx, cancel.clone()));
for i in 0..4u32 {
pace_tx
.send(PacedFrame {
frame: Frame::Audio {
payload: AudioPayload::Aac {
au_data: bytes::Bytes::new(),
sample_rate: 16_000,
channels: 1,
},
pts: i.wrapping_mul(1024),
},
duration: Duration::from_millis(64),
})
.await
.expect("send to pacer");
}
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(broadcast_rx.try_recv().is_err());
tokio::time::advance(AUDIO_PACER_INITIAL_LATENCY).await;
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(matches!(broadcast_rx.try_recv(), Ok(Frame::Audio { .. })));
assert!(broadcast_rx.try_recv().is_err());
tokio::time::advance(Duration::from_millis(64)).await;
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(matches!(broadcast_rx.try_recv(), Ok(Frame::Audio { .. })));
assert!(broadcast_rx.try_recv().is_err());
tokio::time::advance(Duration::from_millis(64)).await;
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(matches!(broadcast_rx.try_recv(), Ok(Frame::Audio { .. })));
cancel.cancel();
drop(pace_tx);
let _ = tokio::time::timeout(Duration::from_millis(100), pacer).await;
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn audio_pacer_exits_on_cancel() {
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Frame>(4);
let (pace_tx, pace_rx) = mpsc::channel::<PacedFrame>(4);
let cancel = CancellationToken::new();
let pacer = tokio::spawn(audio_pacer_task(pace_rx, broadcast_tx, cancel.clone()));
cancel.cancel();
drop(pace_tx);
tokio::time::timeout(Duration::from_secs(1), pacer)
.await
.expect("pacer must exit on cancel")
.expect("pacer task panicked");
}
#[tokio::test(flavor = "current_thread")]
async fn stream_source_start_stores_gap_threshold() {
let src = StreamSource::start_inert_for_test_with_gap(Duration::from_millis(2500));
assert_eq!(src.gap_threshold(), Duration::from_millis(2500));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn reader_task_marks_bridging_after_gap_threshold() {
let src = StreamSource::start_inert_for_test_with_gap(Duration::from_millis(200));
assert_eq!(src.gap_state(), GapState::Live);
tokio::time::advance(Duration::from_millis(500)).await;
tokio::task::yield_now().await;
assert_eq!(src.gap_state(), GapState::Bridging);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn reader_task_returns_to_live_on_new_frame() {
let (src, inject) =
StreamSource::start_inert_for_test_with_gap_and_injector(Duration::from_millis(200));
tokio::time::advance(Duration::from_millis(500)).await;
tokio::task::yield_now().await;
assert_eq!(src.gap_state(), GapState::Bridging);
inject.inject_fake_video_frame();
tokio::time::advance(Duration::from_millis(50)).await;
tokio::task::yield_now().await;
assert_eq!(src.gap_state(), GapState::Live);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn reader_task_never_bridges_when_gap_threshold_is_max() {
let src = StreamSource::start_inert_for_test_with_gap(Duration::MAX);
tokio::time::advance(Duration::from_secs(3600)).await;
tokio::task::yield_now().await;
assert_eq!(src.gap_state(), GapState::Live);
}
fn fake_burst_with_pts_90khz(pts: u32) -> VideoBurst {
VideoBurst {
codec: VideoCodec::H265,
parameter_sets: vec![vec![0x40, 0x01], vec![0x42, 0x01], vec![0x44, 0x01]],
iframe_nals: vec![vec![0x26, 0x01, 0xaf, 0x08, 0x46]],
pframe_nals: Vec::new(),
captured_at: Instant::now(),
captured_pts_90khz: pts,
}
}
async fn must_recv_video(rx: &mut broadcast::Receiver<Frame>) -> u32 {
let frame = tokio::time::timeout(Duration::from_millis(50), rx.recv())
.await
.expect("timed out waiting for Frame::Video")
.expect("broadcast closed before Frame::Video arrived");
match frame {
Frame::Video {
pts_90khz,
keyframe,
..
} => {
assert!(keyframe, "replay frames must be flagged as keyframes");
pts_90khz
}
other => panic!("expected Frame::Video, got {other:?}"),
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn bridging_emits_replay_frames_on_broadcast() {
let (src, last_frame) =
StreamSource::start_inert_for_test_with_gap_and_last_frame(Duration::from_millis(200));
last_frame.replace_video(fake_burst_with_pts_90khz(90_000));
let mut rx = src.subscribe_for_test();
tokio::time::advance(Duration::from_millis(500)).await;
tokio::task::yield_now().await;
let pts = must_recv_video(&mut rx).await;
assert!(
pts > 0,
"replay PTS must advance from its zero anchor, got {pts}"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn bridging_first_replay_seeds_from_burst_captured_pts_when_no_live_frame() {
let (src, last_frame) =
StreamSource::start_inert_for_test_with_gap_and_last_frame(Duration::from_millis(200));
last_frame.replace_video(fake_burst_with_pts_90khz(7_500_000));
let mut rx = src.subscribe_for_test();
tokio::time::advance(Duration::from_millis(500)).await;
tokio::task::yield_now().await;
let first = must_recv_video(&mut rx).await;
assert_eq!(
first, 7_500_000,
"first replay PTS must equal the burst anchor exactly (no wall-clock drift)",
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn bridging_first_replay_pts_independent_of_source_age() {
let (src, last_frame) =
StreamSource::start_inert_for_test_with_gap_and_last_frame(Duration::from_millis(200));
let mut rx = src.subscribe_for_test();
tokio::time::advance(Duration::from_secs(30)).await;
tokio::task::yield_now().await;
while rx.try_recv().is_ok() {}
last_frame.replace_video(fake_burst_with_pts_90khz(1_000_000));
tokio::time::advance(Duration::from_millis(250)).await;
tokio::task::yield_now().await;
let first = must_recv_video(&mut rx).await;
assert_eq!(
first, 1_000_000,
"first replay PTS must equal the burst anchor regardless of how long the source lived before the burst arrived",
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn bridging_replay_pts_advances_monotonically() {
let (src, last_frame) =
StreamSource::start_inert_for_test_with_gap_and_last_frame(Duration::from_millis(200));
last_frame.replace_video(fake_burst_with_pts_90khz(0));
let mut rx = src.subscribe_for_test();
tokio::time::advance(Duration::from_millis(500)).await;
tokio::task::yield_now().await;
let a = must_recv_video(&mut rx).await;
while rx.try_recv().is_ok() {}
tokio::time::advance(Duration::from_millis(300)).await;
tokio::task::yield_now().await;
let b = must_recv_video(&mut rx).await;
assert!(b > a, "PTS must advance: {a} -> {b}");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn bridging_skips_emit_when_burst_has_no_iframe_nals() {
let (src, last_frame) =
StreamSource::start_inert_for_test_with_gap_and_last_frame(Duration::from_millis(200));
last_frame.replace_video(VideoBurst {
codec: VideoCodec::H265,
parameter_sets: vec![],
iframe_nals: Vec::new(),
pframe_nals: Vec::new(),
captured_at: Instant::now(),
captured_pts_90khz: 0,
});
let mut rx = src.subscribe_for_test();
tokio::time::advance(Duration::from_millis(500)).await;
tokio::task::yield_now().await;
assert!(
matches!(
rx.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
),
"empty-NAL burst must not produce a Frame::Video"
);
}
#[test]
fn handle_aac_drops_frame_during_bridging() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x60, 0x40, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
let packet = BcMedia::Aac(BcMediaAac { data });
let gap_state = Mutex::new(GapState::Bridging);
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&gap_state,
);
assert!(
rx.try_recv().is_err(),
"Bridging gate must drop the AAC frame",
);
assert_eq!(
state.aac_pts_next, 1024,
"PTS must advance through Bridging to keep A/V in sync on resume",
);
assert_eq!(*presence.read().unwrap(), AudioPresence::Unknown);
}
#[test]
fn handle_adpcm_drops_frame_during_bridging() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAdpcm;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let data = vec![0u8; 4 + 16];
let packet = BcMedia::Adpcm(BcMediaAdpcm { data });
let gap_state = Mutex::new(GapState::Bridging);
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&gap_state,
);
assert!(
rx.try_recv().is_err(),
"Bridging gate must drop the ADPCM frame",
);
assert!(
state.g711_pts_next > 0,
"PTS must advance through Bridging to keep A/V in sync on resume; got {}",
state.g711_pts_next,
);
assert_eq!(*presence.read().unwrap(), AudioPresence::Unknown);
}
#[test]
fn handle_aac_pts_advances_through_bridging_and_resumes_in_live() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(16);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let build_packet = || {
let mut data = vec![0xFF, 0xF9, 0x60, 0x40, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
BcMedia::Aac(BcMediaAac { data })
};
let live = Mutex::new(GapState::Live);
apply_bcmedia_packet(
&build_packet(),
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live,
);
apply_bcmedia_packet(
&build_packet(),
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live,
);
assert_eq!(state.aac_pts_next, 2 * 1024);
for _ in 0..2 {
assert!(matches!(rx.try_recv(), Ok(Frame::Audio { .. })));
}
let bridging = Mutex::new(GapState::Bridging);
for _ in 0..3 {
apply_bcmedia_packet(
&build_packet(),
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&bridging,
);
}
assert_eq!(
state.aac_pts_next,
5 * 1024,
"PTS advances through Bridging even though no audio reaches the wire",
);
assert!(rx.try_recv().is_err(), "no audio during Bridging");
apply_bcmedia_packet(
&build_packet(),
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live,
);
match rx.try_recv() {
Ok(Frame::Audio { pts, .. }) => {
assert_eq!(pts, 5 * 1024, "post-resume PTS reflects the gap");
}
other => panic!("expected Frame::Audio with resumed PTS; got {other:?}"),
}
assert_eq!(state.aac_pts_next, 6 * 1024);
}
#[test]
fn micros_to_90khz_matches_reference() {
assert_eq!(micros_to_90khz(1_000_000), 90_000);
assert_eq!(micros_to_90khz(0), 0);
}
fn synthetic_h265_iframe_bytes() -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
out.extend_from_slice(&[0x40, 0x01, 0x0c, 0x01, 0xff]);
out.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
out.extend_from_slice(&[0x42, 0x01, 0x01, 0x60, 0x00]);
out.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
out.extend_from_slice(&[0x44, 0x01, 0xc1, 0x72]);
out.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
out.extend_from_slice(&[0x26, 0x01, 0xaf, 0x08, 0x46]);
out
}
fn synthetic_h265_pframe_bytes() -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
out.extend_from_slice(&[0x02, 0x01, 0xd0, 0x21, 0x3c]);
out
}
#[test]
fn apply_bcmedia_packet_pframe_before_iframe_returns_false() {
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let pframe = BcMedia::Pframe(BcMediaPframe {
video_type: VideoType::H265,
microseconds: 0,
data: synthetic_h265_pframe_bytes(),
});
let broadcast_pts = apply_bcmedia_packet(
&pframe,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(
broadcast_pts.is_none(),
"P-frame before any I-frame must not be counted as a live broadcast"
);
assert!(
matches!(
rx.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
),
"no Frame::Video should reach the broadcast"
);
assert_eq!(state.detected_codec, None);
}
#[test]
fn apply_bcmedia_packet_iframe_with_empty_nals_returns_false() {
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let iframe = BcMedia::Iframe(BcMediaIframe {
video_type: VideoType::H265,
microseconds: 0,
time: Some(1_700_000_000),
data: Vec::new(),
});
let broadcast_pts = apply_bcmedia_packet(
&iframe,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(
broadcast_pts.is_none(),
"I-frame with no NALs must not be counted as a live broadcast"
);
assert!(
matches!(
rx.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
),
"no Frame::Video should reach the broadcast"
);
}
#[test]
fn apply_bcmedia_packet_translates_iframe_then_pframe() {
let (tx, mut rx) = broadcast::channel::<Frame>(16);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let iframe = BcMedia::Iframe(BcMediaIframe {
video_type: VideoType::H265,
microseconds: 0,
time: Some(1_700_000_000),
data: synthetic_h265_iframe_bytes(),
});
let pframe = BcMedia::Pframe(BcMediaPframe {
video_type: VideoType::H265,
microseconds: 33_333,
data: synthetic_h265_pframe_bytes(),
});
apply_bcmedia_packet(
&iframe,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
apply_bcmedia_packet(
&pframe,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert_eq!(state.detected_codec, Some(VideoCodec::H265));
let v = sdp_params
.read()
.expect("sdp lock poisoned")
.video
.clone()
.expect("video params must be populated after the I-frame");
assert_eq!(v.codec, VideoCodec::H265);
assert!(v.vps.is_some(), "VPS must be captured for H.265");
assert!(!v.sps.is_empty(), "SPS must be captured");
assert!(!v.pps.is_empty(), "PPS must be captured");
let f1 = rx.try_recv().expect("I-frame broadcast");
match f1 {
Frame::Video {
codec,
keyframe,
pts_90khz,
..
} => {
assert_eq!(codec, VideoCodec::H265);
assert!(keyframe, "first frame must be a keyframe");
assert_eq!(pts_90khz, 0);
}
_ => panic!("expected Frame::Video for the I-frame"),
}
let f2 = rx.try_recv().expect("P-frame broadcast");
match f2 {
Frame::Video {
codec, keyframe, ..
} => {
assert_eq!(codec, VideoCodec::H265);
assert!(!keyframe, "second frame must be a non-keyframe");
}
_ => panic!("expected Frame::Video for the P-frame"),
}
}
#[test]
fn apply_bcmedia_packet_ignores_info_variants() {
use bairelay_neolink_core::bcmedia::model::{BcMediaInfoV1, BcMediaInfoV2};
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let info_v1 = BcMedia::InfoV1(BcMediaInfoV1 {
video_width: 1280,
video_height: 720,
fps: 20,
start_year: 124,
start_month: 4,
start_day: 19,
start_hour: 9,
start_min: 15,
start_seconds: 30,
end_year: 124,
end_month: 4,
end_day: 19,
end_hour: 9,
end_min: 15,
end_seconds: 45,
});
let info_v2 = BcMedia::InfoV2(BcMediaInfoV2 {
video_width: 1920,
video_height: 1080,
fps: 30,
start_year: 0,
start_month: 0,
start_day: 0,
start_hour: 0,
start_min: 0,
start_seconds: 0,
end_year: 0,
end_month: 0,
end_day: 0,
end_hour: 0,
end_min: 0,
end_seconds: 0,
});
for packet in [&info_v1, &info_v2] {
apply_bcmedia_packet(
packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
}
assert_eq!(state.aac_pts_next, 0, "info packets must not touch AAC PTS");
assert_eq!(
state.g711_pts_next, 0,
"info packets must not touch G.711 PTS"
);
assert_eq!(state.detected_codec, None);
assert!(sdp_params.read().expect("sdp lock").video.is_none());
assert!(sdp_params.read().expect("sdp lock").audio.is_none());
assert!(!last_frame.has_video());
assert!(rx.try_recv().is_err(), "no frames should have broadcast");
assert_eq!(
*presence.read().unwrap(),
crate::audio_presence::AudioPresence::Unknown,
"info packets must not touch audio presence"
);
}
#[test]
fn apply_bcmedia_packet_emits_aac_frame_and_updates_sdp_and_presence() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
use bairelay_rtsp::codec::AudioCodec;
use bairelay_rtsp::provider::AudioPayload;
let (tx, mut rx) = broadcast::channel::<Frame>(8);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x60, 0x40, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
let aac = BcMedia::Aac(BcMediaAac { data });
apply_bcmedia_packet(
&aac,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
let sdp = sdp_params.read().unwrap();
let audio = sdp.audio.as_ref().expect("audio SDP populated");
assert_eq!(audio.codec, AudioCodec::Aac);
assert_eq!(audio.sample_rate, 16_000);
assert_eq!(audio.channels, 1);
assert_eq!(audio.payload_type, 97);
assert_eq!(audio.asc_hex.as_deref(), Some("1408"));
match rx.try_recv().expect("audio frame broadcast") {
Frame::Audio {
payload: AudioPayload::Aac {
au_data,
sample_rate,
channels,
},
..
} => {
assert_eq!(au_data.len(), 9);
assert_eq!(sample_rate, 16_000);
assert_eq!(channels, 1);
}
other => panic!("expected AAC audio frame, got {other:?}"),
}
assert_eq!(
*presence.read().unwrap(),
AudioPresence::Present {
codec: AudioCodec::Aac
},
);
}
#[test]
fn apply_bcmedia_packet_transcodes_adpcm_to_g711_and_updates_presence() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAdpcm;
use bairelay_rtsp::codec::AudioCodec;
use bairelay_rtsp::provider::AudioPayload;
let (tx, mut rx) = broadcast::channel::<Frame>(8);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let data = vec![0u8; 4 + 16];
let pkt = BcMedia::Adpcm(BcMediaAdpcm { data });
apply_bcmedia_packet(
&pkt,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
let sdp = sdp_params.read().unwrap();
let audio = sdp.audio.as_ref().expect("audio SDP populated");
assert_eq!(audio.codec, AudioCodec::G711Ulaw);
assert_eq!(audio.sample_rate, 8_000);
assert_eq!(audio.channels, 1);
assert_eq!(audio.payload_type, 0);
assert!(audio.asc_hex.is_none());
match rx.try_recv().expect("audio frame broadcast") {
Frame::Audio {
payload: AudioPayload::G711Ulaw { samples },
..
} => {
assert!(!samples.is_empty(), "at least one sample");
for (i, &b) in samples.iter().enumerate() {
assert_eq!(
b, 0xFFu8,
"sample {i} should be µ-law silence 0xFF, got {b:#x}"
);
}
}
other => panic!("expected G.711 audio frame, got {other:?}"),
}
assert_eq!(
*presence.read().unwrap(),
AudioPresence::Present {
codec: AudioCodec::G711Ulaw
},
);
}
#[test]
fn apply_bcmedia_packet_drops_empty_aac_body_without_upgrading_presence() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let data = vec![0xFF, 0xF9, 0x60, 0x40, 0x00, 0xE0, 0xFC];
assert_eq!(data.len(), 7, "test presumes header-only packet");
let aac = BcMedia::Aac(BcMediaAac { data });
apply_bcmedia_packet(
&aac,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(
rx.try_recv().is_err(),
"empty-body AAC packet must not broadcast a Frame::Audio",
);
assert!(
sdp_params.read().expect("sdp lock").audio.is_some(),
"SDP audio must be populated from header even on empty body",
);
assert_eq!(
*presence.read().unwrap(),
AudioPresence::Unknown,
"dropped empty-body AAC must not upgrade audio presence",
);
assert_eq!(
state.aac_pts_next, 0,
"dropped empty-body AAC must not advance the PTS counter",
);
}
#[test]
fn apply_bcmedia_packet_assigns_monotonic_aac_pts() {
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(8);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x60, 0x40, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
let frame = BcMedia::Aac(BcMediaAac { data });
let observed_pts: Vec<u32> = (0..3)
.map(|_| {
apply_bcmedia_packet(
&frame,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
match rx.try_recv().expect("audio frame") {
Frame::Audio { pts, .. } => pts,
other => panic!("expected audio frame, got {other:?}"),
}
})
.collect();
assert_eq!(observed_pts, vec![0, 1024, 2048]);
assert_eq!(state.aac_pts_next, 3 * 1024);
assert_eq!(
state.g711_pts_next, 0,
"G.711 counter should not advance on AAC"
);
}
#[test]
fn apply_bcmedia_packet_assigns_monotonic_g711_pts() {
use bairelay_neolink_core::bcmedia::model::BcMediaAdpcm;
let (tx, mut rx) = broadcast::channel::<Frame>(8);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let data = vec![0u8; 4 + 16];
let frame = BcMedia::Adpcm(BcMediaAdpcm { data });
let mut observed_pts = Vec::new();
let mut observed_sample_counts = Vec::new();
for _ in 0..3 {
apply_bcmedia_packet(
&frame,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
match rx.try_recv().expect("audio frame") {
Frame::Audio {
payload: bairelay_rtsp::provider::AudioPayload::G711Ulaw { samples },
pts,
} => {
observed_pts.push(pts);
observed_sample_counts.push(samples.len() as u32);
}
other => panic!("expected G.711 audio frame, got {other:?}"),
}
}
let mut expected: u32 = 0;
for (i, pts) in observed_pts.iter().enumerate() {
assert_eq!(*pts, expected, "frame {i} pts");
expected = expected.wrapping_add(observed_sample_counts[i]);
}
assert_eq!(state.g711_pts_next, expected);
assert_eq!(
state.aac_pts_next, 0,
"AAC counter should not advance on ADPCM"
);
}
#[tokio::test(flavor = "current_thread")]
async fn await_sdp_both_returns_when_audio_arrives() {
use bairelay_rtsp::codec::AudioCodec;
use bairelay_rtsp::sdp::{AudioParams, VideoParams};
use std::time::Duration;
let sdp = Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: None,
audio: None,
}));
let sdp2 = Arc::clone(&sdp);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let mut w = sdp2.write().unwrap();
w.video = Some(VideoParams {
codec: VideoCodec::H264,
payload_type: 96,
sps: vec![0x67, 0x42, 0, 0x1f],
pps: vec![0x68, 0xce, 0x38, 0x80],
vps: None,
profile_level_id: [0x42, 0, 0x1f],
});
w.audio = Some(AudioParams {
codec: AudioCodec::G711Ulaw,
payload_type: 0,
sample_rate: 8_000,
channels: 1,
asc_hex: None,
});
});
let params = await_sdp_both(&sdp, Duration::from_secs(1))
.await
.expect("both sides populated in time");
assert!(params.video.is_some());
assert!(params.audio.is_some());
}
#[tokio::test(flavor = "current_thread")]
async fn await_sdp_both_times_out_when_audio_never_arrives() {
use bairelay_rtsp::sdp::VideoParams;
use std::time::Duration;
let sdp = Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: Some(VideoParams {
codec: VideoCodec::H264,
payload_type: 96,
sps: vec![0x67, 0x42, 0, 0x1f],
pps: vec![0x68, 0xce, 0x38, 0x80],
vps: None,
profile_level_id: [0x42, 0, 0x1f],
}),
audio: None,
}));
let r = await_sdp_both(&sdp, Duration::from_millis(250)).await;
assert!(r.is_err(), "must time out when audio never arrives");
}
#[tokio::test(flavor = "current_thread")]
async fn await_audio_or_deadline_times_out_without_audio() {
use std::time::Duration;
let sdp = Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: None,
audio: None,
}));
let r = await_audio_or_deadline(&sdp, Duration::from_millis(200)).await;
assert!(r.is_err(), "must time out when audio never arrives");
}
#[test]
fn apply_bcmedia_packet_takes_state_struct() {
fn _assert_signature(
packet: &BcMedia,
tx: &broadcast::Sender<Frame>,
last_frame: &Arc<LastFrameBuffer>,
sdp_params: &Arc<RwLock<SdpParams>>,
audio_presence: &Arc<RwLock<crate::audio_presence::AudioPresence>>,
state: &mut StreamTranslatorState,
gap_state: &Mutex<GapState>,
) {
apply_bcmedia_packet(
packet,
tx,
None,
None,
last_frame,
sdp_params,
audio_presence,
state,
gap_state,
);
}
}
#[test]
fn translator_state_pts_survives_simulated_reader_respawn() {
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, _rx) = broadcast::channel::<Frame>(16);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "test".to_string(),
video: None,
audio: None,
}));
let audio_presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let state_arc = Arc::new(std::sync::Mutex::new(StreamTranslatorState::default()));
let aac_packets: Vec<BcMedia> = (0..3)
.map(|_| {
let mut data = vec![0xFF, 0xF9, 0x60, 0x40, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
BcMedia::Aac(BcMediaAac { data })
})
.collect();
{
let mut s = state_arc.lock().unwrap();
for p in &aac_packets {
apply_bcmedia_packet(
p,
&tx,
None,
None,
&last_frame,
&sdp_params,
&audio_presence,
&mut s,
&live_gap_state(),
);
}
assert_eq!(s.aac_pts_next, 3 * 1024, "three AAC-LC frames → 3072 ticks");
}
let state_arc_after = Arc::clone(&state_arc);
{
let mut s = state_arc_after.lock().unwrap();
for p in &aac_packets[..2] {
apply_bcmedia_packet(
p,
&tx,
None,
None,
&last_frame,
&sdp_params,
&audio_presence,
&mut s,
&live_gap_state(),
);
}
assert_eq!(
s.aac_pts_next,
5 * 1024,
"PTS must continue from 3072 after simulated respawn, not reset to 0"
);
}
}
#[test]
fn aac_samples_per_au_branches_on_aot() {
assert_eq!(aac_samples_per_au(2), Some(1024));
assert_eq!(aac_samples_per_au(5), Some(2048));
assert_eq!(aac_samples_per_au(29), Some(2048));
assert_eq!(aac_samples_per_au(1), None);
assert_eq!(aac_samples_per_au(3), None);
assert_eq!(aac_samples_per_au(4), None);
assert_eq!(aac_samples_per_au(0), None);
assert_eq!(aac_samples_per_au(255), None);
}
#[test]
fn handle_aac_drops_unsupported_aot_and_leaves_pts() {
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "test".to_string(),
video: None,
audio: None,
}));
let audio_presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x20, 0x40, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
let packet = BcMedia::Aac(BcMediaAac { data });
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&audio_presence,
&mut state,
&live_gap_state(),
);
assert_eq!(
state.aac_pts_next, 0,
"unsupported AOT must not advance PTS"
);
assert_eq!(
state.aac_aot,
Some(1),
"state.aac_aot must latch the unsupported AOT for one-shot warn gating"
);
assert_eq!(
*audio_presence.read().unwrap(),
crate::audio_presence::AudioPresence::Unknown,
"unsupported AOT must not upgrade AudioPresence"
);
assert!(
matches!(
rx.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
),
"unsupported AOT must not broadcast a Frame::Audio"
);
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&audio_presence,
&mut state,
&live_gap_state(),
);
assert_eq!(state.aac_pts_next, 0);
assert_eq!(state.aac_aot, Some(1));
assert_eq!(
*audio_presence.read().unwrap(),
crate::audio_presence::AudioPresence::Unknown,
);
}
#[test]
fn handle_aac_drops_on_malformed_adts_header() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let packet = BcMedia::Aac(BcMediaAac {
data: vec![0x00, 0x00, 0x00],
});
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(rx.try_recv().is_err(), "no frame emitted on bad ADTS");
assert_eq!(state.aac_pts_next, 0);
assert_eq!(*presence.read().unwrap(), AudioPresence::Unknown);
}
#[test]
fn handle_adpcm_drops_on_empty_data() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAdpcm;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let packet = BcMedia::Adpcm(BcMediaAdpcm { data: vec![] });
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(rx.try_recv().is_err());
assert_eq!(state.g711_pts_next, 0);
}
#[test]
fn map_stream_kind_maps_all_variants() {
assert!(matches!(
map_stream_kind(RtspStreamKind::Main),
CoreStreamKind::Main
));
assert!(matches!(
map_stream_kind(RtspStreamKind::Sub),
CoreStreamKind::Sub
));
assert!(matches!(
map_stream_kind(RtspStreamKind::Extern),
CoreStreamKind::Extern
));
}
#[test]
fn micros_to_90khz_edge_cases() {
assert_eq!(micros_to_90khz(0), 0);
assert_eq!(micros_to_90khz(1_000_000), 90_000);
assert_eq!(micros_to_90khz(100), 9);
let big = u32::MAX / 10;
let _ = micros_to_90khz(big);
}
#[tokio::test]
async fn await_audio_or_deadline_returns_on_populated_sdp() {
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: Some(bairelay_rtsp::sdp::AudioParams {
codec: bairelay_rtsp::codec::AudioCodec::G711Ulaw,
payload_type: 0,
sample_rate: 8000,
channels: 1,
asc_hex: None,
}),
}));
await_audio_or_deadline(&sdp_params, Duration::from_millis(10))
.await
.expect("populated audio");
}
#[tokio::test]
async fn await_audio_or_deadline_times_out_when_never_populated() {
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let result = await_audio_or_deadline(&sdp_params, Duration::from_millis(50)).await;
assert!(result.is_err());
}
#[tokio::test]
async fn stream_source_await_sdp_ready_returns_when_video_set() {
let src = StreamSource::start_inert_for_test();
src.set_sdp_params_for_test(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: Some(bairelay_rtsp::sdp::VideoParams {
codec: bairelay_rtsp::codec::VideoCodec::H264,
payload_type: 96,
sps: vec![],
pps: vec![],
vps: None,
profile_level_id: [0x42, 0x00, 0x1F],
}),
audio: None,
});
src.await_sdp_ready(Duration::from_millis(100))
.await
.expect("video sdp ready");
}
#[tokio::test]
async fn stream_source_await_sdp_ready_times_out_without_video() {
let src = StreamSource::start_inert_for_test();
assert!(src
.await_sdp_ready(Duration::from_millis(50))
.await
.is_err());
}
#[tokio::test]
async fn stream_source_await_sdp_both_ready_times_out_without_audio() {
let src = StreamSource::start_inert_for_test();
src.set_sdp_params_for_test(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: Some(bairelay_rtsp::sdp::VideoParams {
codec: bairelay_rtsp::codec::VideoCodec::H265,
payload_type: 97,
sps: vec![],
pps: vec![],
vps: Some(vec![]),
profile_level_id: [0x01, 0x60, 0x00],
}),
audio: None,
});
assert!(src
.await_sdp_both_ready(Duration::from_millis(50))
.await
.is_err());
}
#[tokio::test]
async fn stream_source_await_audio_forwards_to_free_fn() {
let src = StreamSource::start_inert_for_test();
assert!(src.await_audio(Duration::from_millis(30)).await.is_err());
}
#[tokio::test]
async fn stream_source_accessors_are_wired_correctly() {
let src = StreamSource::start_inert_for_test();
let _ = src.sdp_params();
let _ = src.sdp_params_handle();
let _ = src.last_frame();
let _ = src.subscribe();
let _ = src.subscribe_for_test();
assert_eq!(src.subscribers(), 0, "no subscribers at rest");
assert_eq!(src.gap_state(), GapState::Live);
}
#[test]
fn is_parameter_set_nal_empty_returns_false() {
assert!(!is_parameter_set_nal(&[], VideoCodec::H264));
assert!(!is_parameter_set_nal(&[], VideoCodec::H265));
}
#[test]
fn is_parameter_set_nal_h264_sps_and_pps_match() {
assert!(is_parameter_set_nal(&[0x67, 0x00], VideoCodec::H264));
assert!(is_parameter_set_nal(&[0x68, 0x00], VideoCodec::H264));
assert!(!is_parameter_set_nal(&[0x65, 0x00], VideoCodec::H264));
}
#[test]
fn is_parameter_set_nal_h265_vps_sps_pps_match() {
assert!(is_parameter_set_nal(&[0x40, 0x01], VideoCodec::H265));
assert!(is_parameter_set_nal(&[0x42, 0x01], VideoCodec::H265));
assert!(is_parameter_set_nal(&[0x44, 0x01], VideoCodec::H265));
assert!(!is_parameter_set_nal(&[0x26, 0x01], VideoCodec::H265));
}
#[test]
fn is_slice_nal_empty_returns_false() {
assert!(!is_slice_nal(&[], VideoCodec::H264));
assert!(!is_slice_nal(&[], VideoCodec::H265));
}
#[test]
fn is_slice_nal_recognises_h264_vcl_types() {
assert!(is_slice_nal(&[0x41, 0x00], VideoCodec::H264));
assert!(is_slice_nal(&[0x65, 0x00], VideoCodec::H264));
assert!(!is_slice_nal(&[0x67, 0x00], VideoCodec::H264));
assert!(!is_slice_nal(&[0x68, 0x00], VideoCodec::H264));
}
#[test]
fn is_slice_nal_recognises_h265_vcl_types() {
assert!(is_slice_nal(&[0x02, 0x01], VideoCodec::H265));
assert!(is_slice_nal(&[0x26, 0x01], VideoCodec::H265));
assert!(!is_slice_nal(&[0x40, 0x01], VideoCodec::H265));
}
#[test]
fn extract_iframe_parts_h264_splits_sps_pps_idr_and_skips_sei() {
let sps = [0x67u8, 0x42, 0x00, 0x1F];
let pps = [0x68u8, 0xCE, 0x3C, 0x80];
let sei = [0x06u8, 0x00];
let idr = [0x65u8, 0xAA, 0xBB];
let nals: Vec<&[u8]> = vec![&sps, &pps, &sei, &idr];
let (params, iframes, out_sps, out_pps, out_vps) =
extract_iframe_parts(VideoCodec::H264, &nals);
assert_eq!(params.len(), 2);
assert_eq!(iframes.len(), 1);
assert!(out_sps.is_some() && out_pps.is_some() && out_vps.is_none());
}
#[test]
fn extract_iframe_parts_h265_collects_vps_sps_pps_and_idr() {
let vps = [0x40u8, 0x01, 0x0C, 0x01];
let sps = [0x42u8, 0x01, 0x02];
let pps = [0x44u8, 0x01, 0xC0];
let idr = [0x26u8, 0x01, 0xAF];
let nals: Vec<&[u8]> = vec![&vps, &sps, &pps, &idr];
let (params, iframes, out_sps, out_pps, out_vps) =
extract_iframe_parts(VideoCodec::H265, &nals);
assert_eq!(params.len(), 3);
assert_eq!(iframes.len(), 1);
assert!(out_vps.is_some() && out_sps.is_some() && out_pps.is_some());
}
#[test]
fn extract_iframe_parts_skips_empty_nals() {
let empty: &[u8] = &[];
let sps = [0x67u8, 0x42];
let idr = [0x65u8, 0xAA];
let (_, iframes, _, _, _) = extract_iframe_parts(VideoCodec::H264, &[empty, &sps, &idr]);
assert_eq!(iframes.len(), 1);
}
#[test]
fn handle_pframe_returns_none_before_first_iframe() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let pframe = bairelay_neolink_core::bcmedia::model::BcMediaPframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H264,
microseconds: 0,
data: vec![0x00, 0x00, 0x01, 0x41, 0xAA],
};
let mut s = StreamTranslatorState::default();
let result = handle_pframe(&pframe, &tx, None, &last_frame, &mut s);
assert_eq!(result, None);
}
#[test]
fn handle_pframe_returns_none_on_empty_nal_split() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let pframe = bairelay_neolink_core::bcmedia::model::BcMediaPframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H264,
microseconds: 0,
data: vec![],
};
let mut s = StreamTranslatorState {
detected_codec: Some(VideoCodec::H264),
..Default::default()
};
let result = handle_pframe(&pframe, &tx, None, &last_frame, &mut s);
assert_eq!(result, None);
}
#[test]
fn handle_iframe_returns_none_on_empty_nal_split() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let iframe = bairelay_neolink_core::bcmedia::model::BcMediaIframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H264,
microseconds: 0,
data: vec![],
time: None,
};
let mut s = StreamTranslatorState::default();
let result = handle_iframe(&iframe, &tx, None, &last_frame, &sdp_params, &mut s);
assert_eq!(result, None);
}
#[test]
fn handle_iframe_returns_none_when_codec_undetectable() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let iframe = bairelay_neolink_core::bcmedia::model::BcMediaIframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H264,
microseconds: 0,
data: vec![0x00, 0x00, 0x01, 0x80, 0x00],
time: None,
};
let mut s = StreamTranslatorState::default();
let result = handle_iframe(&iframe, &tx, None, &last_frame, &sdp_params, &mut s);
assert_eq!(result, None);
}
#[test]
fn handle_iframe_drops_h265_unspec62_and_multilayer_nals() {
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let iframe = bairelay_neolink_core::bcmedia::model::BcMediaIframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H265,
microseconds: 0,
data: vec![
0x00, 0x00, 0x01, 0x40, 0x01, 0x0C, 0x01, 0x00, 0x00, 0x01, 0x42, 0x01, 0x02, 0x03, 0x04,
0x00, 0x00, 0x01, 0x44, 0x01, 0xC0,
0x00, 0x00, 0x01, 0x7C, 0x01, 0xDE, 0xAD, 0xBE, 0xEF,
0x00, 0x00, 0x01, 0x27, 0x09, 0xCA, 0xFE,
0x00, 0x00, 0x01, 0x26, 0x01, 0xAA, 0xBB,
],
time: None,
};
let mut s = StreamTranslatorState::default();
let pts =
handle_iframe(&iframe, &tx, None, &last_frame, &sdp_params, &mut s).expect("Some");
assert_eq!(s.detected_codec, Some(VideoCodec::H265));
let frame = rx.try_recv().expect("frame broadcast");
match frame {
Frame::Video {
codec,
nals,
keyframe,
pts_90khz,
..
} => {
assert_eq!(codec, VideoCodec::H265);
assert!(keyframe);
assert_eq!(pts_90khz, pts);
assert_eq!(
nals.len(),
1,
"expected single IDR after filter, got {nals:?}"
);
let only = &nals[0];
assert_eq!(
only[0], 0x26,
"first byte should be standard IDR_W_RADL header"
);
assert_eq!(only[1], 0x01, "second byte should be layer_id=0, tid+1=1");
}
Frame::Audio { .. } => panic!("expected video frame, got audio"),
}
}
#[test]
fn handle_pframe_drops_h265_unspec62_nals() {
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let pframe = bairelay_neolink_core::bcmedia::model::BcMediaPframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H265,
microseconds: 0,
data: vec![
0x00, 0x00, 0x01, 0x7C, 0x01, 0xDE, 0xAD,
0x00, 0x00, 0x01, 0x02, 0x01, 0x11, 0x22,
],
};
let mut s = StreamTranslatorState {
detected_codec: Some(VideoCodec::H265),
..Default::default()
};
let pts = handle_pframe(&pframe, &tx, None, &last_frame, &mut s).expect("Some");
let frame = rx.try_recv().expect("frame broadcast");
match frame {
Frame::Video {
codec,
nals,
keyframe,
pts_90khz,
..
} => {
assert_eq!(codec, VideoCodec::H265);
assert!(!keyframe);
assert_eq!(pts_90khz, pts);
assert_eq!(nals.len(), 1, "only standard slice should remain");
assert_eq!(nals[0][0], 0x02);
}
Frame::Audio { .. } => panic!("expected video frame"),
}
}
#[test]
fn handle_pframe_returns_none_when_only_nonstandard_nals() {
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let pframe = bairelay_neolink_core::bcmedia::model::BcMediaPframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H265,
microseconds: 0,
data: vec![0x00, 0x00, 0x01, 0x7C, 0x01, 0xAB, 0xCD],
};
let mut s = StreamTranslatorState {
detected_codec: Some(VideoCodec::H265),
..Default::default()
};
let result = handle_pframe(&pframe, &tx, None, &last_frame, &mut s);
assert_eq!(result, None);
assert!(rx.try_recv().is_err(), "no frame should have broadcast");
}
#[test]
fn handle_iframe_short_sps_populates_zero_profile_level_id() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let iframe = bairelay_neolink_core::bcmedia::model::BcMediaIframe {
video_type: bairelay_neolink_core::bcmedia::model::VideoType::H264,
microseconds: 0,
data: vec![
0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x00, 0x01, 0x68, 0xce, 0x00, 0x00, 0x01, 0x65, 0xaa, ],
time: None,
};
let mut s = StreamTranslatorState::default();
handle_iframe(&iframe, &tx, None, &last_frame, &sdp_params, &mut s).unwrap();
let guard = sdp_params.read().unwrap();
let v = guard.video.as_ref().expect("video populated");
assert_eq!(v.profile_level_id, [0u8; 3]);
}
#[test]
fn extract_iframe_parts_h265_skips_empty_nal_and_non_parameter() {
let empty: &[u8] = &[];
let trail = [0x02u8, 0x01, 0x00]; let vps = [0x40u8, 0x01, 0xaa];
let idr = [0x26u8, 0x01, 0xbb]; let (params, iframes, _, _, out_vps) =
extract_iframe_parts(VideoCodec::H265, &[empty, &trail, &vps, &idr]);
assert_eq!(params.len(), 1);
assert_eq!(iframes.len(), 1);
assert!(out_vps.is_some());
}
#[test]
fn handle_aac_warns_on_unsupported_channel_config() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x44, 0xC0, 0x02, 0x00, 0xFC];
data.extend_from_slice(&[0xAA; 9]);
let packet = BcMedia::Aac(BcMediaAac { data });
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
}
#[test]
fn handle_aac_warns_on_unsupported_sample_rate_but_still_drops_body() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x74, 0x40, 0x02, 0x80, 0xFC];
data.extend_from_slice(&[0xAA; 5]);
let packet = BcMedia::Aac(BcMediaAac { data });
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
}
#[test]
fn handle_aac_drops_when_frame_length_below_adts_header() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAac;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let mut data = vec![0xFF, 0xF9, 0x4c, 0x40, 0x00, 0xA0, 0xFC];
data.extend_from_slice(&[0xAA; 10]);
let packet = BcMedia::Aac(BcMediaAac { data });
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(rx.try_recv().is_err());
}
#[test]
fn process_stream_result_ok_video_updates_live_markers() {
use bairelay_neolink_core::bcmedia::model::{BcMediaIframe, VideoType};
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let translator_state = Arc::new(Mutex::new(StreamTranslatorState::default()));
let gap_state = Arc::new(Mutex::new(GapState::Bridging)); let last_live_frame_at = Arc::new(Mutex::new(tokio::time::Instant::now()));
let last_emitted_pts: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
let last_emit_wall: Arc<Mutex<Option<tokio::time::Instant>>> = Arc::new(Mutex::new(None));
let mut dumper: Option<FrameDumper> = None;
let mut dumper_init_failed = false;
let cancel = CancellationToken::new();
let packet = BcMedia::Iframe(BcMediaIframe {
video_type: VideoType::H264,
microseconds: 1_000_000,
time: None,
data: vec![
0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, 0xff, 0x00, 0x00, 0x01, 0x68, 0xce, 0x38, 0x80, 0x00, 0x00, 0x01, 0x65, 0xaa, 0xbb, 0xcc, ],
});
let result: Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> = Ok(Ok(packet));
let keep_going = process_stream_result(
result,
"cam1",
RtspStreamKind::Main,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&translator_state,
&gap_state,
&last_live_frame_at,
&last_emitted_pts,
&last_emit_wall,
None,
&mut dumper,
&mut dumper_init_failed,
&cancel,
);
assert!(keep_going);
assert!(rx.try_recv().is_ok());
assert_eq!(*gap_state.lock().unwrap(), GapState::Live);
assert_eq!(*last_emitted_pts.lock().unwrap(), Some(90_000));
}
#[test]
fn process_stream_result_ok_audio_does_not_update_live_markers() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAdpcm;
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let translator_state = Arc::new(Mutex::new(StreamTranslatorState::default()));
let gap_state = Arc::new(Mutex::new(GapState::Bridging));
let last_live_frame_at = Arc::new(Mutex::new(tokio::time::Instant::now()));
let last_emitted_pts: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(Some(12345)));
let last_emit_wall: Arc<Mutex<Option<tokio::time::Instant>>> =
Arc::new(Mutex::new(Some(tokio::time::Instant::now())));
let mut dumper: Option<FrameDumper> = None;
let mut dumper_init_failed = false;
let cancel = CancellationToken::new();
let packet = BcMedia::Adpcm(BcMediaAdpcm { data: vec![] });
let result: Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> = Ok(Ok(packet));
let keep = process_stream_result(
result,
"cam1",
RtspStreamKind::Main,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&translator_state,
&gap_state,
&last_live_frame_at,
&last_emitted_pts,
&last_emit_wall,
None,
&mut dumper,
&mut dumper_init_failed,
&cancel,
);
assert!(keep);
assert_eq!(*gap_state.lock().unwrap(), GapState::Bridging);
assert_eq!(*last_emitted_pts.lock().unwrap(), Some(12345));
}
#[test]
fn process_stream_result_decode_error_continues() {
use bairelay_neolink_core::Error;
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let translator_state = Arc::new(Mutex::new(StreamTranslatorState::default()));
let gap_state = Arc::new(Mutex::new(GapState::Live));
let last_live_frame_at = Arc::new(Mutex::new(tokio::time::Instant::now()));
let last_emitted_pts: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
let last_emit_wall: Arc<Mutex<Option<tokio::time::Instant>>> = Arc::new(Mutex::new(None));
let mut dumper: Option<FrameDumper> = None;
let mut dumper_init_failed = false;
let cancel = CancellationToken::new();
let result: Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> = Ok(Err(Error::Other("decode fail")));
let keep = process_stream_result(
result,
"cam1",
RtspStreamKind::Main,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&translator_state,
&gap_state,
&last_live_frame_at,
&last_emitted_pts,
&last_emit_wall,
None,
&mut dumper,
&mut dumper_init_failed,
&cancel,
);
assert!(keep, "decode errors must not terminate the loop");
}
#[test]
fn process_stream_result_outer_error_breaks_loop() {
use bairelay_neolink_core::Error;
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let translator_state = Arc::new(Mutex::new(StreamTranslatorState::default()));
let gap_state = Arc::new(Mutex::new(GapState::Live));
let last_live_frame_at = Arc::new(Mutex::new(tokio::time::Instant::now()));
let last_emitted_pts: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
let last_emit_wall: Arc<Mutex<Option<tokio::time::Instant>>> = Arc::new(Mutex::new(None));
let mut dumper: Option<FrameDumper> = None;
let mut dumper_init_failed = false;
let cancel = CancellationToken::new();
let result: Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> = Err(Error::StreamFinished);
let keep = process_stream_result(
result,
"cam1",
RtspStreamKind::Main,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&translator_state,
&gap_state,
&last_live_frame_at,
&last_emitted_pts,
&last_emit_wall,
None,
&mut dumper,
&mut dumper_init_failed,
&cancel,
);
assert!(!keep, "outer error must terminate the loop");
}
#[test]
fn process_stream_result_outer_error_on_cancel_is_quiet() {
use bairelay_neolink_core::Error;
let (tx, _rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown));
let translator_state = Arc::new(Mutex::new(StreamTranslatorState::default()));
let gap_state = Arc::new(Mutex::new(GapState::Live));
let last_live_frame_at = Arc::new(Mutex::new(tokio::time::Instant::now()));
let last_emitted_pts: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
let last_emit_wall: Arc<Mutex<Option<tokio::time::Instant>>> = Arc::new(Mutex::new(None));
let mut dumper: Option<FrameDumper> = None;
let mut dumper_init_failed = false;
let cancel = CancellationToken::new();
cancel.cancel();
let result: Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> = Err(Error::StreamFinished);
let keep = process_stream_result(
result,
"cam1",
RtspStreamKind::Main,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&translator_state,
&gap_state,
&last_live_frame_at,
&last_emitted_pts,
&last_emit_wall,
None,
&mut dumper,
&mut dumper_init_failed,
&cancel,
);
assert!(!keep);
}
#[test]
fn handle_adpcm_drops_on_short_block_after_decimation() {
use crate::audio_presence::AudioPresence;
use bairelay_neolink_core::bcmedia::model::BcMediaAdpcm;
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let last_frame = Arc::new(LastFrameBuffer::new());
let sdp_params = Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
}));
let presence = Arc::new(RwLock::new(AudioPresence::Unknown));
let mut state = StreamTranslatorState::default();
let packet = BcMedia::Adpcm(BcMediaAdpcm {
data: vec![0x00, 0x01, 0x00, 0x00],
});
apply_bcmedia_packet(
&packet,
&tx,
None,
None,
&last_frame,
&sdp_params,
&presence,
&mut state,
&live_gap_state(),
);
assert!(rx.try_recv().is_err());
}
struct ScriptedSource {
queue: std::collections::VecDeque<
Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
>,
>,
}
#[async_trait::async_trait]
impl PacketSource for ScriptedSource {
async fn get_data(
&mut self,
) -> Result<
std::result::Result<BcMedia, bairelay_neolink_core::Error>,
bairelay_neolink_core::Error,
> {
match self.queue.pop_front() {
Some(r) => r,
None => std::future::pending().await,
}
}
}
fn translator_args(
tx: broadcast::Sender<Frame>,
last_frame: Arc<LastFrameBuffer>,
gap_threshold: Duration,
cancel: CancellationToken,
) -> TranslatorLoopArgs {
TranslatorLoopArgs {
camera_name: "cam1".to_string(),
rtsp_kind: RtspStreamKind::Main,
core_kind: CoreStreamKind::Main,
tx,
audio_pace_tx: None,
video_pace_tx: None,
last_frame,
sdp_params: Arc::new(RwLock::new(SdpParams {
server_ip: "0.0.0.0".to_string(),
session_id: "0".to_string(),
session_name: "unit".to_string(),
video: None,
audio: None,
})),
cancel,
bcmedia_dump: None,
audio_presence: Arc::new(RwLock::new(crate::audio_presence::AudioPresence::Unknown)),
translator_state: Arc::new(Mutex::new(StreamTranslatorState::default())),
gap_threshold,
gap_state: Arc::new(Mutex::new(GapState::Live)),
last_live_frame_at: Arc::new(Mutex::new(tokio::time::Instant::now())),
last_emitted_pts_90khz: Arc::new(Mutex::new(None)),
last_emit_wallclock_at: Arc::new(Mutex::new(None)),
}
}
#[tokio::test]
async fn drive_translator_loop_exits_on_cancel() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let cancel = CancellationToken::new();
let args = translator_args(
tx,
Arc::new(LastFrameBuffer::new()),
Duration::from_secs(5),
cancel.clone(),
);
let mut source = ScriptedSource {
queue: std::collections::VecDeque::new(),
};
cancel.cancel();
tokio::time::timeout(
Duration::from_millis(500),
drive_translator_loop(args, &mut source),
)
.await
.expect("cancel must exit loop");
}
#[tokio::test]
async fn drive_translator_loop_exits_on_outer_error() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let cancel = CancellationToken::new();
let args = translator_args(
tx,
Arc::new(LastFrameBuffer::new()),
Duration::from_secs(5),
cancel,
);
let mut queue = std::collections::VecDeque::new();
queue.push_back(Err(bairelay_neolink_core::Error::StreamFinished));
let mut source = ScriptedSource { queue };
tokio::time::timeout(
Duration::from_millis(500),
drive_translator_loop(args, &mut source),
)
.await
.expect("outer error must exit loop");
}
#[tokio::test]
async fn drive_translator_loop_continues_on_inner_error_then_cancel() {
let (tx, _rx) = broadcast::channel::<Frame>(4);
let cancel = CancellationToken::new();
let args = translator_args(
tx,
Arc::new(LastFrameBuffer::new()),
Duration::from_secs(5),
cancel.clone(),
);
let mut queue = std::collections::VecDeque::new();
queue.push_back(Ok(Err(bairelay_neolink_core::Error::Other("decode"))));
let mut source = ScriptedSource { queue };
let cancel_cp = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
cancel_cp.cancel();
});
tokio::time::timeout(
Duration::from_millis(500),
drive_translator_loop(args, &mut source),
)
.await
.expect("cancel must exit loop after inner error");
}
#[tokio::test]
async fn start_with_packet_source_end_to_end() {
use bairelay_neolink_core::bcmedia::model::{BcMediaIframe, VideoType};
let last_frame = Arc::new(LastFrameBuffer::new());
let packet = BcMedia::Iframe(BcMediaIframe {
video_type: VideoType::H264,
microseconds: 1_000_000,
time: None,
data: vec![
0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, 0xff, 0x00, 0x00, 0x01, 0x68, 0xce, 0x38,
0x80, 0x00, 0x00, 0x01, 0x65, 0xaa, 0xbb, 0xcc,
],
});
let mut queue = std::collections::VecDeque::new();
queue.push_back(Ok(Ok(packet)));
let source = ScriptedSource { queue };
let src = StreamSource::start_with_packet_source(
"cam-x".to_string(),
RtspStreamKind::Main,
Arc::clone(&last_frame),
Duration::from_secs(5),
source,
);
let mut rx = src.subscribe_for_test();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(matches!(rx.try_recv(), Ok(Frame::Video { .. })));
assert!(last_frame.has_video());
let sdp = src.sdp_params();
assert!(sdp.video.is_some());
}
#[tokio::test]
async fn drive_translator_loop_forwards_video_frame_and_updates_markers() {
use bairelay_neolink_core::bcmedia::model::{BcMediaIframe, VideoType};
let (tx, mut rx) = broadcast::channel::<Frame>(4);
let cancel = CancellationToken::new();
let args = translator_args(
tx.clone(),
Arc::new(LastFrameBuffer::new()),
Duration::from_secs(5),
cancel.clone(),
);
let gap_state = Arc::clone(&args.gap_state);
let last_emitted_pts = Arc::clone(&args.last_emitted_pts_90khz);
*gap_state.lock().unwrap() = GapState::Bridging;
let packet = BcMedia::Iframe(BcMediaIframe {
video_type: VideoType::H264,
microseconds: 1_000_000,
time: None,
data: vec![
0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, 0xff, 0x00, 0x00, 0x01, 0x68, 0xce, 0x38,
0x80, 0x00, 0x00, 0x01, 0x65, 0xaa, 0xbb, 0xcc,
],
});
let mut queue = std::collections::VecDeque::new();
queue.push_back(Ok(Ok(packet)));
let mut source = ScriptedSource { queue };
let cancel_cp = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
cancel_cp.cancel();
});
tokio::time::timeout(
Duration::from_millis(500),
drive_translator_loop(args, &mut source),
)
.await
.expect("loop must exit on cancel after frame");
assert!(matches!(rx.try_recv(), Ok(Frame::Video { .. })));
assert_eq!(*gap_state.lock().unwrap(), GapState::Live);
assert_eq!(*last_emitted_pts.lock().unwrap(), Some(90_000));
}
fn dummy_video_frame() -> Frame {
Frame::Video {
codec: bairelay_rtsp::codec::VideoCodec::H264,
nals: vec![],
pts_90khz: 0,
keyframe: false,
access_unit_end: true,
}
}
fn dummy_audio_frame() -> Frame {
Frame::Audio {
payload: bairelay_rtsp::provider::AudioPayload::G711Ulaw {
samples: bytes::Bytes::new(),
},
pts: 0,
}
}
#[tokio::test]
async fn dispatch_paced_video_full_queue_logs_and_drops() {
let (pace_tx, _pace_rx) = mpsc::channel::<PacedFrame>(1);
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Frame>(1);
pace_tx
.try_send(PacedFrame {
frame: dummy_video_frame(),
duration: Duration::ZERO,
})
.unwrap();
dispatch_paced_video(
Some(&pace_tx),
&broadcast_tx,
dummy_video_frame(),
Duration::ZERO,
);
}
#[tokio::test]
async fn dispatch_paced_video_closed_queue_drops_silently() {
let (pace_tx, pace_rx) = mpsc::channel::<PacedFrame>(1);
drop(pace_rx);
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Frame>(1);
dispatch_paced_video(
Some(&pace_tx),
&broadcast_tx,
dummy_video_frame(),
Duration::ZERO,
);
}
#[tokio::test]
async fn dispatch_paced_video_no_pacer_falls_through_to_broadcast() {
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<Frame>(4);
dispatch_paced_video(None, &broadcast_tx, dummy_video_frame(), Duration::ZERO);
assert!(matches!(broadcast_rx.try_recv(), Ok(Frame::Video { .. })));
}
#[tokio::test]
async fn dispatch_paced_audio_full_queue_logs_and_drops() {
let (pace_tx, _pace_rx) = mpsc::channel::<PacedFrame>(1);
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Frame>(1);
pace_tx
.try_send(PacedFrame {
frame: dummy_audio_frame(),
duration: Duration::ZERO,
})
.unwrap();
dispatch_paced_audio(
Some(&pace_tx),
&broadcast_tx,
dummy_audio_frame(),
Duration::ZERO,
);
}
#[tokio::test]
async fn dispatch_paced_audio_closed_queue_drops_silently() {
let (pace_tx, pace_rx) = mpsc::channel::<PacedFrame>(1);
drop(pace_rx);
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Frame>(1);
dispatch_paced_audio(
Some(&pace_tx),
&broadcast_tx,
dummy_audio_frame(),
Duration::ZERO,
);
}
#[tokio::test]
async fn dispatch_paced_audio_no_pacer_falls_through_to_broadcast() {
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<Frame>(4);
dispatch_paced_audio(None, &broadcast_tx, dummy_audio_frame(), Duration::ZERO);
assert!(matches!(broadcast_rx.try_recv(), Ok(Frame::Audio { .. })));
}
}