gneiss_mqtt/
lib.rs

1/*
2 * Copyright Bret Ambrose. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6/*!
7This crate provides clients for communicating with a message broker using the MQTT protocol.
8
9MQTT is a publish/subscribe protocol commonly chosen in IoT use cases.  This crate supports
10both [MQTT5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) and
11[MQTT311](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html).  We strongly
12recommend using MQTT5 over 311 for the significant error handling and communication improvements.
13MQTT specification links within crate documentation are made to the MQTT5 spec.
14
15Depending on feature selection, the crate can provide either a client based on the [`tokio`](https://crates.io/crates/tokio)
16runtime or a client that runs in a background thread.  The interfaces to these two clients
17are similar but differ slightly in certain ways (primarily in how operations complete).
18Both clients are asynchronous in the sense that requests to perform MQTT operations are carried out
19asynchronously, but only the tokio-based client has an interface that uses Rust's async keyword.
20
21# Feature Flags
22
23The crate supports a variety of common transport options, including:
24* **TLS** - provided by either *[`rustls`](https://crates.io/crates/rustls)* or *[`native-tls`](https://crates.io/crates/native-tls)*
25* **Websockets** - provided by *[`tungstenite`](https://crates.io/crates/tungstenite)*
26* **HTTP proxies** - bespoke implementation
27
28It is common to see crates provide transport-agnostic clients -- which is clean and minimal -- but at
29the cost of forcing the user to construct the transport connection themselves, which can be daunting.
30This crate has been designed with the goal of providing a solution, not a piece of a solution.
31While this crate does support bring-your-own-transport, it also provides optional features that
32greatly simplify the setup required to use common transport level options.
33
34The crate supports the following feature flags:
35* `tokio` - enables the tokio-based async client
36* `tokio-rustls` - enables TLS (backed by the rustls crate) support within the tokio-based async client
37* `tokio-native-tls` - enables TLS (backed by the native-tls crate) support within the tokio-based async client
38* `tokio-websockets` - enables websockets support within the tokio-based async client
39* `threaded` - enables the thread-based client
40* `threaded-rustls` - enables TLS (backed by the rustls crate) support within the thread-based client
41* `threaded-native-tls` - enables TLS (backed by the native-tls crate) support within the thread-based client
42* `threaded-websockets` - enables websockets support within the thread-based client
43
44# Usage
45
46To use this crate, you'll first need to add it to your project's Cargo.toml:
47
48```toml
49[dependencies]
50gneiss-mqtt = { version = "<version>", features = [ ... ] }
51```
52
53If using the tokio client and your project does not yet include [`tokio`](https://crates.io/crates/tokio),
54you will need to add it too:
55
56```toml
57[dependencies]
58tokio = { version = "1", features = ["full"] }
59```
60
61This crate contains all the building blocks necessary to connect to most MQTT brokers, but
62the configuration to do so can be confusing and delicate.  For complex cases, we recommend
63using broker-specific crates that implement all the low-level connector details needed to
64successfully establish an MQTT connection to a specific broker.  The documentation for these
65specialized crates contains samples and information on how to connect in all of the ways
66each broker supports.
67
68Currently, these crates include:
69* *[gneiss-mqtt-aws](https://crates.io/crates/gneiss-mqtt-aws)* - A crate with a builder that
70  supports all connection methods allowed by the AWS MQTT broker implementation,
71  [AWS IoT Core](https://docs.aws.amazon.com/iot/latest/developerguide/iot-gs.html).
72
73# Examples
74
75In addition to the in-docs examples, there are a variety of standalone examples that can be found
76in the [project repository](https://github.com/gneiss-mqtt/gneiss-mqtt/tree/main/gneiss-mqtt/examples).
77*/
78
79#![cfg_attr(feature = "tokio", doc = r##"
80## Example: Connect to a local Mosquitto server with the tokio client
81
82Assuming a default Mosquitto installation, you can connect locally by plaintext on port 1883:
83
84```no_run
85use gneiss_mqtt::client::AsyncClient;
86use gneiss_mqtt::client::TokioClientBuilder;
87
88#[tokio::main]
89async fn main() -> Result<(), Box<dyn std::error::Error>> {
90
91    let client =
92        TokioClientBuilder::new("127.0.0.1", 1883)
93            .build()?;
94
95    // Once started, the client will recurrently maintain a connection to the endpoint until
96    // stop() is invoked
97    client.start(None)?;
98
99    // <do stuff with the client>
100
101    Ok(())
102}
103```"##)]
104
105/*!
106## Example: Subscribe to a topic with an async client
107
108In order to receive messages, you must first subscribe to the topics you want to receive messages for.  Subscribing
109is straightforward: configure a Subscribe packet and submit it to the client.  The subscribe will
110be performed whether or not the result is waited on.
111
112A successful subscribe result resolves into the Suback packet that the broker responded with.  You must check the Suback
113reason code vector to verify the success/failure result for each subscription in the original subscribe.
114
115```no_run
116use gneiss_mqtt::error::GneissResult;
117use gneiss_mqtt::client::{AsyncClient, AsyncClientHandle, SubscribeResult};
118use gneiss_mqtt::mqtt::{QualityOfService, SubscribePacket, Subscription};
119
120async fn subscribe_to_topic(client: AsyncClientHandle, topic_filter: String) {
121    let subscribe = SubscribePacket::builder()
122        .with_subscription_simple(topic_filter, QualityOfService::AtLeastOnce)
123        .build();
124
125    let subscribe_result = client.subscribe(subscribe, None).await;
126    match subscribe_result {
127        Ok(suback) => {
128            let rc = suback.reason_codes()[0];
129            if rc.is_success() {
130                println!("Subscribe success!");
131            } else {
132                println!("Subscribe failed with reason code: {}", rc.to_string());
133            }
134        }
135        Err(err) => {
136            println!("Subscribe failed with error: {}", err);
137        }
138    }
139}
140```
141
142## Example: Unsubscribe from a topic with an async client
143
144```no_run
145use gneiss_mqtt::error::GneissResult;
146use gneiss_mqtt::client::{AsyncClient, AsyncClientHandle, UnsubscribeResult};
147use gneiss_mqtt::mqtt::UnsubscribePacket;
148
149async fn unsubscribe_from_topic(client: AsyncClientHandle, topic_filter: String) {
150    let unsubscribe = UnsubscribePacket::builder()
151        .with_topic_filter(topic_filter)
152        .build();
153
154    let unsubscribe_result = client.unsubscribe(unsubscribe, None).await;
155    match unsubscribe_result {
156        Ok(unsuback) => {
157            let rc = unsuback.reason_codes()[0];
158            if rc.is_success() {
159                println!("Unsubscribe success!");
160            } else {
161                println!("Unsubscribe failed with reason code: {}", rc.to_string());
162            }
163        }
164        Err(err) => {
165            println!("Unsubscribe failed with error: {}", err);
166        }
167    }
168}
169```
170
171## Example: Publish to a topic with an async client
172
173```no_run
174use gneiss_mqtt::error::GneissResult;
175use gneiss_mqtt::client::{AsyncClient, AsyncClientHandle, PublishResponse, PublishResult};
176use gneiss_mqtt::mqtt::{PublishPacket, QualityOfService};
177
178async fn publish_to_topic(client: AsyncClientHandle, topic: String, message: String) {
179    let publish = PublishPacket::builder(topic, QualityOfService::AtLeastOnce)
180        .with_payload(message.into_bytes())
181        .build();
182
183    let publish_result = client.publish(publish, None).await;
184    match publish_result {
185        Ok(publish_response) => {
186            match publish_response {
187                PublishResponse::Qos1(puback) => {
188                    let rc = puback.reason_code();
189                    if rc.is_success() {
190                        println!("Publish success!");
191                    } else {
192                        println!("Publish failed with reason code: {}", rc.to_string());
193                    }
194                }
195                _ => { panic!("Illegal publish response to a Qos1 publish!") }
196            }
197        }
198        Err(err) => {
199            println!("Publish failed with error: {}", err);
200        }
201    }
202}
203```
204
205*/
206
207#![cfg_attr(feature = "tokio", doc = r##"
208## Example: React to client events with an async client
209
210In addition to performing MQTT operations with the client, you can also react to events emitted by the
211client.  The client emits events when connectivity changes (successful connection, failed connection, disconnection,
212etc...) as well as when publishes are received.
213
214To handle client events, pass in a handler when starting the client.  See the [crate::client::ClientEvent] documentation for
215more information on what data each event variant may contain.
216
217This example shows how you can capture the client in the event handler closure, letting you perform additional
218operations in reaction to client events (the client's public API is immutable).  In this case, we send a "Pong" publish
219every time we receive a "Ping" publish:
220
221```no_run
222use gneiss_mqtt::client::{AsyncClient, AsyncClientHandle, ClientEvent, TokioClientBuilder};
223use gneiss_mqtt::mqtt::{PublishPacket, QualityOfService};
224use std::sync::Arc;
225use tokio::runtime::Handle;
226
227pub fn client_event_callback(client: AsyncClientHandle, event: Arc<ClientEvent>) {
228    if let ClientEvent::PublishReceived(publish_received_event) = event.as_ref() {
229        let publish = &publish_received_event.publish;
230        if let Some(payload) = publish.payload() {
231            if "Ping".as_bytes() == payload {
232                // we received a Ping, let's send a Pong in response
233                let pong_publish = PublishPacket::builder(publish.topic().to_string(), QualityOfService::AtMostOnce)
234                    .with_payload("Pong".as_bytes().to_vec()).build();
235
236                // we're in a synchronous function, but it's being called from an async task within the runtime, so
237                // we can await and check the publish result by getting the current runtime and spawning an async
238                // task in it
239                let runtime_handle = Handle::current();
240                runtime_handle.spawn(async move {
241                    if let Ok(publish_result) = client.publish(pong_publish, None).await {
242                        println!("Successfully published Pong!");
243                    } else {
244                        println!("Failed to publish Pong!");
245                    }
246                });
247            }
248        }
249    }
250}
251
252#[tokio::main]
253async fn main() -> Result<(), Box<dyn std::error::Error>> {
254
255    let client =
256        TokioClientBuilder::new("127.0.0.1", 1883)
257            .build()?;
258
259    // make a client event handler closure
260    let closure_client = client.clone();
261    let listener_callback = Arc::new(move |event| { client_event_callback(closure_client.clone(), event) });
262
263    // Pass the event handler callback into start()
264    client.start(Some(listener_callback))?;
265
266    // <do stuff with the client>
267
268    Ok(())
269}
270
271```"##)]
272
273/*!
274# Frequently Asked Questions
275See [FAQ](https://github.com/gneiss-mqtt/gneiss-mqtt/blob/main/FAQ.md)
276
277*/
278
279#![cfg_attr(docsrs, feature(doc_cfg))]
280#![warn(missing_docs)]
281#![cfg_attr(not(any(feature = "tokio", feature = "threaded")), allow(dead_code))]
282#![cfg_attr(all(feature = "testing", not(test)), allow(dead_code, unused_imports, unused_macros))]
283#![cfg_attr(feature = "strict", deny(warnings))]
284
285pub mod alias;
286pub mod client;
287mod decode;
288mod encode;
289pub mod error;
290mod logging;
291pub mod mqtt;
292mod protocol;
293#[cfg(feature = "testing")]
294pub mod testing;
295mod validate;