stinger_mqtt_trait/
lib.rs

1pub mod message;
2pub mod availability;
3
4#[cfg(feature = "validation")]
5pub mod validation;
6
7#[cfg(feature = "mock_client")]
8pub mod mock;
9
10use async_trait::async_trait;
11pub use message::MqttMessage;
12use std::fmt;
13use tokio::sync::{broadcast, watch, oneshot};
14
15/// Custom error type for MQTT pub/sub operations
16#[derive(Debug, Clone)]
17pub enum Mqtt5PubSubError {
18    /// Subscription error
19    SubscriptionError(String),
20    /// Unsubscribe error
21    UnsubscribeError(String),
22    /// Publish error
23    PublishError(String),
24    /// Invalid topic
25    InvalidTopic(String),
26    /// Invalid QoS
27    InvalidQoS(String),
28    /// Timeout occurred
29    TimeoutError(String),
30    /// Other errors
31    Other(String),
32}
33
34impl fmt::Display for Mqtt5PubSubError {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        match self {
37            Mqtt5PubSubError::SubscriptionError(msg) => write!(f, "Subscription error: {}", msg),
38            Mqtt5PubSubError::UnsubscribeError(msg) => write!(f, "Unsubscribe error: {}", msg),
39            Mqtt5PubSubError::PublishError(msg) => write!(f, "Publish error: {}", msg),
40            Mqtt5PubSubError::InvalidTopic(msg) => write!(f, "Invalid topic: {}", msg),
41            Mqtt5PubSubError::InvalidQoS(msg) => write!(f, "Invalid QoS: {}", msg),
42            Mqtt5PubSubError::TimeoutError(msg) => write!(f, "Timeout error: {}", msg),
43            Mqtt5PubSubError::Other(msg) => write!(f, "Error: {}", msg),
44        }
45    }
46}
47
48impl std::error::Error for Mqtt5PubSubError {}
49
50/// Represents the success result of an MQTT publish operation
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum MqttPublishSuccess {
53    /// Message was sent (QoS 0)
54    Sent,
55    /// Message was acknowledged by broker (QoS 1, PUBACK received)
56    Acknowledged,
57    /// Message was fully completed (QoS 2, PUBCOMP received)
58    Completed,
59    /// Message was queued for sending (publish_nowait)
60    Queued,
61}
62
63/// Represents the connection state of an MQTT client
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum MqttConnectionState {
66    /// Client is disconnected from the broker
67    Disconnected,
68    /// Client is in the process of connecting to the broker
69    Connecting,
70    /// Client is connected to the broker
71    Connected,
72    /// Client is in the process of disconnecting from the broker
73    Disconnecting,
74}
75
76/// Trait defining the interface for MQTT pub/sub operations
77/// 
78/// This trait provides methods for publishing and subscribing to MQTT topics on an
79/// already-connected MQTT client. Application code is responsible for managing the
80/// client's connection lifecycle (connecting, disconnecting, reconnecting, etc.).
81/// 
82/// This trait is intended for use by libraries that need to publish or subscribe
83/// to MQTT topics without needing to manage the underlying client connection.
84#[async_trait]
85pub trait Mqtt5PubSub {
86    /// Get the client ID
87    fn get_client_id(&self) -> String;
88
89    /// Get a receiver for monitoring the client's connection state
90    /// 
91    /// The implementation must send a new state value to the watch channel whenever the
92    /// connection state changes (e.g., from `Connecting` to `Connected`, or from `Connected`
93    /// to `Disconnected`).
94    fn get_state(&self) -> watch::Receiver<MqttConnectionState>;
95
96    /// Subscribe to a topic with the specified QoS level
97    /// 
98    /// This function awaits until a SUBACK is received from the broker before returning.
99    /// Messages received on this subscription will be sent to the provided channel.
100    /// Returns a subscription identifier that will be set in the `subscription_id` field
101    /// of all `MqttMessage`s received on this subscription.
102    async fn subscribe(&mut self, topic: String, qos: message::QoS, tx: broadcast::Sender<MqttMessage>) -> Result<u32, Mqtt5PubSubError>;
103
104    /// Unsubscribe from a topic
105    /// 
106    /// This function awaits until an UNSUBACK is received from the broker before returning.
107    /// The topic must be identical to the one used with the `subscribe()` method.
108    async fn unsubscribe(&mut self, topic: String) -> Result<(), Mqtt5PubSubError>;
109
110    /// Publish a message to the broker (awaits completion)
111    /// 
112    /// The function blocks according to the QoS level set in the message:
113    /// - QoS 0: Blocks until the message is sent
114    /// - QoS 1: Blocks until a PUBACK is received from the broker
115    /// - QoS 2: Blocks until a PUBCOMP is received from the broker
116    async fn publish(&mut self, message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError>;
117
118    /// Publish a message to the broker and returns a oneshot channel that receives when done.
119    /// 
120    /// The oneshot channel will receive when the publish is complete according to the QoS level set in the message:
121    /// - QoS 0: Blocks until the message is sent
122    /// - QoS 1: Blocks until a PUBACK is received from the broker
123    /// - QoS 2: Blocks until a PUBCOMP is received from the broker
124    async fn publish_noblock(&mut self, message: MqttMessage) -> oneshot::Receiver<Result<MqttPublishSuccess, Mqtt5PubSubError>>;
125
126    /// Publish a message without waiting for completion (fire and forget)
127    /// 
128    /// This function returns as soon as the message is queued to be sent, without waiting
129    /// for any acknowledgment from the broker.
130    fn publish_nowait(&mut self, message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError>;
131
132    /// Get an AvailabilityHelper for publishing availability messages.  
133    fn get_availability_helper(&mut self) -> availability::AvailabilityHelper;
134}
135
136// Re-export commonly used types
137pub use message::{MqttMessageBuilder, QoS};
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn test_mqtt_pubsub_error_display() {
145        let err = Mqtt5PubSubError::SubscriptionError("invalid topic".to_string());
146        assert_eq!(err.to_string(), "Subscription error: invalid topic");
147
148        let err = Mqtt5PubSubError::UnsubscribeError("not subscribed".to_string());
149        assert_eq!(err.to_string(), "Unsubscribe error: not subscribed");
150
151        let err = Mqtt5PubSubError::PublishError("publish failed".to_string());
152        assert_eq!(err.to_string(), "Publish error: publish failed");
153
154        let err = Mqtt5PubSubError::InvalidTopic("bad topic".to_string());
155        assert_eq!(err.to_string(), "Invalid topic: bad topic");
156
157        let err = Mqtt5PubSubError::InvalidQoS("bad qos".to_string());
158        assert_eq!(err.to_string(), "Invalid QoS: bad qos");
159
160        let err = Mqtt5PubSubError::Other("something went wrong".to_string());
161        assert_eq!(err.to_string(), "Error: something went wrong");
162    }
163
164    #[test]
165    fn test_mqtt_pubsub_error_is_error_trait() {
166        let err = Mqtt5PubSubError::PublishError("test".to_string());
167        // Verify it implements std::error::Error
168        let _: &dyn std::error::Error = &err;
169    }
170
171    #[test]
172    fn test_mqtt_pubsub_error_clone() {
173        let err1 = Mqtt5PubSubError::PublishError("test error".to_string());
174        let err2 = err1.clone();
175        assert_eq!(err1.to_string(), err2.to_string());
176    }
177
178    // Mock implementation for testing trait signature
179    struct MockPubSubClient {
180        client_id: String,
181        state_rx: watch::Receiver<MqttConnectionState>,
182    }
183
184    impl MockPubSubClient {
185        fn new() -> Self {
186            let (tx, rx) = watch::channel(MqttConnectionState::Connected);
187            drop(tx); // Drop the sender as we don't need it in the mock
188            Self {
189                client_id: "test-client".to_string(),
190                state_rx: rx,
191            }
192        }
193    }
194
195    #[async_trait]
196    impl Mqtt5PubSub for MockPubSubClient {
197        fn get_client_id(&self) -> String {
198            self.client_id.clone()
199        }
200
201        fn get_state(&self) -> watch::Receiver<MqttConnectionState> {
202            self.state_rx.clone()
203        }
204
205        async fn subscribe(&mut self, _topic: String, _qos: message::QoS, _tx: broadcast::Sender<MqttMessage>) -> Result<u32, Mqtt5PubSubError> {
206            Ok(1)
207        }
208
209        async fn unsubscribe(&mut self, _topic: String) -> Result<(), Mqtt5PubSubError> {
210            Ok(())
211        }
212
213        async fn publish(&mut self, _message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError> {
214            Ok(MqttPublishSuccess::Sent)
215        }
216
217        async fn publish_noblock(&mut self, _message: MqttMessage) -> oneshot::Receiver<Result<MqttPublishSuccess, Mqtt5PubSubError>> {
218            let (tx, rx) = oneshot::channel();
219            let _ = tx.send(Ok(MqttPublishSuccess::Sent));
220            rx
221        }
222
223        fn publish_nowait(&mut self, _message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError> {
224            Ok(MqttPublishSuccess::Queued)
225        }
226
227        fn get_availability_helper(&mut self) -> availability::AvailabilityHelper {
228            availability::AvailabilityHelper::system_availability(self.client_id.clone())
229        }
230    }
231
232    #[tokio::test]
233    async fn test_mock_pubsub_subscribe() {
234        let (tx, _rx) = broadcast::channel(10);
235        let mut client = MockPubSubClient::new();
236        let result = client.subscribe("test/topic".to_string(), message::QoS::AtLeastOnce, tx).await;
237        assert!(result.is_ok());
238        assert_eq!(result.unwrap(), 1);
239    }
240
241    #[tokio::test]
242    async fn test_mock_pubsub_publish() {
243        let mut client = MockPubSubClient::new();
244        let msg = MqttMessage::simple(
245            "test/topic".to_string(),
246            message::QoS::AtMostOnce,
247            false,
248            bytes::Bytes::from("test"),
249        );
250        let result = client.publish(msg).await;
251        assert!(result.is_ok());
252    }
253
254    #[test]
255    fn test_mock_pubsub_publish_nowait() {
256        let mut client = MockPubSubClient::new();
257        let msg = MqttMessage::simple(
258            "test/topic".to_string(),
259            message::QoS::AtMostOnce,
260            false,
261            bytes::Bytes::from("test"),
262        );
263        let result = client.publish_nowait(msg);
264        assert!(result.is_ok());
265    }
266}