use std::time::Duration;
#[cfg(feature = "verify-tls")]
use std::{env::temp_dir, fs::File, io::Write};
use futures::stream::StreamExt;
use tokio::sync::broadcast::Sender;
use crate::{command::Command, message::Message, parser::parse_message};
#[derive(Clone)]
pub struct Client {
pub host: String,
pub access_code: String,
pub serial: String,
mqtt: paho_mqtt::AsyncClient,
stream: std::pin::Pin<Box<paho_mqtt::AsyncReceiver<Option<paho_mqtt::Message>>>>,
tx: Sender<Message>,
topic_device_request: String,
topic_device_report: String,
}
impl Client {
pub fn new<S: Into<String>>(ip: S, access_code: S, serial: S, tx: Sender<Message>) -> Self {
let host = format!("mqtts://{}:8883", ip.into());
let access_code = access_code.into();
let serial = serial.into();
let client_id = format!("bambu-api-{}", nanoid::nanoid!(8));
let create_opts = paho_mqtt::CreateOptionsBuilder::new()
.server_uri(&host)
.client_id(client_id)
.max_buffered_messages(25)
.finalize();
let mut mqtt = paho_mqtt::AsyncClient::new(create_opts).expect("Failed to create client");
let stream = Box::pin(mqtt.get_stream(25));
Self {
host,
access_code,
topic_device_request: format!("device/{}/request", &serial),
topic_device_report: format!("device/{}/report", &serial),
serial,
mqtt,
stream,
tx,
}
}
async fn poll(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let msg_opt = self.stream.next().await.flatten();
if let Some(msg) = msg_opt {
self.tx.send(parse_message(&msg))?;
} else {
self.tx.send(Message::Disconnected)?;
while (self.mqtt.reconnect().await).is_err() {
tokio::time::sleep(Duration::from_secs(1)).await;
self.tx.send(Message::Reconnecting)?;
}
self.tx.send(Message::Connected)?;
}
Ok(())
}
#[cfg(feature = "verify-tls")]
fn get_ssl_opts() -> Result<paho_mqtt::SslOptions, Box<dyn std::error::Error>> {
let ca_cert_bytes = include_bytes!("certs/bbl_ca.pem");
let ca_cert_path = temp_dir().join("bbl_ca.pem");
let mut ca_cert_file = File::create(&ca_cert_path)?;
ca_cert_file.write_all(ca_cert_bytes)?;
Ok(paho_mqtt::SslOptionsBuilder::new()
.trust_store(ca_cert_path)?
.finalize())
}
#[cfg(not(feature = "verify-tls"))]
fn get_ssl_opts() -> Result<paho_mqtt::SslOptions, Box<dyn std::error::Error>> {
Ok(paho_mqtt::SslOptionsBuilder::new()
.disable_default_trust_store(true)
.enable_server_cert_auth(false)
.verify(false)
.finalize())
}
async fn connect(&self) -> Result<(), Box<dyn std::error::Error>> {
let ssl_opts = Self::get_ssl_opts()?;
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
.ssl_options(ssl_opts)
.keep_alive_interval(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(3))
.user_name("bblp")
.password(self.access_code.as_str())
.finalize();
self.tx.send(Message::Connecting)?;
self.mqtt.connect(conn_opts).await?;
self.tx.send(Message::Connected)?;
Ok(())
}
fn subscibe_to_device_report(&self) {
self.mqtt
.subscribe(&self.topic_device_report, paho_mqtt::QOS_0);
}
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.connect().await?;
self.subscibe_to_device_report();
loop {
Self::poll(self).await?;
}
}
pub async fn publish(&self, command: Command) -> Result<(), Box<dyn std::error::Error>> {
let payload = command.get_payload();
let msg = paho_mqtt::Message::new(&self.topic_device_request, payload, paho_mqtt::QOS_0);
self.mqtt.publish(msg).await?;
Ok(())
}
}