1pub mod message;
2pub mod availability;
3
4#[cfg(feature = "validation")]
5pub mod validation;
6
7#[cfg(feature = "mock_client")]
8pub mod mock;
9
10use async_trait::async_trait;
11pub use message::MqttMessage;
12use std::fmt;
13use tokio::sync::{broadcast, watch, oneshot};
14
15#[derive(Debug, Clone)]
17pub enum Mqtt5PubSubError {
18 SubscriptionError(String),
20 UnsubscribeError(String),
22 PublishError(String),
24 InvalidTopic(String),
26 InvalidQoS(String),
28 TimeoutError(String),
30 Other(String),
32}
33
34impl fmt::Display for Mqtt5PubSubError {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 match self {
37 Mqtt5PubSubError::SubscriptionError(msg) => write!(f, "Subscription error: {}", msg),
38 Mqtt5PubSubError::UnsubscribeError(msg) => write!(f, "Unsubscribe error: {}", msg),
39 Mqtt5PubSubError::PublishError(msg) => write!(f, "Publish error: {}", msg),
40 Mqtt5PubSubError::InvalidTopic(msg) => write!(f, "Invalid topic: {}", msg),
41 Mqtt5PubSubError::InvalidQoS(msg) => write!(f, "Invalid QoS: {}", msg),
42 Mqtt5PubSubError::TimeoutError(msg) => write!(f, "Timeout error: {}", msg),
43 Mqtt5PubSubError::Other(msg) => write!(f, "Error: {}", msg),
44 }
45 }
46}
47
48impl std::error::Error for Mqtt5PubSubError {}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum MqttPublishSuccess {
53 Sent,
55 Acknowledged,
57 Completed,
59 Queued,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum MqttConnectionState {
66 Disconnected,
68 Connecting,
70 Connected,
72 Disconnecting,
74}
75
76#[async_trait]
85pub trait Mqtt5PubSub {
86 fn get_client_id(&self) -> String;
88
89 fn get_state(&self) -> watch::Receiver<MqttConnectionState>;
95
96 async fn subscribe(&mut self, topic: String, qos: message::QoS, tx: broadcast::Sender<MqttMessage>) -> Result<u32, Mqtt5PubSubError>;
103
104 async fn unsubscribe(&mut self, topic: String) -> Result<(), Mqtt5PubSubError>;
109
110 async fn publish(&mut self, message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError>;
117
118 async fn publish_noblock(&mut self, message: MqttMessage) -> oneshot::Receiver<Result<MqttPublishSuccess, Mqtt5PubSubError>>;
125
126 fn publish_nowait(&mut self, message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError>;
131
132 fn get_availability_helper(&mut self) -> availability::AvailabilityHelper;
134}
135
136pub use message::{MqttMessageBuilder, QoS};
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn test_mqtt_pubsub_error_display() {
145 let err = Mqtt5PubSubError::SubscriptionError("invalid topic".to_string());
146 assert_eq!(err.to_string(), "Subscription error: invalid topic");
147
148 let err = Mqtt5PubSubError::UnsubscribeError("not subscribed".to_string());
149 assert_eq!(err.to_string(), "Unsubscribe error: not subscribed");
150
151 let err = Mqtt5PubSubError::PublishError("publish failed".to_string());
152 assert_eq!(err.to_string(), "Publish error: publish failed");
153
154 let err = Mqtt5PubSubError::InvalidTopic("bad topic".to_string());
155 assert_eq!(err.to_string(), "Invalid topic: bad topic");
156
157 let err = Mqtt5PubSubError::InvalidQoS("bad qos".to_string());
158 assert_eq!(err.to_string(), "Invalid QoS: bad qos");
159
160 let err = Mqtt5PubSubError::Other("something went wrong".to_string());
161 assert_eq!(err.to_string(), "Error: something went wrong");
162 }
163
164 #[test]
165 fn test_mqtt_pubsub_error_is_error_trait() {
166 let err = Mqtt5PubSubError::PublishError("test".to_string());
167 let _: &dyn std::error::Error = &err;
169 }
170
171 #[test]
172 fn test_mqtt_pubsub_error_clone() {
173 let err1 = Mqtt5PubSubError::PublishError("test error".to_string());
174 let err2 = err1.clone();
175 assert_eq!(err1.to_string(), err2.to_string());
176 }
177
178 struct MockPubSubClient {
180 client_id: String,
181 state_rx: watch::Receiver<MqttConnectionState>,
182 }
183
184 impl MockPubSubClient {
185 fn new() -> Self {
186 let (tx, rx) = watch::channel(MqttConnectionState::Connected);
187 drop(tx); Self {
189 client_id: "test-client".to_string(),
190 state_rx: rx,
191 }
192 }
193 }
194
195 #[async_trait]
196 impl Mqtt5PubSub for MockPubSubClient {
197 fn get_client_id(&self) -> String {
198 self.client_id.clone()
199 }
200
201 fn get_state(&self) -> watch::Receiver<MqttConnectionState> {
202 self.state_rx.clone()
203 }
204
205 async fn subscribe(&mut self, _topic: String, _qos: message::QoS, _tx: broadcast::Sender<MqttMessage>) -> Result<u32, Mqtt5PubSubError> {
206 Ok(1)
207 }
208
209 async fn unsubscribe(&mut self, _topic: String) -> Result<(), Mqtt5PubSubError> {
210 Ok(())
211 }
212
213 async fn publish(&mut self, _message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError> {
214 Ok(MqttPublishSuccess::Sent)
215 }
216
217 async fn publish_noblock(&mut self, _message: MqttMessage) -> oneshot::Receiver<Result<MqttPublishSuccess, Mqtt5PubSubError>> {
218 let (tx, rx) = oneshot::channel();
219 let _ = tx.send(Ok(MqttPublishSuccess::Sent));
220 rx
221 }
222
223 fn publish_nowait(&mut self, _message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError> {
224 Ok(MqttPublishSuccess::Queued)
225 }
226
227 fn get_availability_helper(&mut self) -> availability::AvailabilityHelper {
228 availability::AvailabilityHelper::system_availability(self.client_id.clone())
229 }
230 }
231
232 #[tokio::test]
233 async fn test_mock_pubsub_subscribe() {
234 let (tx, _rx) = broadcast::channel(10);
235 let mut client = MockPubSubClient::new();
236 let result = client.subscribe("test/topic".to_string(), message::QoS::AtLeastOnce, tx).await;
237 assert!(result.is_ok());
238 assert_eq!(result.unwrap(), 1);
239 }
240
241 #[tokio::test]
242 async fn test_mock_pubsub_publish() {
243 let mut client = MockPubSubClient::new();
244 let msg = MqttMessage::simple(
245 "test/topic".to_string(),
246 message::QoS::AtMostOnce,
247 false,
248 bytes::Bytes::from("test"),
249 );
250 let result = client.publish(msg).await;
251 assert!(result.is_ok());
252 }
253
254 #[test]
255 fn test_mock_pubsub_publish_nowait() {
256 let mut client = MockPubSubClient::new();
257 let msg = MqttMessage::simple(
258 "test/topic".to_string(),
259 message::QoS::AtMostOnce,
260 false,
261 bytes::Bytes::from("test"),
262 );
263 let result = client.publish_nowait(msg);
264 assert!(result.is_ok());
265 }
266}