use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::camera::CameraHandle;
const PER_CAMERA_TIMEOUT: Duration = Duration::from_secs(30);
const FIRST_FRAME_TIMEOUT: Duration = Duration::from_secs(15);
const SNAPSHOT_TIMEOUT: Duration = Duration::from_secs(10);
const FRAME_POLL_INTERVAL: Duration = Duration::from_millis(200);
const AUDIO_OBSERVE_WINDOW: Duration = Duration::from_secs(2);
pub async fn warm_last_frame_buffers(
cameras: &Arc<HashMap<String, Arc<CameraHandle>>>,
cancel: CancellationToken,
) {
if cameras.is_empty() {
return;
}
tracing::info!(cameras = cameras.len(), "Starting startup wake cycle");
let mut set = tokio::task::JoinSet::new();
for (name, handle) in cameras.iter() {
let handle = Arc::clone(handle);
let name = name.clone();
let cancel_task = cancel.clone();
set.spawn(async move {
let _guard = handle.wake_lock().acquire();
tokio::select! {
_ = cancel_task.cancelled() => {
tracing::debug!(camera = %name, "startup wake cancelled");
}
result = tokio::time::timeout(
PER_CAMERA_TIMEOUT,
warm_one(&handle, &name, cancel_task.clone()),
) => match result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(camera = %name, error = %e, "startup wake failed");
}
Err(_) => {
tracing::warn!(
camera = %name,
timeout_s = PER_CAMERA_TIMEOUT.as_secs(),
"startup wake timed out"
);
}
},
}
});
}
while set.join_next().await.is_some() {}
tracing::info!("Startup wake cycle complete");
}
async fn warm_one(
handle: &CameraHandle,
name: &str,
cancel: CancellationToken,
) -> anyhow::Result<()> {
use bairelay_rtsp::url::StreamKind;
tracing::info!(camera = %name, "Warming last-frame buffer");
let source = tokio::select! {
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
r = handle.stream_source(StreamKind::Main) => {
r.map_err(|e| anyhow::anyhow!("stream_source failed: {e}"))?
}
};
let last_frame = source.last_frame();
let start = std::time::Instant::now();
while !last_frame.has_video() {
if start.elapsed() > FIRST_FRAME_TIMEOUT {
anyhow::bail!("no video in {}s", FIRST_FRAME_TIMEOUT.as_secs());
}
tokio::select! {
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
_ = tokio::time::sleep(FRAME_POLL_INTERVAL) => {}
}
}
let sdp_handle = source.sdp_params_handle();
let observed = tokio::select! {
_ = cancel.cancelled() => None,
c = observe_audio_presence(&sdp_handle, AUDIO_OBSERVE_WINDOW) => c,
};
let new_presence = match observed {
Some(c) => crate::audio_presence::AudioPresence::Present { codec: c },
None => crate::audio_presence::AudioPresence::Absent,
};
*handle
.audio_presence()
.write()
.expect("presence lock poisoned") = new_presence;
tracing::info!(
camera = %name,
presence = ?new_presence,
"audio presence observed at startup"
);
if let Some(camera) = handle.bc_camera() {
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
_ = capture_snapshot_into_buffer(&camera, name, &last_frame) => {}
}
} else {
tracing::warn!(camera = %name, "bc_camera unavailable for snapshot");
}
drop(source);
Ok(())
}
pub(crate) async fn capture_snapshot_into_buffer(
camera: &std::sync::Arc<dyn bairelay_neolink_core::bc_protocol::CameraDriver>,
name: &str,
last_frame: &bairelay_rtsp::buffer::LastFrameBuffer,
) {
match tokio::time::timeout(SNAPSHOT_TIMEOUT, camera.get_snapshot()).await {
Ok(Ok(bytes)) => {
last_frame.set_jpeg(bytes::Bytes::from(bytes));
tracing::info!(camera = %name, "Captured startup JPEG snapshot");
}
Ok(Err(e)) => {
tracing::warn!(camera = %name, error = %e, "snapshot request failed");
}
Err(_) => {
tracing::warn!(
camera = %name,
timeout_s = SNAPSHOT_TIMEOUT.as_secs(),
"snapshot request timed out"
);
}
}
}
pub(crate) async fn observe_audio_presence(
sdp: &std::sync::Arc<std::sync::RwLock<bairelay_rtsp::sdp::SdpParams>>,
deadline: std::time::Duration,
) -> Option<bairelay_rtsp::codec::AudioCodec> {
let start = std::time::Instant::now();
loop {
if let Some(a) = sdp.read().expect("sdp lock poisoned").audio.as_ref() {
return Some(a.codec);
}
if start.elapsed() > deadline {
return None;
}
tokio::time::sleep(crate::stream_source::SDP_POLL_INTERVAL).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use bairelay_rtsp::codec::AudioCodec;
use bairelay_rtsp::sdp::{AudioParams, SdpParams};
use std::sync::{Arc, RwLock};
use std::time::Duration;
fn empty_sdp() -> Arc<RwLock<SdpParams>> {
Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: None,
audio: None,
}))
}
#[tokio::test(flavor = "current_thread")]
async fn observe_audio_returns_codec_when_arrives() {
let sdp = empty_sdp();
let sdp2 = Arc::clone(&sdp);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
sdp2.write().unwrap().audio = Some(AudioParams {
codec: AudioCodec::Aac,
payload_type: 97,
sample_rate: 16_000,
channels: 1,
asc_hex: Some("1408".into()),
});
});
let r = observe_audio_presence(&sdp, Duration::from_secs(1)).await;
assert_eq!(r, Some(AudioCodec::Aac));
}
#[tokio::test(flavor = "current_thread")]
async fn observe_audio_returns_none_on_timeout() {
let sdp = empty_sdp();
let r = observe_audio_presence(&sdp, Duration::from_millis(200)).await;
assert_eq!(r, None);
}
#[tokio::test]
async fn capture_snapshot_into_buffer_warms_last_frame() {
use bairelay_neolink_core::bc_protocol::{CameraDriver, FakeCameraBuilder};
use bairelay_rtsp::buffer::LastFrameBuffer;
let jpeg_bytes: Vec<u8> = vec![0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0xAA, 0xBB];
let expected = jpeg_bytes.clone();
let fake = FakeCameraBuilder::new()
.with_snapshot(move || Ok(jpeg_bytes.clone()))
.build();
let driver: Arc<dyn CameraDriver> = fake;
let lfb = LastFrameBuffer::new();
capture_snapshot_into_buffer(&driver, "cam1", &lfb).await;
let got = lfb
.jpeg()
.expect("buffer should be populated after snapshot");
assert_eq!(got.as_ref(), expected.as_slice());
}
#[tokio::test]
async fn capture_snapshot_into_buffer_leaves_buffer_empty_on_error() {
use bairelay_neolink_core::bc_protocol::{CameraDriver, FakeCameraBuilder};
use bairelay_rtsp::buffer::LastFrameBuffer;
let fake = FakeCameraBuilder::new()
.with_snapshot(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"camera declined snapshot",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let lfb = LastFrameBuffer::new();
capture_snapshot_into_buffer(&driver, "cam1", &lfb).await;
assert!(
lfb.jpeg().is_none(),
"snapshot error must not populate last_frame_buffer"
);
}
#[tokio::test]
async fn warm_last_frame_buffers_empty_map_returns_instantly() {
use std::collections::HashMap;
let cameras: Arc<HashMap<String, Arc<crate::camera::CameraHandle>>> =
Arc::new(HashMap::new());
let cancel = CancellationToken::new();
tokio::time::timeout(
Duration::from_millis(200),
warm_last_frame_buffers(&cameras, cancel),
)
.await
.expect("warm on empty map should return well under 200ms");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn warm_last_frame_buffers_propagates_cancel() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
use std::collections::HashMap;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-nc"),
cancel.clone(),
None,
));
let mut map = HashMap::new();
map.insert("cam-nc".to_string(), handle);
let cameras = Arc::new(map);
let cancel_task = cancel.clone();
let canceller = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
cancel_task.cancel();
});
warm_last_frame_buffers(&cameras, cancel).await;
let _ = canceller.await;
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn warm_last_frame_buffers_times_out_per_camera() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
use std::collections::HashMap;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-to"),
cancel.clone(),
None,
));
let mut map = HashMap::new();
map.insert("cam-to".to_string(), handle);
let cameras = Arc::new(map);
warm_last_frame_buffers(&cameras, cancel).await;
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn warm_last_frame_buffers_completes_with_inert_source() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
use crate::stream_source::StreamSource;
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
use bairelay_rtsp::url::StreamKind;
use std::collections::HashMap;
use std::sync::Arc as StdArc;
let jpeg: Vec<u8> = vec![0xFF, 0xD8, 0xFF, 0xE0, 0xAA, 0xBB];
let jpeg_clone = jpeg.clone();
let fake = FakeCameraBuilder::new()
.with_snapshot(move || Ok(jpeg_clone.clone()))
.build();
let _: StdArc<dyn bairelay_neolink_core::bc_protocol::CameraDriver> = fake.clone();
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-inert"),
cancel.clone(),
None,
));
handle
.insert_stream_source_for_test(StreamKind::Main, StreamSource::start_inert_for_test());
handle.set_driver_for_test(fake);
let mut map = HashMap::new();
map.insert("cam-inert".to_string(), handle);
let cameras = Arc::new(map);
warm_last_frame_buffers(&cameras, cancel).await;
}
#[tokio::test]
async fn warm_last_frame_buffers_exercises_full_warm_one_happy_path() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
use crate::stream_source::StreamSource;
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
use bairelay_rtsp::buffer::VideoBurst;
use bairelay_rtsp::codec::VideoCodec;
use bairelay_rtsp::url::StreamKind;
use std::collections::HashMap;
use std::time::Instant as StdInstant;
let jpeg_bytes: Vec<u8> = vec![0xFF, 0xD8, 0xFF, 0xE0, 0x12, 0x34];
let expected_jpeg = jpeg_bytes.clone();
let fake = FakeCameraBuilder::new()
.with_snapshot(move || Ok(jpeg_bytes.clone()))
.build();
let (source, last_frame) =
StreamSource::start_inert_for_test_with_gap_and_last_frame(Duration::from_secs(1));
last_frame.replace_video(VideoBurst {
codec: VideoCodec::H264,
parameter_sets: vec![vec![0x67, 0x42, 0x00, 0x1f]],
iframe_nals: vec![vec![0x65, 0xaa]],
pframe_nals: vec![],
captured_at: StdInstant::now(),
captured_pts_90khz: 0,
});
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-happy"),
cancel.clone(),
None,
));
handle.insert_stream_source_for_test(StreamKind::Main, Arc::clone(&source));
handle.set_driver_for_test(fake);
let mut map = HashMap::new();
map.insert("cam-happy".to_string(), Arc::clone(&handle));
let cameras = Arc::new(map);
warm_last_frame_buffers(&cameras, cancel).await;
let got = source
.last_frame()
.jpeg()
.expect("source buffer populated with snapshot");
assert_eq!(got.as_ref(), expected_jpeg.as_slice());
let presence = *handle.audio_presence().read().unwrap();
assert_eq!(presence, crate::audio_presence::AudioPresence::Absent);
}
#[tokio::test(flavor = "current_thread")]
async fn observe_audio_short_circuits_when_sdp_already_has_audio() {
let sdp = Arc::new(RwLock::new(SdpParams {
server_ip: "0".into(),
session_id: "0".into(),
session_name: "u".into(),
video: None,
audio: Some(AudioParams {
codec: AudioCodec::Aac,
payload_type: 97,
sample_rate: 16_000,
channels: 1,
asc_hex: None,
}),
}));
let r = observe_audio_presence(&sdp, Duration::from_secs(5)).await;
assert_eq!(r, Some(AudioCodec::Aac));
}
#[tokio::test]
async fn capture_snapshot_error_does_not_clobber_prior_frame() {
use bairelay_neolink_core::bc_protocol::{CameraDriver, FakeCameraBuilder};
use bairelay_rtsp::buffer::LastFrameBuffer;
use bytes::Bytes;
let good: Vec<u8> = vec![0xFF, 0xD8, 0xFF, 0xE0, 0xCA, 0xFE];
let lfb = LastFrameBuffer::new();
lfb.set_jpeg(Bytes::from(good.clone()));
let fake = FakeCameraBuilder::new()
.with_snapshot(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"snapshot declined",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
capture_snapshot_into_buffer(&driver, "cam1", &lfb).await;
let after = lfb.jpeg().expect("prior frame must still be present");
assert_eq!(
after.as_ref(),
good.as_slice(),
"snapshot error must not replace cached frame bytes"
);
}
}