rumq_client/lib.rs
1//! A pure rust mqtt client which strives to be robust, efficient and easy to use.
2//! * Eventloop is just an async `Stream` which can be polled by tokio
3//! * Requests to eventloop is also a `Stream`. Solves both bounded an unbounded usecases
4//! * Robustness just a loop away
5//! * Flexible access to the state of eventloop to control its behaviour
6//!
7//! Accepts any stream of Requests
8//! ----------------------------
9//! Build bounded, unbounded, interruptible or any other stream (that fits your need) to
10//! feed the eventloop.
11//!
12//! **Few of our real world use cases**
13//!
14//! - A stream which orchestrates data between disk and memory by detecting backpressure and never (practically) loose data
15//! - A stream which juggles data between several channels based on priority of the data
16//!
17//! ```ignore
18//! #[tokio::main(core_threads = 1)]
19//! async fn main() {
20//! let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
21//! let requests = Vec::new::<Request>();
22//!
23//! let mut eventloop = eventloop(mqttoptions, requests_rx);
24//! let mut stream = eventloop.connect().await.unwrap();
25//! while let Some(item) = stream.next().await {
26//! println!("Received = {:?}", item);
27//! }
28//! }
29//! ```
30//! Robustness a loop away
31//! ----------------------
32//! Networks are unreliable. But robustness is easy
33//!
34//! - Just create a new stream from the existing eventloop
35//! - Resumes from where it left
36//! - Access the state of the eventloop to customize the behaviour of the next connection
37//!
38//! ```ignore
39//! #[tokio::main(core_threads = 1)]
40//! async fn main() {
41//! let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
42//! let requests = Vec::new::<Request>();
43//!
44//! let mut eventloop = eventloop(mqttoptions, requests_rx);
45//!
46//! // loop to reconnect and resume
47//! loop {
48//! let mut stream = eventloop.connect().await.unwrap();
49//! while let Some(item) = stream.next().await {
50//! println!("Received = {:?}", item);
51//! }
52//!
53//! time::delay_for(Duration::from_secs(1)).await;
54//! }
55//! }
56//! ```
57//!
58//! Eventloop is just a stream which can be polled with tokio
59//! ----------------------
60//! - Plug it into `select!` `join!` to interleave with other streams on the the same thread
61//!
62//! ```ignore
63//! #[tokio::main(core_threads = 1)]
64//! async fn main() {
65//! let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
66//! let requests = Vec::new::<Request>();
67//!
68//! let mut eventloop = eventloop(mqttoptions, requests_rx);
69//!
70//! // plug it into tokio ecosystem
71//! let mut stream = eventloop.connect().await.unwrap();
72//! }
73//! ```
74//!
75//! Powerful notification system to control the runtime
76//! ----------------------
77//! Eventloop stream yields all the interesting event ranging for data on the network to
78//! disconnections and reconnections. Use it the way you see fit
79//!
80//! - Resubscribe after reconnection
81//! - Stop after receiving Nth puback
82//!
83//! ```ignore
84//! #[tokio::main(core_threads = 1)]
85//! async fn main() {
86//! let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
87//! let (requests_tx, requests_rx) = channel(10);
88//!
89//! let mut eventloop = eventloop(mqttoptions, requests_rx);
90//!
91//! // loop to reconnect and resume
92//! loop {
93//! let mut stream = eventloop.connect().await.unwrap();
94//! while let Some(notification) = stream.next().await {
95//! println!("Received = {:?}", item);
96//! match notification {
97//! Notification::Connect => requests_tx.send(subscribe).unwrap(),
98//! }
99//! }
100//!
101//! time::delay_for(Duration::from_secs(1)).await;
102//! }
103//! }
104//! ```
105
106#![recursion_limit = "512"]
107
108#[macro_use]
109extern crate log;
110
111use rumq_core::mqtt4::{MqttRead, MqttWrite, Packet};
112use std::io::Cursor;
113use std::time::Duration;
114
115mod eventloop;
116mod network;
117mod state;
118
119pub use eventloop::eventloop;
120pub use eventloop::{EventLoopError, MqttEventLoop};
121pub use state::MqttState;
122
123pub use rumq_core::mqtt4::*;
124
125/// Includes incoming packets from the network and other interesting events happening in the eventloop
126#[derive(Debug)]
127pub enum Notification {
128 /// Incoming publish from the broker
129 Publish(Publish),
130 /// Incoming puback from the broker
131 Puback(PacketIdentifier),
132 /// Incoming pubrec from the broker
133 Pubrec(PacketIdentifier),
134 /// Incoming pubcomp from the broker
135 Pubcomp(PacketIdentifier),
136 /// Incoming suback from the broker
137 Suback(Suback),
138 /// Incoming unsuback from the broker
139 Unsuback(PacketIdentifier),
140 /// Eventloop error
141 Abort(EventLoopError),
142}
143
144/// Requests by the client to mqtt event loop. Request are
145/// handle one by one
146#[derive(Debug)]
147pub enum Request {
148 Publish(Publish),
149 Subscribe(Subscribe),
150 Unsubscribe(Unsubscribe),
151 Reconnect(Connect),
152 Disconnect,
153}
154
155impl From<Publish> for Request {
156 fn from(publish: Publish) -> Request {
157 return Request::Publish(publish);
158 }
159}
160
161impl From<Subscribe> for Request {
162 fn from(subscribe: Subscribe) -> Request {
163 return Request::Subscribe(subscribe);
164 }
165}
166
167impl From<Unsubscribe> for Request {
168 fn from(unsubscribe: Unsubscribe) -> Request {
169 return Request::Unsubscribe(unsubscribe);
170 }
171}
172
173/// From implementations for serialized requests
174/// TODO Probably implement for io::Result<Vec<u8>> if possible?
175impl From<Request> for Vec<u8> {
176 fn from(request: Request) -> Vec<u8> {
177 let mut packet = Cursor::new(Vec::new());
178 let o = match request {
179 Request::Reconnect(connect) => packet.mqtt_write(&Packet::Connect(connect)),
180 Request::Publish(publish) => packet.mqtt_write(&Packet::Publish(publish)),
181 Request::Subscribe(subscribe) => packet.mqtt_write(&Packet::Subscribe(subscribe)),
182 _ => unimplemented!(),
183 };
184
185 o.unwrap();
186 packet.into_inner()
187 }
188}
189
190impl From<Vec<u8>> for Request {
191 fn from(payload: Vec<u8>) -> Request {
192 let mut payload = Cursor::new(payload);
193 let packet = payload.mqtt_read().unwrap();
194
195 match packet {
196 Packet::Connect(connect) => Request::Reconnect(connect),
197 Packet::Publish(publish) => Request::Publish(publish),
198 Packet::Subscribe(subscribe) => Request::Subscribe(subscribe),
199 _ => unimplemented!(),
200 }
201 }
202}
203
204/// Commands sent by the client to mqtt event loop. Commands
205/// are of higher priority and will be `select`ed along with
206/// [request]s
207///
208/// request: enum.Request.html
209#[derive(Debug)]
210pub enum Command {
211 Pause,
212 Resume,
213}
214
215/// Client authentication option for mqtt connect packet
216#[derive(Clone, Debug)]
217pub enum SecurityOptions {
218 /// No authentication.
219 None,
220 /// Use the specified `(username, password)` tuple to authenticate.
221 UsernamePassword(String, String),
222}
223
224// TODO: Should all the options be exposed as public? Drawback
225// would be loosing the ability to panic when the user options
226// are wrong (e.g empty client id) or aggressive (keep alive time)
227/// Options to configure the behaviour of mqtt connection
228#[derive(Clone, Debug)]
229pub struct MqttOptions {
230 /// broker address that you want to connect to
231 broker_addr: String,
232 /// broker port
233 port: u16,
234 /// keep alive time to send pingreq to broker when the connection is idle
235 keep_alive: Duration,
236 /// clean (or) persistent session
237 clean_session: bool,
238 /// client identifier
239 client_id: String,
240 /// connection method
241 ca: Option<Vec<u8>>,
242 /// tls client_authentication
243 client_auth: Option<(Vec<u8>, Vec<u8>)>,
244 /// alpn settings
245 alpn: Option<Vec<Vec<u8>>>,
246 /// username and password
247 credentials: Option<(String, String)>,
248 /// maximum packet size
249 max_packet_size: usize,
250 /// request (publish, subscribe) channel capacity
251 request_channel_capacity: usize,
252 /// notification channel capacity
253 notification_channel_capacity: usize,
254 /// Minimum delay time between consecutive outgoing packets
255 throttle: Duration,
256 /// maximum number of outgoing inflight messages
257 inflight: usize,
258 /// Last will that will be issued on unexpected disconnect
259 last_will: Option<LastWill>,
260}
261
262impl MqttOptions {
263 /// New mqtt options
264 pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
265 let id = id.into();
266 if id.starts_with(' ') || id.is_empty() {
267 panic!("Invalid client id")
268 }
269
270 MqttOptions {
271 broker_addr: host.into(),
272 port,
273 keep_alive: Duration::from_secs(60),
274 clean_session: true,
275 client_id: id,
276 ca: None,
277 client_auth: None,
278 alpn: None,
279 credentials: None,
280 max_packet_size: 256 * 1024,
281 request_channel_capacity: 10,
282 notification_channel_capacity: 10,
283 throttle: Duration::from_micros(0),
284 inflight: 100,
285 last_will: None,
286 }
287 }
288
289 /// Broker address
290 pub fn broker_address(&self) -> (String, u16) {
291 (self.broker_addr.clone(), self.port)
292 }
293
294 pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
295 self.last_will = Some(will);
296 self
297 }
298
299 pub fn last_will(&mut self) -> Option<LastWill> {
300 self.last_will.clone()
301 }
302
303 pub fn set_ca(&mut self, ca: Vec<u8>) -> &mut Self {
304 self.ca = Some(ca);
305 self
306 }
307
308 pub fn ca(&self) -> Option<Vec<u8>> {
309 self.ca.clone()
310 }
311
312 pub fn set_client_auth(&mut self, cert: Vec<u8>, key: Vec<u8>) -> &mut Self {
313 self.client_auth = Some((cert, key));
314 self
315 }
316
317 pub fn client_auth(&self) -> Option<(Vec<u8>, Vec<u8>)> {
318 self.client_auth.clone()
319 }
320
321 pub fn set_alpn(&mut self, alpn: Vec<Vec<u8>>) -> &mut Self {
322 self.alpn = Some(alpn);
323 self
324 }
325
326 pub fn alpn(&self) -> Option<Vec<Vec<u8>>> {
327 self.alpn.clone()
328 }
329
330 /// Set number of seconds after which client should ping the broker
331 /// if there is no other data exchange
332 pub fn set_keep_alive(&mut self, secs: u16) -> &mut Self {
333 if secs < 5 {
334 panic!("Keep alives should be >= 5 secs");
335 }
336
337 self.keep_alive = Duration::from_secs(u64::from(secs));
338 self
339 }
340
341 /// Keep alive time
342 pub fn keep_alive(&self) -> Duration {
343 self.keep_alive
344 }
345
346 /// Client identifier
347 pub fn client_id(&self) -> String {
348 self.client_id.clone()
349 }
350
351 /// Set packet size limit (in Kilo Bytes)
352 pub fn set_max_packet_size(&mut self, sz: usize) -> &mut Self {
353 self.max_packet_size = sz * 1024;
354 self
355 }
356
357 /// Maximum packet size
358 pub fn max_packet_size(&self) -> usize {
359 self.max_packet_size
360 }
361
362 /// `clean_session = true` removes all the state from queues & instructs the broker
363 /// to clean all the client state when client disconnects.
364 ///
365 /// When set `false`, broker will hold the client state and performs pending
366 /// operations on the client when reconnection with same `client_id`
367 /// happens. Local queue state is also held to retransmit packets after reconnection.
368 pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
369 self.clean_session = clean_session;
370 self
371 }
372
373 /// Clean session
374 pub fn clean_session(&self) -> bool {
375 self.clean_session
376 }
377
378 /// Username and password
379 pub fn set_credentials<S: Into<String>>(&mut self, username: S, password: S) -> &mut Self {
380 self.credentials = Some((username.into(), password.into()));
381 self
382 }
383
384 /// Security options
385 pub fn credentials(&self) -> Option<(String, String)> {
386 self.credentials.clone()
387 }
388
389 /// Set notification channel capacity
390 pub fn set_notification_channel_capacity(&mut self, capacity: usize) -> &mut Self {
391 self.notification_channel_capacity = capacity;
392 self
393 }
394
395 /// Notification channel capacity
396 pub fn notification_channel_capacity(&self) -> usize {
397 self.notification_channel_capacity
398 }
399
400 /// Set request channel capacity
401 pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
402 self.request_channel_capacity = capacity;
403 self
404 }
405
406 /// Request channel capacity
407 pub fn request_channel_capacity(&self) -> usize {
408 self.request_channel_capacity
409 }
410
411 /// Enables throttling and sets outoing message rate to the specified 'rate'
412 pub fn set_throttle(&mut self, duration: Duration) -> &mut Self {
413 self.throttle = duration;
414 self
415 }
416
417 /// Outgoing message rate
418 pub fn throttle(&self) -> Duration {
419 self.throttle
420 }
421
422 /// Set number of concurrent in flight messages
423 pub fn set_inflight(&mut self, inflight: usize) -> &mut Self {
424 if inflight == 0 {
425 panic!("zero in flight is not allowed")
426 }
427
428 self.inflight = inflight;
429 self
430 }
431
432 /// Number of concurrent in flight messages
433 pub fn inflight(&self) -> usize {
434 self.inflight
435 }
436}
437
438#[cfg(test)]
439mod test {
440 use super::MqttOptions;
441
442 #[test]
443 #[should_panic]
444 fn client_id_startswith_space() {
445 let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883).set_clean_session(true);
446 }
447
448 #[test]
449 #[should_panic]
450 fn no_client_id() {
451 let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
452 }
453}