Skip to main content

agentlink_core/mqtt/
mod.rs

1//! MQTT Client Abstraction
2//!
3//! This module defines the MQTT client trait that must be implemented
4//! by platform-specific backends (native rumqttc, wasm websocket, etc.)
5
6use async_trait::async_trait;
7
8use crate::error::SdkResult;
9
10/// MQTT connection state
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum MqttConnectionState {
13    Disconnected,
14    Connecting,
15    Connected,
16    Reconnecting,
17    Disconnecting,
18    Failed,
19}
20
21impl MqttConnectionState {
22    pub fn can_connect(&self) -> bool {
23        matches!(self, Self::Disconnected | Self::Failed)
24    }
25
26    pub fn is_connected(&self) -> bool {
27        matches!(self, Self::Connected)
28    }
29}
30
31/// MQTT QoS level
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum MqttQoS {
34    AtMostOnce,   // QoS 0
35    AtLeastOnce,  // QoS 1
36    ExactlyOnce,  // QoS 2
37}
38
39/// MQTT message
40#[derive(Debug, Clone)]
41pub struct MqttMessage {
42    pub topic: String,
43    pub payload: Vec<u8>,
44    pub qos: MqttQoS,
45}
46
47impl MqttMessage {
48    pub fn new(topic: impl Into<String>, payload: Vec<u8>) -> Self {
49        Self {
50            topic: topic.into(),
51            payload,
52            qos: MqttQoS::AtLeastOnce,
53        }
54    }
55
56    pub fn with_qos(mut self, qos: MqttQoS) -> Self {
57        self.qos = qos;
58        self
59    }
60
61    pub fn payload_string(&self) -> Result<String, std::string::FromUtf8Error> {
62        String::from_utf8(self.payload.clone())
63    }
64}
65
66/// MQTT event
67#[derive(Debug, Clone)]
68pub enum MqttEvent {
69    Connected,
70    Disconnected,
71    Reconnecting { attempt: u32, delay_ms: u64 },
72    Reconnected,
73    ConnectionFailed { error: String },
74    MessageReceived(MqttMessage),
75    Subscribed { topic: String },
76    Unsubscribed { topic: String },
77    Published { topic: String },
78    Error { error: String },
79}
80
81/// MQTT client configuration
82#[derive(Debug, Clone)]
83pub struct MqttConfig {
84    pub broker_url: String,
85    pub client_id: String,
86    pub username: Option<String>,
87    pub password: Option<String>,
88    pub keep_alive_secs: u64,
89    pub clean_session: bool,
90}
91
92impl Default for MqttConfig {
93    fn default() -> Self {
94        Self {
95            broker_url: "mqtts://localhost:8883".to_string(),
96            client_id: String::new(),
97            username: None,
98            password: None,
99            keep_alive_secs: 30,
100            clean_session: true,
101        }
102    }
103}
104
105/// MQTT client trait
106///
107/// Platform-specific implementations must implement this trait.
108#[async_trait]
109#[cfg(not(target_arch = "wasm32"))]
110pub trait MqttClient: Send + Sync {
111    /// Connect to MQTT broker
112    async fn connect(&self, config: MqttConfig) -> SdkResult<()>;
113
114    /// Disconnect from broker
115    async fn disconnect(&self) -> SdkResult<()>;
116
117    /// Subscribe to a topic
118    async fn subscribe(&self, topic: &str, qos: MqttQoS) -> SdkResult<()>;
119
120    /// Unsubscribe from a topic
121    async fn unsubscribe(&self, topic: &str) -> SdkResult<()>;
122
123    /// Publish a message
124    async fn publish(&self, message: MqttMessage) -> SdkResult<()>;
125
126    /// Get current connection state
127    fn connection_state(&self) -> MqttConnectionState;
128
129    /// Check if connected
130    fn is_connected(&self) -> bool {
131        self.connection_state().is_connected()
132    }
133}
134
135/// MQTT client trait (WASM version)
136///
137/// Platform-specific implementations must implement this trait.
138#[async_trait(?Send)]
139#[cfg(target_arch = "wasm32")]
140pub trait MqttClient {
141    /// Connect to MQTT broker
142    async fn connect(&self, config: MqttConfig) -> SdkResult<()>;
143
144    /// Disconnect from broker
145    async fn disconnect(&self) -> SdkResult<()>;
146
147    /// Subscribe to a topic
148    async fn subscribe(&self, topic: &str, qos: MqttQoS) -> SdkResult<()>;
149
150    /// Unsubscribe from a topic
151    async fn unsubscribe(&self, topic: &str) -> SdkResult<()>;
152
153    /// Publish a message
154    async fn publish(&self, message: MqttMessage) -> SdkResult<()>;
155
156    /// Get current connection state
157    fn connection_state(&self) -> MqttConnectionState;
158
159    /// Check if connected
160    fn is_connected(&self) -> bool {
161        self.connection_state().is_connected()
162    }
163}
164
165/// Trait for MQTT clients that support event callbacks
166#[cfg(not(target_arch = "wasm32"))]
167pub trait MqttClientWithEvents: MqttClient {
168    /// Set event callback
169    ///
170    /// The callback will be called when MQTT events occur.
171    fn on_event<F>(&self, callback: F)
172    where
173        F: Fn(MqttEvent) + Send + Sync + 'static;
174}
175
176/// Trait for MQTT clients that support event callbacks (WASM version)
177#[cfg(target_arch = "wasm32")]
178pub trait MqttClientWithEvents: MqttClient {
179    /// Set event callback
180    ///
181    /// The callback will be called when MQTT events occur.
182    fn on_event<F>(&self, callback: F)
183    where
184        F: Fn(MqttEvent) + 'static;
185}
186
187/// MQTT client factory trait
188#[cfg(not(target_arch = "wasm32"))]
189pub trait MqttClientFactory: Send + Sync {
190    fn create_client(&self) -> Box<dyn MqttClient>;
191}
192
193/// MQTT client factory trait (WASM version)
194#[cfg(target_arch = "wasm32")]
195pub trait MqttClientFactory {
196    fn create_client(&self) -> Box<dyn MqttClient>;
197}