lmc/lib.rs
1#![doc(html_logo_url = "https://raw.githubusercontent.com/montoyo/lmc-rs/master/logo.svg", html_favicon_url = "https://raw.githubusercontent.com/montoyo/lmc-rs/master/favicon.ico")]
2//! This crate provides a basic MQTT client implementation with a high-level, asynchronous interface.
3//!
4//! # Connecting
5//!
6//! Connecting to a broker can be achieved using the [`Client::connect()`] function, by passing a
7//! hostname (or IP address) together with [`Options`](options::OptionsT):
8//!
9#![cfg_attr(feature = "tls", doc = r##"```
10# tokio_test::block_on(async {
11use lmc::{Options, Client, QoS};
12
13let mut opts = Options::new("client_id")
14 .enable_tls()
15 .expect("Failed to load native system TLS certificates");
16
17opts.set_username("username")
18 .set_password(b"password");
19
20# return; //We can't really test this in doctests
21let (client, shutdown_handle) = Client::connect("localhost", opts)
22 .await
23 .expect("Failed to connect to broker!");
24
25let (subscription, sub_qos) = client.subscribe_unbounded("my_topic", QoS::AtLeastOnce)
26 .await
27 .expect("Failed to subscribe to 'my_topic'");
28
29println!("Subscribed to topic with QoS {:?}", sub_qos);
30
31client.publish_qos_1("my_topic", b"it works!", false, true)
32 .await
33 .expect("Failed to publish message in 'my_topic'");
34
35let msg = subscription.recv().await.expect("Failed to await message");
36println!("Received {}", msg.payload_as_utf8().unwrap());
37
38shutdown_handle.disconnect().await.expect("Could not disconnect gracefully");
39# });
40```"##)]
41//!
42//! # Publishing messages
43//!
44//! The basic ways to publish messages are:
45//! - [`Client::publish_qos_0()`] to publish a message with a Quality of Service of "AtMostOnce". Unfortunately,
46//! there is no way to know if and when this message has reached the broker safely.
47//! - [`Client::publish_qos_1()`] to publish a message with a Quality of Service of "AtLeastOnce". By setting
48//! `await_ack` to true, it is possible to wait for the acknowledgment packet and make sure that the broker
49//! has received the message.
50//! - [`Client::publish_qos_2()`] to publish a message with a Quality of Service of "ExactlyOnce". By specifying
51//! a value other than [`PublishEvent::None`] to `await_event`, it is possible to wait for the acknowledgment
52//! packet and make sure that the broker has received the message.
53//!
54//! If it is not necessary to wait for the publish packet to be sent, [`Client::publish_no_wait()`] can also be used.
55//! Finally, [`Client::try_publish()`] also be used to attempt to publish a packet with no need for any `await`.
56//!
57//! # Subscribing to topics
58//!
59//! There are multiple ways to subscribe to topics, each of them have their own limitations:
60//! - [`Client::subscribe_lossy()`] will create a subscription based on a bounded queue. If the queue is full when
61//! the message is received, it will not be pushed onto that queue and may be lost.
62//! - [`Client::subscribe_unbounded()`] will create a subscription based on an unbounded queue, bypassing the
63//! limitations of a bounded subscription at a slightly increased performance cost.
64//! - [`Client::subscribe_fast_callback()`] will cause the passed function to be called every time a message is
65//! received. However, the function must be thread-safe and cannot block.
66//!
67//! There is also [`Client::subscribe_void()`], which can be used to only establish an MQTT subscription with the
68//! broker. However, none of the messages can be obtained until a normal LMC subscription is created (including
69//! retained messages). This might be used if you wish to "pre-subscribe" to a topic, but only occasionally care
70//! about the messages.
71//!
72//! Note that LMC subscriptions and MQTT subscriptions are not the same thing. LMC subscriptions are created using
73//! any of the `subscribe` methods. MQTT subscriptions are created by the implementation, as a result of the creation
74//! of an LMC subscription to a **new** topic. If an LMC subscription exists for a given topic, that means that an MQTT
75//! subscription already exists and there is thus no need to create a new one. This does mean that **only the first
76//! LMC subscription will receive the retained messages** of a particular topic (if there are any). So, if the first
77//! subscription is a "void" subscription, retained messages will be lost.
78//!
79//! MQTT subscriptions can only be cancelled (unsubscribed) by calling [`Client::unsubscribe()`] directly. Since v0.2,
80//! LMC will not automatically unsubscribe from topics anymore. This means that even after dropping the MPSC channels
81//! of lossy & unlossy subscriptions, and removing fast callback subscriptions, the MQTT subscription with the broker
82//! is maintained but the messages will be ignored (effectively becoming a "void" subscription) until
83//! [`Client::unsubscribe()`] is called.
84
85#![feature(new_uninit)]
86#![feature(get_mut_unchecked)]
87#![feature(const_trait_impl)]
88
89use std::future::Future;
90use std::sync::Arc;
91use std::sync::atomic::Ordering;
92use std::time::Duration;
93
94use tokio::net::{lookup_host, TcpSocket};
95use tokio::sync::{mpsc, oneshot};
96use tokio::task::{JoinHandle, JoinError};
97use tokio::time;
98
99#[cfg(feature = "tls")]
100pub mod tls;
101
102pub mod subs;
103pub mod options;
104mod transport;
105mod transceiver;
106mod futures;
107mod errors;
108mod shared;
109mod wrappers;
110
111#[cfg(test)]
112mod tests;
113
114pub use errors::{ConnectError, PublishError, TryPublishError, SubscribeError, TimeoutKind, ServerConnectError};
115pub use options::{Options, LastWill};
116pub use transceiver::ShutdownStatus;
117pub use transceiver::packets::{IncomingPublishPacket as Message, PublishFlags, PublishPacketInfo as MessageInfo};
118
119use options::{ConnectionConfig, OptionsT};
120use transceiver::{TransceiverBuildData, Transceiver};
121use transceiver::commands::{Command, PublishCommand, SubscribeCommand, UnsubCommand, UnsubKind, SubscriptionKind, FastCallback};
122use transceiver::packets::{ConnectPacket, Encode, OutgoingPublishPacket};
123use futures::*;
124use shared::*;
125
126#[cfg(feature = "tls")]
127pub use tls::OptionsWithTls;
128
129/// The **maximum** Quality of Service used to transmit and receive messages.
130///
131/// If the value used to publish a message is different than the value used
132/// to subscribe to its corresponding topic, the **effective** quality of
133/// service will be the lowest of these.
134#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
135pub enum QoS
136{
137 /// Messages will be transmitted or received at most once.
138 ///
139 /// This is the lowest (but fastest) quality of service,
140 /// involving a single packet. However, it is possible that
141 /// the message may never be transmitted or received.
142 AtMostOnce = 0,
143
144 /// Messages will be transmitted or received at least once.
145 ///
146 /// This quality of service involves at least 2 packets.
147 /// Using this value as the **effective** quality of service
148 /// means that messages will definitely be received by the
149 /// recipient(s), but they may receive multiple copies of it.
150 AtLeastOnce = 1,
151
152 /// Messages will be transmitted or received exactly once.
153 ///
154 /// This is the highest (but slowest) quality of service,
155 /// involving at least 4 packets. However, if used as the
156 /// **effective** quality of service, it ensures that
157 /// the packet **will** be received by recipient(s) without
158 /// any duplicates.
159 ExactlyOnce = 2
160}
161
162/// A clonable, thread-safe handle to an MQTT client that can be used to
163/// publish messages, and subscribe/unsubscribe to/from topics.
164///
165/// It can also be used to disconnect the client from the broker, however
166/// it is recommended to disconnect using the [`ClientShutdownHandle`]
167/// linked to this client.
168#[derive(Clone)]
169pub struct Client
170{
171 was_session_present: bool,
172 cmd_queue: mpsc::Sender<Command>,
173 shared: Arc<ClientShared>
174}
175
176/// An owned handle that can be used to disconnect and shutdown an MQTT client's
177/// transceiver task and obtain its [`ShutdownStatus`].
178///
179/// Should this value be dropped without calling [`ClientShutdownHandle::disconnect()`]
180/// beforehand, the client's transceiver task will be _detached_.
181pub struct ClientShutdownHandle
182{
183 cmd_queue: mpsc::Sender<Command>,
184 join_handle: JoinHandle<ShutdownStatus>
185}
186
187/// Enumerates the different stages of publishing a message with [`QoS::ExactlyOnce`]
188/// that can be awaited on.
189#[derive(Debug, Clone, Copy)]
190pub enum PublishEvent
191{
192 /// Submit the publish packet to the transceiver task and return, do not wait
193 /// for completion. Shutting down the transceiver task after this may result
194 /// in the message not being transmitted at all.
195 None,
196
197 /// Submit the publish packet to the transceiver task and await its corresponding
198 /// `PUBREC` packet, indicated that the server has received the message.
199 ///
200 /// Shutting down the transceiver task at this point may be safe.
201 Received,
202
203 /// Submit the publish packet to the transceiver task and await its corresponding
204 /// `PUBCOMP` packet, marking the end the transmission of the message.
205 ///
206 /// Shutting down the transceiver task after this is safe.
207 Complete
208}
209
210/// Utility enumeration to store the future corresponding to a [`PublishEvent`].
211enum PublishEventFuture
212{
213 None,
214 Received(PublishFuture<RecNotifierMapAccessor>),
215 Complete(PublishFuture<CompNotifierMapAccessor>)
216}
217
218/// Utility enumeration specifying when to wait for a subscription future.
219enum SubWait
220{
221 /// The subscription already exists and has the specified quality of service.
222 /// No need to await the subscription's completion.
223 DontWait(QoS),
224
225 /// The subscription is pending. Wait for completion before sending the
226 /// subscribe command to the transceiver task.
227 Before,
228
229 /// The subscription does not exist and has to be created. Send the subscribe
230 /// command to the transceiver task right away and await completion after.
231 After
232}
233
234/// The status of a topic subscription. Can be queried using
235/// [`Client::get_subscription_status()`].
236#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
237pub enum SubscriptionStatus
238{
239 /// The subscription does not exist.
240 Absent,
241
242 /// The subscription is in the process of being established.
243 Pending,
244
245 /// THe subscription has already been established, and messages
246 /// are ready to be received.
247 Live
248}
249
250impl Client
251{
252 /// Attempts to connect to the MQTT broker at the specified `host` using the specified [`Options`].
253 ///
254 /// The `host` parameter should only contain a hostname or an IP address, optionally followed by a port
255 /// number, separated by colons (e.g. `example.com` or `127.0.0.1:1234`). If the port is missing, the
256 /// default port (specified in `options`) will be used to establish the connection.
257 ///
258 /// If the connection succeeds and is accepted by the broker, two handles will be returned:
259 /// - A [`Client`] handle that can be used to publish messages and (un)susbcribe to/from topics
260 /// - A [`ClientShutdownHandle`] that can be used to disconnect the client's transceiver task
261 ///
262 /// An example can be found in this module's documentation.
263 pub async fn connect<'a, C, CC>(host: &str, options: OptionsT<'a, CC>) -> Result<(Client, ClientShutdownHandle), ConnectError>
264 where CC: ConnectionConfig<C>
265 {
266 let (options, conn_cfg) = options.separate_connection_cfg();
267 let tmp;
268
269 let (host_and_port, port_pos) = match host.find(':') {
270 Some(x) => (host, x),
271 None => {
272 tmp = format!("{}:{}", host, options.default_port());
273 (tmp.as_str(), host.len())
274 }
275 };
276
277 let host_only = &host_and_port[..port_pos];
278 let conn = conn_cfg.create_connection(host_only)?;
279
280 let addr = time::timeout(options.dns_timeout, lookup_host(host_and_port)).await
281 .map_err(|_| ConnectError::Timeout(TimeoutKind::DnsLookup))?
282 .map_err(ConnectError::LookupHostError)?
283 .filter(|addr| options.enabled_ip_versions.supports(addr))
284 .next()
285 .ok_or(ConnectError::HostnameNotFound)?;
286
287 log::debug!("Connecting to {:?}", addr);
288 let socket = (if addr.is_ipv6() { TcpSocket::new_v6() } else { TcpSocket::new_v4() }).map_err(ConnectError::IoError)?;
289
290 let stream = time::timeout(options.tcp_connect_timeout, socket.connect(addr)).await
291 .map_err(|_| ConnectError::Timeout(TimeoutKind::TcpConnect))?
292 .map_err(ConnectError::IoError)?;
293
294 if options.no_delay {
295 stream.set_nodelay(true).map_err(ConnectError::IoError)?;
296 }
297
298 let (cmd_tx, cmd_rx) = mpsc::channel(1024);
299 let (conn_tx, conn_rx) = oneshot::channel();
300 let transport = CC::create_transport(stream, conn);
301
302 let connect_packet = ConnectPacket {
303 clean_session: options.clean_session,
304 keep_alive: options.keep_alive,
305 client_id: options.client_id,
306 will: options.last_will,
307 username: options.username,
308 password: options.password
309 }.make_arc_packet();
310
311 let shared = Arc::new(ClientShared::new());
312
313 let build_data = TransceiverBuildData {
314 transport,
315 cmd_queue: cmd_rx,
316 connect_sig: conn_tx,
317 connect_packet,
318 ping_interval: Duration::from_secs(options.keep_alive as u64),
319 shared: shared.clone(),
320 packet_resend_delay: options.packets_resend_delay.max(Duration::from_secs(1))
321 };
322
323 let join_handle = Transceiver::spawn(build_data);
324 let conn_result = time::timeout(options.mqtt_connect_timeout, conn_rx).await;
325
326 let conn_result = match conn_result {
327 Ok(Ok(Ok(x))) => Ok(x),
328 Ok(Ok(Err(x))) => Err(ConnectError::ServerError(x)),
329 Ok(Err(_)) => Err(ConnectError::OneshotRecvError),
330 Err(_) => {
331 let _ = cmd_tx.send(Command::Disconnect).await;
332 return Err(ConnectError::Timeout(TimeoutKind::MqttConnect));
333 }
334 };
335
336 match conn_result {
337 Ok(was_session_present) => {
338 let shutdown_handle = ClientShutdownHandle {
339 cmd_queue: cmd_tx.clone(),
340 join_handle
341 };
342
343 let client = Client {
344 was_session_present,
345 cmd_queue: cmd_tx,
346 shared
347 };
348
349 Ok((client, shutdown_handle))
350 },
351 Err(err) => {
352 drop(cmd_tx);
353 let _ = join_handle.await;
354 Err(err)
355 }
356 }
357 }
358
359 /// Returns the `session_present` flags returned by the broker when connecting.
360 /// Indicates if a session corresponding to this client's ID was already existing and has been resumed.
361 pub fn was_session_present(&self) -> bool
362 {
363 self.was_session_present
364 }
365
366 #[inline]
367 fn make_publish_cmd(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> PublishCommand
368 {
369 let packet_id = if qos == QoS::AtMostOnce { 0 } else { self.shared.next_packet_id.fetch_add(1, Ordering::Relaxed) };
370
371 let packet = OutgoingPublishPacket {
372 flags: PublishFlags::new(false, qos, retain),
373 topic, packet_id, payload
374 };
375
376 PublishCommand {
377 packet_id,
378 qos,
379 packet: packet.make_arc_packet()
380 }
381 }
382
383 /// Attempts to publish a message. This flavour is non-blocking and does not need to be awaited on, but
384 /// can fail if the client's command queue is full.
385 ///
386 /// There is no guarantee that the message will be transmitted to the broker as soon as this call returns,
387 /// so the developer should wait a bit before shutting the client down. If this is not the expected
388 /// behaviour, check the other `publish` methods.
389 ///
390 /// For details regarding this method's parameters, check the MQTT protocol.
391 pub fn try_publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), TryPublishError>
392 {
393 use mpsc::error::*;
394 let cmd = self.make_publish_cmd(topic, payload, qos, retain);
395
396 match self.cmd_queue.try_send(cmd.into()) {
397 Ok(x) => Ok(x),
398 Err(TrySendError::Closed(_)) => Err(TryPublishError::TransceiverTaskTerminated),
399 Err(TrySendError::Full(_)) => Err(TryPublishError::QueueFull)
400 }
401 }
402
403 /// Waits for a free spot in the client's command queue and enqueues a publish command.
404 ///
405 /// There is no guarantee that the message will be transmitted to the broker as soon as this call returns,
406 /// so the developer should wait a bit before shutting the client down. If this is not the expected
407 /// behaviour, check the other `publish` methods.
408 ///
409 /// For details regarding this method's parameters, check the MQTT protocol.
410 pub async fn publish_no_wait(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), PublishError>
411 {
412 let cmd = self.make_publish_cmd(topic, payload, qos, retain);
413 self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)
414 }
415
416 /// Waits for a free spot in the client's command queue and enqueues a publish command with the lowest
417 /// possible Quality of Service ([`QoS::AtMostOnce`]). This is exactly the same as calling
418 /// [`Self::publish_no_wait()`] with [`QoS::AtMostOnce`].
419 ///
420 /// There is no guarantee that the message will be transmitted to the broker as soon as this call returns,
421 /// so the developer should wait a bit before shutting the client down. Reception by the broker will also
422 /// never be guaranteed for a message with [`QoS::AtMostOnce`]; it is thus recommended to use
423 /// [`Self::publish_qos_1()`] or [`Self::publish_qos_2()`] if the developer needs to ensure that the message
424 /// has been successfully received by the broker.
425 ///
426 /// For details regarding this method's parameters, check the MQTT protocol.
427 pub async fn publish_qos_0(&self, topic: &str, payload: &[u8], retain: bool) -> Result<(), PublishError>
428 {
429 let cmd = self.make_publish_cmd(topic, payload, QoS::AtMostOnce, retain);
430 self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)
431 }
432
433 /// Waits for a free spot in the client's command queue, enqueues a publish command with [`QoS::AtLeastOnce`],
434 /// and optionally waits for the corresponding `PUBACK` packet, indicating that the message was successfully
435 /// received by the broker (based on the value of `await_ack`).
436 ///
437 /// If `await_ack` is `false`, then this is exactly the same as calling [`Self::publish_no_wait()`] with
438 /// [`QoS::AtLeastOnce`], and there will be no guarantee that the message will be transmitted to the broker
439 /// as soon as this call returns.
440 ///
441 /// Note that the broker is free to not relay the message to any clients if, for instance, this client does
442 /// not have the permission to publish to the specified topic. On top of that, clients subscribed to the
443 /// topic may have chosen a different Quality of Service. So, even if this method succeeds with `await_ack`
444 /// set to true `true`, there is no guarantee that the message was received by any client.
445 ///
446 /// For details regarding this method's parameters, check the MQTT protocol.
447 pub async fn publish_qos_1(&self, topic: &str, payload: &[u8], retain: bool, await_ack: bool) -> Result<(), PublishError>
448 {
449 let cmd = self.make_publish_cmd(topic, payload, QoS::AtLeastOnce, retain);
450
451 let opt_fut = match await_ack {
452 true => Some(PublishFuture::new(cmd.packet_id, AckNotifierMapAccessor(self.shared.clone()))),
453 false => None
454 };
455
456 self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)?;
457
458 match opt_fut {
459 Some(x) => if x.await { Err(PublishError::TransceiverTaskTerminated) } else { Ok(()) },
460 None => Ok(())
461 }
462 }
463
464 /// Waits for a free spot in the client's command queue, enqueues a publish command with [`QoS::ExactlyOnce`],
465 /// and optionally waits for (based on the value of `await_event`):
466 /// - The corresponding `PUBREC` packet, indicating that the message was successfully received by the broker
467 /// - The corresponding `PUBCOMP` packet, indicating the end of the QoS 2 message transmission
468 ///
469 /// If `await_event` is [`PublishEvent::None`], then this is exactly the same as calling [`Self::publish_no_wait()`]
470 /// with [`QoS::ExactlyOnce`], and there will be no guarantee that the message will be transmitted to the broker
471 /// as soon as this call returns.
472 ///
473 /// Note that the broker is free to not relay the message to any clients if, for instance, this client does
474 /// not have the permission to publish to the specified topic. On top of that, clients subscribed to the
475 /// topic may have chosen a different Quality of Service. So, even if this method succeeds with `await_event`
476 /// different from [`PublishEvent::None`], there is no guarantee that the message was received by any client.
477 ///
478 /// For details regarding this method's parameters, check the MQTT protocol.
479 pub async fn publish_qos_2(&self, topic: &str, payload: &[u8], retain: bool, await_event: PublishEvent) -> Result<(), PublishError>
480 {
481 let cmd = self.make_publish_cmd(topic, payload, QoS::ExactlyOnce, retain);
482
483 let opt_fut = match await_event {
484 PublishEvent::None => PublishEventFuture::None,
485 PublishEvent::Received => PublishEventFuture::Received(PublishFuture::new(cmd.packet_id, RecNotifierMapAccessor(self.shared.clone()))),
486 PublishEvent::Complete => PublishEventFuture::Complete(PublishFuture::new(cmd.packet_id, CompNotifierMapAccessor(self.shared.clone())))
487 };
488
489 self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)?;
490
491 let ttt = match opt_fut {
492 PublishEventFuture::None => false,
493 PublishEventFuture::Received(x) => x.await,
494 PublishEventFuture::Complete(x) => x.await
495 };
496
497 if ttt { Err(PublishError::TransceiverTaskTerminated) } else { Ok(()) }
498 }
499
500 async fn subscribe<R, F>(&self, topic: &str, qos_hint: QoS, func: F) -> Result<(R, QoS), SubscribeError>
501 where F: FnOnce() -> (R, SubscriptionKind)
502 {
503 let mut sub_map = self.shared.subs.lock();
504
505 let actual_qos = match sub_map.get_mut(topic) {
506 Some(SubscriptionState::Existing(qos)) => SubWait::DontWait(*qos), //Add ref, then send command
507 Some(SubscriptionState::Pending(_)) => SubWait::Before, //Add ref, await result, and only then send command
508 None => {
509 //Create the map entry, then send command, then await result
510 sub_map.insert(topic.into(), SubscriptionState::Pending(Vec::new()));
511 SubWait::After
512 }
513 };
514
515 drop(sub_map);
516
517 let actual_qos = match actual_qos {
518 SubWait::DontWait(x) => Some(x),
519 SubWait::Before => Some(SubscribeFuture::new(&self.shared, topic).await.map_err(|_| SubscribeError::RefusedByBroker)?),
520 SubWait::After => None
521 };
522
523 let (ret, kind) = func();
524
525 let cmd = SubscribeCommand {
526 topic: topic.into(),
527 qos: qos_hint,
528 kind
529 };
530
531 //Note: we could skip this safely for hold subscriptions if the subscription already exists...
532 self.cmd_queue.send(cmd.into()).await.map_err(|_| SubscribeError::TransceiverTaskTerminated)?;
533
534 let actual_qos = match actual_qos {
535 Some(x) => x,
536 None => SubscribeFuture::new(&self.shared, topic).await.map_err(|_| SubscribeError::RefusedByBroker)?
537 };
538
539 Ok((ret, actual_qos))
540 }
541
542 /// Creates a "void" subscription to the specified topic. This will only send a subscription request to the
543 /// broker, and the caller will not have any way to retrieve the messages. If you wish to read messages, use
544 /// any other `subscribe` methods.
545 ///
546 /// If the client is already subscribed to the topic, then this function will only wait for a free spot on the
547 /// command queue to submit the subscribe command, and will completely ignore `qos_hint`. Otherwise, it will
548 /// send a `SUBSCRIBE` packet to the broker with the specified `qos_hint` and wait for a response. Because this
549 /// is "void" subscription will be the sole subscription for this topic, all incoming messages (including the
550 /// retained messages, if any) will be dropped, until a new subscription is created using any of the other
551 /// `subscribe` methods.
552 ///
553 /// The returned value is the actual [`QoS`] of this subscription as specified by the broker.
554 pub async fn subscribe_void(&self, topic: String, qos_hint: QoS) -> Result<QoS, SubscribeError>
555 {
556 self.subscribe(&topic, qos_hint, move || ((), SubscriptionKind::Void)).await.map(|(_, qos)| qos)
557 }
558
559 /// Creates a "lossy" subscription to the specified topic, with the specified capacity. "Lossy" subscriptions
560 /// use a bounded MPSC channel to read incoming messages. Because the transceiver task cannot afford to wait,
561 /// messages on the queue are sent using [`mpsc::Sender::try_send()`] and are dropped if the queue is full,
562 /// meaning that messages can be lost if they are not dequeued quickly enough or if the queue capacity
563 /// is too low. If this behaviour is not wanted, use [`Self::subscribe_unbounded()`] which relies on a
564 /// [`mpsc::UnboundedSender`] instead, for a slightly higher cost in performance.
565 ///
566 /// If the client is already subscribed to the topic, then this function will only wait for a free spot on the
567 /// command queue to submit the subscribe command, and will completely ignore `qos_hint`. Otherwise, it will
568 /// send a `SUBSCRIBE` packet to the broker with the specified `qos_hint` and wait for a response. In that case,
569 /// retained packets (if there are any) are guarateed to be pushed onto the returned queue, unless the queue
570 /// is full.
571 ///
572 /// The returned value contains the [`mpsc::Receiver`] end of the queue as well as the actual [`QoS`] of this
573 /// subscription as specified by the broker. Since v0.2, closing the receiving end of the queue **will not**
574 /// cause the client to unsubscribe. To unsubscribe from a topic, use [`Client::unsubscribe()`].
575 pub fn subscribe_lossy<'a>(&'a self, topic: &'a str, qos_hint: QoS, queue_cap: usize) -> impl Future<Output = Result<(mpsc::Receiver<Message>, QoS), SubscribeError>> + 'a
576 {
577 self.subscribe(topic, qos_hint, move || {
578 let (tx, rx) = mpsc::channel(queue_cap);
579 (rx, SubscriptionKind::Lossy(tx))
580 })
581 }
582
583 /// Creates an "unbounded" subscription to the specified topic. "Unbounded" subscriptions use an unbounded
584 /// channel to read incoming messages. This is the safest way to receive messages, but may come at a slighly
585 /// increased performance cost. If performance is an issue, use [`Self::subscribe_lossy()`].
586 ///
587 /// If the client is already subscribed to the topic, then this function will only wait for a free spot on the
588 /// command queue to submit the subscribe command, and will completely ignore `qos_hint`. Otherwise, it will
589 /// send a `SUBSCRIBE` packet to the broker with the specified `qos_hint` and wait for a response. In that case,
590 /// retained packets (if there are any) are guarateed to be pushed onto the returned queue.
591 ///
592 /// The returned value contains the [`mpsc::UnboundedReceiver`] end of the queue as well as the actual [`QoS`]
593 /// of this subscription as specified by the broker. Since v0.2, closing the receiving end of the queue **will
594 /// not** cause the client to unsubscribe. To unsubscribe from a topic, use [`Client::unsubscribe()`].
595 pub fn subscribe_unbounded<'a>(&'a self, topic: &'a str, qos_hint: QoS) -> impl Future<Output = Result<(mpsc::UnboundedReceiver<Message>, QoS), SubscribeError>> + 'a
596 {
597 self.subscribe(topic, qos_hint, move || {
598 let (tx, rx) = mpsc::unbounded_channel();
599 (rx, SubscriptionKind::Unbounded(tx))
600 })
601 }
602
603 /// Creates a [`subs::Callback`] subscription to the specified topic. Callback subscriptions simply call
604 /// the specified function whenever a message is received. This is the lightest form of subscription, however
605 /// it does come with some constraints:
606 /// - `callback` must be thread-safe (implement [`Send`] and [`Sync`])
607 /// - `callback` must never block
608 ///
609 /// If the client is already subscribed to the topic, then this function will only wait for a free spot on the
610 /// command queue to submit the subscribe command, and will completely ignore `qos_hint`. Otherwise, it will
611 /// send a `SUBSCRIBE` packet to the broker with the specified `qos_hint` and wait for a response. In that case,
612 /// `callback` is guaranteed to be called for each retained messages (should there be any).
613 ///
614 /// The returned value contains the subscription object (which can be used to cancel the subscription),
615 /// as well as the actual [`QoS`] of this subscription as specified by the broker.
616 pub async fn subscribe_fast_callback<C>(&self, topic: String, qos_hint: QoS, callback: C) -> Result<(subs::Callback, QoS), SubscribeError>
617 where C: FnMut(Message) + Send + Sync + 'static
618 {
619 let (id, qos) = self.subscribe(&topic, qos_hint, move || {
620 let id = self.shared.next_callback_id.fetch_add(1, Ordering::Relaxed);
621 (id, SubscriptionKind::FastCallback(FastCallback { id, f: Box::new(callback) }))
622 }).await?;
623
624 let ret = subs::Callback::new(self.cmd_queue.clone(), topic, id);
625 Ok((ret, qos))
626 }
627
628 /// Queries the status of a subscription to the specified topic. See [`SubscriptionStatus`] for
629 /// details regarding the return value.
630 pub fn get_subscription_status(&self, topic: &str) -> SubscriptionStatus
631 {
632 match self.shared.subs.lock().get(topic) {
633 Some(SubscriptionState::Existing(_)) => SubscriptionStatus::Live,
634 Some(SubscriptionState::Pending(_)) => SubscriptionStatus::Pending,
635 None => SubscriptionStatus::Absent
636 }
637 }
638
639 /// Unsubscribes from the specified topic, regardless of any existing subscription to that topic. If there
640 /// are valid MPSC channels for that topic, they will be closed. If the client is not subscribed to this
641 /// topic, this call will have no effect.
642 pub async fn unsubscribe(&self, topic: String)
643 {
644 let cmd = UnsubCommand { topic, kind: UnsubKind::Immediate };
645 let _ = self.cmd_queue.send(cmd.into()).await;
646 }
647
648 /// Returns `true` if the transceiver task is still running, meaning the MQTT client is probably still
649 /// connected.
650 pub fn is_transceiver_task_running(&self) -> bool
651 {
652 !self.cmd_queue.is_closed()
653 }
654
655 /// Returns `true` if the transceiver task stopped, which may occur if [`Client::disconnect()`] or
656 /// [`ClientShutdownHandle::disconnect()`] are closed, but also if the connection is closed
657 /// unexpectedly.
658 pub fn did_transceiver_task_stop(&self) -> bool
659 {
660 self.cmd_queue.is_closed()
661 }
662
663 /// Enqueues a disconnect command, causing the transceiver task to attempt to disconnect gracefully
664 /// and shutdown. This will take some time to complete and the task might still be alive when this
665 /// function returns. [`Self::is_transceiver_task_running()`] or [`Self::did_transceiver_task_stop()`]
666 /// can be used to check if the task is still running. Ideally, [`ClientShutdownHandle::disconnect()`]
667 /// should be used instead as it waits for the task's completion.
668 ///
669 /// Pending publish packets may not be sent, and existing subscriptions will be closed.
670 pub async fn disconnect(self)
671 {
672 let _ = self.cmd_queue.send(Command::Disconnect).await;
673 }
674}
675
676impl ClientShutdownHandle
677{
678 /// Enqueues a disconnect command, causing the transceiver task to attempt to disconnect gracefully
679 /// and shutdown. Unlike [`Client::disconnect()`], this function will wait for the transceiver task
680 /// to complete before returning, and also returns a [`ShutdownStatus`], which can be used to know
681 /// whether the client managed to disconnect gracefully or not.
682 ///
683 /// Pending publish packets may not be sent, and existing subscriptions will be closed.
684 pub async fn disconnect(self) -> Result<ShutdownStatus, JoinError>
685 {
686 let _ = self.cmd_queue.send(Command::Disconnect).await;
687 self.join_handle.await
688 }
689}