actix_mqtt_client/
client.rs

1use std::io::{Error as IoError, ErrorKind, Result as IoResult};
2use std::sync::Arc;
3
4use actix::{Actor, Addr, MailboxError, Recipient};
5use mqtt::QualityOfService;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use crate::actors::actions::dispatch::DispatchActor;
9use crate::actors::actions::recv::RecvActor;
10use crate::actors::actions::send::SendActor;
11use crate::actors::actions::status::{PacketStatusActor, StatusExistenceMessage};
12use crate::actors::actions::stop::{AddStopRecipient, StopActor};
13use crate::actors::packets::connack::ConnackActor;
14use crate::actors::packets::connect::{Connect, ConnectActor};
15use crate::actors::packets::disconnect::{Disconnect, DisconnectActor};
16use crate::actors::packets::pingreq::PingreqActor;
17use crate::actors::packets::pingresp::PingrespActor;
18use crate::actors::packets::puback::PubackActor;
19use crate::actors::packets::pubcomp::PubcompActor;
20use crate::actors::packets::publish::{Publish, RecvPublishActor, SendPublishActor};
21use crate::actors::packets::pubrec::PubrecActor;
22use crate::actors::packets::pubrel::PubrelActor;
23use crate::actors::packets::suback::SubackActor;
24use crate::actors::packets::subscribe::{Subscribe, SubscribeActor};
25use crate::actors::packets::unsuback::UnsubackActor;
26use crate::actors::packets::unsubscribe::{Unsubscribe, UnsubscribeActor};
27use crate::actors::packets::{PublishMessage, PublishPacketStatus};
28use crate::actors::{ErrorMessage, StopMessage};
29use crate::consts::PING_INTERVAL;
30
31#[inline]
32fn map_mailbox_error_to_io_error(e: MailboxError) -> IoError {
33    IoError::new(ErrorKind::Interrupted, format!("{}", e))
34}
35
36#[inline]
37fn address_not_found_error(name: &str) -> IoError {
38    IoError::new(ErrorKind::NotFound, format!("{} address not found", name))
39}
40
41/// The options for setting up MQTT connection
42#[derive(Default, Clone)]
43pub struct MqttOptions {
44    /// User name
45    pub user_name: Option<String>,
46    /// Password
47    pub password: Option<String>,
48    /// Keep alive time(in seconds)
49    pub keep_alive: Option<u16>,
50}
51
52/// The client for connecting to the MQTT server
53#[derive(Clone)]
54pub struct MqttClient {
55    conn_addr: Option<Addr<ConnectActor>>,
56    pub_addr: Option<Addr<SendPublishActor>>,
57    sub_addr: Option<Addr<SubscribeActor>>,
58    unsub_addr: Option<Addr<UnsubscribeActor>>,
59    stop_addr: Option<Addr<StopActor>>,
60    disconnect_addr: Option<Addr<DisconnectActor>>,
61    conn_status_addr: Option<Addr<PacketStatusActor<()>>>,
62    client_name: Arc<String>,
63    options: Option<MqttOptions>,
64}
65
66impl MqttClient {
67    /// Create a new MQTT client
68    pub fn new<
69        TReader: AsyncRead + Send + 'static + Unpin,
70        TWriter: AsyncWrite + Send + 'static + Unpin,
71    >(
72        reader: TReader,
73        writer: TWriter,
74        client_name: String,
75        options: MqttOptions,
76        message_recipient: Recipient<PublishMessage>,
77        error_recipient: Recipient<ErrorMessage>,
78        stop_recipient: Option<Recipient<StopMessage>>,
79    ) -> Self {
80        let mut client = MqttClient {
81            conn_addr: None,
82            pub_addr: None,
83            sub_addr: None,
84            unsub_addr: None,
85            stop_addr: None,
86            disconnect_addr: None,
87            conn_status_addr: None,
88            client_name: Arc::new(client_name),
89            options: Some(options),
90        };
91        client.start_actors(
92            reader,
93            writer,
94            message_recipient,
95            error_recipient,
96            stop_recipient,
97        );
98        client
99    }
100
101    /// Returns the name of the client
102    pub fn name(&self) -> &str {
103        &*self.client_name
104    }
105
106    /// Perform the connect operation to the remote MQTT server
107    ///
108    /// Note: This function can only be called once for each client, calling it the second time will return an error
109    /// Note: The successful return of this function *DOES NOT* mean that the MQTT connection is successful, if anything wrong happens the error actor will receive an error
110    /// Note: Please use is_connected() to check whether the MQTT connection is successful or not
111    pub async fn connect(&mut self) -> Result<(), IoError> {
112        if let (Some(connect_addr), Some(mut options)) =
113            (self.conn_addr.take(), self.options.take())
114        {
115            connect_addr
116                .send(Connect {
117                    user_name: options.user_name.take(),
118                    password: options.password.take(),
119                    keep_alive: options.keep_alive.take(),
120                })
121                .await
122                .map_err(map_mailbox_error_to_io_error)
123        } else {
124            Err(IoError::new(ErrorKind::AlreadyExists, "Already connected"))
125        }
126    }
127
128    /// Check whether the client has connected to the server successfully
129    pub async fn is_connected(&self) -> IoResult<bool> {
130        match self.conn_status_addr {
131            Some(ref addr) => {
132                let connected = addr
133                    .send(StatusExistenceMessage(1))
134                    .await
135                    .map_err(|e| {
136                        log::error!("Failed to get connection status: {}", e);
137                        IoError::new(ErrorKind::NotConnected, "Failed to connect to server")
138                    })?;
139                Ok(connected)
140            }
141            None => Ok(false),
142        }
143    }
144
145    /// Subscribe to the server with a topic and QoS
146    pub async fn subscribe(&self, topic: String, qos: QualityOfService) -> Result<(), IoError> {
147        if let Some(ref sub_addr) = self.sub_addr {
148            sub_addr
149                .send(Subscribe::new(topic, qos))
150                .await
151                .map_err(map_mailbox_error_to_io_error)
152        } else {
153            Err(address_not_found_error("subscribe"))
154        }
155    }
156
157    /// Unsubscribe from the server
158    pub async fn unsubscribe(&self, topic: String) -> Result<(), IoError> {
159        if let Some(ref unsub_addr) = self.unsub_addr {
160            unsub_addr
161                .send(Unsubscribe::new(topic))
162                .await
163                .map_err(map_mailbox_error_to_io_error)
164        } else {
165            Err(address_not_found_error("unsubscribe"))
166        }
167    }
168
169    /// Publish a message
170    pub async fn publish(
171        &self,
172        topic: String,
173        qos: QualityOfService,
174        payload: Vec<u8>,
175    ) -> Result<(), IoError> {
176        if let Some(ref pub_addr) = self.pub_addr {
177            pub_addr
178                .send(Publish::new(topic, qos, payload))
179                .await
180                .map_err(map_mailbox_error_to_io_error)
181        } else {
182            Err(address_not_found_error("publish"))
183        }
184    }
185
186    /// Disconnect from the server
187    pub async fn disconnect(&mut self, force: bool) -> Result<(), IoError> {
188        if let Some(ref disconnect_addr) = self.disconnect_addr {
189            let result = disconnect_addr
190                .send(Disconnect { force })
191                .await
192                .map_err(map_mailbox_error_to_io_error);
193            self.clear_all_addrs(force);
194            result
195        } else {
196            Err(address_not_found_error("disconnect"))
197        }
198    }
199
200    /// Check if the client has been disconnected from the server, useful to check whether disconnection is completed
201    pub fn is_disconnected(&self) -> bool {
202        if let Some(ref disconnect_addr) = self.disconnect_addr {
203            !disconnect_addr.connected()
204        } else {
205            true
206        }
207    }
208
209    /// Set all addrs to None to prevent further operations
210    fn clear_all_addrs(&mut self, include_disconnect: bool) {
211        self.pub_addr = None;
212        self.sub_addr = None;
213        self.unsub_addr = None;
214        self.conn_addr = None;
215        self.conn_status_addr = None;
216
217        if include_disconnect {
218            self.disconnect_addr = None;
219        }
220    }
221
222    fn start_actors<
223        TReader: AsyncRead + Send + 'static + Unpin,
224        TWriter: AsyncWrite + Send + 'static + Unpin,
225    >(
226        &mut self,
227        reader: TReader,
228        writer: TWriter,
229        publish_message_recipient: Recipient<PublishMessage>,
230        error_recipient: Recipient<ErrorMessage>,
231        client_stop_recipient_option: Option<Recipient<StopMessage>>,
232    ) {
233        let stop_addr = StopActor::new().start();
234
235        if let Some(client_stop_recipient) = client_stop_recipient_option {
236            let _ = stop_addr.do_send(AddStopRecipient(client_stop_recipient));
237        }
238
239        let stop_recipient = stop_addr.clone().recipient();
240        let stop_recipient_container = stop_addr.clone().recipient();
241
242        let send_addr =
243            SendActor::new(writer, error_recipient.clone(), stop_recipient.clone()).start();
244        let send_recipient = send_addr.clone().recipient();
245        let _ = stop_addr.do_send(AddStopRecipient(send_addr.recipient()));
246
247        let disconnect_actor_addr = DisconnectActor::new(
248            send_recipient.clone(),
249            error_recipient.clone(),
250            stop_recipient.clone(),
251        )
252        .start();
253
254        macro_rules! start_response_actor {
255            ($addr_name:ident, $actor_name:ident, $status_recipient:ident) => {
256                let $addr_name = $actor_name::new(
257                    $status_recipient.clone(),
258                    error_recipient.clone(),
259                    stop_recipient.clone(),
260                )
261                .start();
262                let _ = stop_recipient_container
263                    .do_send(AddStopRecipient($addr_name.clone().recipient()));
264            };
265        }
266
267        macro_rules! start_send_actor {
268            ($addr_name:ident, $actor_name:ident, $status_recipient:ident) => {
269                let $addr_name = $actor_name::new(
270                    $status_recipient.clone(),
271                    send_recipient.clone(),
272                    error_recipient.clone(),
273                    stop_recipient.clone(),
274                )
275                .start();
276                let _ = stop_recipient_container
277                    .do_send(AddStopRecipient($addr_name.clone().recipient()));
278            };
279        }
280
281        macro_rules! start_status_actor {
282            ($name:ident, $status_name:tt, $payload_type:ty, $send_status_recipient:expr) => {
283                let status_addr =
284                    PacketStatusActor::<$payload_type>::new($status_name, $send_status_recipient)
285                        .start();
286                let $name = status_addr.clone().recipient();
287                let _ = stop_recipient_container.do_send(AddStopRecipient(status_addr.recipient()));
288            };
289        }
290
291        let send_status_recipient = disconnect_actor_addr.clone().recipient();
292        start_status_actor!(
293            publish_status_recipient,
294            "Disconnect",
295            PublishPacketStatus,
296            Some(send_status_recipient)
297        );
298
299        start_send_actor!(
300            send_pub_actor_addr,
301            SendPublishActor,
302            publish_status_recipient
303        );
304        let recv_pub_actor_addr = RecvPublishActor::new(
305            publish_status_recipient.clone(),
306            send_recipient.clone(),
307            error_recipient.clone(),
308            stop_recipient.clone(),
309            publish_message_recipient,
310        )
311        .start();
312        let _ = stop_recipient_container
313            .do_send(AddStopRecipient(recv_pub_actor_addr.clone().recipient()));
314        start_response_actor!(puback_actor_addr, PubackActor, publish_status_recipient);
315        start_send_actor!(pubrec_actor_addr, PubrecActor, publish_status_recipient);
316        start_send_actor!(pubrel_actor_addr, PubrelActor, publish_status_recipient);
317        start_response_actor!(pubcomp_actor_addr, PubcompActor, publish_status_recipient);
318
319        start_status_actor!(subscribe_status_recipient, "Subscribe", (), None);
320        start_send_actor!(
321            subscribe_actor_addr,
322            SubscribeActor,
323            subscribe_status_recipient
324        );
325        start_response_actor!(suback_actor_addr, SubackActor, subscribe_status_recipient);
326
327        start_status_actor!(unsubscribe_status_recipient, "Unsubscribe", (), None);
328        start_send_actor!(
329            unsubscribe_actor_addr,
330            UnsubscribeActor,
331            unsubscribe_status_recipient
332        );
333        start_response_actor!(
334            unsuback_actor_addr,
335            UnsubackActor,
336            unsubscribe_status_recipient
337        );
338
339        let connect_status_actor_addr = PacketStatusActor::new("Connect", None).start();
340        let connect_actor_addr = ConnectActor::new(
341            send_recipient.clone(),
342            connect_status_actor_addr.clone().recipient(),
343            stop_recipient.clone(),
344            error_recipient.clone(),
345            (&*self.client_name).clone(),
346        )
347        .start();
348
349        let connack_actor_addr = ConnackActor::new(
350            connect_status_actor_addr.clone().recipient(),
351            error_recipient.clone(),
352            connect_actor_addr.clone().recipient(),
353            stop_recipient.clone(),
354        )
355        .start();
356
357        start_status_actor!(ping_status_recipient, "Ping", (), None);
358        let send_ping_actor_addr = PingreqActor::new(
359            ping_status_recipient.clone(),
360            connect_status_actor_addr.clone().recipient(),
361            send_recipient.clone(),
362            error_recipient.clone(),
363            stop_recipient.clone(),
364            PING_INTERVAL.clone(),
365        )
366        .start();
367        let _ = stop_recipient_container
368            .do_send(AddStopRecipient(send_ping_actor_addr.clone().recipient()));
369        start_response_actor!(pingresp_actor_addr, PingrespActor, ping_status_recipient);
370
371        let dispatch_actor_addr = DispatchActor::new(
372            error_recipient.clone(),
373            stop_recipient.clone(),
374            connack_actor_addr.recipient(),
375            pingresp_actor_addr.recipient(),
376            recv_pub_actor_addr.recipient(),
377            puback_actor_addr.recipient(),
378            pubrec_actor_addr.recipient(),
379            pubrel_actor_addr.recipient(),
380            pubcomp_actor_addr.recipient(),
381            suback_actor_addr.recipient(),
382            unsuback_actor_addr.recipient(),
383        )
384        .start();
385        let recv_addr = RecvActor::new(
386            reader,
387            dispatch_actor_addr.clone().recipient(),
388            error_recipient,
389            stop_recipient,
390        )
391        .start();
392        let _ = stop_addr.do_send(AddStopRecipient(recv_addr.recipient()));
393
394        self.sub_addr = Some(subscribe_actor_addr);
395        self.unsub_addr = Some(unsubscribe_actor_addr);
396        self.pub_addr = Some(send_pub_actor_addr);
397        self.disconnect_addr = Some(disconnect_actor_addr);
398        self.conn_addr = Some(connect_actor_addr);
399        self.stop_addr = Some(stop_addr);
400        self.conn_status_addr = Some(connect_status_actor_addr);
401    }
402}