spotflow_rumqttc_fork/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use crate::mqttbytes::{self, v4::*, QoS};
4use crate::{ConnectionError, Event, EventLoop, MqttOptions, Request};
5
6use async_channel::{SendError, Sender, TrySendError};
7use bytes::Bytes;
8use std::mem;
9use tokio::runtime;
10use tokio::runtime::Runtime;
11
12/// Client Error
13#[derive(Debug, thiserror::Error)]
14pub enum ClientError {
15    #[error("Failed to send cancel request to eventloop")]
16    Cancel(#[from] SendError<()>),
17    #[error("Failed to send mqtt requests to eventloop")]
18    Request(#[from] SendError<Request>),
19    #[error("Failed to send mqtt requests to eventloop")]
20    TryRequest(#[from] TrySendError<Request>),
21    #[error("Serialization error")]
22    Mqtt4(mqttbytes::Error),
23}
24
25/// `AsyncClient` to communicate with MQTT `Eventloop`
26/// This is cloneable and can be used to asynchronously Publish, Subscribe.
27#[derive(Clone, Debug)]
28pub struct AsyncClient {
29    request_tx: Sender<Request>,
30    cancel_tx: Sender<()>,
31}
32
33impl AsyncClient {
34    /// Create a new `AsyncClient`
35    pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
36        let mut eventloop = EventLoop::new(options, cap);
37        let request_tx = eventloop.handle();
38        let cancel_tx = eventloop.cancel_handle();
39
40        let client = AsyncClient {
41            request_tx,
42            cancel_tx,
43        };
44
45        (client, eventloop)
46    }
47
48    /// Create a new `AsyncClient` from a pair of async channel `Sender`s. This is mostly useful for
49    /// creating a test instance.
50    pub fn from_senders(request_tx: Sender<Request>, cancel_tx: Sender<()>) -> AsyncClient {
51        AsyncClient {
52            request_tx,
53            cancel_tx,
54        }
55    }
56
57    /// Sends a MQTT Publish to the eventloop
58    pub async fn publish<S, V>(
59        &self,
60        topic: S,
61        qos: QoS,
62        retain: bool,
63        payload: V,
64    ) -> Result<(), ClientError>
65    where
66        S: Into<String>,
67        V: Into<Vec<u8>>,
68    {
69        let mut publish = Publish::new(topic, qos, payload);
70        publish.retain = retain;
71        let publish = Request::Publish(publish);
72        self.request_tx.send(publish).await?;
73        Ok(())
74    }
75
76    /// Sends a MQTT Publish to the eventloop
77    pub fn try_publish<S, V>(
78        &self,
79        topic: S,
80        qos: QoS,
81        retain: bool,
82        payload: V,
83    ) -> Result<(), ClientError>
84    where
85        S: Into<String>,
86        V: Into<Vec<u8>>,
87    {
88        let mut publish = Publish::new(topic, qos, payload);
89        publish.retain = retain;
90        let publish = Request::Publish(publish);
91        self.request_tx.try_send(publish)?;
92        Ok(())
93    }
94
95    /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
96    pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
97        let ack = get_ack_req(publish);
98
99        if let Some(ack) = ack {
100            self.request_tx.send(ack).await?;
101        }
102        Ok(())
103    }
104
105    /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
106    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
107        let ack = get_ack_req(publish);
108        if let Some(ack) = ack {
109            self.request_tx.try_send(ack)?;
110        }
111        Ok(())
112    }
113
114    /// Sends a MQTT Publish to the eventloop
115    pub async fn publish_bytes<S>(
116        &self,
117        topic: S,
118        qos: QoS,
119        retain: bool,
120        payload: Bytes,
121    ) -> Result<(), ClientError>
122    where
123        S: Into<String>,
124    {
125        let mut publish = Publish::from_bytes(topic, qos, payload);
126        publish.retain = retain;
127        let publish = Request::Publish(publish);
128        self.request_tx.send(publish).await?;
129        Ok(())
130    }
131
132    /// Sends a MQTT Subscribe to the eventloop
133    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
134        let subscribe = Subscribe::new(topic.into(), qos);
135        let request = Request::Subscribe(subscribe);
136        self.request_tx.send(request).await?;
137        Ok(())
138    }
139
140    /// Sends a MQTT Subscribe to the eventloop
141    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
142        let subscribe = Subscribe::new(topic.into(), qos);
143        let request = Request::Subscribe(subscribe);
144        self.request_tx.try_send(request)?;
145        Ok(())
146    }
147
148    /// Sends a MQTT Subscribe for multiple topics to the eventloop
149    pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
150    where
151        T: IntoIterator<Item = SubscribeFilter>,
152    {
153        let subscribe = Subscribe::new_many(topics);
154        let request = Request::Subscribe(subscribe);
155        self.request_tx.send(request).await?;
156        Ok(())
157    }
158
159    /// Sends a MQTT Subscribe for multiple topics to the eventloop
160    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
161    where
162        T: IntoIterator<Item = SubscribeFilter>,
163    {
164        let subscribe = Subscribe::new_many(topics);
165        let request = Request::Subscribe(subscribe);
166        self.request_tx.try_send(request)?;
167        Ok(())
168    }
169
170    /// Sends a MQTT Unsubscribe to the eventloop
171    pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
172        let unsubscribe = Unsubscribe::new(topic.into());
173        let request = Request::Unsubscribe(unsubscribe);
174        self.request_tx.send(request).await?;
175        Ok(())
176    }
177
178    /// Sends a MQTT Unsubscribe to the eventloop
179    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
180        let unsubscribe = Unsubscribe::new(topic.into());
181        let request = Request::Unsubscribe(unsubscribe);
182        self.request_tx.try_send(request)?;
183        Ok(())
184    }
185
186    /// Sends a MQTT disconnect to the eventloop
187    pub async fn disconnect(&self) -> Result<(), ClientError> {
188        let request = Request::Disconnect;
189        self.request_tx.send(request).await?;
190        Ok(())
191    }
192
193    /// Sends a MQTT disconnect to the eventloop
194    pub fn try_disconnect(&self) -> Result<(), ClientError> {
195        let request = Request::Disconnect;
196        self.request_tx.try_send(request)?;
197        Ok(())
198    }
199
200    /// Stops the eventloop right away
201    pub async fn cancel(&self) -> Result<(), ClientError> {
202        self.cancel_tx.send(()).await?;
203        Ok(())
204    }
205}
206
207fn get_ack_req(publish: &Publish) -> Option<Request> {
208    let ack = match publish.qos {
209        QoS::AtMostOnce => return None,
210        QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid)),
211        QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid)),
212    };
213    Some(ack)
214}
215
216/// `Client` to communicate with MQTT eventloop `Connection`.
217///
218/// Client is cloneable and can be used to synchronously Publish, Subscribe.
219/// Asynchronous channel handle can also be extracted if necessary
220#[derive(Clone)]
221pub struct Client {
222    client: AsyncClient,
223}
224
225impl Client {
226    /// Create a new `Client`
227    pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
228        let (client, eventloop) = AsyncClient::new(options, cap);
229        let client = Client { client };
230        let runtime = runtime::Builder::new_current_thread()
231            .enable_all()
232            .build()
233            .unwrap();
234
235        let connection = Connection::new(eventloop, runtime);
236        (client, connection)
237    }
238
239    /// Sends a MQTT Publish to the eventloop
240    pub fn publish<S, V>(
241        &mut self,
242        topic: S,
243        qos: QoS,
244        retain: bool,
245        payload: V,
246    ) -> Result<(), ClientError>
247    where
248        S: Into<String>,
249        V: Into<Vec<u8>>,
250    {
251        pollster::block_on(self.client.publish(topic, qos, retain, payload))?;
252        Ok(())
253    }
254
255    pub fn try_publish<S, V>(
256        &mut self,
257        topic: S,
258        qos: QoS,
259        retain: bool,
260        payload: V,
261    ) -> Result<(), ClientError>
262    where
263        S: Into<String>,
264        V: Into<Vec<u8>>,
265    {
266        self.client.try_publish(topic, qos, retain, payload)?;
267        Ok(())
268    }
269
270    /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
271    pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
272        pollster::block_on(self.client.ack(publish))?;
273        Ok(())
274    }
275
276    /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
277    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
278        self.client.try_ack(publish)?;
279        Ok(())
280    }
281
282    /// Sends a MQTT Subscribe to the eventloop
283    pub fn subscribe<S: Into<String>>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> {
284        pollster::block_on(self.client.subscribe(topic, qos))?;
285        Ok(())
286    }
287
288    /// Sends a MQTT Subscribe to the eventloop
289    pub fn try_subscribe<S: Into<String>>(
290        &mut self,
291        topic: S,
292        qos: QoS,
293    ) -> Result<(), ClientError> {
294        self.client.try_subscribe(topic, qos)?;
295        Ok(())
296    }
297
298    /// Sends a MQTT Subscribe for multiple topics to the eventloop
299    pub fn subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
300    where
301        T: IntoIterator<Item = SubscribeFilter>,
302    {
303        pollster::block_on(self.client.subscribe_many(topics))
304    }
305
306    pub fn try_subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
307    where
308        T: IntoIterator<Item = SubscribeFilter>,
309    {
310        self.client.try_subscribe_many(topics)
311    }
312
313    /// Sends a MQTT Unsubscribe to the eventloop
314    pub fn unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
315        pollster::block_on(self.client.unsubscribe(topic))?;
316        Ok(())
317    }
318
319    /// Sends a MQTT Unsubscribe to the eventloop
320    pub fn try_unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
321        self.client.try_unsubscribe(topic)?;
322        Ok(())
323    }
324
325    /// Sends a MQTT disconnect to the eventloop
326    pub fn disconnect(&mut self) -> Result<(), ClientError> {
327        pollster::block_on(self.client.disconnect())?;
328        Ok(())
329    }
330
331    /// Sends a MQTT disconnect to the eventloop
332    pub fn try_disconnect(&mut self) -> Result<(), ClientError> {
333        self.client.try_disconnect()?;
334        Ok(())
335    }
336
337    /// Stops the eventloop right away
338    pub fn cancel(&mut self) -> Result<(), ClientError> {
339        pollster::block_on(self.client.cancel())?;
340        Ok(())
341    }
342}
343
344///  MQTT connection. Maintains all the necessary state
345pub struct Connection {
346    pub eventloop: EventLoop,
347    runtime: Option<Runtime>,
348}
349
350impl Connection {
351    fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
352        Connection {
353            eventloop,
354            runtime: Some(runtime),
355        }
356    }
357
358    /// Returns an iterator over this connection. Iterating over this is all that's
359    /// necessary to make connection progress and maintain a robust connection.
360    /// Just continuing to loop will reconnect
361    /// **NOTE** Don't block this while iterating
362    #[must_use = "Connection should be iterated over a loop to make progress"]
363    pub fn iter(&mut self) -> Iter {
364        let runtime = self.runtime.take().unwrap();
365        Iter {
366            connection: self,
367            runtime,
368        }
369    }
370}
371
372/// Iterator which polls the eventloop for connection progress
373pub struct Iter<'a> {
374    connection: &'a mut Connection,
375    runtime: runtime::Runtime,
376}
377
378impl<'a> Iterator for Iter<'a> {
379    type Item = Result<Event, ConnectionError>;
380
381    fn next(&mut self) -> Option<Self::Item> {
382        let f = self.connection.eventloop.poll();
383        match self.runtime.block_on(f) {
384            Ok(v) => Some(Ok(v)),
385            // closing of request channel should stop the iterator
386            Err(ConnectionError::RequestsDone) => {
387                trace!("Done with requests");
388                None
389            }
390            Err(ConnectionError::Cancel) => {
391                trace!("Cancellation request received");
392                None
393            }
394            Err(e) => Some(Err(e)),
395        }
396    }
397}
398
399impl<'a> Drop for Iter<'a> {
400    fn drop(&mut self) {
401        // TODO: Don't create new runtime in drop
402        let runtime = runtime::Builder::new_current_thread().build().unwrap();
403        self.connection.runtime = Some(mem::replace(&mut self.runtime, runtime));
404    }
405}