use crate::error::{MqttError, Result};
use rumqttd::local::{LinkRx, LinkTx};
use rumqttd::{Broker, Config, ConnectionSettings, RouterConfig, ServerSettings};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::thread;
#[derive(Debug, Clone)]
pub struct EmbeddedBrokerConfig {
pub tcp_listener: Option<TcpListenerConfig>,
pub max_payload_size: usize,
pub max_inflight_count: usize,
pub max_connections: usize,
}
#[derive(Debug, Clone)]
pub struct TcpListenerConfig {
pub bind_addr: SocketAddr,
}
impl Default for EmbeddedBrokerConfig {
fn default() -> Self {
Self {
tcp_listener: None,
max_payload_size: 256 * 1024,
max_inflight_count: 100,
max_connections: 64,
}
}
}
pub struct EmbeddedBroker {
_thread: thread::JoinHandle<()>,
}
pub struct EmbeddedBrokerBuilder {
config: EmbeddedBrokerConfig,
link_ids: Vec<String>,
}
impl EmbeddedBrokerBuilder {
pub fn new(config: EmbeddedBrokerConfig) -> Self {
Self { config, link_ids: Vec::new() }
}
pub fn add_link(mut self, client_id: impl Into<String>) -> Self {
self.link_ids.push(client_id.into());
self
}
pub fn start(self) -> Result<(EmbeddedBroker, Vec<BrokerLink>)> {
let rumqttd_config = build_config(&self.config);
let broker = Broker::new(rumqttd_config);
let mut links = Vec::with_capacity(self.link_ids.len());
for id in &self.link_ids {
let (tx, rx) = broker
.link(id)
.map_err(|e| MqttError::Broker(format!("link `{id}` failed: {e}")))?;
links.push(BrokerLink { tx, rx });
}
let handle = thread::Builder::new()
.name("aether-broker".into())
.spawn(move || {
let mut broker = broker;
if let Err(e) = broker.start() {
tracing::error!("embedded MQTT broker stopped: {e}");
}
})
.map_err(|e| MqttError::Broker(format!("failed to spawn broker thread: {e}")))?;
Ok((EmbeddedBroker { _thread: handle }, links))
}
}
pub struct BrokerLink {
pub tx: LinkTx,
pub rx: LinkRx,
}
fn build_config(cfg: &EmbeddedBrokerConfig) -> Config {
let connection = ConnectionSettings {
connection_timeout_ms: 5000,
max_payload_size: cfg.max_payload_size,
max_inflight_count: cfg.max_inflight_count,
auth: None,
external_auth: None,
dynamic_filters: false,
};
let router = RouterConfig {
max_connections: cfg.max_connections,
max_outgoing_packet_count: 200,
max_segment_size: 100 * 1024,
max_segment_count: 10,
custom_segment: None,
initialized_filters: None,
shared_subscriptions_strategy: Default::default(),
};
let mut v5 = HashMap::new();
if let Some(tcp) = &cfg.tcp_listener {
v5.insert(
"v5-tcp".into(),
ServerSettings {
name: "v5-tcp".into(),
listen: tcp.bind_addr,
tls: None,
next_connection_delay_ms: 0,
connections: connection.clone(),
},
);
}
Config {
id: 0,
router,
v4: None,
v5: if v5.is_empty() { None } else { Some(v5) },
ws: None,
cluster: None,
console: None,
bridge: None,
prometheus: None,
metrics: None,
}
}