rumq_client/
lib.rs

1//! A pure rust mqtt client which strives to be robust, efficient and easy to use.
2//! * Eventloop is just an async `Stream` which can be polled by tokio
3//! * Requests to eventloop is also a `Stream`. Solves both bounded an unbounded usecases
4//! * Robustness just a loop away
5//! * Flexible access to the state of eventloop to control its behaviour
6//!
7//! Accepts any stream of Requests
8//! ----------------------------
9//! Build bounded, unbounded, interruptible or any other stream (that fits your need) to
10//! feed the eventloop.
11//!
12//! **Few of our real world use cases**
13//!
14//! - A stream which orchestrates data between disk and memory by detecting backpressure and never (practically) loose data
15//! - A stream which juggles data between several channels based on priority of the data
16//!
17//! ```ignore
18//! #[tokio::main(core_threads = 1)]
19//! async fn main() {
20//!     let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
21//!     let requests = Vec::new::<Request>();
22//!
23//!     let mut eventloop = eventloop(mqttoptions, requests_rx);
24//!     let mut stream = eventloop.connect().await.unwrap();
25//!     while let Some(item) = stream.next().await {
26//!         println!("Received = {:?}", item);
27//!     }
28//! }
29//! ```
30//! Robustness a loop away
31//! ----------------------
32//! Networks are unreliable. But robustness is easy
33//!
34//! - Just create a new stream from the existing eventloop
35//! - Resumes from where it left
36//! - Access the state of the eventloop to customize the behaviour of the next connection
37//!
38//! ```ignore
39//! #[tokio::main(core_threads = 1)]
40//! async fn main() {
41//!     let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
42//!     let requests = Vec::new::<Request>();
43//!
44//!     let mut eventloop = eventloop(mqttoptions, requests_rx);
45//!
46//!     // loop to reconnect and resume
47//!     loop {
48//!         let mut stream = eventloop.connect().await.unwrap();
49//!         while let Some(item) = stream.next().await {
50//!             println!("Received = {:?}", item);
51//!         }
52//!
53//!         time::delay_for(Duration::from_secs(1)).await;
54//!     }
55//! }
56//! ```
57//!
58//! Eventloop is just a stream which can be polled with tokio
59//! ----------------------
60//! - Plug it into `select!` `join!` to interleave with other streams on the the same thread
61//!
62//! ```ignore
63//! #[tokio::main(core_threads = 1)]
64//! async fn main() {
65//!     let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
66//!     let requests = Vec::new::<Request>();
67//!
68//!     let mut eventloop = eventloop(mqttoptions, requests_rx);
69//!
70//!     // plug it into tokio ecosystem
71//!     let mut stream = eventloop.connect().await.unwrap();
72//! }
73//! ```
74//!
75//! Powerful notification system to control the runtime
76//! ----------------------
77//! Eventloop stream yields all the interesting event ranging for data on the network to
78//! disconnections and reconnections. Use it the way you see fit
79//!
80//! - Resubscribe after reconnection
81//! - Stop after receiving Nth puback
82//!
83//! ```ignore
84//! #[tokio::main(core_threads = 1)]
85//! async fn main() {
86//!     let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
87//!     let (requests_tx, requests_rx) = channel(10);
88//!
89//!     let mut eventloop = eventloop(mqttoptions, requests_rx);
90//!
91//!     // loop to reconnect and resume
92//!     loop {
93//!         let mut stream = eventloop.connect().await.unwrap();
94//!         while let Some(notification) = stream.next().await {
95//!             println!("Received = {:?}", item);
96//!             match notification {
97//!                 Notification::Connect => requests_tx.send(subscribe).unwrap(),
98//!             }
99//!         }
100//!
101//!         time::delay_for(Duration::from_secs(1)).await;
102//!     }
103//! }
104//! ```
105
106#![recursion_limit = "512"]
107
108#[macro_use]
109extern crate log;
110
111use rumq_core::mqtt4::{MqttRead, MqttWrite, Packet};
112use std::io::Cursor;
113use std::time::Duration;
114
115mod eventloop;
116mod network;
117mod state;
118
119pub use eventloop::eventloop;
120pub use eventloop::{EventLoopError, MqttEventLoop};
121pub use state::MqttState;
122
123pub use rumq_core::mqtt4::*;
124
125/// Includes incoming packets from the network and other interesting events happening in the eventloop
126#[derive(Debug)]
127pub enum Notification {
128    /// Incoming publish from the broker
129    Publish(Publish),
130    /// Incoming puback from the broker
131    Puback(PacketIdentifier),
132    /// Incoming pubrec from the broker
133    Pubrec(PacketIdentifier),
134    /// Incoming pubcomp from the broker
135    Pubcomp(PacketIdentifier),
136    /// Incoming suback from the broker
137    Suback(Suback),
138    /// Incoming unsuback from the broker
139    Unsuback(PacketIdentifier),
140    /// Eventloop error
141    Abort(EventLoopError),
142}
143
144/// Requests by the client to mqtt event loop. Request are
145/// handle one by one
146#[derive(Debug)]
147pub enum Request {
148    Publish(Publish),
149    Subscribe(Subscribe),
150    Unsubscribe(Unsubscribe),
151    Reconnect(Connect),
152    Disconnect,
153}
154
155impl From<Publish> for Request {
156    fn from(publish: Publish) -> Request {
157        return Request::Publish(publish);
158    }
159}
160
161impl From<Subscribe> for Request {
162    fn from(subscribe: Subscribe) -> Request {
163        return Request::Subscribe(subscribe);
164    }
165}
166
167impl From<Unsubscribe> for Request {
168    fn from(unsubscribe: Unsubscribe) -> Request {
169        return Request::Unsubscribe(unsubscribe);
170    }
171}
172
173/// From implementations for serialized requests
174/// TODO Probably implement for io::Result<Vec<u8>> if possible?
175impl From<Request> for Vec<u8> {
176    fn from(request: Request) -> Vec<u8> {
177        let mut packet = Cursor::new(Vec::new());
178        let o = match request {
179            Request::Reconnect(connect) => packet.mqtt_write(&Packet::Connect(connect)),
180            Request::Publish(publish) => packet.mqtt_write(&Packet::Publish(publish)),
181            Request::Subscribe(subscribe) => packet.mqtt_write(&Packet::Subscribe(subscribe)),
182            _ => unimplemented!(),
183        };
184
185        o.unwrap();
186        packet.into_inner()
187    }
188}
189
190impl From<Vec<u8>> for Request {
191    fn from(payload: Vec<u8>) -> Request {
192        let mut payload = Cursor::new(payload);
193        let packet = payload.mqtt_read().unwrap();
194
195        match packet {
196            Packet::Connect(connect) => Request::Reconnect(connect),
197            Packet::Publish(publish) => Request::Publish(publish),
198            Packet::Subscribe(subscribe) => Request::Subscribe(subscribe),
199            _ => unimplemented!(),
200        }
201    }
202}
203
204/// Commands sent by the client to mqtt event loop. Commands
205/// are of higher priority and will be `select`ed along with
206/// [request]s
207///
208/// request: enum.Request.html
209#[derive(Debug)]
210pub enum Command {
211    Pause,
212    Resume,
213}
214
215/// Client authentication option for mqtt connect packet
216#[derive(Clone, Debug)]
217pub enum SecurityOptions {
218    /// No authentication.
219    None,
220    /// Use the specified `(username, password)` tuple to authenticate.
221    UsernamePassword(String, String),
222}
223
224// TODO: Should all the options be exposed as public? Drawback
225// would be loosing the ability to panic when the user options
226// are wrong (e.g empty client id) or aggressive (keep alive time)
227/// Options to configure the behaviour of mqtt connection
228#[derive(Clone, Debug)]
229pub struct MqttOptions {
230    /// broker address that you want to connect to
231    broker_addr: String,
232    /// broker port
233    port: u16,
234    /// keep alive time to send pingreq to broker when the connection is idle
235    keep_alive: Duration,
236    /// clean (or) persistent session
237    clean_session: bool,
238    /// client identifier
239    client_id: String,
240    /// connection method
241    ca: Option<Vec<u8>>,
242    /// tls client_authentication
243    client_auth: Option<(Vec<u8>, Vec<u8>)>,
244    /// alpn settings
245    alpn: Option<Vec<Vec<u8>>>,
246    /// username and password
247    credentials: Option<(String, String)>,
248    /// maximum packet size
249    max_packet_size: usize,
250    /// request (publish, subscribe) channel capacity
251    request_channel_capacity: usize,
252    /// notification channel capacity
253    notification_channel_capacity: usize,
254    /// Minimum delay time between consecutive outgoing packets
255    throttle: Duration,
256    /// maximum number of outgoing inflight messages
257    inflight: usize,
258    /// Last will that will be issued on unexpected disconnect
259    last_will: Option<LastWill>,
260}
261
262impl MqttOptions {
263    /// New mqtt options
264    pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
265        let id = id.into();
266        if id.starts_with(' ') || id.is_empty() {
267            panic!("Invalid client id")
268        }
269
270        MqttOptions {
271            broker_addr: host.into(),
272            port,
273            keep_alive: Duration::from_secs(60),
274            clean_session: true,
275            client_id: id,
276            ca: None,
277            client_auth: None,
278            alpn: None,
279            credentials: None,
280            max_packet_size: 256 * 1024,
281            request_channel_capacity: 10,
282            notification_channel_capacity: 10,
283            throttle: Duration::from_micros(0),
284            inflight: 100,
285            last_will: None,
286        }
287    }
288
289    /// Broker address
290    pub fn broker_address(&self) -> (String, u16) {
291        (self.broker_addr.clone(), self.port)
292    }
293
294    pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
295        self.last_will = Some(will);
296        self
297    }
298
299    pub fn last_will(&mut self) -> Option<LastWill> {
300        self.last_will.clone()
301    }
302
303    pub fn set_ca(&mut self, ca: Vec<u8>) -> &mut Self {
304        self.ca = Some(ca);
305        self
306    }
307
308    pub fn ca(&self) -> Option<Vec<u8>> {
309        self.ca.clone()
310    }
311
312    pub fn set_client_auth(&mut self, cert: Vec<u8>, key: Vec<u8>) -> &mut Self {
313        self.client_auth = Some((cert, key));
314        self
315    }
316
317    pub fn client_auth(&self) -> Option<(Vec<u8>, Vec<u8>)> {
318        self.client_auth.clone()
319    }
320
321    pub fn set_alpn(&mut self, alpn: Vec<Vec<u8>>) -> &mut Self {
322        self.alpn = Some(alpn);
323        self
324    }
325
326    pub fn alpn(&self) -> Option<Vec<Vec<u8>>> {
327        self.alpn.clone()
328    }
329
330    /// Set number of seconds after which client should ping the broker
331    /// if there is no other data exchange
332    pub fn set_keep_alive(&mut self, secs: u16) -> &mut Self {
333        if secs < 5 {
334            panic!("Keep alives should be >= 5  secs");
335        }
336
337        self.keep_alive = Duration::from_secs(u64::from(secs));
338        self
339    }
340
341    /// Keep alive time
342    pub fn keep_alive(&self) -> Duration {
343        self.keep_alive
344    }
345
346    /// Client identifier
347    pub fn client_id(&self) -> String {
348        self.client_id.clone()
349    }
350
351    /// Set packet size limit (in Kilo Bytes)
352    pub fn set_max_packet_size(&mut self, sz: usize) -> &mut Self {
353        self.max_packet_size = sz * 1024;
354        self
355    }
356
357    /// Maximum packet size
358    pub fn max_packet_size(&self) -> usize {
359        self.max_packet_size
360    }
361
362    /// `clean_session = true` removes all the state from queues & instructs the broker
363    /// to clean all the client state when client disconnects.
364    ///
365    /// When set `false`, broker will hold the client state and performs pending
366    /// operations on the client when reconnection with same `client_id`
367    /// happens. Local queue state is also held to retransmit packets after reconnection.
368    pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
369        self.clean_session = clean_session;
370        self
371    }
372
373    /// Clean session
374    pub fn clean_session(&self) -> bool {
375        self.clean_session
376    }
377
378    /// Username and password
379    pub fn set_credentials<S: Into<String>>(&mut self, username: S, password: S) -> &mut Self {
380        self.credentials = Some((username.into(), password.into()));
381        self
382    }
383
384    /// Security options
385    pub fn credentials(&self) -> Option<(String, String)> {
386        self.credentials.clone()
387    }
388
389    /// Set notification channel capacity
390    pub fn set_notification_channel_capacity(&mut self, capacity: usize) -> &mut Self {
391        self.notification_channel_capacity = capacity;
392        self
393    }
394
395    /// Notification channel capacity
396    pub fn notification_channel_capacity(&self) -> usize {
397        self.notification_channel_capacity
398    }
399
400    /// Set request channel capacity
401    pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
402        self.request_channel_capacity = capacity;
403        self
404    }
405
406    /// Request channel capacity
407    pub fn request_channel_capacity(&self) -> usize {
408        self.request_channel_capacity
409    }
410
411    /// Enables throttling and sets outoing message rate to the specified 'rate'
412    pub fn set_throttle(&mut self, duration: Duration) -> &mut Self {
413        self.throttle = duration;
414        self
415    }
416
417    /// Outgoing message rate
418    pub fn throttle(&self) -> Duration {
419        self.throttle
420    }
421
422    /// Set number of concurrent in flight messages
423    pub fn set_inflight(&mut self, inflight: usize) -> &mut Self {
424        if inflight == 0 {
425            panic!("zero in flight is not allowed")
426        }
427
428        self.inflight = inflight;
429        self
430    }
431
432    /// Number of concurrent in flight messages
433    pub fn inflight(&self) -> usize {
434        self.inflight
435    }
436}
437
438#[cfg(test)]
439mod test {
440    use super::MqttOptions;
441
442    #[test]
443    #[should_panic]
444    fn client_id_startswith_space() {
445        let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883).set_clean_session(true);
446    }
447
448    #[test]
449    #[should_panic]
450    fn no_client_id() {
451        let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
452    }
453}