pub mod ack;
#[cfg(test)]
mod tests;
use async_trait::async_trait;
pub use rumqttc::QoS;
use rumqttc::{AsyncClient, MqttOptions}; use thiserror::Error; use tokio::sync::mpsc;
#[cfg(feature = "logging")]
use tracing::{debug, error, info, warn};
use crate::{Publisher, Subscriber};
#[derive(Error, Debug)]
pub enum MQTTError {
#[error("MQTT connection error: {0}")]
ConnectionError(String),
#[error("MQTT publish error: {0}")]
PublishError(String),
#[error("MQTT subscribe error: {0}")]
SubscribeError(String),
#[error("MQTT receive error: {0}")]
ReceiveError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Deserialization error: {0}")]
DeserializationError(String),
#[error("Other MQTT error: {0}")]
Other(String),
}
pub struct MQTTPublisher {
client: AsyncClient,
topic: String,
}
impl MQTTPublisher {
pub fn new(broker_url: &str, topic: &str) -> Result<Self, MQTTError> {
let client_id = format!("kincir-mqtt-publisher-{}", uuid::Uuid::new_v4());
let mut mqtt_options = MqttOptions::new(client_id, broker_url, 1883);
mqtt_options.set_keep_alive(std::time::Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);
tokio::spawn(async move {
loop {
match eventloop.poll().await {
Ok(notification) => {
#[cfg(feature = "logging")]
debug!("Publisher EventLoop notification: {:?}", notification);
}
Err(e) => {
#[cfg(feature = "logging")]
error!("Publisher EventLoop error: {}", e);
break;
}
}
}
});
#[cfg(feature = "logging")]
info!(
"MQTTPublisher: Initialized for broker_url: {}, topic: {}",
broker_url, topic
);
Ok(MQTTPublisher {
client,
topic: topic.to_string(),
})
}
}
#[async_trait]
impl Publisher for MQTTPublisher {
type Error = Box<dyn std::error::Error + Send + Sync>;
async fn publish(
&self,
_topic: &str,
messages: Vec<crate::Message>,
) -> Result<(), Self::Error> {
for message in messages {
match self
.client
.publish(
&self.topic,
QoS::AtLeastOnce,
false,
message.payload.as_slice(),
)
.await
{
Ok(_) => {
#[cfg(feature = "logging")]
debug!(
"Successfully published message (UUID: {}) to topic: {}",
message.uuid, self.topic
);
}
Err(e) => {
#[cfg(feature = "logging")]
error!(
"Failed to publish message (UUID: {}) to topic {}: {}",
message.uuid, self.topic, e
);
return Err(Box::new(MQTTError::PublishError(e.to_string())));
}
}
}
Ok(())
}
}
use uuid::Uuid;
pub struct MQTTSubscriber {
client: AsyncClient, topic: String,
message_rx: mpsc::Receiver<Result<crate::Message, MQTTError>>,
qos: QoS, }
impl MQTTSubscriber {
pub fn new(broker_url: &str, topic_str: &str, qos: QoS) -> Result<Self, MQTTError> {
let client_id = format!("kincir-mqtt-subscriber-{}", Uuid::new_v4());
let mut mqtt_options = MqttOptions::new(client_id, broker_url, 1883);
mqtt_options.set_keep_alive(std::time::Duration::from_secs(5));
let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10);
let (message_tx, message_rx) = mpsc::channel(100);
tokio::spawn(async move {
#[cfg(feature = "logging")]
info!("MQTT EventLoop Task: Started for topic processing.");
loop {
match event_loop.poll().await {
Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish))) => {
let k_message = crate::Message {
uuid: Uuid::new_v4().to_string(),
payload: publish.payload.to_vec(),
metadata: std::collections::HashMap::new(),
};
if message_tx.send(Ok(k_message)).await.is_err() {
#[cfg(feature = "logging")]
error!("MQTT EventLoop Task: Failed to send message to channel. Receiver dropped.");
break; }
}
Ok(rumqttc::Event::Incoming(rumqttc::Packet::Disconnect)) => {
#[cfg(feature = "logging")]
warn!("MQTT EventLoop Task: MQTT Disconnected. Sending error and exiting.");
let _ = message_tx
.send(Err(MQTTError::ConnectionError(
"MQTT Disconnected".to_string(),
)))
.await;
break; }
Ok(event) => {
#[cfg(feature = "logging")]
debug!("MQTT EventLoop Task: Received event: {:?}", event);
}
Err(e) => {
#[cfg(feature = "logging")]
error!("MQTT EventLoop Task: MQTT EventLoop error: {}. Sending error and exiting.", e);
let _ = message_tx
.send(Err(MQTTError::ReceiveError(e.to_string())))
.await;
break; }
}
}
#[cfg(feature = "logging")]
info!("MQTT EventLoop Task: Exiting.");
});
#[cfg(feature = "logging")]
info!(
"MQTTSubscriber: Created for broker_url: {}, topic: {}",
broker_url, topic_str
);
Ok(MQTTSubscriber {
client,
topic: topic_str.to_string(),
message_rx,
qos, })
}
}
#[async_trait]
impl Subscriber for MQTTSubscriber {
type Error = Box<dyn std::error::Error + Send + Sync>;
async fn subscribe(&self, topic: &str) -> Result<(), Self::Error> {
if topic != self.topic {
let err_msg = format!(
"Subscription topic mismatch: expected '{}', got '{}'. Current subscriber is for topic '{}'.",
self.topic, topic, self.topic
);
#[cfg(feature = "logging")]
error!("{}", err_msg);
return Err(Box::new(MQTTError::SubscribeError(err_msg)));
}
let client_clone = self.client.clone();
let topic_to_subscribe = self.topic.clone();
#[cfg(feature = "logging")]
info!(
"MQTTSubscriber::subscribe - Attempting to subscribe client to topic: {}",
topic_to_subscribe
);
client_clone
.subscribe(&topic_to_subscribe, self.qos) .await
.map_err(|e| {
#[cfg(feature = "logging")]
error!(
"MQTTSubscriber::subscribe - Failed to subscribe to MQTT topic {}: {}",
topic_to_subscribe, e
);
Box::new(MQTTError::SubscribeError(e.to_string())) as Self::Error
})?;
#[cfg(feature = "logging")]
info!(
"MQTTSubscriber::subscribe - Successfully sent SUBSCRIBE packet for topic: {}",
topic_to_subscribe
);
Ok(())
}
async fn receive(&mut self) -> Result<crate::Message, Self::Error> {
match self.message_rx.recv().await {
Some(Ok(message)) => {
#[cfg(feature = "logging")]
debug!(
"MQTTSubscriber::receive - Received message from channel: {}",
message.uuid
);
Ok(message)
}
Some(Err(mqtt_error)) => {
#[cfg(feature = "logging")]
error!(
"MQTTSubscriber::receive - Received error from event loop task: {:?}",
mqtt_error
);
Err(Box::new(mqtt_error) as Self::Error)
}
None => {
#[cfg(feature = "logging")]
warn!("MQTTSubscriber::receive - Message channel closed. Event loop task likely terminated.");
Err(Box::new(MQTTError::ReceiveError(
"Message channel closed. Event loop task terminated.".to_string(),
)) as Self::Error)
}
}
}
}
pub use ack::{MQTTAckHandle, MQTTAckSubscriber};