use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use bairelay_neolink_core::bc_protocol::{CameraDriver, MotionStatus};
use bairelay_mqtt::{SharedMqttClient, StatusPublisher};
use bairelay_rtsp::buffer::LastFrameBuffer;
use crate::camera::ReconnectBackoff;
use crate::preview_overlay::OverlayCache;
use crate::preview_state::PreviewState;
use crate::status_cache::StatusCache;
use crate::wake_lock::{WakeLockCounter, WakeLockGuard};
#[allow(clippy::too_many_arguments)]
pub async fn motion_listener(
camera_name: String,
bc_camera: Arc<dyn CameraDriver>,
mqtt: SharedMqttClient,
topic_prefix: String,
wake_lock: WakeLockCounter,
cancel: CancellationToken,
motion_wake_hold: Duration,
status_cache: Arc<StatusCache>,
) {
let publisher = StatusPublisher::new(&mqtt, &topic_prefix, &camera_name);
let mut backoff = ReconnectBackoff::new(Duration::from_secs(1), Duration::from_secs(60));
loop {
let mut motion_data = match bc_camera.listen_on_motion().await {
Ok(md) => md,
Err(e) => {
tracing::warn!(camera = %camera_name, error = %e, "Failed to start motion listener");
if !backoff.sleep_with_cancel(&cancel).await {
break;
}
continue;
}
};
backoff.reset();
let mut wake_guard: Option<WakeLockGuard> = None;
let mut release_at: Option<tokio::time::Instant> = None;
loop {
let release_deadline = release_at
.unwrap_or_else(|| tokio::time::Instant::now() + Duration::from_secs(86_400));
tokio::select! {
_ = cancel.cancelled() => return,
_ = tokio::time::sleep_until(release_deadline), if release_at.is_some() => {
wake_guard = None;
release_at = None;
}
result = motion_data.next_motion() => {
match result {
Ok(MotionStatus::Start(_)) => {
tracing::info!(camera = %camera_name, "Motion detected");
release_at = None;
if wake_guard.is_none() {
wake_guard = Some(wake_lock.acquire());
}
let _ = publisher.publish_motion(true).await;
status_cache.set_motion(true);
}
Ok(MotionStatus::Stop(_)) => {
tracing::info!(camera = %camera_name, "Motion stopped");
let _ = publisher.publish_motion(false).await;
status_cache.set_motion(false);
if wake_guard.is_some() {
release_at = Some(
tokio::time::Instant::now() + motion_wake_hold,
);
}
}
Ok(MotionStatus::NoChange(_)) => {}
Err(e) => {
tracing::warn!(camera = %camera_name, error = %e, "Motion listener error");
break;
}
}
}
}
}
if !backoff.sleep_with_cancel(&cancel).await {
break;
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn battery_poller(
camera_name: String,
bc_camera: Arc<dyn CameraDriver>,
mqtt: SharedMqttClient,
topic_prefix: String,
interval_ms: u64,
cancel: CancellationToken,
status_cache: Arc<StatusCache>,
) {
let publisher = StatusPublisher::new(&mqtt, &topic_prefix, &camera_name);
let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = ticker.tick() => {
match tokio::time::timeout(Duration::from_secs(10), bc_camera.battery_info()).await {
Ok(Ok(info)) => {
let level = info.battery_percent.min(100) as u8;
tracing::debug!(camera = %camera_name, battery = level, "Battery level");
let _ = publisher.publish_battery_level(level).await;
status_cache.set_battery_level(level);
}
Ok(Err(e)) => {
tracing::debug!(camera = %camera_name, error = %e, "Battery poll failed");
}
Err(_) => {
tracing::debug!(camera = %camera_name, "Battery poll timed out");
}
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn preview_poller(
camera_name: String,
camera: Arc<crate::camera::CameraHandle>,
last_frame: Arc<LastFrameBuffer>,
mqtt: SharedMqttClient,
topic_prefix: String,
interval_ms: u64,
mut preview_state_rx: watch::Receiver<PreviewState>,
preview_overlay_enabled: bool,
cancel: CancellationToken,
) {
const SNAPSHOT_TIMEOUT: Duration = Duration::from_secs(10);
let publisher = StatusPublisher::new(&mqtt, &topic_prefix, &camera_name);
let overlay_cache = OverlayCache::new();
let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = ticker.tick() => {
if camera.state().is_connected() {
if let Some(bc) = camera.bc_camera() {
match tokio::time::timeout(SNAPSHOT_TIMEOUT, bc.get_snapshot()).await {
Ok(Ok(bytes)) => {
last_frame.set_jpeg(bytes::Bytes::from(bytes));
}
Ok(Err(e)) => {
tracing::debug!(camera = %camera_name, error = %e, "preview snapshot failed");
}
Err(_) => {
tracing::debug!(camera = %camera_name, "preview snapshot timed out");
}
}
}
}
let Some(jpeg_bytes) = last_frame.jpeg() else {
continue;
};
let payload = crate::preview_overlay::rendered_preview(
jpeg_bytes,
preview_overlay_enabled,
*preview_state_rx.borrow_and_update(),
Some(&overlay_cache),
);
if let Err(e) = publisher.publish_preview(&payload).await {
tracing::warn!(
camera = %camera_name,
bytes = payload.len(),
error = %e,
"preview publish failed"
);
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn floodlight_poller(
camera_name: String,
bc_camera: Arc<dyn CameraDriver>,
mqtt: SharedMqttClient,
topic_prefix: String,
interval_ms: u64,
cancel: CancellationToken,
status_cache: Arc<StatusCache>,
) {
let publisher = StatusPublisher::new(&mqtt, &topic_prefix, &camera_name);
let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = ticker.tick() => {
match tokio::time::timeout(Duration::from_secs(10), bc_camera.is_floodlight_tasks_enabled()).await {
Ok(Ok(enabled)) => {
tracing::debug!(camera = %camera_name, enabled, "Floodlight tasks");
let _ = publisher.publish_floodlight_tasks_enabled(enabled).await;
status_cache.set_floodlight_tasks(enabled);
}
Ok(Err(e)) => {
tracing::debug!(camera = %camera_name, error = %e, "Floodlight poll failed");
}
Err(_) => {
tracing::debug!(camera = %camera_name, "Floodlight poll timed out");
}
}
}
}
}
}
pub async fn floodlight_listener(
camera_name: String,
bc_camera: Arc<dyn CameraDriver>,
mqtt: SharedMqttClient,
topic_prefix: String,
cancel: CancellationToken,
status_cache: Arc<StatusCache>,
) {
let publisher = StatusPublisher::new(&mqtt, &topic_prefix, &camera_name);
let mut rx = match bc_camera.listen_on_floodlight().await {
Ok(rx) => rx,
Err(e) => {
tracing::debug!(camera = %camera_name, error = %e, "Floodlight listener not supported");
return;
}
};
loop {
tokio::select! {
_ = cancel.cancelled() => break,
result = rx.recv() => {
match result {
Some(status_list) => {
for flight in status_list.floodlight_status_list.iter() {
let on = flight.status != 0;
tracing::debug!(camera = %camera_name, on, "Floodlight state changed");
let _ = publisher.publish_floodlight(on).await;
status_cache.set_floodlight(on);
}
}
None => break, }
}
}
}
}
pub async fn publish_pir_state(
camera_name: String,
bc_camera: Arc<dyn CameraDriver>,
mqtt: SharedMqttClient,
topic_prefix: String,
status_cache: Arc<StatusCache>,
) {
let publisher = StatusPublisher::new(&mqtt, &topic_prefix, &camera_name);
match tokio::time::timeout(Duration::from_secs(10), bc_camera.get_pirstate()).await {
Ok(Ok(pir_state)) => {
let enabled = pir_state.enable == 1;
tracing::debug!(camera = %camera_name, enabled, "PIR state");
let _ = publisher.publish_pir(enabled).await;
status_cache.set_pir(enabled);
}
Ok(Err(e)) => {
tracing::debug!(camera = %camera_name, error = %e, "PIR state query failed");
}
Err(_) => {
tracing::debug!(camera = %camera_name, "PIR state query timed out");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bairelay_mqtt::test_support::MockHandle;
use bairelay_neolink_core::bc_protocol::{CameraDriver, FakeCameraBuilder};
use bytes::Bytes;
fn empty_cache() -> Arc<StatusCache> {
Arc::new(StatusCache::default())
}
async fn await_publish_matching<F>(mock: &MockHandle, budget: Duration, pred: F) -> bool
where
F: Fn(&(String, Vec<u8>, bool)) -> bool,
{
tokio::time::timeout(budget, async {
loop {
if mock.published().iter().any(&pred) {
return true;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap_or(false)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_publishes_motion_on_start_event() {
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(8);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.build();
let driver: Arc<dyn CameraDriver> = fake.clone();
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(motion_listener(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl,
cancel.clone(),
Duration::from_secs(30),
empty_cache(),
));
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Start(
std::time::Instant::now(),
)))
.await
.unwrap();
let saw_motion_on = await_publish_matching(&mock, Duration::from_secs(2), |(t, p, r)| {
t == "bairelay/cam1/status/motion" && p == b"on" && *r
})
.await;
cancel.cancel();
let _ = task.await;
assert!(
saw_motion_on,
"motion_listener should publish status/motion=on after MotionStatus::Start; observed: {:?}",
mock.published_topics()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn battery_poller_publishes_level_on_success() {
use bairelay_neolink_core::bc::xml::BatteryInfo;
let fake = FakeCameraBuilder::new()
.with_battery_info(|| {
Ok(BatteryInfo {
battery_percent: 77,
..Default::default()
})
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(battery_poller(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
20,
cancel.clone(),
empty_cache(),
));
let saw_level = await_publish_matching(&mock, Duration::from_secs(2), |(t, p, r)| {
t == "bairelay/cam1/status/battery_level" && p == b"77" && *r
})
.await;
cancel.cancel();
let _ = task.await;
assert!(
saw_level,
"battery_poller should publish 77 on status/battery_level; observed: {:?}",
mock.published()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn battery_poller_tolerates_transient_error_and_recovers() {
use bairelay_neolink_core::bc::xml::BatteryInfo;
use std::sync::atomic::{AtomicU32, Ordering};
let call = Arc::new(AtomicU32::new(0));
let call_c = Arc::clone(&call);
let fake = FakeCameraBuilder::new()
.with_battery_info(move || {
let n = call_c.fetch_add(1, Ordering::AcqRel);
if n == 0 {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"transient test failure",
))
} else {
Ok(BatteryInfo {
battery_percent: 33,
..Default::default()
})
}
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(battery_poller(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
20,
cancel.clone(),
empty_cache(),
));
let saw_level = await_publish_matching(&mock, Duration::from_secs(3), |(t, p, _)| {
t == "bairelay/cam1/status/battery_level" && p == b"33"
})
.await;
cancel.cancel();
let _ = task.await;
assert!(
saw_level,
"battery_poller should recover after a transient error; calls={} publishes={:?}",
call.load(Ordering::Acquire),
mock.published_topics()
);
assert!(
call.load(Ordering::Acquire) >= 2,
"closure should have been invoked at least twice (one Err, one Ok)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn floodlight_listener_publishes_on_event() {
use bairelay_neolink_core::bc::xml::{FloodlightStatus, FloodlightStatusList};
let (tx, rx) = tokio::sync::mpsc::channel::<FloodlightStatusList>(4);
let fake = FakeCameraBuilder::new().with_floodlight_stream(rx).build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(floodlight_listener(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
cancel.clone(),
empty_cache(),
));
tx.send(FloodlightStatusList {
version: "1.1".to_string(),
floodlight_status_list: vec![FloodlightStatus {
channel_id: 0,
status: 1,
}],
})
.await
.unwrap();
let saw_on = await_publish_matching(&mock, Duration::from_secs(2), |(t, p, r)| {
t == "bairelay/cam1/status/floodlight" && p.as_slice() == br#"{"state":"on"}"# && *r
})
.await;
cancel.cancel();
let _ = task.await;
assert!(
saw_on,
"floodlight_listener should publish status/floodlight on event; observed: {:?}",
mock.published()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn floodlight_poller_publishes_tasks_enabled_state() {
let fake = FakeCameraBuilder::new()
.with_is_floodlight_tasks_enabled(|| Ok(true))
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(floodlight_poller(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
20,
cancel.clone(),
empty_cache(),
));
let saw = await_publish_matching(&mock, Duration::from_secs(2), |(t, p, r)| {
t == "bairelay/cam1/status/floodlight_tasks" && p == b"on" && *r
})
.await;
cancel.cancel();
let _ = task.await;
assert!(
saw,
"floodlight_poller should publish status/floodlight_tasks=on; observed: {:?}",
mock.published()
);
}
#[tokio::test]
async fn publish_pir_state_publishes_enabled() {
use bairelay_neolink_core::bc::xml::RfAlarmCfg;
let fake = FakeCameraBuilder::new()
.with_pirstate(|| {
Ok(RfAlarmCfg {
enable: 1,
..Default::default()
})
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
publish_pir_state(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
empty_cache(),
)
.await;
let pub_rows = mock.published();
assert!(
pub_rows
.iter()
.any(|(t, p, r)| t == "bairelay/cam1/status/pir" && p == b"on" && *r),
"publish_pir_state should publish status/pir=on; observed: {:?}",
pub_rows
);
}
fn fake_jpeg() -> Bytes {
let img = image::RgbImage::from_pixel(32, 32, image::Rgb([64, 64, 64]));
let mut out = Vec::new();
image::DynamicImage::ImageRgb8(img)
.write_to(
&mut std::io::Cursor::new(&mut out),
image::ImageFormat::Jpeg,
)
.expect("encode fake jpeg");
Bytes::from(out)
}
#[test]
fn overlay_applies_when_enabled_and_not_live() {
let jpeg = fake_jpeg();
let rendered = crate::preview_overlay::render(&jpeg, PreviewState::Connecting);
assert_ne!(rendered, jpeg, "non-Live state must change the bytes");
}
#[test]
fn overlay_skipped_when_disabled() {
let jpeg = fake_jpeg();
let preview_overlay_enabled = false;
let rendered = if preview_overlay_enabled {
crate::preview_overlay::render(&jpeg, PreviewState::Connecting)
} else {
jpeg.clone()
};
assert_eq!(rendered, jpeg, "disabled flag must bypass the renderer");
}
#[test]
fn overlay_passthrough_on_live_state() {
let jpeg = fake_jpeg();
let rendered = crate::preview_overlay::render(&jpeg, PreviewState::Live);
assert_eq!(rendered, jpeg, "Live state must be passthrough");
}
#[tokio::test]
async fn preview_poller_publishes_cached_jpeg_while_disconnected() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
let last_frame = Arc::new(LastFrameBuffer::new());
last_frame.set_jpeg(fake_jpeg());
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-preview-test"),
cancel.clone(),
None,
));
let (mqtt, mqtt_handle) = bairelay_mqtt::test_support::mock_client();
let rx = handle.preview_state_rx();
let cancel_task = cancel.clone();
let task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
cancel_task.cancel();
});
preview_poller(
"cam-preview-test".to_string(),
Arc::clone(&handle),
Arc::clone(&last_frame),
mqtt,
"bairelay".to_string(),
10, rx,
true,
cancel,
)
.await;
let _ = task.await;
let topics = mqtt_handle.published_topics();
assert!(
topics
.iter()
.any(|t| t == "bairelay/cam-preview-test/status/preview"),
"preview poller did not publish to status/preview; topics seen: {:?}",
topics
);
}
#[tokio::test]
async fn preview_poller_honours_overlay_disabled_flag() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
let last_frame = Arc::new(LastFrameBuffer::new());
last_frame.set_jpeg(fake_jpeg());
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-preview-noovl"),
cancel.clone(),
None,
));
let mqtt = SharedMqttClient::for_test_stub("preview-poller-noovl");
let rx = handle.preview_state_rx();
let cancel_task = cancel.clone();
let task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
cancel_task.cancel();
});
preview_poller(
"cam-preview-noovl".to_string(),
Arc::clone(&handle),
Arc::clone(&last_frame),
mqtt,
"bairelay".to_string(),
10,
rx,
false, cancel,
)
.await;
let _ = task.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_publishes_motion_off_on_stop_event() {
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(8);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(motion_listener(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl,
cancel.clone(),
Duration::from_secs(30),
empty_cache(),
));
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Start(
std::time::Instant::now(),
)))
.await
.unwrap();
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Stop(
std::time::Instant::now(),
)))
.await
.unwrap();
let saw_off = await_publish_matching(&mock, Duration::from_secs(2), |(t, p, _)| {
t == "bairelay/cam1/status/motion" && p == b"off"
})
.await;
cancel.cancel();
let _ = task.await;
assert!(saw_off, "Stop event must publish status/motion=off");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_holds_wake_lock_through_hold_down() {
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(8);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let hold = Duration::from_millis(500);
let task = tokio::spawn(motion_listener(
"cam-hold".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl.clone(),
cancel.clone(),
hold,
empty_cache(),
));
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Start(
std::time::Instant::now(),
)))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(wl.count(), 1, "Start must acquire the wake lock");
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Stop(
std::time::Instant::now(),
)))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(
wl.count(),
1,
"wake lock must remain held during the post-stop hold-down",
);
tokio::time::sleep(Duration::from_millis(700)).await;
assert_eq!(
wl.count(),
0,
"wake lock must be released after the hold-down expires",
);
cancel.cancel();
let _ = task.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_start_during_hold_down_cancels_release() {
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(8);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let hold = Duration::from_millis(500);
let task = tokio::spawn(motion_listener(
"cam-flap".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl.clone(),
cancel.clone(),
hold,
empty_cache(),
));
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Start(
std::time::Instant::now(),
)))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Stop(
std::time::Instant::now(),
)))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Start(
std::time::Instant::now(),
)))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(700)).await;
assert_eq!(
wl.count(),
1,
"a Start during the hold-down window must cancel the pending release",
);
cancel.cancel();
let _ = task.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_ignores_nochange_and_continues() {
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(8);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(motion_listener(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl,
cancel.clone(),
Duration::from_secs(30),
empty_cache(),
));
motion_tx
.send(Ok(
bairelay_neolink_core::bc_protocol::MotionStatus::NoChange(
std::time::Instant::now(),
),
))
.await
.unwrap();
motion_tx
.send(Ok(bairelay_neolink_core::bc_protocol::MotionStatus::Start(
std::time::Instant::now(),
)))
.await
.unwrap();
let saw_on = await_publish_matching(&mock, Duration::from_secs(2), |(t, p, _)| {
t == "bairelay/cam1/status/motion" && p == b"on"
})
.await;
cancel.cancel();
let _ = task.await;
assert!(saw_on, "NoChange must not block subsequent Start publish");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_breaks_inner_loop_on_next_motion_error() {
type MotionItem = std::result::Result<
bairelay_neolink_core::bc_protocol::MotionStatus,
bairelay_neolink_core::bc_protocol::Error,
>;
let (motion_tx, motion_rx) = tokio::sync::mpsc::channel::<MotionItem>(8);
let motion_data = bairelay_neolink_core::bc_protocol::MotionData::test_new(motion_rx);
let fake = FakeCameraBuilder::new()
.with_motion_stream(motion_data)
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(motion_listener(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl,
cancel.clone(),
Duration::from_secs(30),
empty_cache(),
));
motion_tx
.send(Err(bairelay_neolink_core::bc_protocol::Error::Other(
"scripted next_motion failure",
)))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(Duration::from_millis(500), task).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn floodlight_listener_exits_cleanly_on_subscribe_error() {
let (tx, rx) =
tokio::sync::mpsc::channel::<bairelay_neolink_core::bc::xml::FloodlightStatusList>(1);
drop(tx); let fake = FakeCameraBuilder::new().with_floodlight_stream(rx).build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
floodlight_listener(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
cancel,
empty_cache(),
)
.await;
assert!(
mock.published().is_empty(),
"closed channel should publish nothing; got {:?}",
mock.published_topics()
);
}
#[tokio::test]
async fn publish_pir_state_handles_driver_error() {
let fake = FakeCameraBuilder::new()
.with_pirstate(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"pir refused",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
publish_pir_state(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
empty_cache(),
)
.await;
assert!(
!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam1/status/pir"),
"error path must not publish status/pir"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn floodlight_poller_tolerates_err_without_publishing() {
let fake = FakeCameraBuilder::new()
.with_is_floodlight_tasks_enabled(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"tasks refused",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(floodlight_poller(
"cam1".to_string(),
driver,
mqtt,
"bairelay".to_string(),
20,
cancel.clone(),
empty_cache(),
));
tokio::time::sleep(Duration::from_millis(80)).await;
cancel.cancel();
let _ = task.await;
assert!(
!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam1/status/floodlight_tasks"),
"error branch must not publish floodlight_tasks"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn preview_poller_refreshes_jpeg_while_connected() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
let fresh_jpeg = fake_jpeg();
let fresh_clone = fresh_jpeg.clone();
let fake = bairelay_neolink_core::bc_protocol::FakeCameraBuilder::new()
.with_snapshot(move || Ok(fresh_clone.to_vec()))
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-conn"),
cancel.clone(),
None,
));
handle.set_driver_for_test(driver);
let last_frame = Arc::new(LastFrameBuffer::new());
last_frame.set_jpeg(Bytes::from_static(&[0u8; 4]));
let rx = handle.preview_state_rx();
let mqtt = SharedMqttClient::for_test_stub("preview-poller-connected");
let cancel_task = cancel.clone();
let canceller = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(80)).await;
cancel_task.cancel();
});
preview_poller(
"cam-conn".to_string(),
Arc::clone(&handle),
Arc::clone(&last_frame),
mqtt,
"bairelay".to_string(),
10,
rx,
false,
cancel,
)
.await;
let _ = canceller.await;
let got = last_frame.jpeg().expect("buffer populated");
assert_eq!(
got.as_ref(),
fresh_jpeg.as_ref(),
"connected path must refresh the cached JPEG from the driver"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn preview_poller_tolerates_snapshot_error_while_connected() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
let fake = bairelay_neolink_core::bc_protocol::FakeCameraBuilder::new()
.with_snapshot(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"snap denied",
))
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-snap-err"),
cancel.clone(),
None,
));
handle.set_driver_for_test(driver);
let last_frame = Arc::new(LastFrameBuffer::new());
last_frame.set_jpeg(fake_jpeg());
let rx = handle.preview_state_rx();
let mqtt = SharedMqttClient::for_test_stub("preview-snap-err");
let cancel_task = cancel.clone();
let canceller = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(80)).await;
cancel_task.cancel();
});
preview_poller(
"cam-snap-err".to_string(),
Arc::clone(&handle),
Arc::clone(&last_frame),
mqtt,
"bairelay".to_string(),
10,
rx,
false,
cancel,
)
.await;
let _ = canceller.await;
}
#[tokio::test]
async fn preview_poller_noops_when_buffer_is_empty() {
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
let last_frame = Arc::new(LastFrameBuffer::new());
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-preview-empty"),
cancel.clone(),
None,
));
let mqtt = SharedMqttClient::for_test_stub("preview-poller-empty");
let rx = handle.preview_state_rx();
let cancel_task = cancel.clone();
let task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(80)).await;
cancel_task.cancel();
});
preview_poller(
"cam-preview-empty".to_string(),
Arc::clone(&handle),
Arc::clone(&last_frame),
mqtt,
"bairelay".to_string(),
10,
rx,
true,
cancel,
)
.await;
let _ = task.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn motion_listener_retries_after_subscribe_error() {
let fake = FakeCameraBuilder::new()
.with_motion_stream_error(|| {
bairelay_neolink_core::bc_protocol::Error::Other("scripted subscribe failure")
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let wl = crate::wake_lock::WakeLockCounter::new();
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(motion_listener(
"cam-retry".to_string(),
driver,
mqtt,
"bairelay".to_string(),
wl,
cancel.clone(),
Duration::from_secs(30),
empty_cache(),
));
tokio::time::sleep(Duration::from_millis(50)).await;
cancel.cancel();
let _ = tokio::time::timeout(Duration::from_millis(500), task).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn floodlight_listener_returns_on_subscribe_error() {
let fake = FakeCameraBuilder::new()
.with_floodlight_stream_error(|| {
bairelay_neolink_core::bc_protocol::Error::Other("floodlight unsupported")
})
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
tokio::time::timeout(
Duration::from_millis(500),
floodlight_listener(
"cam-err".to_string(),
driver,
mqtt,
"bairelay".to_string(),
cancel,
empty_cache(),
),
)
.await
.expect("listener must return on subscribe error");
assert!(
mock.published().is_empty(),
"subscribe error must not publish; got {:?}",
mock.published_topics()
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn battery_poller_timeout_branch_logs_and_continues() {
let fake = FakeCameraBuilder::new().with_battery_info_pending().build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(battery_poller(
"cam-bt".to_string(),
driver,
mqtt,
"bairelay".to_string(),
20,
cancel.clone(),
empty_cache(),
));
tokio::time::advance(Duration::from_secs(11)).await;
tokio::task::yield_now().await;
cancel.cancel();
let _ = task.await;
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-bt/status/battery_level"));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn floodlight_poller_timeout_branch_logs_and_continues() {
let fake = FakeCameraBuilder::new()
.with_is_floodlight_tasks_enabled_pending()
.build();
let driver: Arc<dyn CameraDriver> = fake;
let cancel = CancellationToken::new();
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(floodlight_poller(
"cam-ft".to_string(),
driver,
mqtt,
"bairelay".to_string(),
20,
cancel.clone(),
empty_cache(),
));
tokio::time::advance(Duration::from_secs(11)).await;
tokio::task::yield_now().await;
cancel.cancel();
let _ = task.await;
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-ft/status/floodlight_tasks"));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn publish_pir_state_timeout_branch() {
let fake = FakeCameraBuilder::new().with_pirstate_pending().build();
let driver: Arc<dyn CameraDriver> = fake;
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let task = tokio::spawn(publish_pir_state(
"cam-pst".to_string(),
driver,
mqtt,
"bairelay".to_string(),
empty_cache(),
));
tokio::time::advance(Duration::from_secs(11)).await;
let _ = task.await;
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-pst/status/pir"));
}
}