1use std::io::{Error as IoError, ErrorKind, Result as IoResult};
2use std::sync::Arc;
3
4use actix::{Actor, Addr, MailboxError, Recipient};
5use mqtt::QualityOfService;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use crate::actors::actions::dispatch::DispatchActor;
9use crate::actors::actions::recv::RecvActor;
10use crate::actors::actions::send::SendActor;
11use crate::actors::actions::status::{PacketStatusActor, StatusExistenceMessage};
12use crate::actors::actions::stop::{AddStopRecipient, StopActor};
13use crate::actors::packets::connack::ConnackActor;
14use crate::actors::packets::connect::{Connect, ConnectActor};
15use crate::actors::packets::disconnect::{Disconnect, DisconnectActor};
16use crate::actors::packets::pingreq::PingreqActor;
17use crate::actors::packets::pingresp::PingrespActor;
18use crate::actors::packets::puback::PubackActor;
19use crate::actors::packets::pubcomp::PubcompActor;
20use crate::actors::packets::publish::{Publish, RecvPublishActor, SendPublishActor};
21use crate::actors::packets::pubrec::PubrecActor;
22use crate::actors::packets::pubrel::PubrelActor;
23use crate::actors::packets::suback::SubackActor;
24use crate::actors::packets::subscribe::{Subscribe, SubscribeActor};
25use crate::actors::packets::unsuback::UnsubackActor;
26use crate::actors::packets::unsubscribe::{Unsubscribe, UnsubscribeActor};
27use crate::actors::packets::{PublishMessage, PublishPacketStatus};
28use crate::actors::{ErrorMessage, StopMessage};
29use crate::consts::PING_INTERVAL;
30
31#[inline]
32fn map_mailbox_error_to_io_error(e: MailboxError) -> IoError {
33 IoError::new(ErrorKind::Interrupted, format!("{}", e))
34}
35
36#[inline]
37fn address_not_found_error(name: &str) -> IoError {
38 IoError::new(ErrorKind::NotFound, format!("{} address not found", name))
39}
40
41#[derive(Default, Clone)]
43pub struct MqttOptions {
44 pub user_name: Option<String>,
46 pub password: Option<String>,
48 pub keep_alive: Option<u16>,
50}
51
52#[derive(Clone)]
54pub struct MqttClient {
55 conn_addr: Option<Addr<ConnectActor>>,
56 pub_addr: Option<Addr<SendPublishActor>>,
57 sub_addr: Option<Addr<SubscribeActor>>,
58 unsub_addr: Option<Addr<UnsubscribeActor>>,
59 stop_addr: Option<Addr<StopActor>>,
60 disconnect_addr: Option<Addr<DisconnectActor>>,
61 conn_status_addr: Option<Addr<PacketStatusActor<()>>>,
62 client_name: Arc<String>,
63 options: Option<MqttOptions>,
64}
65
66impl MqttClient {
67 pub fn new<
69 TReader: AsyncRead + Send + 'static + Unpin,
70 TWriter: AsyncWrite + Send + 'static + Unpin,
71 >(
72 reader: TReader,
73 writer: TWriter,
74 client_name: String,
75 options: MqttOptions,
76 message_recipient: Recipient<PublishMessage>,
77 error_recipient: Recipient<ErrorMessage>,
78 stop_recipient: Option<Recipient<StopMessage>>,
79 ) -> Self {
80 let mut client = MqttClient {
81 conn_addr: None,
82 pub_addr: None,
83 sub_addr: None,
84 unsub_addr: None,
85 stop_addr: None,
86 disconnect_addr: None,
87 conn_status_addr: None,
88 client_name: Arc::new(client_name),
89 options: Some(options),
90 };
91 client.start_actors(
92 reader,
93 writer,
94 message_recipient,
95 error_recipient,
96 stop_recipient,
97 );
98 client
99 }
100
101 pub fn name(&self) -> &str {
103 &*self.client_name
104 }
105
106 pub async fn connect(&mut self) -> Result<(), IoError> {
112 if let (Some(connect_addr), Some(mut options)) =
113 (self.conn_addr.take(), self.options.take())
114 {
115 connect_addr
116 .send(Connect {
117 user_name: options.user_name.take(),
118 password: options.password.take(),
119 keep_alive: options.keep_alive.take(),
120 })
121 .await
122 .map_err(map_mailbox_error_to_io_error)
123 } else {
124 Err(IoError::new(ErrorKind::AlreadyExists, "Already connected"))
125 }
126 }
127
128 pub async fn is_connected(&self) -> IoResult<bool> {
130 match self.conn_status_addr {
131 Some(ref addr) => {
132 let connected = addr
133 .send(StatusExistenceMessage(1))
134 .await
135 .map_err(|e| {
136 log::error!("Failed to get connection status: {}", e);
137 IoError::new(ErrorKind::NotConnected, "Failed to connect to server")
138 })?;
139 Ok(connected)
140 }
141 None => Ok(false),
142 }
143 }
144
145 pub async fn subscribe(&self, topic: String, qos: QualityOfService) -> Result<(), IoError> {
147 if let Some(ref sub_addr) = self.sub_addr {
148 sub_addr
149 .send(Subscribe::new(topic, qos))
150 .await
151 .map_err(map_mailbox_error_to_io_error)
152 } else {
153 Err(address_not_found_error("subscribe"))
154 }
155 }
156
157 pub async fn unsubscribe(&self, topic: String) -> Result<(), IoError> {
159 if let Some(ref unsub_addr) = self.unsub_addr {
160 unsub_addr
161 .send(Unsubscribe::new(topic))
162 .await
163 .map_err(map_mailbox_error_to_io_error)
164 } else {
165 Err(address_not_found_error("unsubscribe"))
166 }
167 }
168
169 pub async fn publish(
171 &self,
172 topic: String,
173 qos: QualityOfService,
174 payload: Vec<u8>,
175 ) -> Result<(), IoError> {
176 if let Some(ref pub_addr) = self.pub_addr {
177 pub_addr
178 .send(Publish::new(topic, qos, payload))
179 .await
180 .map_err(map_mailbox_error_to_io_error)
181 } else {
182 Err(address_not_found_error("publish"))
183 }
184 }
185
186 pub async fn disconnect(&mut self, force: bool) -> Result<(), IoError> {
188 if let Some(ref disconnect_addr) = self.disconnect_addr {
189 let result = disconnect_addr
190 .send(Disconnect { force })
191 .await
192 .map_err(map_mailbox_error_to_io_error);
193 self.clear_all_addrs(force);
194 result
195 } else {
196 Err(address_not_found_error("disconnect"))
197 }
198 }
199
200 pub fn is_disconnected(&self) -> bool {
202 if let Some(ref disconnect_addr) = self.disconnect_addr {
203 !disconnect_addr.connected()
204 } else {
205 true
206 }
207 }
208
209 fn clear_all_addrs(&mut self, include_disconnect: bool) {
211 self.pub_addr = None;
212 self.sub_addr = None;
213 self.unsub_addr = None;
214 self.conn_addr = None;
215 self.conn_status_addr = None;
216
217 if include_disconnect {
218 self.disconnect_addr = None;
219 }
220 }
221
222 fn start_actors<
223 TReader: AsyncRead + Send + 'static + Unpin,
224 TWriter: AsyncWrite + Send + 'static + Unpin,
225 >(
226 &mut self,
227 reader: TReader,
228 writer: TWriter,
229 publish_message_recipient: Recipient<PublishMessage>,
230 error_recipient: Recipient<ErrorMessage>,
231 client_stop_recipient_option: Option<Recipient<StopMessage>>,
232 ) {
233 let stop_addr = StopActor::new().start();
234
235 if let Some(client_stop_recipient) = client_stop_recipient_option {
236 let _ = stop_addr.do_send(AddStopRecipient(client_stop_recipient));
237 }
238
239 let stop_recipient = stop_addr.clone().recipient();
240 let stop_recipient_container = stop_addr.clone().recipient();
241
242 let send_addr =
243 SendActor::new(writer, error_recipient.clone(), stop_recipient.clone()).start();
244 let send_recipient = send_addr.clone().recipient();
245 let _ = stop_addr.do_send(AddStopRecipient(send_addr.recipient()));
246
247 let disconnect_actor_addr = DisconnectActor::new(
248 send_recipient.clone(),
249 error_recipient.clone(),
250 stop_recipient.clone(),
251 )
252 .start();
253
254 macro_rules! start_response_actor {
255 ($addr_name:ident, $actor_name:ident, $status_recipient:ident) => {
256 let $addr_name = $actor_name::new(
257 $status_recipient.clone(),
258 error_recipient.clone(),
259 stop_recipient.clone(),
260 )
261 .start();
262 let _ = stop_recipient_container
263 .do_send(AddStopRecipient($addr_name.clone().recipient()));
264 };
265 }
266
267 macro_rules! start_send_actor {
268 ($addr_name:ident, $actor_name:ident, $status_recipient:ident) => {
269 let $addr_name = $actor_name::new(
270 $status_recipient.clone(),
271 send_recipient.clone(),
272 error_recipient.clone(),
273 stop_recipient.clone(),
274 )
275 .start();
276 let _ = stop_recipient_container
277 .do_send(AddStopRecipient($addr_name.clone().recipient()));
278 };
279 }
280
281 macro_rules! start_status_actor {
282 ($name:ident, $status_name:tt, $payload_type:ty, $send_status_recipient:expr) => {
283 let status_addr =
284 PacketStatusActor::<$payload_type>::new($status_name, $send_status_recipient)
285 .start();
286 let $name = status_addr.clone().recipient();
287 let _ = stop_recipient_container.do_send(AddStopRecipient(status_addr.recipient()));
288 };
289 }
290
291 let send_status_recipient = disconnect_actor_addr.clone().recipient();
292 start_status_actor!(
293 publish_status_recipient,
294 "Disconnect",
295 PublishPacketStatus,
296 Some(send_status_recipient)
297 );
298
299 start_send_actor!(
300 send_pub_actor_addr,
301 SendPublishActor,
302 publish_status_recipient
303 );
304 let recv_pub_actor_addr = RecvPublishActor::new(
305 publish_status_recipient.clone(),
306 send_recipient.clone(),
307 error_recipient.clone(),
308 stop_recipient.clone(),
309 publish_message_recipient,
310 )
311 .start();
312 let _ = stop_recipient_container
313 .do_send(AddStopRecipient(recv_pub_actor_addr.clone().recipient()));
314 start_response_actor!(puback_actor_addr, PubackActor, publish_status_recipient);
315 start_send_actor!(pubrec_actor_addr, PubrecActor, publish_status_recipient);
316 start_send_actor!(pubrel_actor_addr, PubrelActor, publish_status_recipient);
317 start_response_actor!(pubcomp_actor_addr, PubcompActor, publish_status_recipient);
318
319 start_status_actor!(subscribe_status_recipient, "Subscribe", (), None);
320 start_send_actor!(
321 subscribe_actor_addr,
322 SubscribeActor,
323 subscribe_status_recipient
324 );
325 start_response_actor!(suback_actor_addr, SubackActor, subscribe_status_recipient);
326
327 start_status_actor!(unsubscribe_status_recipient, "Unsubscribe", (), None);
328 start_send_actor!(
329 unsubscribe_actor_addr,
330 UnsubscribeActor,
331 unsubscribe_status_recipient
332 );
333 start_response_actor!(
334 unsuback_actor_addr,
335 UnsubackActor,
336 unsubscribe_status_recipient
337 );
338
339 let connect_status_actor_addr = PacketStatusActor::new("Connect", None).start();
340 let connect_actor_addr = ConnectActor::new(
341 send_recipient.clone(),
342 connect_status_actor_addr.clone().recipient(),
343 stop_recipient.clone(),
344 error_recipient.clone(),
345 (&*self.client_name).clone(),
346 )
347 .start();
348
349 let connack_actor_addr = ConnackActor::new(
350 connect_status_actor_addr.clone().recipient(),
351 error_recipient.clone(),
352 connect_actor_addr.clone().recipient(),
353 stop_recipient.clone(),
354 )
355 .start();
356
357 start_status_actor!(ping_status_recipient, "Ping", (), None);
358 let send_ping_actor_addr = PingreqActor::new(
359 ping_status_recipient.clone(),
360 connect_status_actor_addr.clone().recipient(),
361 send_recipient.clone(),
362 error_recipient.clone(),
363 stop_recipient.clone(),
364 PING_INTERVAL.clone(),
365 )
366 .start();
367 let _ = stop_recipient_container
368 .do_send(AddStopRecipient(send_ping_actor_addr.clone().recipient()));
369 start_response_actor!(pingresp_actor_addr, PingrespActor, ping_status_recipient);
370
371 let dispatch_actor_addr = DispatchActor::new(
372 error_recipient.clone(),
373 stop_recipient.clone(),
374 connack_actor_addr.recipient(),
375 pingresp_actor_addr.recipient(),
376 recv_pub_actor_addr.recipient(),
377 puback_actor_addr.recipient(),
378 pubrec_actor_addr.recipient(),
379 pubrel_actor_addr.recipient(),
380 pubcomp_actor_addr.recipient(),
381 suback_actor_addr.recipient(),
382 unsuback_actor_addr.recipient(),
383 )
384 .start();
385 let recv_addr = RecvActor::new(
386 reader,
387 dispatch_actor_addr.clone().recipient(),
388 error_recipient,
389 stop_recipient,
390 )
391 .start();
392 let _ = stop_addr.do_send(AddStopRecipient(recv_addr.recipient()));
393
394 self.sub_addr = Some(subscribe_actor_addr);
395 self.unsub_addr = Some(unsubscribe_actor_addr);
396 self.pub_addr = Some(send_pub_actor_addr);
397 self.disconnect_addr = Some(disconnect_actor_addr);
398 self.conn_addr = Some(connect_actor_addr);
399 self.stop_addr = Some(stop_addr);
400 self.conn_status_addr = Some(connect_status_actor_addr);
401 }
402}