use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::{watch, Notify};
use tokio_util::sync::CancellationToken;
use bairelay_neolink_core::bc_protocol::{BcCamera, CameraDriver};
use bairelay_rtsp::buffer::LastFrameBuffer;
use bairelay_rtsp::provider::StreamError;
use bairelay_rtsp::url::StreamKind as RtspStreamKind;
use crate::audio_presence::AudioPresence;
use crate::bcmedia_dump::BcMediaDumpConfig;
use crate::camera_tasks;
use crate::capabilities::CameraCapabilities;
use crate::config::CameraConfig;
use crate::grace_period::GracePeriod;
use crate::preview_state::PreviewState;
use crate::status_cache::StatusCache;
use crate::stream_source::{MutexPoisonRecover as _, RwLockPoisonRecover as _, StreamSource};
use crate::wake_lock::WakeLockCounter;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CameraState {
Disconnected,
Connecting,
Connected,
}
impl CameraState {
pub fn is_disconnected(&self) -> bool {
matches!(self, CameraState::Disconnected)
}
pub fn is_connecting(&self) -> bool {
matches!(self, CameraState::Connecting)
}
pub fn is_connected(&self) -> bool {
matches!(self, CameraState::Connected)
}
}
fn aggregate_preview_state(
current: PreviewState,
any_bridging: bool,
last_own_write: Option<PreviewState>,
) -> PreviewState {
match (current, any_bridging, last_own_write) {
(PreviewState::Sleeping, _, _) => PreviewState::Sleeping,
(PreviewState::Live, true, _) => PreviewState::Connecting,
(PreviewState::Live, false, _) => PreviewState::Live,
(PreviewState::Connecting, false, Some(PreviewState::Connecting)) => PreviewState::Live,
(PreviewState::Connecting, _, _) => PreviewState::Connecting,
}
}
fn post_keepalive_preview_state(idle_disconnect: bool, wake_lock_idle: bool) -> PreviewState {
if idle_disconnect && wake_lock_idle {
PreviewState::Sleeping
} else {
PreviewState::Connecting
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum KeepaliveTickOutcome {
Ok,
Failed,
}
pub(crate) fn classify_keepalive_tick<T, E>(
outcome: std::result::Result<std::result::Result<T, E>, tokio::time::error::Elapsed>,
) -> KeepaliveTickOutcome {
match outcome {
Ok(Ok(_)) => KeepaliveTickOutcome::Ok,
Ok(Err(_)) => KeepaliveTickOutcome::Failed,
Err(_) => KeepaliveTickOutcome::Failed,
}
}
pub(crate) fn advance_keepalive_counter(
consecutive_failures: u32,
outcome: KeepaliveTickOutcome,
max_failures: u32,
) -> (u32, bool) {
match outcome {
KeepaliveTickOutcome::Ok => (0, false),
KeepaliveTickOutcome::Failed => {
let next = consecutive_failures.saturating_add(1);
(next, next >= max_failures)
}
}
}
pub struct CameraHandle {
config: CameraConfig,
cancel: CancellationToken,
wake_lock: WakeLockCounter,
state: Arc<RwLock<CameraState>>,
bc_camera: std::sync::RwLock<Option<Arc<dyn CameraDriver>>>,
bc_camera_concrete: std::sync::RwLock<Option<Arc<BcCamera>>>,
mqtt_client: Option<bairelay_mqtt::SharedMqttClient>,
topic_prefix: String,
disconnect_signal: Arc<Notify>,
stream_sources: std::sync::RwLock<HashMap<RtspStreamKind, Arc<StreamSource>>>,
stream_source_create_lock: tokio::sync::Mutex<()>,
last_frame_main: Arc<LastFrameBuffer>,
bcmedia_dump: Option<Arc<BcMediaDumpConfig>>,
audio_presence: Arc<RwLock<AudioPresence>>,
capabilities: std::sync::RwLock<Option<CameraCapabilities>>,
preset_cache: std::sync::RwLock<Vec<(u8, String)>>,
discovery_publisher: Option<bairelay_mqtt::DiscoveryPublisher>,
preview_state_tx: Arc<watch::Sender<PreviewState>>,
preview_state_rx: watch::Receiver<PreviewState>,
status_cache: Arc<StatusCache>,
prune_grace: Duration,
}
impl CameraHandle {
pub fn new(
config: CameraConfig,
parent_cancel: CancellationToken,
mqtt_client: Option<bairelay_mqtt::SharedMqttClient>,
) -> Self {
Self::with_bcmedia_dump(config, parent_cancel, mqtt_client, None)
}
pub fn with_bcmedia_dump(
config: CameraConfig,
parent_cancel: CancellationToken,
mqtt_client: Option<bairelay_mqtt::SharedMqttClient>,
bcmedia_dump: Option<Arc<BcMediaDumpConfig>>,
) -> Self {
Self::with_bcmedia_dump_and_prefix(
config,
parent_cancel,
mqtt_client,
"bairelay".to_string(),
bcmedia_dump,
)
}
pub fn with_bcmedia_dump_and_prefix(
config: CameraConfig,
parent_cancel: CancellationToken,
mqtt_client: Option<bairelay_mqtt::SharedMqttClient>,
topic_prefix: String,
bcmedia_dump: Option<Arc<BcMediaDumpConfig>>,
) -> Self {
let (preview_state_tx, preview_state_rx) = watch::channel(PreviewState::Sleeping);
let preview_state_tx = Arc::new(preview_state_tx);
Self {
config,
cancel: parent_cancel.child_token(),
wake_lock: WakeLockCounter::new(),
state: Arc::new(RwLock::new(CameraState::Disconnected)),
bc_camera: std::sync::RwLock::new(None),
bc_camera_concrete: std::sync::RwLock::new(None),
mqtt_client,
topic_prefix,
disconnect_signal: Arc::new(Notify::new()),
stream_sources: std::sync::RwLock::new(HashMap::new()),
stream_source_create_lock: tokio::sync::Mutex::new(()),
last_frame_main: Arc::new(LastFrameBuffer::new()),
bcmedia_dump,
audio_presence: Arc::new(RwLock::new(AudioPresence::Unknown)),
capabilities: std::sync::RwLock::new(None),
preset_cache: std::sync::RwLock::new(Vec::new()),
discovery_publisher: None,
preview_state_tx,
preview_state_rx,
status_cache: Arc::new(StatusCache::default()),
prune_grace: Duration::ZERO,
}
}
#[must_use]
pub fn with_prune_grace(mut self, prune_grace: Duration) -> Self {
self.prune_grace = prune_grace;
self
}
#[must_use]
pub fn with_discovery_publisher(
mut self,
publisher: bairelay_mqtt::DiscoveryPublisher,
) -> Self {
self.discovery_publisher = Some(publisher);
self
}
pub fn last_frame_main(&self) -> Arc<LastFrameBuffer> {
Arc::clone(&self.last_frame_main)
}
pub fn audio_presence(&self) -> Arc<RwLock<AudioPresence>> {
Arc::clone(&self.audio_presence)
}
pub fn capabilities(&self) -> Option<CameraCapabilities> {
*self.capabilities.read_recover()
}
pub fn preset_cache(&self) -> Vec<(u8, String)> {
self.preset_cache.read_recover().clone()
}
#[doc(hidden)]
pub fn set_preset_cache_for_test(&self, presets: Vec<(u8, String)>) {
*self.preset_cache.write_recover() = presets;
}
pub fn replace_preset_cache(&self, presets: Vec<(u8, String)>) {
*self.preset_cache.write_recover() = presets;
}
pub fn preset_id_for_name(&self, name: &str) -> Option<u8> {
let cache = self.preset_cache.read_recover();
cache
.iter()
.find(|(_, n)| n.eq_ignore_ascii_case(name))
.map(|(id, _)| *id)
}
pub fn preset_name_for_id(&self, preset_id: u8) -> Option<String> {
let cache = self.preset_cache.read_recover();
cache
.iter()
.find(|(id, _)| *id == preset_id)
.map(|(_, n)| n.clone())
}
pub fn preview_state_rx(&self) -> watch::Receiver<PreviewState> {
self.preview_state_rx.clone()
}
pub(crate) fn set_preview_state(&self, s: PreviewState) {
let _ = self.preview_state_tx.send(s);
}
pub async fn publish_discovery(&self) -> Result<(), bairelay_mqtt::MqttError> {
let Some(publisher) = self.discovery_publisher.as_ref() else {
return Ok(());
};
let Some(caps) = self.capabilities() else {
return Ok(());
};
let flags = bairelay_mqtt::CameraEnableFlags::from(&self.config.mqtt);
let presets = self.preset_cache();
publisher
.publish(
&self.config.name,
self.config.address.as_deref(),
self.config.uid.as_deref(),
caps.into(),
&flags,
&presets,
)
.await
}
#[doc(hidden)]
pub fn set_capabilities_for_test(&self, caps: CameraCapabilities) {
*self.capabilities.write_recover() = Some(caps);
}
pub fn status_cache(&self) -> Arc<StatusCache> {
Arc::clone(&self.status_cache)
}
pub async fn republish_cached_status(&self) -> Result<(), bairelay_mqtt::MqttError> {
let Some(ref mqtt) = self.mqtt_client else {
return Ok(());
};
let publisher =
bairelay_mqtt::StatusPublisher::new(mqtt, &self.topic_prefix, &self.config.name);
if let Some(level) = self.status_cache.battery_level() {
publisher.publish_battery_level(level).await?;
}
if let Some(motion) = self.status_cache.motion() {
publisher.publish_motion(motion).await?;
}
if let Some(on) = self.status_cache.floodlight() {
publisher.publish_floodlight(on).await?;
}
if let Some(enabled) = self.status_cache.floodlight_tasks() {
publisher.publish_floodlight_tasks_enabled(enabled).await?;
}
if let Some(enabled) = self.status_cache.pir() {
publisher.publish_pir(enabled).await?;
}
Ok(())
}
pub async fn unpublish_discovery(&self) -> Result<(), bairelay_mqtt::MqttError> {
let Some(publisher) = self.discovery_publisher.as_ref() else {
return Ok(());
};
let Some(caps) = self.capabilities() else {
return Ok(());
};
let flags = bairelay_mqtt::CameraEnableFlags::from(&self.config.mqtt);
let presets = self.preset_cache();
publisher
.unpublish(
&self.config.name,
self.config.address.as_deref(),
self.config.uid.as_deref(),
caps.into(),
&flags,
&presets,
)
.await
}
pub fn state(&self) -> CameraState {
*self.state.read_recover()
}
pub fn wake_lock(&self) -> &WakeLockCounter {
&self.wake_lock
}
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
pub fn name(&self) -> &str {
&self.config.name
}
pub fn topic_prefix(&self) -> &str {
&self.topic_prefix
}
pub fn config(&self) -> &CameraConfig {
&self.config
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.cancel
}
pub fn bc_camera(&self) -> Option<Arc<dyn CameraDriver>> {
self.bc_camera.read_recover().clone()
}
pub fn request_disconnect(&self) {
self.disconnect_signal.notify_one();
}
#[cfg(test)]
pub(crate) fn disconnect_signal_for_test(&self) -> Arc<Notify> {
Arc::clone(&self.disconnect_signal)
}
fn set_state(&self, new: CameraState) {
*self.state.write_recover() = new;
}
pub async fn stream_source(
&self,
kind: RtspStreamKind,
) -> Result<Arc<StreamSource>, StreamError> {
const POLL_INTERVAL: Duration = Duration::from_millis(100);
const MAX_WAIT: Duration = Duration::from_secs(30);
let deadline = std::time::Instant::now() + MAX_WAIT;
loop {
if self.cancel.is_cancelled() {
return Err(StreamError::Unavailable(
"camera task shutting down".to_string(),
));
}
if self.state() == CameraState::Connected {
break;
}
if std::time::Instant::now() >= deadline {
return Err(StreamError::Unavailable(format!(
"camera '{}' did not reach Connected state within {}s",
self.config.name,
MAX_WAIT.as_secs()
)));
}
tokio::time::sleep(POLL_INTERVAL).await;
}
{
let guard = self.stream_sources.read_recover();
if let Some(source) = guard.get(&kind) {
return Ok(Arc::clone(source));
}
}
let _create_guard = self.stream_source_create_lock.lock().await;
{
let guard = self.stream_sources.read_recover();
if let Some(source) = guard.get(&kind) {
return Ok(Arc::clone(source));
}
}
if self.cancel.is_cancelled() || self.state() != CameraState::Connected {
return Err(StreamError::Unavailable(format!(
"camera '{}' is no longer in Connected state",
self.config.name
)));
}
let camera = self
.bc_camera_concrete
.read_recover()
.clone()
.ok_or_else(|| {
StreamError::Unavailable(format!(
"camera '{}' is not currently connected",
self.config.name
))
})?;
let gap_threshold = if self.config.pause.bridge_gaps {
Duration::from_secs_f64(self.config.pause.gap_threshold_secs)
} else {
Duration::MAX
};
let source = StreamSource::start(
camera,
self.config.name.clone(),
kind,
Arc::clone(&self.last_frame_main),
self.bcmedia_dump.clone(),
self.audio_presence(),
gap_threshold,
);
{
let mut guard = self.stream_sources.write_recover();
guard.insert(kind, Arc::clone(&source));
}
Ok(source)
}
pub(crate) fn prune_idle_stream_sources_at(&self, now: Instant, grace: Duration) {
let mut guard = self.stream_sources.write_recover();
guard.retain(|_kind, source| {
let subs = source.subscribers();
let mut marker = source.last_idle_since.lock_recover();
if subs > 0 {
*marker = None;
return true;
}
match *marker {
None => {
*marker = Some(now);
if grace.is_zero() {
drop(marker);
source.stop();
return false;
}
true
}
Some(since) if now.saturating_duration_since(since) >= grace => {
drop(marker);
source.stop();
false
}
Some(_) => true,
}
});
}
pub(crate) fn any_stream_source_bridging(&self) -> bool {
let guard = self.stream_sources.read_recover();
guard
.values()
.any(|s| s.gap_state() == crate::stream_source::GapState::Bridging)
}
async fn stop_all_stream_sources(&self) {
let drained: Vec<(RtspStreamKind, Arc<StreamSource>)> = {
let mut guard = self.stream_sources.write_recover();
let entries: Vec<_> = guard.drain().collect();
entries
};
for (kind, source) in drained {
if let Err(e) = source.stop_and_wait(Self::STREAM_SOURCE_STOP_TIMEOUT).await {
tracing::warn!(
camera = %self.config.name,
stream = ?kind,
error = %e,
"stream-source reader did not exit within budget; detached (still holds Arc<BcCamera> until it observes cancel)",
);
}
}
}
async fn try_connect(&self) -> anyhow::Result<Arc<BcCamera>> {
let opts = crate::bc_opts::build_bc_opts(&self.config);
let max_enc = crate::bc_opts::max_encryption(&self.config);
let camera = BcCamera::new(&opts).await?;
camera.login_with_maxenc(max_enc).await?;
Ok(Arc::new(camera))
}
async fn keepalive_loop(&self, camera: &dyn CameraDriver, session_cancel: &CancellationToken) {
let mut interval = tokio::time::interval(Self::KEEPALIVE_INTERVAL);
let mut consecutive_failures: u32 = 0;
'outer: loop {
tokio::select! {
_ = self.cancel.cancelled() => break,
_ = session_cancel.cancelled() => {
tracing::info!(camera = %self.config.name, "Session cancelled (grace period or disconnect)");
break;
}
_ = self.disconnect_signal.notified() => {
tracing::info!(camera = %self.config.name, "Watchdog requested disconnect");
break;
}
_ = interval.tick() => {
let raw = tokio::select! {
v = tokio::time::timeout(Self::KEEPALIVE_PROBE_TIMEOUT, camera.keepalive_probe()) => v,
_ = self.cancel.cancelled() => break 'outer,
_ = session_cancel.cancelled() => {
tracing::info!(
camera = %self.config.name,
"Session cancelled mid-probe"
);
break 'outer;
}
_ = self.disconnect_signal.notified() => {
tracing::info!(
camera = %self.config.name,
"Watchdog requested disconnect mid-probe"
);
break 'outer;
}
};
let outcome = classify_keepalive_tick(raw);
let (next, should_break) = advance_keepalive_counter(
consecutive_failures,
outcome,
Self::KEEPALIVE_MAX_FAILURES,
);
consecutive_failures = next;
if outcome == KeepaliveTickOutcome::Failed {
tracing::debug!(
camera = %self.config.name,
failures = consecutive_failures,
"Keepalive probe failed",
);
}
if should_break {
tracing::warn!(
camera = %self.config.name,
"Keepalive failed {} times, disconnecting",
Self::KEEPALIVE_MAX_FAILURES,
);
break;
}
}
}
}
}
pub(crate) const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
pub(crate) const KEEPALIVE_PROBE_TIMEOUT: Duration = Duration::from_secs(3);
const LOGOUT_TIMEOUT: Duration = Duration::from_secs(5);
const STREAM_SOURCE_STOP_TIMEOUT: Duration = Duration::from_secs(7);
pub(crate) const KEEPALIVE_MAX_FAILURES: u32 = 5;
async fn run_connected_session(
self: &Arc<Self>,
driver: Arc<dyn CameraDriver>,
concrete: Option<Arc<BcCamera>>,
) {
self.set_state(CameraState::Connected);
self.set_preview_state(PreviewState::Live);
tracing::info!(camera = %self.config.name, "Connected");
if let Some(ref mqtt) = self.mqtt_client {
let publisher =
bairelay_mqtt::StatusPublisher::new(mqtt, &self.topic_prefix, &self.config.name);
if let Err(e) = publisher.publish_connection(true).await {
tracing::warn!(camera = %self.config.name, error = %e, "Failed to publish connected status");
}
}
*self.bc_camera.write_recover() = Some(Arc::clone(&driver));
*self.bc_camera_concrete.write_recover() = concrete.clone();
match driver.get_support().await {
Ok(support) => {
let has_ptz = crate::capabilities::ptz_mode_indicates_ptz(
support.ptz_mode.as_deref(),
support.ptz_cfg,
);
*self.capabilities.write_recover() = Some(CameraCapabilities { has_ptz });
if has_ptz {
match driver.get_ptz_preset().await {
Ok(p) => {
let presets: Vec<(u8, String)> = p
.preset_list
.preset
.into_iter()
.filter_map(|preset| preset.name.map(|n| (preset.id, n)))
.collect();
*self.preset_cache.write_recover() = presets;
}
Err(e) => {
tracing::debug!(
camera = %self.config.name,
error = %e,
"get_ptz_preset failed; leaving preset cache empty"
);
}
}
}
if let Err(e) = self.publish_discovery().await {
tracing::warn!(
camera = %self.config.name,
error = %e,
"Failed to publish HA discovery payloads post-connect; will retry on next ConnAck"
);
}
}
Err(e) => {
tracing::warn!(
camera = %self.config.name,
error = %e,
"Failed to query camera capabilities via get_support(); leaving cache empty for re-probe on next reconnect"
);
}
}
let session_cancel = CancellationToken::new();
let tasks = self.spawn_session_tasks(&driver, &session_cancel).await;
self.keepalive_loop(&*driver, &session_cancel).await;
self.set_preview_state(post_keepalive_preview_state(
self.config.idle_disconnect,
self.wake_lock.is_idle(),
));
session_cancel.cancel();
self.teardown_session_tasks(tasks, concrete).await;
}
async fn spawn_session_tasks(
self: &Arc<Self>,
driver: &Arc<dyn CameraDriver>,
session_cancel: &CancellationToken,
) -> tokio::task::JoinSet<()> {
let mut tasks = tokio::task::JoinSet::new();
{
let handle = Arc::clone(self);
let cancel = session_cancel.clone();
tasks.spawn(async move {
aggregate_preview_state_loop(handle, cancel).await;
});
}
if let Some(ref mqtt) = self.mqtt_client {
if self.config.mqtt.enable_motion {
tasks.spawn(camera_tasks::motion_listener(
self.config.name.clone(),
Arc::clone(driver),
mqtt.clone(),
self.topic_prefix.clone(),
self.wake_lock.clone(),
session_cancel.clone(),
Duration::from_secs_f64(self.config.motion_wake_hold_secs),
self.status_cache(),
));
}
if self.config.mqtt.enable_battery {
tasks.spawn(camera_tasks::battery_poller(
self.config.name.clone(),
Arc::clone(driver),
mqtt.clone(),
self.topic_prefix.clone(),
self.config.mqtt.battery_update,
session_cancel.clone(),
self.status_cache(),
));
}
if self.config.mqtt.enable_floodlight {
tasks.spawn(camera_tasks::floodlight_poller(
self.config.name.clone(),
Arc::clone(driver),
mqtt.clone(),
self.topic_prefix.clone(),
self.config.mqtt.floodlight_update,
session_cancel.clone(),
self.status_cache(),
));
tasks.spawn(camera_tasks::floodlight_listener(
self.config.name.clone(),
Arc::clone(driver),
mqtt.clone(),
self.topic_prefix.clone(),
session_cancel.clone(),
self.status_cache(),
));
}
if self.config.mqtt.enable_pir {
camera_tasks::publish_pir_state(
self.config.name.clone(),
Arc::clone(driver),
mqtt.clone(),
self.topic_prefix.clone(),
self.status_cache(),
)
.await;
}
}
if self.config.idle_disconnect {
let grace =
crate::config::resolve_idle_disconnect_timeout(&self.config, self.prune_grace);
let gp = GracePeriod::new(self.wake_lock.clone(), grace);
let sc = session_cancel.clone();
let cam_name = self.config.name.clone();
tasks.spawn(async move {
gp.run().await;
tracing::info!(camera = %cam_name, "Grace period expired, disconnecting");
sc.cancel();
});
}
tasks
}
async fn teardown_session_tasks(
self: &Arc<Self>,
mut tasks: tokio::task::JoinSet<()>,
concrete: Option<Arc<BcCamera>>,
) {
let drain_deadline = tokio::time::sleep(Duration::from_secs(2));
tokio::pin!(drain_deadline);
loop {
tokio::select! {
result = tasks.join_next() => {
if result.is_none() { break; } }
_ = &mut drain_deadline => {
tracing::debug!(camera = %self.config.name, "Aborting remaining session tasks");
tasks.abort_all();
while tasks.join_next().await.is_some() {}
break;
}
}
}
self.stop_all_stream_sources().await;
if let Some(ref cam) = concrete {
match tokio::time::timeout(Self::LOGOUT_TIMEOUT, cam.logout()).await {
Ok(Ok(())) => tracing::debug!(camera = %self.config.name, "Logged out"),
Ok(Err(e)) => {
tracing::debug!(camera = %self.config.name, error = %e, "Logout failed")
}
Err(_) => tracing::debug!(camera = %self.config.name, "Logout timed out, dropping"),
}
}
*self.bc_camera.write_recover() = None;
*self.bc_camera_concrete.write_recover() = None;
drop(concrete);
self.set_state(CameraState::Disconnected);
tracing::info!(camera = %self.config.name, "Disconnected");
if let Some(ref mqtt) = self.mqtt_client {
let publisher =
bairelay_mqtt::StatusPublisher::new(mqtt, &self.topic_prefix, &self.config.name);
if let Err(e) = publisher.publish_connection(false).await {
tracing::warn!(camera = %self.config.name, error = %e, "Failed to publish disconnected status");
}
}
}
pub async fn run(self: &Arc<Self>) {
if let Some(ref mqtt) = self.mqtt_client {
let publisher =
bairelay_mqtt::StatusPublisher::new(mqtt, &self.topic_prefix, &self.config.name);
let _ = publisher.publish_connection(false).await;
let _ = publisher.publish_motion_unknown().await;
}
let preview_task = if let Some(ref mqtt) = self.mqtt_client {
if self.config.mqtt.enable_preview {
Some(tokio::spawn(camera_tasks::preview_poller(
self.config.name.clone(),
Arc::clone(self),
Arc::clone(&self.last_frame_main),
mqtt.clone(),
self.topic_prefix.clone(),
self.config.mqtt.preview_update,
self.preview_state_rx(),
self.config.pause.preview_overlay,
self.cancel.clone(),
)))
} else {
None
}
} else {
None
};
let mut backoff = ReconnectBackoff::new(Duration::from_secs(2), Duration::from_secs(60));
loop {
if self.cancel.is_cancelled() {
break;
}
if self.config.idle_disconnect && self.wake_lock.is_idle() {
self.set_preview_state(PreviewState::Sleeping);
tracing::info!(camera = %self.config.name, "Idle disconnect enabled, waiting for wake lock...");
tokio::select! {
_ = self.cancel.cancelled() => break,
_ = self.wake_lock.wait_for_acquire() => {
tracing::info!(camera = %self.config.name, "Wake lock acquired, connecting...");
}
}
}
self.set_state(CameraState::Connecting);
self.set_preview_state(PreviewState::Connecting);
tracing::info!(camera = %self.config.name, "Connecting...");
let connect_result = tokio::select! {
_ = self.cancel.cancelled() => break,
result = tokio::time::timeout(Duration::from_secs(30), self.try_connect()) => {
match result {
Ok(r) => r,
Err(_) => {
tracing::warn!(camera = %self.config.name, "Connection timed out (30s)");
Err(anyhow::anyhow!("Connection timed out"))
}
}
}
};
match connect_result {
Ok(camera) => {
let driver: Arc<dyn CameraDriver> =
Arc::clone(&camera) as Arc<dyn CameraDriver>;
backoff.reset();
self.run_connected_session(driver, Some(Arc::clone(&camera)))
.await;
}
Err(e) => {
if is_login_failure(&e) {
tracing::error!(camera = %self.config.name, "Authentication failed — check credentials. Stopping retries.");
self.set_state(CameraState::Disconnected);
break;
}
tracing::warn!(camera = %self.config.name, error = %e, "Connection failed");
self.set_state(CameraState::Disconnected);
}
}
if !backoff.sleep_with_cancel(&self.cancel).await {
break;
}
}
self.stop_all_stream_sources().await;
if let Some(task) = preview_task {
let _ = tokio::time::timeout(Duration::from_secs(2), task).await;
}
}
}
async fn aggregate_preview_state_loop(
handle: Arc<CameraHandle>,
session_cancel: CancellationToken,
) {
let mut ticker = tokio::time::interval(Duration::from_millis(200));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
let mut last_write: Option<PreviewState> = None;
loop {
tokio::select! {
_ = session_cancel.cancelled() => break,
_ = ticker.tick() => {
let current = *handle.preview_state_rx.borrow();
let any_bridging = handle.any_stream_source_bridging();
let next = aggregate_preview_state(current, any_bridging, last_write);
if next != current {
handle.set_preview_state(next);
last_write = Some(next);
} else if matches!(current, PreviewState::Sleeping)
|| (matches!(current, PreviewState::Connecting)
&& last_write != Some(PreviewState::Connecting))
{
last_write = None;
}
}
}
}
}
fn is_login_failure(err: &anyhow::Error) -> bool {
use bairelay_neolink_core::Error as CoreError;
for cause in err.chain() {
if let Some(core) = cause.downcast_ref::<CoreError>() {
if matches!(core, CoreError::AuthFailed | CoreError::CameraLoginFail) {
return true;
}
}
}
let msg = format!("{:?}", err);
msg.contains("AuthFailed")
|| msg.contains("CameraLoginFail")
|| msg.contains("Credential error")
}
pub struct ReconnectBackoff {
initial: Duration,
max: Duration,
current: Duration,
}
impl ReconnectBackoff {
pub fn new(initial: Duration, max: Duration) -> Self {
Self {
initial,
max,
current: initial,
}
}
pub fn next_delay(&mut self) -> Duration {
let delay = self.current;
self.current = (self.current * 2).min(self.max);
delay
}
pub fn reset(&mut self) {
self.current = self.initial;
}
pub async fn sleep_with_cancel(&mut self, cancel: &CancellationToken) -> bool {
let delay = self.next_delay();
crate::run_support::sleep_or_cancel(delay, cancel).await
}
}
#[cfg(test)]
#[derive(Debug)]
#[allow(dead_code)] pub(crate) enum ReconnectOutcome<T> {
Connected(T),
Bailed(anyhow::Error),
Cancelled,
}
#[cfg(test)]
pub(crate) async fn drive_reconnect_with_backoff<T, F, Fut>(
mut backoff: ReconnectBackoff,
cancel: CancellationToken,
mut connect: F,
should_bail: impl Fn(&anyhow::Error) -> bool,
) -> ReconnectOutcome<T>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<T>>,
{
loop {
if cancel.is_cancelled() {
return ReconnectOutcome::Cancelled;
}
let attempt = tokio::select! {
_ = cancel.cancelled() => return ReconnectOutcome::Cancelled,
r = connect() => r,
};
match attempt {
Ok(value) => return ReconnectOutcome::Connected(value),
Err(e) => {
if should_bail(&e) {
return ReconnectOutcome::Bailed(e);
}
if !backoff.sleep_with_cancel(&cancel).await {
return ReconnectOutcome::Cancelled;
}
}
}
}
}
#[cfg(test)]
impl CameraHandle {
pub(crate) fn insert_stream_source_for_test(
&self,
kind: RtspStreamKind,
source: Arc<StreamSource>,
) {
self.stream_sources.write_recover().insert(kind, source);
}
pub(crate) fn has_stream_source_for_test(&self, kind: RtspStreamKind) -> bool {
self.stream_sources.read_recover().contains_key(&kind)
}
pub(crate) fn subscribe_stream_for_test(
&self,
kind: RtspStreamKind,
) -> tokio::sync::broadcast::Receiver<bairelay_rtsp::provider::Frame> {
let guard = self.stream_sources.read_recover();
guard
.get(&kind)
.expect("stream source present for test")
.subscribe_for_test()
}
#[allow(dead_code)] pub(crate) fn set_driver_for_test(&self, driver: Arc<dyn CameraDriver>) {
*self.bc_camera.write_recover() = Some(driver);
self.set_state(CameraState::Connected);
}
pub(crate) async fn run_connected_session_for_test(
self: &Arc<Self>,
driver: Arc<dyn CameraDriver>,
) {
self.run_connected_session(driver, None).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_camera_handle_has_audio_presence_unknown() {
use crate::audio_presence::AudioPresence;
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let config = minimal_camera_config("cam");
let cancel = CancellationToken::new();
let handle = CameraHandle::new(config, cancel, None);
let presence = *handle.audio_presence().read().expect("presence lock");
assert_eq!(presence, AudioPresence::Unknown);
}
#[test]
fn capabilities_defaults_to_none() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let config = minimal_camera_config("cam");
let cancel = CancellationToken::new();
let handle = CameraHandle::new(config, cancel, None);
assert!(handle.capabilities().is_none());
}
#[test]
fn capabilities_cached_after_set() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let config = minimal_camera_config("cam");
let cancel = CancellationToken::new();
let handle = CameraHandle::new(config, cancel, None);
handle.set_capabilities_for_test(CameraCapabilities { has_ptz: true });
let caps = handle.capabilities().expect("caps populated");
assert!(caps.has_ptz);
}
#[test]
fn camera_handle_initial_preview_state_is_sleeping() {
use crate::config::test_helpers::minimal_camera_config;
use crate::preview_state::PreviewState;
use tokio_util::sync::CancellationToken;
let config = minimal_camera_config("cam");
let cancel = CancellationToken::new();
let handle = CameraHandle::new(config, cancel, None);
let rx = handle.preview_state_rx();
assert_eq!(*rx.borrow(), PreviewState::Sleeping);
}
#[tokio::test]
async fn preview_state_transitions_to_connecting_during_connect() {
use crate::config::test_helpers::minimal_camera_config;
use crate::preview_state::PreviewState;
use tokio_util::sync::CancellationToken;
let mut config = minimal_camera_config("cam");
config.address = Some("127.0.0.1:1".to_string());
config.idle_disconnect = false;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel.clone(), None));
let mut rx = handle.preview_state_rx();
assert_eq!(*rx.borrow_and_update(), PreviewState::Sleeping);
let run_handle = {
let h = Arc::clone(&handle);
tokio::spawn(async move { h.run().await })
};
let saw_connecting = tokio::time::timeout(Duration::from_secs(2), async {
loop {
if *rx.borrow_and_update() == PreviewState::Connecting {
return true;
}
if rx.changed().await.is_err() {
return false;
}
}
})
.await
.unwrap_or(false);
cancel.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), run_handle).await;
assert!(
saw_connecting,
"preview state never transitioned to Connecting"
);
}
#[test]
fn aggregate_preview_state_live_downgrades_on_bridging() {
assert_eq!(
aggregate_preview_state(PreviewState::Live, true, None),
PreviewState::Connecting
);
assert_eq!(
aggregate_preview_state(PreviewState::Live, false, None),
PreviewState::Live
);
}
#[test]
fn aggregate_preview_state_sleeping_never_changes() {
assert_eq!(
aggregate_preview_state(PreviewState::Sleeping, true, None),
PreviewState::Sleeping
);
assert_eq!(
aggregate_preview_state(PreviewState::Sleeping, false, None),
PreviewState::Sleeping
);
assert_eq!(
aggregate_preview_state(PreviewState::Sleeping, true, Some(PreviewState::Connecting)),
PreviewState::Sleeping
);
}
#[test]
fn aggregate_upgrades_from_own_downgrade_when_bridging_clears() {
assert_eq!(
aggregate_preview_state(
PreviewState::Connecting,
false,
Some(PreviewState::Connecting)
),
PreviewState::Live,
);
}
#[test]
fn aggregate_stays_connecting_on_connect_loop_owned() {
assert_eq!(
aggregate_preview_state(PreviewState::Connecting, false, None),
PreviewState::Connecting,
);
assert_eq!(
aggregate_preview_state(PreviewState::Connecting, true, None),
PreviewState::Connecting,
);
}
#[test]
fn aggregate_stays_connecting_when_bridging_still_active() {
assert_eq!(
aggregate_preview_state(
PreviewState::Connecting,
true,
Some(PreviewState::Connecting)
),
PreviewState::Connecting,
);
}
#[test]
fn post_keepalive_sleeping_only_when_idle_disconnect_and_wake_lock_idle() {
assert_eq!(
post_keepalive_preview_state(true, true),
PreviewState::Sleeping,
);
}
#[test]
fn post_keepalive_connecting_when_wake_lock_held() {
assert_eq!(
post_keepalive_preview_state(true, false),
PreviewState::Connecting,
);
}
#[test]
fn post_keepalive_connecting_when_idle_disconnect_disabled() {
assert_eq!(
post_keepalive_preview_state(false, true),
PreviewState::Connecting,
);
assert_eq!(
post_keepalive_preview_state(false, false),
PreviewState::Connecting,
);
}
#[derive(Debug)]
#[allow(dead_code)] struct FakeErr(&'static str);
#[test]
fn classify_ok_reply_is_ok() {
let raw: std::result::Result<
std::result::Result<(), FakeErr>,
tokio::time::error::Elapsed,
> = Ok(Ok(()));
assert_eq!(classify_keepalive_tick(raw), KeepaliveTickOutcome::Ok);
}
#[test]
fn classify_other_err_is_failed() {
let raw: std::result::Result<
std::result::Result<(), FakeErr>,
tokio::time::error::Elapsed,
> = Ok(Err(FakeErr("ConnectionReset")));
assert_eq!(classify_keepalive_tick(raw), KeepaliveTickOutcome::Failed);
}
#[test]
fn classify_timeout_is_failed() {
let raw: std::result::Result<std::result::Result<(), FakeErr>, _> =
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
tokio::time::timeout(
Duration::from_millis(0),
futures::future::pending::<std::result::Result<(), FakeErr>>(),
)
.await
});
assert_eq!(classify_keepalive_tick(raw), KeepaliveTickOutcome::Failed);
}
#[test]
fn advance_ok_resets_counter() {
assert_eq!(
advance_keepalive_counter(4, KeepaliveTickOutcome::Ok, 5),
(0, false),
);
}
#[test]
fn advance_failed_increments_and_does_not_break_below_max() {
assert_eq!(
advance_keepalive_counter(3, KeepaliveTickOutcome::Failed, 5),
(4, false),
);
}
#[test]
fn advance_failed_breaks_at_max() {
assert_eq!(
advance_keepalive_counter(4, KeepaliveTickOutcome::Failed, 5),
(5, true),
);
}
#[test]
fn advance_saturates_on_overflow() {
assert_eq!(
advance_keepalive_counter(u32::MAX, KeepaliveTickOutcome::Failed, 5),
(u32::MAX, true),
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn keepalive_cadence_drives_one_probe_per_interval() {
let mut interval = tokio::time::interval(CameraHandle::KEEPALIVE_INTERVAL);
let t0 = tokio::time::Instant::now();
interval.tick().await;
let t1 = tokio::time::Instant::now();
tokio::time::advance(CameraHandle::KEEPALIVE_INTERVAL).await;
interval.tick().await;
let t2 = tokio::time::Instant::now();
assert_eq!(t1.duration_since(t0), Duration::ZERO);
assert_eq!(t2.duration_since(t1), CameraHandle::KEEPALIVE_INTERVAL);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn keepalive_counter_breaks_after_max_consecutive_failures() {
let mut failures: u32 = 0;
let mut broke_on: Option<u32> = None;
for i in 1..=(CameraHandle::KEEPALIVE_MAX_FAILURES + 2) {
let (next, should_break) = advance_keepalive_counter(
failures,
KeepaliveTickOutcome::Failed,
CameraHandle::KEEPALIVE_MAX_FAILURES,
);
failures = next;
if should_break {
broke_on = Some(i);
break;
}
}
assert_eq!(broke_on, Some(CameraHandle::KEEPALIVE_MAX_FAILURES));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn keepalive_counter_resets_after_single_ok_tick() {
let mut failures: u32 = 0;
for _ in 0..4 {
let (next, should_break) = advance_keepalive_counter(
failures,
KeepaliveTickOutcome::Failed,
CameraHandle::KEEPALIVE_MAX_FAILURES,
);
assert!(!should_break);
failures = next;
}
assert_eq!(failures, 4);
let (next, should_break) = advance_keepalive_counter(
failures,
KeepaliveTickOutcome::Ok,
CameraHandle::KEEPALIVE_MAX_FAILURES,
);
assert_eq!(next, 0);
assert!(!should_break);
}
#[test]
fn is_login_failure_recognises_typed_auth_failed() {
let e: anyhow::Error = bairelay_neolink_core::Error::AuthFailed.into();
assert!(is_login_failure(&e));
}
#[test]
fn is_login_failure_recognises_typed_camera_login_fail() {
let e: anyhow::Error = bairelay_neolink_core::Error::CameraLoginFail.into();
assert!(is_login_failure(&e));
}
#[test]
fn is_login_failure_recognises_typed_auth_failed_through_context() {
use anyhow::Context;
let e: anyhow::Result<()> = Err(bairelay_neolink_core::Error::AuthFailed.into());
let wrapped = e.context("connect_with_retry").unwrap_err();
assert!(is_login_failure(&wrapped));
}
#[test]
fn is_login_failure_recognises_auth_fail() {
let e: anyhow::Error = anyhow::anyhow!("remote reported AuthFailed for cam-alpha");
assert!(is_login_failure(&e));
}
#[test]
fn is_login_failure_recognises_camera_login_fail() {
let e: anyhow::Error = anyhow::anyhow!("CameraLoginFail on port 9000");
assert!(is_login_failure(&e));
}
#[test]
fn is_login_failure_recognises_credential_error() {
let e: anyhow::Error = anyhow::anyhow!("Credential error: invalid password hash");
assert!(is_login_failure(&e));
}
#[test]
fn is_login_failure_rejects_generic_io() {
let e: anyhow::Error = anyhow::anyhow!("connection reset by peer during initial handshake");
assert!(!is_login_failure(&e));
}
#[test]
fn reconnect_backoff_starts_at_initial_and_doubles() {
let mut b = ReconnectBackoff::new(Duration::from_millis(100), Duration::from_millis(400));
assert_eq!(b.next_delay(), Duration::from_millis(100));
assert_eq!(b.next_delay(), Duration::from_millis(200));
assert_eq!(b.next_delay(), Duration::from_millis(400));
assert_eq!(b.next_delay(), Duration::from_millis(400));
}
#[tokio::test(start_paused = true)]
async fn drive_reconnect_retries_until_success_with_doubling_backoff() {
use std::sync::atomic::{AtomicU32, Ordering};
let calls = Arc::new(AtomicU32::new(0));
let calls_c = Arc::clone(&calls);
let backoff = ReconnectBackoff::new(Duration::from_secs(2), Duration::from_secs(60));
let cancel = CancellationToken::new();
let start = tokio::time::Instant::now();
let outcome = drive_reconnect_with_backoff::<_, _, _>(
backoff,
cancel,
move || {
let calls_c = Arc::clone(&calls_c);
async move {
let n = calls_c.fetch_add(1, Ordering::AcqRel);
if n < 3 {
Err(anyhow::anyhow!("transient failure #{n}"))
} else {
Ok::<u32, anyhow::Error>(n)
}
}
},
|_| false,
)
.await;
match outcome {
ReconnectOutcome::Connected(n) => assert_eq!(n, 3),
other => panic!("expected Connected(3), got {:?}", other),
}
assert_eq!(
calls.load(Ordering::Acquire),
4,
"connect should be called four times (three Err, one Ok)"
);
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_secs(14) && elapsed < Duration::from_secs(15),
"elapsed {:?} should be ~14s (2+4+8 backoff)",
elapsed
);
}
#[tokio::test(start_paused = true)]
async fn drive_reconnect_bails_on_should_bail() {
use std::sync::atomic::{AtomicU32, Ordering};
let calls = Arc::new(AtomicU32::new(0));
let calls_c = Arc::clone(&calls);
let backoff = ReconnectBackoff::new(Duration::from_secs(1), Duration::from_secs(60));
let cancel = CancellationToken::new();
let outcome = drive_reconnect_with_backoff::<(), _, _>(
backoff,
cancel,
move || {
calls_c.fetch_add(1, Ordering::AcqRel);
async move { Err(anyhow::anyhow!("AuthFailed")) }
},
|e| format!("{e:?}").contains("AuthFailed"),
)
.await;
assert!(
matches!(outcome, ReconnectOutcome::Bailed(_)),
"expected Bailed, got {:?}",
outcome
);
assert_eq!(
calls.load(Ordering::Acquire),
1,
"bail-on-auth must not retry"
);
}
#[tokio::test(start_paused = true)]
async fn drive_reconnect_cancels_during_backoff_sleep() {
let backoff = ReconnectBackoff::new(Duration::from_secs(60), Duration::from_secs(60));
let cancel = CancellationToken::new();
let cancel_c = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
cancel_c.cancel();
});
let outcome = drive_reconnect_with_backoff::<(), _, _>(
backoff,
cancel,
|| async { Err(anyhow::anyhow!("transient")) },
|_| false,
)
.await;
assert!(
matches!(outcome, ReconnectOutcome::Cancelled),
"expected Cancelled, got {:?}",
outcome
);
}
#[test]
fn reconnect_backoff_resets_to_initial() {
let mut b = ReconnectBackoff::new(Duration::from_millis(50), Duration::from_secs(1));
let _ = b.next_delay();
let _ = b.next_delay();
b.reset();
assert_eq!(b.next_delay(), Duration::from_millis(50));
}
#[tokio::test]
async fn aggregator_upgrades_back_to_live_when_gap_clears() {
use crate::config::test_helpers::minimal_camera_config;
use crate::preview_state::PreviewState;
use crate::stream_source::{GapState, StreamSource};
let config = minimal_camera_config("cam");
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel, None));
handle.set_preview_state(PreviewState::Live);
let mut rx = handle.preview_state_rx();
assert_eq!(*rx.borrow_and_update(), PreviewState::Live);
let source = StreamSource::start_inert_for_test();
handle.insert_stream_source_for_test(RtspStreamKind::Main, Arc::clone(&source));
source.set_gap_state_for_test(GapState::Bridging);
let session_cancel = CancellationToken::new();
let loop_handle = {
let h = Arc::clone(&handle);
let c = session_cancel.clone();
tokio::spawn(async move { aggregate_preview_state_loop(h, c).await })
};
let saw_downgrade = tokio::time::timeout(Duration::from_secs(2), async {
loop {
if *rx.borrow_and_update() == PreviewState::Connecting {
return true;
}
if rx.changed().await.is_err() {
return false;
}
}
})
.await
.unwrap_or(false);
assert!(
saw_downgrade,
"aggregator never downgraded Live → Connecting while Bridging"
);
source.set_gap_state_for_test(GapState::Live);
let saw_upgrade = tokio::time::timeout(Duration::from_secs(2), async {
loop {
if *rx.borrow_and_update() == PreviewState::Live {
return true;
}
if rx.changed().await.is_err() {
return false;
}
}
})
.await
.unwrap_or(false);
session_cancel.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), loop_handle).await;
assert!(
saw_upgrade,
"aggregator never upgraded Connecting → Live after gap cleared"
);
}
#[tokio::test]
async fn run_parks_in_sleeping_when_idle_disconnect_and_no_wake_lock() {
use crate::config::test_helpers::minimal_camera_config;
use crate::preview_state::PreviewState;
use tokio_util::sync::CancellationToken;
let mut config = minimal_camera_config("cam-park");
config.idle_disconnect = true;
config.address = Some("127.0.0.1:1".to_string());
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel.clone(), None));
let mut rx = handle.preview_state_rx();
let run_handle = {
let h = Arc::clone(&handle);
tokio::spawn(async move { h.run().await })
};
let saw_sleeping = tokio::time::timeout(Duration::from_secs(2), async {
loop {
if *rx.borrow_and_update() == PreviewState::Sleeping {
return true;
}
if rx.changed().await.is_err() {
return false;
}
}
})
.await
.unwrap_or(false);
cancel.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), run_handle).await;
assert!(saw_sleeping, "idle_disconnect loop must flip to Sleeping");
}
#[tokio::test]
async fn run_publishes_initial_disconnected_status_when_mqtt_present() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let mut config = minimal_camera_config("cam-init");
config.idle_disconnect = true;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel.clone(), Some(mqtt)));
let run_handle = {
let h = Arc::clone(&handle);
tokio::spawn(async move { h.run().await })
};
let saw_init = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let rows = mock.published();
let has_conn = rows
.iter()
.any(|(t, p, _)| t == "bairelay/cam-init/status" && p == b"disconnected");
if has_conn {
return true;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap_or(false);
cancel.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), run_handle).await;
assert!(
saw_init,
"initial disconnected publish must land on connection topic; observed: {:?}",
mock.published_topics()
);
}
#[tokio::test]
async fn run_backoffs_after_connect_failure_then_exits_on_cancel() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let mut config = minimal_camera_config("cam-fail");
config.idle_disconnect = false;
config.address = Some("127.0.0.1:1".to_string());
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel.clone(), None));
let run_handle = {
let h = Arc::clone(&handle);
tokio::spawn(async move { h.run().await })
};
tokio::time::sleep(Duration::from_millis(500)).await;
cancel.cancel();
let join_outcome = tokio::time::timeout(Duration::from_secs(10), run_handle).await;
let join_result = join_outcome.expect("run() did not exit within 10 s after cancel");
join_result.expect("run() task panicked");
assert_eq!(handle.state(), CameraState::Disconnected);
}
#[test]
fn simple_accessors_reflect_initial_state() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let config = minimal_camera_config("cam-acc");
let cancel = CancellationToken::new();
let handle = CameraHandle::new(config, cancel, None);
assert_eq!(handle.name(), "cam-acc");
assert_eq!(handle.topic_prefix(), "bairelay");
assert_eq!(handle.state(), CameraState::Disconnected);
assert!(!handle.is_cancelled());
assert!(handle.bc_camera().is_none());
assert_eq!(handle.config().name, "cam-acc");
assert!(!handle.cancel_token().is_cancelled());
}
#[test]
fn request_disconnect_fires_notify_without_panicking() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let handle = CameraHandle::new(minimal_camera_config("cam-rd"), cancel, None);
handle.request_disconnect();
}
#[test]
fn with_bcmedia_dump_and_prefix_preserves_prefix() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let handle = CameraHandle::with_bcmedia_dump_and_prefix(
minimal_camera_config("cam-x"),
cancel,
None,
"neolink".to_string(),
None,
);
assert_eq!(handle.topic_prefix(), "neolink");
}
#[test]
fn last_frame_main_returns_clonable_arc() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let handle = CameraHandle::new(minimal_camera_config("cam-lfm"), cancel, None);
let a = handle.last_frame_main();
let b = handle.last_frame_main();
assert!(Arc::ptr_eq(&a, &b));
}
#[test]
fn camera_state_variant_predicates_are_exclusive() {
assert!(CameraState::Disconnected.is_disconnected());
assert!(!CameraState::Disconnected.is_connecting());
assert!(!CameraState::Disconnected.is_connected());
assert!(CameraState::Connecting.is_connecting());
assert!(!CameraState::Connecting.is_disconnected());
assert!(CameraState::Connected.is_connected());
assert!(!CameraState::Connected.is_connecting());
}
#[tokio::test]
async fn discovery_no_publisher_noop_ok() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let handle = CameraHandle::new(minimal_camera_config("cam-np"), cancel, None);
handle.publish_discovery().await.expect("ok noop");
handle.unpublish_discovery().await.expect("ok noop");
}
#[tokio::test]
async fn discovery_caps_none_noop_ok() {
use crate::config::test_helpers::minimal_camera_config;
use std::collections::HashSet;
use tokio_util::sync::CancellationToken;
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let publisher = bairelay_mqtt::DiscoveryPublisher::new(
mqtt,
"bairelay".to_string(),
"homeassistant".to_string(),
HashSet::new(),
"test".to_string(),
);
let cancel = CancellationToken::new();
let handle = CameraHandle::new(minimal_camera_config("cam-nc"), cancel, None)
.with_discovery_publisher(publisher);
handle
.publish_discovery()
.await
.expect("ok noop on None caps");
handle
.unpublish_discovery()
.await
.expect("ok noop on None caps");
}
#[tokio::test]
async fn discovery_publishes_and_unpublishes_with_caps() {
use crate::config::test_helpers::minimal_camera_config;
use std::collections::HashSet;
use tokio_util::sync::CancellationToken;
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let mut features = HashSet::new();
features.insert(bairelay_mqtt::discovery::Feature::Camera);
features.insert(bairelay_mqtt::discovery::Feature::Motion);
let publisher = bairelay_mqtt::DiscoveryPublisher::new(
mqtt,
"bairelay".to_string(),
"homeassistant".to_string(),
features,
"test".to_string(),
);
let cancel = CancellationToken::new();
let handle = CameraHandle::new(minimal_camera_config("cam-dp"), cancel, None)
.with_discovery_publisher(publisher);
handle.set_capabilities_for_test(CameraCapabilities { has_ptz: true });
let before = mock.published().len();
handle.publish_discovery().await.expect("publish ok");
let after_pub = mock.published().len();
assert!(
after_pub > before,
"publish_discovery must emit retained payloads; before={before} after={after_pub}"
);
handle.unpublish_discovery().await.expect("unpublish ok");
let after_unpub = mock.published().len();
assert!(
after_unpub > after_pub,
"unpublish_discovery must emit retained-empty payloads; after_pub={after_pub} after_unpub={after_unpub}"
);
}
#[tokio::test]
async fn stream_source_returns_unavailable_when_cancelled() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let parent = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-sc"),
parent.clone(),
None,
));
parent.cancel();
let err = handle
.stream_source(RtspStreamKind::Main)
.await
.err()
.expect("cancelled → Unavailable");
assert!(matches!(err, StreamError::Unavailable(_)));
}
#[tokio::test]
async fn stream_source_returns_unavailable_without_concrete_bc() {
use crate::config::test_helpers::minimal_camera_config;
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
use tokio_util::sync::CancellationToken;
let fake = FakeCameraBuilder::new().build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-nc2"),
cancel,
None,
));
handle.set_driver_for_test(driver);
let err = handle
.stream_source(RtspStreamKind::Main)
.await
.err()
.expect("no concrete → Unavailable");
assert!(
matches!(err, StreamError::Unavailable(ref msg) if msg.contains("not currently connected")),
"got {err:?}"
);
}
#[tokio::test]
async fn stop_all_stream_sources_clears_registry() {
use crate::config::test_helpers::minimal_camera_config;
use crate::stream_source::StreamSource;
use bairelay_rtsp::url::StreamKind;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let handle = CameraHandle::new(minimal_camera_config("cam-stop"), cancel, None);
handle
.insert_stream_source_for_test(StreamKind::Main, StreamSource::start_inert_for_test());
handle.insert_stream_source_for_test(StreamKind::Sub, StreamSource::start_inert_for_test());
assert!(handle.has_stream_source_for_test(StreamKind::Main));
assert!(handle.has_stream_source_for_test(StreamKind::Sub));
handle.stop_all_stream_sources().await;
assert!(!handle.has_stream_source_for_test(StreamKind::Main));
assert!(!handle.has_stream_source_for_test(StreamKind::Sub));
}
#[tokio::test]
async fn stream_source_fast_path_returns_preregistered_source() {
use crate::config::test_helpers::minimal_camera_config;
use crate::stream_source::StreamSource;
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
use tokio_util::sync::CancellationToken;
let fake = FakeCameraBuilder::new().build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-fast"),
cancel,
None,
));
handle.set_driver_for_test(driver);
let source = StreamSource::start_inert_for_test();
handle.insert_stream_source_for_test(RtspStreamKind::Main, Arc::clone(&source));
let got = handle
.stream_source(RtspStreamKind::Main)
.await
.expect("preregistered source returned");
assert!(Arc::ptr_eq(&got, &source));
}
#[test]
fn is_login_failure_rejects_timeout_style_error() {
let e: anyhow::Error =
anyhow::anyhow!("deadline elapsed while waiting for handshake reply");
assert!(!is_login_failure(&e));
}
#[test]
fn aggregate_connecting_with_own_downgrade_but_still_bridging_holds() {
assert_eq!(
aggregate_preview_state(
PreviewState::Connecting,
true,
Some(PreviewState::Connecting)
),
PreviewState::Connecting,
);
}
#[test]
fn aggregate_live_with_own_downgrade_bookkeeping_irrelevant() {
assert_eq!(
aggregate_preview_state(PreviewState::Live, false, Some(PreviewState::Live)),
PreviewState::Live,
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn run_connected_session_covers_full_lifecycle() {
use crate::config::test_helpers::minimal_camera_config;
use bairelay_neolink_core::bc::xml::{FloodlightStatusList, RfAlarmCfg, Support};
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
let mut config = minimal_camera_config("cam-sess");
config.mqtt.enable_motion = true;
config.mqtt.enable_battery = true;
config.mqtt.enable_floodlight = true;
config.mqtt.enable_pir = true;
config.mqtt.battery_update = 60_000;
config.mqtt.floodlight_update = 60_000;
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (_motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(1);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let (fl_tx, fl_rx) = tokio::sync::mpsc::channel::<FloodlightStatusList>(1);
drop(fl_tx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.with_floodlight_stream(fl_rx)
.with_battery_info(|| {
Ok(bairelay_neolink_core::bc::xml::BatteryInfo {
battery_percent: 42,
..Default::default()
})
})
.with_is_floodlight_tasks_enabled(|| Ok(false))
.with_pirstate(|| {
Ok(RfAlarmCfg {
enable: 1,
..Default::default()
})
})
.with_support(|| {
Ok(Support {
ptz_mode: Some("pt".to_string()),
..Default::default()
})
})
.with_ptz_preset(|| Ok(bairelay_neolink_core::bc::xml::PtzPreset::default()))
.with_linktype(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"scripted link fail",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake.clone();
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::with_bcmedia_dump_and_prefix(
config,
cancel,
Some(mqtt),
"bairelay".to_string(),
None,
));
tokio::time::timeout(
Duration::from_secs(30),
handle.run_connected_session_for_test(driver),
)
.await
.expect("session must exit after MAX_FAILURES consecutive keepalive errors");
assert_eq!(handle.state(), CameraState::Disconnected);
assert!(handle.bc_camera().is_none());
let caps = handle.capabilities().expect("caps populated from fake");
assert!(caps.has_ptz);
let pubs = mock.published();
let conn_on = pubs
.iter()
.any(|(t, p, _)| t == "bairelay/cam-sess/status" && p == b"connected");
let conn_off = pubs
.iter()
.any(|(t, p, _)| t == "bairelay/cam-sess/status" && p == b"disconnected");
assert!(
conn_on,
"publish_connection(true) must land; topics: {:?}",
mock.published_topics()
);
assert!(conn_off, "publish_connection(false) must land at teardown");
let pir_published = pubs
.iter()
.any(|(t, _, _)| t == "bairelay/cam-sess/status/pir");
assert!(
pir_published,
"enable_pir = true must trigger a status/pir publish"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn run_connected_session_tolerates_get_support_error() {
use crate::config::test_helpers::minimal_camera_config;
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
let config = minimal_camera_config("cam-no-caps");
let fake = FakeCameraBuilder::new()
.with_support(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"support probe refused",
))
})
.with_linktype(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"link fail",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel, None));
tokio::time::timeout(
Duration::from_secs(30),
handle.run_connected_session_for_test(driver),
)
.await
.expect("session must exit");
assert!(
handle.capabilities().is_none(),
"get_support Err must leave caps cache empty",
);
assert_eq!(handle.state(), CameraState::Disconnected);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn run_connected_session_idle_disconnect_grace_path() {
use crate::config::test_helpers::minimal_camera_config;
use bairelay_neolink_core::bc::xml::Support;
use bairelay_neolink_core::bc_protocol::FakeCameraBuilder;
let mut config = minimal_camera_config("cam-idle");
config.idle_disconnect = true;
config.idle_disconnect_timeout_secs = Some(1.0);
let fake = FakeCameraBuilder::new()
.with_support(|| Ok(Support::default()))
.with_linktype(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"link fail",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(config, cancel, None));
tokio::time::timeout(
Duration::from_secs(30),
handle.run_connected_session_for_test(driver),
)
.await
.expect("grace period path must exit");
assert_eq!(handle.state(), CameraState::Disconnected);
}
#[tokio::test]
async fn any_stream_source_bridging_reflects_registry() {
use crate::config::test_helpers::minimal_camera_config;
use crate::stream_source::StreamSource;
let config = minimal_camera_config("cam");
let cancel = CancellationToken::new();
let handle = CameraHandle::new(config, cancel, None);
assert!(!handle.any_stream_source_bridging());
let source = StreamSource::start_inert_for_test();
handle.insert_stream_source_for_test(RtspStreamKind::Main, Arc::clone(&source));
assert!(!handle.any_stream_source_bridging());
source.set_gap_state_for_test(crate::stream_source::GapState::Bridging);
assert!(handle.any_stream_source_bridging());
source.set_gap_state_for_test(crate::stream_source::GapState::Live);
assert!(!handle.any_stream_source_bridging());
}
}
#[cfg(test)]
mod prune_grace_tests {
use super::*;
use crate::config::test_helpers::minimal_camera_config;
use crate::stream_source::StreamSource;
use bairelay_rtsp::url::StreamKind;
use std::time::{Duration, Instant};
const GRACE: Duration = Duration::from_secs(60);
fn new_handle() -> Arc<CameraHandle> {
let config = minimal_camera_config("prune-test-cam");
let cancel = CancellationToken::new();
Arc::new(CameraHandle::new(config, cancel, None))
}
fn insert_inert_source(handle: &CameraHandle, kind: StreamKind) -> Arc<StreamSource> {
let source = StreamSource::start_inert_for_test();
handle.insert_stream_source_for_test(kind, Arc::clone(&source));
source
}
#[tokio::test]
async fn prune_with_zero_grace_drops_idle_source_on_first_sweep() {
let handle = new_handle();
let source = insert_inert_source(&handle, StreamKind::Main);
let rx = source.subscribe_for_test();
drop(rx);
handle.prune_idle_stream_sources_at(Instant::now(), Duration::ZERO);
assert!(!handle.has_stream_source_for_test(StreamKind::Main));
}
#[tokio::test]
async fn prune_retains_source_within_grace_window() {
let handle = new_handle();
let source = insert_inert_source(&handle, StreamKind::Main);
let rx = source.subscribe_for_test();
drop(rx);
let t0 = Instant::now();
handle.prune_idle_stream_sources_at(t0, GRACE);
assert!(handle.has_stream_source_for_test(StreamKind::Main));
handle.prune_idle_stream_sources_at(t0 + Duration::from_secs(30), GRACE);
assert!(handle.has_stream_source_for_test(StreamKind::Main));
}
#[tokio::test]
async fn prune_drops_source_after_grace_expires() {
let handle = new_handle();
let source = insert_inert_source(&handle, StreamKind::Main);
let rx = source.subscribe_for_test();
drop(rx);
let t0 = Instant::now();
handle.prune_idle_stream_sources_at(t0, GRACE);
handle.prune_idle_stream_sources_at(t0 + Duration::from_secs(61), GRACE);
assert!(!handle.has_stream_source_for_test(StreamKind::Main));
}
#[tokio::test]
async fn prune_resets_idle_marker_on_new_subscriber() {
let handle = new_handle();
let source = insert_inert_source(&handle, StreamKind::Main);
let rx = source.subscribe_for_test();
drop(rx);
let t0 = Instant::now();
handle.prune_idle_stream_sources_at(t0, GRACE);
let rx2 = handle.subscribe_stream_for_test(StreamKind::Main);
handle.prune_idle_stream_sources_at(t0 + Duration::from_secs(30), GRACE);
assert!(handle.has_stream_source_for_test(StreamKind::Main));
drop(rx2);
let t1 = t0 + Duration::from_secs(30);
handle.prune_idle_stream_sources_at(t1, GRACE);
handle.prune_idle_stream_sources_at(t1 + Duration::from_secs(59), GRACE);
assert!(handle.has_stream_source_for_test(StreamKind::Main));
handle.prune_idle_stream_sources_at(t1 + Duration::from_secs(61), GRACE);
assert!(!handle.has_stream_source_for_test(StreamKind::Main));
}
}