use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use bairelay_mqtt::{Event, Packet, SharedMqttClient, StatusPublisher};
use crate::camera::CameraHandle;
use crate::config::Config;
pub fn resolve_topic_prefix(config: &Config) -> String {
config
.mqtt
.as_ref()
.map(|m| m.topic_prefix.clone())
.unwrap_or_else(|| "bairelay".to_string())
}
pub fn build_rtsp_users(config: &Config) -> Vec<bairelay_rtsp::rtsp::auth::UserCred> {
config
.users
.iter()
.map(|u| bairelay_rtsp::rtsp::auth::UserCred {
name: u.name.clone(),
password: u.pass.clone(),
})
.collect()
}
pub fn build_broker_config(config: &Config) -> Option<bairelay_mqtt::MqttConfig> {
let mqtt_config = config.mqtt.as_ref()?;
Some(bairelay_mqtt::MqttConfig {
broker_addr: mqtt_config.broker_addr.clone(),
port: mqtt_config.port,
credentials: mqtt_config.credentials.clone(),
ca: mqtt_config.ca.clone(),
client_auth: None,
})
}
pub async fn publish_shutdown_fanout(
camera_names: &[String],
cameras: &HashMap<String, Arc<CameraHandle>>,
mqtt: &SharedMqttClient,
topic_prefix: &str,
) {
for name in camera_names {
let publisher = StatusPublisher::new(mqtt, topic_prefix, name);
let _ = publisher.publish_connection(false).await;
}
for (name, cam) in cameras.iter() {
if let Err(e) = cam.unpublish_discovery().await {
tracing::warn!(
camera = %name,
error = %e,
"Failed to unpublish HA discovery on shutdown"
);
}
}
}
pub const SHUTDOWN_FANOUT_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Debug)]
pub enum EventAction<'a> {
Publish {
topic: &'a str,
payload: &'a [u8],
},
ConnAck,
LogError,
Ignore,
}
pub fn classify_event<'a, E>(event: &'a Result<Event, E>) -> EventAction<'a> {
match event {
Ok(Event::Incoming(Packet::Publish(msg))) => EventAction::Publish {
topic: &msg.topic,
payload: &msg.payload,
},
Ok(Event::Incoming(Packet::ConnAck(_))) => EventAction::ConnAck,
Err(_) => EventAction::LogError,
Ok(_) => EventAction::Ignore,
}
}
#[derive(Debug)]
pub struct MqttBackoff {
consecutive: u32,
last_logged_at: Option<std::time::Instant>,
relog_period: Duration,
}
impl Default for MqttBackoff {
fn default() -> Self {
Self::new()
}
}
impl MqttBackoff {
pub const RELOG_PERIOD: Duration = Duration::from_secs(60);
pub fn new() -> Self {
Self {
consecutive: 0,
last_logged_at: None,
relog_period: Self::RELOG_PERIOD,
}
}
pub fn record_error(&mut self, _msg: &str) -> (Duration, bool) {
self.record_error_at(_msg, std::time::Instant::now())
}
pub fn record_error_at(&mut self, _msg: &str, now: std::time::Instant) -> (Duration, bool) {
let should_log = self
.last_logged_at
.is_none_or(|t| now.duration_since(t) >= self.relog_period);
if should_log {
self.last_logged_at = Some(now);
}
let exp = self.consecutive.min(5);
let raw = 1u64 << exp;
let delay = Duration::from_secs(raw.min(30));
self.consecutive = self.consecutive.saturating_add(1);
(delay, should_log)
}
pub fn reset(&mut self) {
self.consecutive = 0;
self.last_logged_at = None;
}
}
pub async fn handle_connack(
cameras: &HashMap<String, Arc<CameraHandle>>,
mqtt: &SharedMqttClient,
topic_prefix: &str,
) {
tracing::info!("MQTT broker connected");
let mut subscribe_failures: Vec<String> = Vec::new();
for name in cameras.keys() {
tracing::debug!(camera = %name, "Subscribing to MQTT topics");
if let Err(e) = mqtt.subscribe_all(topic_prefix, name).await {
if subscribe_failures.is_empty() {
tracing::warn!(camera = %name, error = %e, "Failed to subscribe");
}
subscribe_failures.push(name.clone());
}
}
if subscribe_failures.len() > 1 {
tracing::warn!(
count = subscribe_failures.len(),
cameras = ?subscribe_failures,
"Failed to subscribe additional cameras after the first failure (suppressed for log brevity)"
);
}
for (name, cam) in cameras.iter() {
if cam.capabilities().is_none() {
continue;
}
if let Err(e) = cam.publish_discovery().await {
tracing::warn!(
camera = %name,
error = %e,
"Failed to re-publish HA discovery on ConnAck"
);
}
}
for (name, cam) in cameras.iter() {
if let Err(e) = cam.republish_cached_status().await {
tracing::warn!(
camera = %name,
error = %e,
"Failed to re-publish cached MQTT status on ConnAck"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bairelay_mqtt::{ConnAck, ConnectReturnCode, Publish, QoS};
fn connack_event() -> Result<Event, &'static str> {
Ok(Event::Incoming(Packet::ConnAck(ConnAck {
session_present: false,
code: ConnectReturnCode::Success,
})))
}
fn publish_event() -> Result<Event, &'static str> {
Ok(Event::Incoming(Packet::Publish(Publish::new(
"bairelay/cam/control/reboot",
QoS::AtLeastOnce,
"1",
))))
}
fn pingresp_event() -> Result<Event, &'static str> {
Ok(Event::Incoming(Packet::PingResp))
}
#[test]
fn classify_publish_returns_publish_action() {
let ev = publish_event();
match classify_event(&ev) {
EventAction::Publish { topic, payload } => {
assert_eq!(topic, "bairelay/cam/control/reboot");
assert_eq!(payload, b"1");
}
_ => panic!("expected Publish action"),
}
}
#[test]
fn classify_connack_returns_connack_action() {
let ev = connack_event();
assert!(matches!(classify_event(&ev), EventAction::ConnAck));
}
#[test]
fn classify_error_returns_log_error() {
let ev: Result<Event, &'static str> = Err("broker disconnected");
assert!(matches!(classify_event(&ev), EventAction::LogError));
}
#[test]
fn classify_other_packet_returns_ignore() {
let ev = pingresp_event();
assert!(matches!(classify_event(&ev), EventAction::Ignore));
let ev: Result<Event, &'static str> = Ok(Event::Outgoing(bairelay_mqtt::Outgoing::PingReq));
assert!(matches!(classify_event(&ev), EventAction::Ignore));
}
#[test]
fn mqtt_backoff_first_error_logs_and_starts_at_one_second() {
let mut bo = MqttBackoff::new();
let (delay, log) = bo.record_error("Connection refused");
assert_eq!(delay, Duration::from_secs(1));
assert!(log, "first occurrence must log");
}
#[test]
fn mqtt_backoff_consecutive_same_error_suppresses_log() {
let mut bo = MqttBackoff::new();
let t0 = std::time::Instant::now();
bo.record_error_at("Connection refused", t0);
let (_, log) = bo.record_error_at("Connection refused", t0 + Duration::from_secs(1));
assert!(!log, "identical repeat within relog window must suppress");
}
#[test]
fn mqtt_backoff_delay_is_exponential_then_capped() {
let mut bo = MqttBackoff::new();
let t0 = std::time::Instant::now();
let mut delays = Vec::new();
for _ in 0..8 {
let (d, _) = bo.record_error_at("err", t0);
delays.push(d.as_secs());
}
assert_eq!(delays, vec![1, 2, 4, 8, 16, 30, 30, 30]);
}
#[test]
fn mqtt_backoff_alternating_messages_stay_suppressed() {
let mut bo = MqttBackoff::new();
let t0 = std::time::Instant::now();
bo.record_error_at("Connection refused", t0);
let (_, log_a) = bo.record_error_at("Broken pipe", t0 + Duration::from_secs(1));
let (_, log_b) = bo.record_error_at("Connection refused", t0 + Duration::from_secs(2));
let (_, log_c) = bo.record_error_at("Broken pipe", t0 + Duration::from_secs(3));
assert!(
!log_a && !log_b && !log_c,
"alternating messages within the relog window must stay quiet (got {log_a},{log_b},{log_c})"
);
}
#[test]
fn mqtt_backoff_relog_after_period_even_for_same_message() {
let mut bo = MqttBackoff::new();
let t0 = std::time::Instant::now();
bo.record_error_at("Connection refused", t0);
let after = t0 + MqttBackoff::RELOG_PERIOD + Duration::from_secs(1);
let (_, log) = bo.record_error_at("Connection refused", after);
assert!(log, "stale window must relog even when message identical");
}
#[test]
fn mqtt_backoff_reset_restores_first_error_behaviour() {
let mut bo = MqttBackoff::new();
let t0 = std::time::Instant::now();
for _ in 0..3 {
bo.record_error_at("Connection refused", t0);
}
bo.reset();
let (delay, log) = bo.record_error_at("Connection refused", t0);
assert_eq!(
delay,
Duration::from_secs(1),
"reset must restart the ladder"
);
assert!(log, "reset must clear the suppression cache");
}
#[tokio::test]
async fn handle_connack_with_empty_cameras_is_noop() {
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let cameras: HashMap<String, Arc<CameraHandle>> = HashMap::new();
handle_connack(&cameras, &mqtt, "bairelay").await;
}
use crate::config::{Config, MqttServerConfig, UserConfig};
#[test]
fn resolve_topic_prefix_defaults_when_mqtt_absent() {
let cfg = Config::default();
assert_eq!(resolve_topic_prefix(&cfg), "bairelay");
}
#[test]
fn resolve_topic_prefix_honours_configured_value() {
let cfg = Config {
mqtt: Some(MqttServerConfig {
broker_addr: "localhost".into(),
port: 1883,
credentials: None,
ca: None,
client_auth: None,
topic_prefix: "neolink".into(),
discovery: None,
}),
..Config::default()
};
assert_eq!(resolve_topic_prefix(&cfg), "neolink");
}
#[test]
fn build_rtsp_users_maps_each_entry() {
let cfg = Config {
users: vec![
UserConfig {
name: "alice".into(),
pass: "apw".into(),
},
UserConfig {
name: "bob".into(),
pass: "bpw".into(),
},
],
..Config::default()
};
let out = build_rtsp_users(&cfg);
assert_eq!(out.len(), 2);
assert_eq!(out[0].name, "alice");
assert_eq!(out[0].password, "apw");
assert_eq!(out[1].name, "bob");
assert_eq!(out[1].password, "bpw");
}
#[test]
fn build_rtsp_users_empty_when_config_has_no_users() {
let cfg = Config::default();
assert!(build_rtsp_users(&cfg).is_empty());
}
#[test]
fn build_broker_config_returns_none_when_mqtt_absent() {
assert!(build_broker_config(&Config::default()).is_none());
}
#[test]
fn build_broker_config_maps_each_field() {
let cfg = Config {
mqtt: Some(MqttServerConfig {
broker_addr: "mqtt.example.com".into(),
port: 8883,
credentials: Some(("u".into(), "p".into())),
ca: Some("/etc/ca.pem".into()),
client_auth: None,
topic_prefix: "bairelay".into(),
discovery: None,
}),
..Config::default()
};
let b = build_broker_config(&cfg).expect("Some");
assert_eq!(b.broker_addr, "mqtt.example.com");
assert_eq!(b.port, 8883);
assert_eq!(b.credentials.as_ref().unwrap().0, "u");
assert_eq!(b.credentials.as_ref().unwrap().1, "p");
assert_eq!(b.ca.as_deref(), Some("/etc/ca.pem"));
assert!(b.client_auth.is_none());
}
#[test]
fn shutdown_fanout_timeout_is_two_seconds() {
assert_eq!(SHUTDOWN_FANOUT_TIMEOUT, Duration::from_secs(2));
}
#[tokio::test]
async fn publish_shutdown_fanout_is_noop_on_empty_input() {
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let cameras: HashMap<String, Arc<CameraHandle>> = HashMap::new();
publish_shutdown_fanout(&[], &cameras, &mqtt, "bairelay").await;
assert_eq!(mock.count(), 0);
}
#[tokio::test]
async fn publish_shutdown_fanout_publishes_one_disconnect_per_camera_name() {
let (mqtt, mock) = bairelay_mqtt::test_support::mock_client();
let cameras: HashMap<String, Arc<CameraHandle>> = HashMap::new();
publish_shutdown_fanout(
&["cam-a".to_string(), "cam-b".to_string()],
&cameras,
&mqtt,
"bairelay",
)
.await;
assert!(mock.count() >= 2, "expected at least 2 publishes");
}
#[tokio::test]
async fn handle_connack_subscribes_each_camera_and_skips_nocaps_discovery() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let (mqtt, _mock) = bairelay_mqtt::test_support::mock_client();
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("cam-a"),
cancel,
None,
));
let mut cameras = HashMap::new();
cameras.insert("cam-a".to_string(), handle);
handle_connack(&cameras, &mqtt, "bairelay").await;
}
}