use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, TlsConfiguration, Transport};
use serde_json::Value;
use tracing::{debug, error, info};
use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
fn parse_broker_url(url: &str) -> (bool, String, u16) {
let tls = url.starts_with("mqtts://");
let without_scheme = url
.trim_start_matches("mqtts://")
.trim_start_matches("mqtt://");
let hostport = without_scheme.split('/').next().unwrap_or(without_scheme);
let (host, port_str) = hostport
.rsplit_once(':')
.unwrap_or((hostport, ""));
let default_port = if tls { 8883 } else { 1883 };
let port = port_str.parse::<u16>().unwrap_or(default_port);
(tls, host.to_string(), port)
}
pub struct MqttConnector {
broker_url: String,
client_id: String,
username: Option<String>,
password: Option<String>,
keepalive: u64,
classifier: TopicClassifier,
delta_filter: DeltaFilter,
}
impl MqttConnector {
pub fn new(broker_url: impl Into<String>, delta_threshold: f64) -> Self {
Self {
broker_url: broker_url.into(),
client_id: "wireband-edge".to_string(),
username: None,
password: None,
keepalive: 60,
classifier: TopicClassifier::new(),
delta_filter: DeltaFilter::new(delta_threshold),
}
}
pub fn client_id(mut self, id: impl Into<String>) -> Self {
self.client_id = id.into();
self
}
pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.username = Some(username.into());
self.password = Some(password.into());
self
}
pub fn keepalive(mut self, secs: u64) -> Self {
self.keepalive = secs;
self
}
pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
&mut self.classifier
}
pub async fn run(
mut self,
client: WireBandClient,
topics: Vec<String>,
) -> Result<()> {
let (tls, host, port) = parse_broker_url(&self.broker_url);
let mut options = MqttOptions::new(&self.client_id, &host, port);
options.set_keep_alive(Duration::from_secs(self.keepalive));
if let (Some(u), Some(p)) = (self.username.as_deref(), self.password.as_deref()) {
options.set_credentials(u, p);
}
if tls {
options.set_transport(Transport::Tls(TlsConfiguration::default()));
}
let (mqtt_client, mut event_loop) = AsyncClient::new(options, 128);
for topic in &topics {
mqtt_client
.subscribe(topic, QoS::AtMostOnce)
.await
.map_err(|e| WireBandError::Mqtt(e.to_string()))?;
}
info!(
broker = %self.broker_url,
?topics,
"MQTT connected"
);
loop {
match event_loop.poll().await {
Ok(Event::Incoming(Packet::Publish(p))) => {
let topic = p.topic.clone();
let raw = p.payload.to_vec();
self.process(&client, topic, raw).await;
}
Ok(_) => {} Err(e) => {
error!(err = %e, "MQTT connection error");
return Err(WireBandError::Mqtt(e.to_string()));
}
}
}
}
async fn process(&mut self, client: &WireBandClient, topic: String, raw: Vec<u8>) {
let payload: Value = serde_json::from_slice(&raw).unwrap_or_else(|_| {
Value::String(
std::str::from_utf8(&raw)
.map(|s| s.to_string())
.unwrap_or_else(|_| format!("<binary:{}>", raw.len())),
)
});
if self.delta_filter.should_suppress(&topic, &payload) {
debug!(topic = %topic, "delta-filtered");
return;
}
let symbol = self.classifier.classify(&topic, &payload);
let encoded = frame::encode(symbol, &topic, &payload);
let ts = unix_ts();
client.buffer_event(topic, symbol, encoded, ts).await;
}
}