use crate::time::Duration;
use crate::transport::StreamStrategy;
use crate::QoS;
pub use mqtt5_protocol::BridgeDirection;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)]
#[allow(clippy::struct_excessive_bools)]
pub struct BridgeConfig {
pub name: String,
pub remote_address: String,
pub client_id: String,
pub username: Option<String>,
pub password: Option<String>,
#[serde(default)]
pub protocol: BridgeProtocol,
#[serde(default)]
pub use_tls: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub tls_server_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ca_file: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_cert_file: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_key_file: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub insecure: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quic_stream_strategy: Option<StreamStrategy>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quic_flow_headers: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quic_datagrams: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quic_max_streams: Option<usize>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub fallback_protocols: Vec<BridgeProtocol>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub fallback_tcp: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub alpn_protocols: Option<Vec<String>>,
#[serde(default = "default_try_private")]
pub try_private: bool,
#[serde(default = "default_clean_start")]
pub clean_start: bool,
#[serde(default = "default_keepalive")]
pub keepalive: u16,
#[serde(default)]
pub protocol_version: MqttVersion,
#[serde(with = "humantime_serde", default = "default_reconnect_delay")]
pub reconnect_delay: Duration,
#[serde(with = "humantime_serde", default = "default_reconnect_delay")]
pub initial_reconnect_delay: Duration,
#[serde(with = "humantime_serde", default = "default_max_reconnect_delay")]
pub max_reconnect_delay: Duration,
#[serde(default = "default_backoff_multiplier")]
pub backoff_multiplier: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_reconnect_attempts: Option<u32>,
#[serde(default = "default_connection_retries")]
pub connection_retries: u8,
#[serde(with = "humantime_serde", default = "default_first_retry_delay")]
pub first_retry_delay: Duration,
#[serde(default = "default_retry_jitter")]
pub retry_jitter: bool,
#[serde(default)]
pub backup_brokers: Vec<String>,
#[serde(
with = "humantime_serde",
default = "default_primary_health_check_interval"
)]
pub primary_health_check_interval: Duration,
#[serde(default = "default_enable_failback")]
pub enable_failback: bool,
#[serde(default)]
pub topics: Vec<TopicMapping>,
#[serde(with = "humantime_serde", default = "default_loop_prevention_ttl")]
pub loop_prevention_ttl: Duration,
#[serde(default = "default_loop_prevention_cache_size")]
pub loop_prevention_cache_size: usize,
}
impl std::fmt::Debug for BridgeConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BridgeConfig")
.field("name", &self.name)
.field("remote_address", &self.remote_address)
.field("client_id", &self.client_id)
.field("username", &self.username)
.field("password", &self.password.as_ref().map(|_| "[REDACTED]"))
.field("protocol", &self.protocol)
.field("use_tls", &self.use_tls)
.field("insecure", &self.insecure)
.field("topics", &self.topics)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicMapping {
pub pattern: String,
pub direction: BridgeDirection,
#[serde(default = "default_qos")]
pub qos: QoS,
#[serde(skip_serializing_if = "Option::is_none")]
pub local_prefix: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub remote_prefix: Option<String>,
}
impl From<&TopicMapping> for mqtt5_protocol::TopicMappingCore {
fn from(mapping: &TopicMapping) -> Self {
Self {
pattern: mapping.pattern.clone(),
direction: mapping.direction,
qos: mapping.qos,
local_prefix: mapping.local_prefix.clone(),
remote_prefix: mapping.remote_prefix.clone(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum BridgeProtocol {
#[default]
Tcp,
Tls,
Quic,
QuicSecure,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MqttVersion {
#[serde(rename = "mqttv31")]
V31,
#[serde(rename = "mqttv311")]
V311,
#[serde(rename = "mqttv50")]
#[default]
V50,
}
fn default_try_private() -> bool {
true
}
fn default_clean_start() -> bool {
false
}
fn default_keepalive() -> u16 {
60
}
fn default_reconnect_delay() -> Duration {
Duration::from_secs(5)
}
fn default_max_reconnect_delay() -> Duration {
Duration::from_secs(300)
}
fn default_backoff_multiplier() -> f64 {
2.0
}
fn default_qos() -> QoS {
QoS::AtMostOnce
}
fn default_primary_health_check_interval() -> Duration {
Duration::from_secs(30)
}
fn default_enable_failback() -> bool {
true
}
fn default_connection_retries() -> u8 {
3
}
fn default_first_retry_delay() -> Duration {
Duration::from_secs(1)
}
fn default_retry_jitter() -> bool {
true
}
fn default_loop_prevention_ttl() -> Duration {
Duration::from_secs(60)
}
fn default_loop_prevention_cache_size() -> usize {
10000
}
impl BridgeConfig {
pub fn new(name: impl Into<String>, remote_address: impl Into<String>) -> Self {
let name = name.into();
Self {
name: name.clone(),
remote_address: remote_address.into(),
client_id: format!("bridge-{name}"),
username: None,
password: None,
protocol: BridgeProtocol::Tcp,
use_tls: false,
tls_server_name: None,
ca_file: None,
client_cert_file: None,
client_key_file: None,
insecure: None,
quic_stream_strategy: None,
quic_flow_headers: None,
quic_datagrams: None,
quic_max_streams: None,
fallback_protocols: Vec::new(),
fallback_tcp: false,
alpn_protocols: None,
try_private: true,
clean_start: false,
keepalive: 60,
protocol_version: MqttVersion::V50,
reconnect_delay: Duration::from_secs(5),
initial_reconnect_delay: Duration::from_secs(5),
max_reconnect_delay: Duration::from_secs(300),
backoff_multiplier: 2.0,
max_reconnect_attempts: None,
connection_retries: 3,
first_retry_delay: Duration::from_secs(1),
retry_jitter: true,
backup_brokers: Vec::new(),
primary_health_check_interval: Duration::from_secs(30),
enable_failback: true,
topics: Vec::new(),
loop_prevention_ttl: Duration::from_secs(60),
loop_prevention_cache_size: 10000,
}
}
#[must_use]
pub fn add_topic(
mut self,
pattern: impl Into<String>,
direction: BridgeDirection,
qos: QoS,
) -> Self {
self.topics.push(TopicMapping {
pattern: pattern.into(),
direction,
qos,
local_prefix: None,
remote_prefix: None,
});
self
}
#[must_use]
pub fn add_backup_broker(mut self, address: impl Into<String>) -> Self {
self.backup_brokers.push(address.into());
self
}
pub fn validate(&self) -> crate::Result<()> {
use crate::error::MqttError;
if self.name.is_empty() {
return Err(MqttError::Configuration(
"Bridge name cannot be empty".into(),
));
}
if self.client_id.is_empty() {
return Err(MqttError::Configuration("Client ID cannot be empty".into()));
}
if self.topics.is_empty() {
return Err(MqttError::Configuration(
"Bridge must have at least one topic mapping".into(),
));
}
for topic in &self.topics {
if topic.pattern.is_empty() {
return Err(MqttError::Configuration(
"Topic pattern cannot be empty".into(),
));
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bridge_config_creation() {
let config = BridgeConfig::new("test-bridge", "remote.broker:1883")
.add_topic("sensors/+/data", BridgeDirection::Out, QoS::AtLeastOnce)
.add_backup_broker("backup.broker:1883");
assert_eq!(config.name, "test-bridge");
assert_eq!(config.client_id, "bridge-test-bridge");
assert_eq!(config.topics.len(), 1);
assert_eq!(config.backup_brokers.len(), 1);
}
#[test]
fn test_bridge_config_validation() {
let mut config = BridgeConfig::new("test", "broker:1883");
assert!(config.validate().is_err());
config = config.add_topic("test/#", BridgeDirection::Both, QoS::AtMostOnce);
assert!(config.validate().is_ok());
}
#[test]
fn test_try_private_serialization() {
let mut config = BridgeConfig::new("test-bridge", "remote.broker:1883").add_topic(
"test/#",
BridgeDirection::Both,
QoS::AtMostOnce,
);
config.try_private = true;
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"try_private\":true"));
let deserialized: BridgeConfig = serde_json::from_str(&json).unwrap();
assert!(deserialized.try_private);
assert_eq!(deserialized.name, "test-bridge");
}
#[test]
fn test_try_private_default_value() {
let config = BridgeConfig::new("test-bridge", "remote.broker:1883").add_topic(
"test/#",
BridgeDirection::Both,
QoS::AtMostOnce,
);
assert!(config.try_private);
let json = r#"{
"name": "test-bridge",
"client_id": "bridge-test-bridge",
"remote_address": "remote.broker:1883",
"topics": [{
"pattern": "test/#",
"direction": "both",
"qos": "AtMostOnce"
}]
}"#;
let deserialized: BridgeConfig = serde_json::from_str(json).unwrap();
assert!(deserialized.try_private);
}
#[test]
fn test_try_private_false_serialization() {
let mut config = BridgeConfig::new("test-bridge", "remote.broker:1883").add_topic(
"test/#",
BridgeDirection::Both,
QoS::AtMostOnce,
);
config.try_private = false;
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"try_private\":false"));
let deserialized: BridgeConfig = serde_json::from_str(&json).unwrap();
assert!(!deserialized.try_private);
}
#[test]
fn test_loop_prevention_defaults() {
let config = BridgeConfig::new("test-bridge", "remote.broker:1883").add_topic(
"test/#",
BridgeDirection::Both,
QoS::AtMostOnce,
);
assert_eq!(config.loop_prevention_ttl, Duration::from_secs(60));
assert_eq!(config.loop_prevention_cache_size, 10000);
}
#[test]
fn test_loop_prevention_serialization() {
let mut config = BridgeConfig::new("test-bridge", "remote.broker:1883").add_topic(
"test/#",
BridgeDirection::Both,
QoS::AtMostOnce,
);
config.loop_prevention_ttl = Duration::from_secs(120);
config.loop_prevention_cache_size = 5000;
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"loop_prevention_ttl\":\"2m\""));
assert!(json.contains("\"loop_prevention_cache_size\":5000"));
let deserialized: BridgeConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.loop_prevention_ttl, Duration::from_secs(120));
assert_eq!(deserialized.loop_prevention_cache_size, 5000);
}
#[test]
fn test_loop_prevention_from_json() {
let json = r#"{
"name": "test-bridge",
"client_id": "bridge-test",
"remote_address": "broker:1883",
"loop_prevention_ttl": "5m",
"loop_prevention_cache_size": 20000,
"topics": [{
"pattern": "test/#",
"direction": "both",
"qos": "AtMostOnce"
}]
}"#;
let config: BridgeConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.loop_prevention_ttl, Duration::from_secs(300));
assert_eq!(config.loop_prevention_cache_size, 20000);
}
}