use embassy_time::Duration;
use heapless::String;
use crate::at::processor::AtProcessor;
use crate::bus::SpiTransport;
use crate::error::{Error, Result};
use crate::sync::TmMutex;
use crate::types::MqttQos;
pub const MAX_MQTT_TOPIC_LEN: usize = 128;
pub const MAX_MQTT_PAYLOAD_LEN: usize = 512;
pub type MqttTopic = String<MAX_MQTT_TOPIC_LEN>;
pub type MqttPayload = String<MAX_MQTT_PAYLOAD_LEN>;
#[derive(Debug, Clone)]
pub struct MqttConfig {
pub client_id: String<32>,
pub username: Option<String<32>>,
pub password: Option<String<32>>,
pub keep_alive: u16,
pub clean_session: bool,
}
impl Default for MqttConfig {
fn default() -> Self {
Self {
client_id: String::new(),
username: None,
password: None,
keep_alive: 60,
clean_session: true,
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct MqttMessage {
pub topic: MqttTopic,
pub payload: MqttPayload,
pub qos: MqttQos,
pub retain: bool,
}
pub struct MqttClient {
link_id: u8,
processor: &'static AtProcessor,
timeout: Duration,
connected: TmMutex<bool>,
}
impl MqttClient {
pub const fn new(link_id: u8, processor: &'static AtProcessor, timeout: Duration) -> Self {
Self {
link_id,
processor,
timeout,
connected: TmMutex::new(false),
}
}
pub async fn connect<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
host: &str,
port: u16,
config: &MqttConfig,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let username = config.username.as_ref().map(|s| s.as_str()).unwrap_or("");
let password = config.password.as_ref().map(|s| s.as_str()).unwrap_or("");
let cmd = crate::at::command::mqtt::set_user_config(
self.link_id,
1, &config.client_id,
username,
password,
)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), self.timeout)
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::MqttError);
}
let cmd = crate::at::command::mqtt::connect(self.link_id, host, port, false)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), Duration::from_secs(30))
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::MqttError);
}
let mut connected = self.connected.lock().await;
*connected = true;
Ok(())
}
pub async fn publish<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
topic: &str,
payload: &str,
qos: MqttQos,
retain: bool,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let connected = self.connected.lock().await;
if !*connected {
return Err(Error::NotConnected);
}
drop(connected);
let cmd = crate::at::command::mqtt::publish(self.link_id, topic, payload, qos, retain)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), self.timeout)
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::MqttError);
}
Ok(())
}
pub async fn subscribe<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
topic: &str,
qos: MqttQos,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let connected = self.connected.lock().await;
if !*connected {
return Err(Error::NotConnected);
}
drop(connected);
let cmd = crate::at::command::mqtt::subscribe(self.link_id, topic, qos)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), self.timeout)
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::MqttError);
}
Ok(())
}
pub async fn unsubscribe<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
topic: &str,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let connected = self.connected.lock().await;
if !*connected {
return Err(Error::NotConnected);
}
drop(connected);
let cmd = crate::at::command::mqtt::unsubscribe(self.link_id, topic)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), self.timeout)
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::MqttError);
}
Ok(())
}
pub async fn disconnect<SPI, CS>(
&self,
spi: &'static TmMutex<SpiTransport<SPI, CS>>,
) -> Result<()>
where
SPI: embedded_hal_async::spi::SpiDevice,
CS: embedded_hal::digital::OutputPin,
{
let cmd = crate::at::command::mqtt::disconnect(self.link_id)?;
let response = self
.processor
.send_command(spi, cmd.as_bytes(), self.timeout)
.await?;
if response != crate::at::AtResponse::Ok {
return Err(Error::MqttError);
}
let mut connected = self.connected.lock().await;
*connected = false;
Ok(())
}
pub async fn is_connected(&self) -> bool {
let connected = self.connected.lock().await;
*connected
}
}