use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
use bairelay_mqtt::control::{ControlCommand, IrMode, PtzDirection};
use bairelay_mqtt::{topics, SharedMqttClient, StatusPublisher};
use bairelay_neolink_core::bc_protocol::{Direction, LightState};
use crate::camera::CameraHandle;
pub async fn dispatch_control(
cmd: ControlCommand,
cameras: &HashMap<String, Arc<CameraHandle>>,
mqtt: &SharedMqttClient,
topic_prefix: &str,
) {
let camera_name = cmd.camera_name().to_owned();
let Some(cam) = cameras.get(&camera_name) else {
tracing::warn!(camera = %camera_name, "Control for unknown camera");
return;
};
if let ControlCommand::Wakeup { minutes, .. } = &cmd {
tracing::info!(camera = %camera_name, minutes, "Wakeup requested");
let wl = cam.wake_lock().clone();
let cam_clone = Arc::clone(cam);
let cancel = cam_clone.cancel_token().clone();
let mins = *minutes;
tokio::spawn(async move {
let _guard = wl.acquire();
let connect_timeout = tokio::time::timeout(Duration::from_secs(120), async {
loop {
if cam_clone.state().is_connected() {
return;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
tokio::select! {
_ = cancel.cancelled() => return,
result = connect_timeout => {
if result.is_err() {
tracing::warn!(camera = %cam_clone.name(), "Wakeup timed out waiting for camera to connect");
return;
}
}
}
tokio::select! {
_ = cancel.cancelled() => {},
_ = tokio::time::sleep(Duration::from_secs(mins as u64 * 60)) => {},
}
});
return;
}
let _guard = cam.wake_lock().acquire();
let reply_topic = cmd.control_topic(topic_prefix);
tracing::debug!(camera = %camera_name, command = %reply_topic, "Dispatching control command");
let bc = match cam.bc_camera() {
Some(bc) => bc,
None => {
tracing::info!(
camera = %camera_name,
"Camera not connected; waking and waiting before dispatch"
);
match wait_for_bc_camera(cam, Duration::from_secs(15)).await {
Some(bc) => bc,
None => {
tracing::warn!(
camera = %camera_name,
"Timed out waiting for camera to connect; dropping command"
);
return;
}
}
}
};
const CMD_TIMEOUT: Duration = Duration::from_secs(30);
let result = match cmd {
ControlCommand::Floodlight { state, .. } => {
timeout_to_result(timeout(CMD_TIMEOUT, bc.set_floodlight_manual(state, 30)).await)
}
ControlCommand::FloodlightTasks { state, .. } => {
timeout_to_result(timeout(CMD_TIMEOUT, bc.floodlight_tasks_enable(state)).await)
}
ControlCommand::Led { state, .. } => {
timeout_to_result(timeout(CMD_TIMEOUT, bc.led_light_set(state)).await)
}
ControlCommand::Ir { mode, .. } => {
let ls = match mode {
IrMode::On => LightState::On,
IrMode::Off => LightState::Off,
IrMode::Auto => LightState::Auto,
};
timeout_to_result(timeout(CMD_TIMEOUT, bc.irled_light_set(ls)).await)
}
ControlCommand::Pir {
camera: ref cam_name,
state,
} => {
let result = timeout_to_result(timeout(CMD_TIMEOUT, bc.pir_set(state)).await);
if result.is_ok() {
let publisher = StatusPublisher::new(mqtt, topic_prefix, cam_name);
let _ = publisher.publish_pir(state).await;
cam.status_cache().set_pir(state);
}
result
}
ControlCommand::Reboot { .. } => timeout_to_result(timeout(CMD_TIMEOUT, bc.reboot()).await),
ControlCommand::Ptz {
direction, amount, ..
} => {
let dir = match direction {
PtzDirection::Up => Direction::Up,
PtzDirection::Down => Direction::Down,
PtzDirection::Left => Direction::Left,
PtzDirection::Right => Direction::Right,
};
let speed = 32.0_f32;
let seconds = (amount / speed).clamp(0.0, 10.0);
match timeout(CMD_TIMEOUT, bc.send_ptz(dir, speed)).await {
Ok(Err(e)) => {
tracing::warn!(camera = %camera_name, error = %e, "PTZ move failed");
}
Err(_) => {
tracing::warn!(camera = %camera_name, "PTZ move timed out");
}
Ok(Ok(())) => {
tokio::time::sleep(Duration::from_secs_f32(seconds)).await;
}
}
let _ = timeout(CMD_TIMEOUT, bc.send_ptz(Direction::Stop, speed)).await;
Ok(())
}
ControlCommand::PtzPreset {
ref camera,
preset_id,
} => {
let result =
timeout_to_result(timeout(CMD_TIMEOUT, bc.moveto_ptz_preset(preset_id)).await);
if result.is_ok() {
if let Some(name) = cam.preset_name_for_id(preset_id) {
let topic = topics::status_ptz_preset(topic_prefix, camera);
let _ = mqtt.publish_retained(&topic, name.as_bytes()).await;
}
}
result
}
ControlCommand::PtzPresetByName {
ref camera,
ref name,
} => match cam.preset_id_for_name(name) {
Some(preset_id) => {
let result =
timeout_to_result(timeout(CMD_TIMEOUT, bc.moveto_ptz_preset(preset_id)).await);
if result.is_ok() {
let topic = topics::status_ptz_preset(topic_prefix, camera);
let _ = mqtt.publish_retained(&topic, name.as_bytes()).await;
}
result
}
None => {
tracing::warn!(camera = %camera, name = %name, "PTZ preset name not in cache");
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"PTZ preset name not in cache",
))
}
},
ControlCommand::PtzAssign {
preset_id, name, ..
} => timeout_to_result(timeout(CMD_TIMEOUT, bc.set_ptz_preset(preset_id, name)).await),
ControlCommand::Zoom { level, .. } => {
timeout_to_result(timeout(CMD_TIMEOUT, bc.zoom_to((level * 1000.0) as u32)).await)
}
ControlCommand::Siren { state, .. } => {
if state {
timeout_to_result(timeout(CMD_TIMEOUT, bc.siren()).await)
} else {
Ok(())
}
}
ControlCommand::Wakeup { .. } => {
unreachable!()
}
ControlCommand::QueryBattery { ref camera, .. } => {
match timeout(CMD_TIMEOUT, bc.battery_info()).await {
Ok(Ok(info)) => {
let topic = topics::status_battery(topic_prefix, camera);
if let Ok(xml) = serialize_xml(&info) {
let _ = mqtt.publish(&topic, xml.as_bytes()).await;
}
}
Ok(Err(e)) => tracing::warn!(camera = %camera, error = %e, "Battery query failed"),
Err(_) => tracing::warn!(camera = %camera, "Battery query timed out"),
}
Ok(())
}
ControlCommand::QueryPreview { ref camera } => {
match timeout(CMD_TIMEOUT, bc.get_snapshot()).await {
Ok(Ok(jpeg)) => {
let payload = crate::preview_overlay::rendered_preview(
bytes::Bytes::from(jpeg),
cam.config().pause.preview_overlay,
*cam.preview_state_rx().borrow(),
None,
);
let publisher = StatusPublisher::new(mqtt, topic_prefix, camera);
if let Err(e) = publisher.publish_preview(&payload).await {
tracing::warn!(camera = %camera, error = %e, "QueryPreview publish failed");
}
}
Ok(Err(e)) => {
tracing::warn!(camera = %camera, error = %e, "QueryPreview snapshot failed")
}
Err(_) => tracing::warn!(camera = %camera, "QueryPreview snapshot timed out"),
}
Ok(())
}
ControlCommand::QueryPir { ref camera, .. } => {
match timeout(CMD_TIMEOUT, bc.get_pirstate()).await {
Ok(Ok(pir_state)) => {
let topic = topics::status_pir(topic_prefix, camera);
if let Ok(xml) = serialize_xml(&pir_state) {
let _ = mqtt.publish(&topic, xml.as_bytes()).await;
}
}
Ok(Err(e)) => tracing::warn!(camera = %camera, error = %e, "PIR query failed"),
Err(_) => tracing::warn!(camera = %camera, "PIR query timed out"),
}
Ok(())
}
ControlCommand::QueryPtzPreset { ref camera } => {
match timeout(CMD_TIMEOUT, bc.get_ptz_preset()).await {
Ok(Ok(p)) => {
let presets: Vec<(u8, String)> = p
.preset_list
.preset
.into_iter()
.filter_map(|preset| preset.name.map(|n| (preset.id, n)))
.collect();
cam.replace_preset_cache(presets);
if let Err(e) = cam.publish_discovery().await {
tracing::warn!(
camera = %camera,
error = %e,
"QueryPtzPreset: discovery republish failed"
);
}
}
Ok(Err(e)) => {
tracing::warn!(camera = %camera, error = %e, "QueryPtzPreset: get_ptz_preset failed")
}
Err(_) => {
tracing::warn!(camera = %camera, "QueryPtzPreset: get_ptz_preset timed out")
}
}
Ok(())
}
};
let reply = if result.is_ok() { "OK" } else { "FAIL" };
tracing::debug!(camera = %camera_name, command = %reply_topic, result = reply, "Command completed");
let _ = mqtt.publish(&reply_topic, reply.as_bytes()).await;
if let Err(e) = result {
tracing::warn!(camera = %camera_name, command = %reply_topic, error = %e, "Control command failed");
}
}
async fn wait_for_bc_camera(
cam: &Arc<CameraHandle>,
deadline: Duration,
) -> Option<Arc<dyn bairelay_neolink_core::bc_protocol::CameraDriver>> {
let cancel = cam.cancel_token().clone();
let cam_clone = Arc::clone(cam);
let inner = async move {
loop {
if let Some(bc) = cam_clone.bc_camera() {
return Some(bc);
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
};
tokio::select! {
_ = cancel.cancelled() => None,
result = tokio::time::timeout(deadline, inner) => result.ok().flatten(),
}
}
pub(crate) fn timeout_to_result<T>(
outcome: std::result::Result<
std::result::Result<T, bairelay_neolink_core::bc_protocol::Error>,
tokio::time::error::Elapsed,
>,
) -> std::result::Result<T, bairelay_neolink_core::bc_protocol::Error> {
outcome.unwrap_or_else(|_| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"Command timed out",
))
})
}
fn serialize_xml<T: serde::Serialize>(value: &T) -> Result<String, String> {
let mut buf = bytes::BytesMut::new();
quick_xml::se::to_writer(&mut buf, value)
.map_err(|e| format!("XML serialization failed: {e}"))?;
String::from_utf8(buf.to_vec()).map_err(|e| format!("XML encoding failed: {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::camera::CameraHandle;
use crate::config::test_helpers::minimal_camera_config;
use bairelay_neolink_core::bc_protocol::{CameraDriver, FakeCamera, FakeCameraBuilder};
use std::sync::Arc as StdArc;
use tokio_util::sync::CancellationToken;
#[test]
fn timeout_to_result_forwards_ok_value() {
let r: std::result::Result<u32, bairelay_neolink_core::bc_protocol::Error> =
timeout_to_result(Ok(Ok(42)));
assert_eq!(r.unwrap(), 42);
}
#[test]
fn timeout_to_result_forwards_inner_error() {
let r: std::result::Result<u32, bairelay_neolink_core::bc_protocol::Error> =
timeout_to_result(Ok(Err(
bairelay_neolink_core::bc_protocol::Error::AuthFailed,
)));
assert!(matches!(
r.unwrap_err(),
bairelay_neolink_core::bc_protocol::Error::AuthFailed
));
}
#[tokio::test]
async fn timeout_to_result_elapsed_becomes_other_command_timed_out() {
let elapsed = tokio::time::timeout(
Duration::from_millis(1),
std::future::pending::<
std::result::Result<(), bairelay_neolink_core::bc_protocol::Error>,
>(),
)
.await
.unwrap_err();
let r: std::result::Result<(), bairelay_neolink_core::bc_protocol::Error> =
timeout_to_result(Err(elapsed));
let err = r.unwrap_err();
assert!(format!("{err}").contains("Command timed out"));
}
fn test_cameras(name: &str) -> HashMap<String, Arc<CameraHandle>> {
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(minimal_camera_config(name), cancel, None));
let mut map = HashMap::new();
map.insert(name.to_string(), handle);
map
}
fn test_cameras_with_fake(
name: &str,
fake: StdArc<FakeCamera>,
) -> (HashMap<String, Arc<CameraHandle>>, StdArc<FakeCamera>) {
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(minimal_camera_config(name), cancel, None));
let driver: Arc<dyn CameraDriver> = fake.clone();
handle.set_driver_for_test(driver);
let mut map = HashMap::new();
map.insert(name.to_string(), handle);
(map, fake)
}
#[tokio::test]
async fn dispatch_unknown_camera_returns_early() {
let cameras = test_cameras("cam-known");
let mqtt = SharedMqttClient::for_test_stub("dispatch-unknown");
let cmd = ControlCommand::Floodlight {
camera: "cam-missing".to_string(),
state: true,
};
dispatch_control(cmd, &cameras, &mqtt, "bairelay").await;
}
#[tokio::test]
async fn dispatch_disconnected_camera_waits_then_drops_on_cancel() {
let cameras = test_cameras("cam-disc");
let mqtt = SharedMqttClient::for_test_stub("dispatch-disc");
let cmd = ControlCommand::Led {
camera: "cam-disc".to_string(),
state: false,
};
cameras.get("cam-disc").unwrap().cancel_token().cancel();
dispatch_control(cmd, &cameras, &mqtt, "bairelay").await;
}
#[tokio::test]
async fn dispatch_wakeup_spawns_task_without_panicking() {
let cameras = test_cameras("cam-wake");
let mqtt = SharedMqttClient::for_test_stub("dispatch-wake");
let cmd = ControlCommand::Wakeup {
camera: "cam-wake".to_string(),
minutes: 1,
};
dispatch_control(cmd, &cameras, &mqtt, "bairelay").await;
cameras.get("cam-wake").unwrap().cancel_token().cancel();
tokio::time::sleep(Duration::from_millis(20)).await;
}
#[tokio::test]
async fn dispatch_reboot_invokes_driver_and_replies_ok() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-reboot", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Reboot {
camera: "cam-reboot".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
*fake.calls().reboot.lock().unwrap(),
1,
"reboot should be invoked exactly once"
);
assert!(
mock.published()
.iter()
.any(|(t, p, _)| { t == "bairelay/cam-reboot/control/reboot" && p == b"OK" }),
"OK reply should be published on the control topic; observed: {:?}",
mock.published_topics()
);
}
#[tokio::test]
async fn dispatch_floodlight_on_invokes_driver_with_30s_duration() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-fl", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Floodlight {
camera: "cam-fl".to_string(),
state: true,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
*fake.calls().set_floodlight_manual.lock().unwrap(),
vec![(true, 30)],
"Floodlight on should call set_floodlight_manual(true, 30)"
);
}
#[tokio::test]
async fn dispatch_floodlight_off_invokes_driver_with_30s_duration() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-fl-off", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Floodlight {
camera: "cam-fl-off".to_string(),
state: false,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
*fake.calls().set_floodlight_manual.lock().unwrap(),
vec![(false, 30)]
);
}
#[tokio::test]
async fn dispatch_pir_set_invokes_driver_and_republishes_state() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-pir", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Pir {
camera: "cam-pir".to_string(),
state: true,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(*fake.calls().pir_set.lock().unwrap(), vec![true]);
let rows = mock.published();
assert!(
rows.iter()
.any(|(t, p, r)| t == "bairelay/cam-pir/status/pir" && p == b"on" && *r),
"Pir set should re-publish retained status/pir=on; observed: {:?}",
rows
);
assert!(
rows.iter()
.any(|(t, p, _)| t == "bairelay/cam-pir/control/pir" && p == b"OK"),
"Pir set should reply OK on the control topic; observed: {:?}",
rows
);
}
#[tokio::test]
async fn dispatch_ptz_preset_by_name_cache_miss_replies_fail() {
let fake = FakeCameraBuilder::new().build();
let (cameras, _fake) = test_cameras_with_fake("cam-pn", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::PtzPresetByName {
camera: "cam-pn".to_string(),
name: "Doorstep".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
let publishes = mock.published();
let reply = publishes
.iter()
.find(|(t, _, _)| t == "bairelay/cam-pn/control/ptz/preset")
.expect("dispatcher must publish on the reply topic");
assert_eq!(reply.1, b"FAIL", "cache-miss must reply FAIL");
}
#[tokio::test]
async fn dispatch_ptz_assign_invokes_driver_with_id_and_name() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-pt", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::PtzAssign {
camera: "cam-pt".to_string(),
preset_id: 1,
name: "Home".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
*fake.calls().set_ptz_preset.lock().unwrap(),
vec![(1u8, "Home".to_string())]
);
}
#[tokio::test]
async fn dispatch_ptz_directional_move_sends_direction_then_stop() {
use bairelay_neolink_core::bc_protocol::Direction;
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-pd", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Ptz {
camera: "cam-pd".to_string(),
direction: PtzDirection::Up,
amount: 0.0,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
let calls = fake.calls().send_ptz.lock().unwrap().clone();
assert_eq!(
calls,
vec![(Direction::Up, 32.0_f32), (Direction::Stop, 32.0_f32)],
"directional Ptz should issue the direction then an unconditional Stop"
);
}
#[tokio::test]
async fn dispatch_ptz_directional_still_stops_on_error() {
use bairelay_neolink_core::bc_protocol::Direction;
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-pe", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Ptz {
camera: "cam-pe".to_string(),
direction: PtzDirection::Left,
amount: 0.0,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
let calls = fake.calls().send_ptz.lock().unwrap().clone();
assert_eq!(calls.len(), 2, "must record two send_ptz calls");
assert_eq!(calls[1].0, Direction::Stop, "second call must be Stop");
}
#[tokio::test]
async fn dispatch_ptz_preset_invokes_driver_moveto() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-pm", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::PtzPreset {
camera: "cam-pm".to_string(),
preset_id: 3,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(*fake.calls().moveto_ptz_preset.lock().unwrap(), vec![3u8]);
}
#[tokio::test]
async fn dispatch_led_on_invokes_driver() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-led", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Led {
camera: "cam-led".to_string(),
state: true,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(*fake.calls().led_light_set.lock().unwrap(), vec![true]);
assert!(mock
.published()
.iter()
.any(|(t, p, _)| t == "bairelay/cam-led/control/led" && p == b"OK"));
}
#[tokio::test]
async fn dispatch_ir_maps_each_mode_variant() {
use bairelay_neolink_core::bc_protocol::LightState;
let cases = [
(IrMode::On, LightState::On),
(IrMode::Off, LightState::Off),
(IrMode::Auto, LightState::Auto),
];
for (mode, expected) in cases {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-ir", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Ir {
camera: "cam-ir".to_string(),
mode,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
let calls = fake.calls().irled_light_set.lock().unwrap().clone();
assert_eq!(calls.len(), 1, "one irled_light_set call per dispatch");
assert_eq!(calls[0], expected);
}
}
#[tokio::test]
async fn dispatch_siren_on_invokes_driver_and_off_is_noop() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-siren", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Siren {
camera: "cam-siren".to_string(),
state: true,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(*fake.calls().siren.lock().unwrap(), 1);
dispatch_control(
ControlCommand::Siren {
camera: "cam-siren".to_string(),
state: false,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
*fake.calls().siren.lock().unwrap(),
1,
"off-path must not invoke siren()"
);
}
#[tokio::test]
async fn dispatch_floodlight_tasks_invokes_driver() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-ft", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::FloodlightTasks {
camera: "cam-ft".to_string(),
state: true,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
*fake.calls().floodlight_tasks_enable.lock().unwrap(),
vec![true]
);
}
#[tokio::test]
async fn dispatch_zoom_scales_level_to_zoom_to() {
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-zoom", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Zoom {
camera: "cam-zoom".to_string(),
level: 0.5,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(*fake.calls().zoom_to.lock().unwrap(), vec![500u32]);
}
#[tokio::test]
async fn dispatch_query_battery_publishes_xml() {
use bairelay_neolink_core::bc::xml::BatteryInfo;
let fake = FakeCameraBuilder::new()
.with_battery_info(|| {
Ok(BatteryInfo {
battery_percent: 42,
..Default::default()
})
})
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qb", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryBattery {
camera: "cam-qb".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(mock
.published()
.iter()
.any(|(t, p, _)| t == "bairelay/cam-qb/status/battery"
&& std::str::from_utf8(p).unwrap_or("").contains("42")));
}
#[tokio::test]
async fn dispatch_query_battery_handles_driver_error() {
let fake = FakeCameraBuilder::new()
.with_battery_info(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"battery refused",
))
})
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qbe", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryBattery {
camera: "cam-qbe".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(
!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qbe/status/battery"),
"error path must not publish status/battery"
);
}
#[tokio::test]
async fn dispatch_query_preview_publishes_jpeg() {
let jpeg: Vec<u8> = {
let img = image::RgbImage::from_pixel(8, 8, image::Rgb([0, 0, 0]));
let mut out = Vec::new();
image::DynamicImage::ImageRgb8(img)
.write_to(
&mut std::io::Cursor::new(&mut out),
image::ImageFormat::Jpeg,
)
.expect("encode");
out
};
let jpeg_clone = jpeg.clone();
let fake = FakeCameraBuilder::new()
.with_snapshot(move || Ok(jpeg_clone.clone()))
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qp", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPreview {
camera: "cam-qp".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(
mock.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qp/status/preview"),
"QueryPreview should publish status/preview; observed: {:?}",
mock.published_topics()
);
}
#[tokio::test]
async fn dispatch_query_preview_handles_driver_error() {
let fake = FakeCameraBuilder::new()
.with_snapshot(|| Err(bairelay_neolink_core::bc_protocol::Error::Other("no snap")))
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qpe", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPreview {
camera: "cam-qpe".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qpe/status/preview"));
}
#[tokio::test]
async fn dispatch_query_preview_passthrough_when_overlay_disabled() {
let jpeg: Vec<u8> = {
let img = image::RgbImage::from_pixel(8, 8, image::Rgb([0, 0, 0]));
let mut out = Vec::new();
image::DynamicImage::ImageRgb8(img)
.write_to(
&mut std::io::Cursor::new(&mut out),
image::ImageFormat::Jpeg,
)
.expect("encode");
out
};
let jpeg_clone = jpeg.clone();
let fake = FakeCameraBuilder::new()
.with_snapshot(move || Ok(jpeg_clone.clone()))
.build();
let cancel = CancellationToken::new();
let mut cfg = minimal_camera_config("cam-qpov");
cfg.pause.preview_overlay = false;
let handle = Arc::new(CameraHandle::new(cfg, cancel, None));
let driver: Arc<dyn CameraDriver> = fake;
handle.set_driver_for_test(driver);
let mut cameras = HashMap::new();
cameras.insert("cam-qpov".to_string(), handle);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPreview {
camera: "cam-qpov".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qpov/status/preview"));
}
#[tokio::test]
async fn dispatch_query_pir_publishes_xml() {
use bairelay_neolink_core::bc::xml::RfAlarmCfg;
let fake = FakeCameraBuilder::new()
.with_pirstate(|| {
Ok(RfAlarmCfg {
enable: 1,
..Default::default()
})
})
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qpir", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPir {
camera: "cam-qpir".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qpir/status/pir"));
}
#[tokio::test]
async fn dispatch_query_pir_handles_driver_error() {
let fake = FakeCameraBuilder::new()
.with_pirstate(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"pir refused",
))
})
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qpe", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPir {
camera: "cam-qpe".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qpe/status/pir"));
}
#[tokio::test]
async fn dispatch_query_ptz_preset_refreshes_cache_and_replies_ok() {
use bairelay_neolink_core::bc::xml::{Preset, PresetList, PtzPreset};
let fake = FakeCameraBuilder::new()
.with_ptz_preset(|| {
Ok(PtzPreset {
preset_list: PresetList {
preset: vec![
Preset {
id: 1,
name: Some("Garden".to_string()),
..Default::default()
},
Preset {
id: 2,
name: Some("Driveway".to_string()),
..Default::default()
},
],
},
..Default::default()
})
})
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qptz", fake);
cameras
.get("cam-qptz")
.unwrap()
.set_preset_cache_for_test(vec![(99, "stale".to_string())]);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPtzPreset {
camera: "cam-qptz".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
cameras.get("cam-qptz").unwrap().preset_cache(),
vec![(1u8, "Garden".to_string()), (2u8, "Driveway".to_string())],
"QueryPtzPreset must replace the cache with fresh camera data"
);
assert!(mock
.published()
.iter()
.any(|(t, p, _)| t == "bairelay/cam-qptz/query/ptz/preset" && p == b"OK"));
}
#[tokio::test]
async fn dispatch_query_ptz_preset_driver_error_preserves_cache() {
let fake = FakeCameraBuilder::new()
.with_ptz_preset(|| {
Err(bairelay_neolink_core::bc_protocol::Error::Other(
"ptz refused",
))
})
.build();
let (cameras, _fake) = test_cameras_with_fake("cam-qptze", fake);
cameras
.get("cam-qptze")
.unwrap()
.set_preset_cache_for_test(vec![(7, "Front".to_string())]);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::QueryPtzPreset {
camera: "cam-qptze".to_string(),
},
&cameras,
&mqtt,
"bairelay",
)
.await;
assert_eq!(
cameras.get("cam-qptze").unwrap().preset_cache(),
vec![(7u8, "Front".to_string())],
"driver error must not clobber the previously-cached presets"
);
assert!(mock
.published()
.iter()
.any(|(t, p, _)| t == "bairelay/cam-qptze/query/ptz/preset" && p == b"OK"));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_query_ptz_preset_timeout_path() {
let fake = FakeCameraBuilder::new().with_ptz_preset_pending().build();
let (cameras, _fake) = test_cameras_with_fake("cam-qptzt", fake);
cameras
.get("cam-qptzt")
.unwrap()
.set_preset_cache_for_test(vec![(3, "Side".to_string())]);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let dispatch = dispatch_control(
ControlCommand::QueryPtzPreset {
camera: "cam-qptzt".to_string(),
},
&cameras,
&mqtt,
"bairelay",
);
let (_, _) = tokio::join!(dispatch, async {
tokio::time::advance(Duration::from_secs(31)).await;
});
assert_eq!(
cameras.get("cam-qptzt").unwrap().preset_cache(),
vec![(3u8, "Side".to_string())],
"timeout must not clobber the previously-cached presets"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_ptz_with_positive_amount_sleeps_before_stop() {
use bairelay_neolink_core::bc_protocol::Direction;
let fake = FakeCameraBuilder::new().build();
let (cameras, fake) = test_cameras_with_fake("cam-pds", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
dispatch_control(
ControlCommand::Ptz {
camera: "cam-pds".to_string(),
direction: PtzDirection::Right,
amount: 32.0,
},
&cameras,
&mqtt,
"bairelay",
)
.await;
let calls = fake.calls().send_ptz.lock().unwrap().clone();
assert_eq!(
calls,
vec![(Direction::Right, 32.0_f32), (Direction::Stop, 32.0_f32)]
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_query_battery_timeout_path() {
let fake = FakeCameraBuilder::new().with_battery_info_pending().build();
let (cameras, _fake) = test_cameras_with_fake("cam-qbt", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let dispatch = dispatch_control(
ControlCommand::QueryBattery {
camera: "cam-qbt".to_string(),
},
&cameras,
&mqtt,
"bairelay",
);
let (_, _) = tokio::join!(dispatch, async {
tokio::time::advance(Duration::from_secs(31)).await;
});
assert!(
!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qbt/status/battery"),
"timeout path must NOT publish status/battery"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_query_pir_timeout_path() {
let fake = FakeCameraBuilder::new().with_pirstate_pending().build();
let (cameras, _fake) = test_cameras_with_fake("cam-qpit", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let dispatch = dispatch_control(
ControlCommand::QueryPir {
camera: "cam-qpit".to_string(),
},
&cameras,
&mqtt,
"bairelay",
);
let (_, _) = tokio::join!(dispatch, async {
tokio::time::advance(Duration::from_secs(31)).await;
});
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qpit/status/pir"));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_query_preview_timeout_path() {
let fake = FakeCameraBuilder::new().with_snapshot_pending().build();
let (cameras, _fake) = test_cameras_with_fake("cam-qpvt", fake);
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let dispatch = dispatch_control(
ControlCommand::QueryPreview {
camera: "cam-qpvt".to_string(),
},
&cameras,
&mqtt,
"bairelay",
);
let (_, _) = tokio::join!(dispatch, async {
tokio::time::advance(Duration::from_secs(31)).await;
});
assert!(!mock
.published()
.iter()
.any(|(t, _, _)| t == "bairelay/cam-qpvt/status/preview"));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_ptz_directional_timeout_still_stops() {
use bairelay_neolink_core::bc_protocol::Direction;
let fake = FakeCameraBuilder::new().with_send_ptz_pending().build();
let (cameras, fake) = test_cameras_with_fake("cam-ptt", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let dispatch = dispatch_control(
ControlCommand::Ptz {
camera: "cam-ptt".to_string(),
direction: PtzDirection::Down,
amount: 0.0,
},
&cameras,
&mqtt,
"bairelay",
);
let (_, _) = tokio::join!(dispatch, async {
tokio::time::advance(Duration::from_secs(70)).await;
});
let calls = fake.calls().send_ptz.lock().unwrap().clone();
assert_eq!(calls.len(), 2, "both directional and stop calls recorded");
assert_eq!(calls[0].0, Direction::Down);
assert_eq!(calls[1].0, Direction::Stop);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dispatch_siren_timeout_path() {
let fake = FakeCameraBuilder::new().with_siren_pending().build();
let (cameras, fake) = test_cameras_with_fake("cam-st", fake);
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let dispatch = dispatch_control(
ControlCommand::Siren {
camera: "cam-st".to_string(),
state: true,
},
&cameras,
&mqtt,
"bairelay",
);
let (_, _) = tokio::join!(dispatch, async {
tokio::time::advance(Duration::from_secs(31)).await;
});
assert_eq!(*fake.calls().siren.lock().unwrap(), 1);
}
#[test]
fn serialize_xml_round_trips_a_struct() {
#[derive(serde::Serialize)]
#[serde(rename = "Battery")]
struct Battery {
percent: u8,
}
let s = serialize_xml(&Battery { percent: 42 }).expect("serialize ok");
assert!(s.contains("percent"));
assert!(s.contains("42"));
}
}