1use crate::mqttbytes::{self, v4::*, QoS};
4use crate::{ConnectionError, Event, EventLoop, MqttOptions, Request};
5
6use async_channel::{SendError, Sender, TrySendError};
7use bytes::Bytes;
8use std::mem;
9use tokio::runtime;
10use tokio::runtime::Runtime;
11
12#[derive(Debug, thiserror::Error)]
14pub enum ClientError {
15 #[error("Failed to send cancel request to eventloop")]
16 Cancel(#[from] SendError<()>),
17 #[error("Failed to send mqtt requests to eventloop")]
18 Request(#[from] SendError<Request>),
19 #[error("Failed to send mqtt requests to eventloop")]
20 TryRequest(#[from] TrySendError<Request>),
21 #[error("Serialization error")]
22 Mqtt4(mqttbytes::Error),
23}
24
25#[derive(Clone, Debug)]
28pub struct AsyncClient {
29 request_tx: Sender<Request>,
30 cancel_tx: Sender<()>,
31}
32
33impl AsyncClient {
34 pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
36 let mut eventloop = EventLoop::new(options, cap);
37 let request_tx = eventloop.handle();
38 let cancel_tx = eventloop.cancel_handle();
39
40 let client = AsyncClient {
41 request_tx,
42 cancel_tx,
43 };
44
45 (client, eventloop)
46 }
47
48 pub fn from_senders(request_tx: Sender<Request>, cancel_tx: Sender<()>) -> AsyncClient {
51 AsyncClient {
52 request_tx,
53 cancel_tx,
54 }
55 }
56
57 pub async fn publish<S, V>(
59 &self,
60 topic: S,
61 qos: QoS,
62 retain: bool,
63 payload: V,
64 ) -> Result<(), ClientError>
65 where
66 S: Into<String>,
67 V: Into<Vec<u8>>,
68 {
69 let mut publish = Publish::new(topic, qos, payload);
70 publish.retain = retain;
71 let publish = Request::Publish(publish);
72 self.request_tx.send(publish).await?;
73 Ok(())
74 }
75
76 pub fn try_publish<S, V>(
78 &self,
79 topic: S,
80 qos: QoS,
81 retain: bool,
82 payload: V,
83 ) -> Result<(), ClientError>
84 where
85 S: Into<String>,
86 V: Into<Vec<u8>>,
87 {
88 let mut publish = Publish::new(topic, qos, payload);
89 publish.retain = retain;
90 let publish = Request::Publish(publish);
91 self.request_tx.try_send(publish)?;
92 Ok(())
93 }
94
95 pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
97 let ack = get_ack_req(publish);
98
99 if let Some(ack) = ack {
100 self.request_tx.send(ack).await?;
101 }
102 Ok(())
103 }
104
105 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
107 let ack = get_ack_req(publish);
108 if let Some(ack) = ack {
109 self.request_tx.try_send(ack)?;
110 }
111 Ok(())
112 }
113
114 pub async fn publish_bytes<S>(
116 &self,
117 topic: S,
118 qos: QoS,
119 retain: bool,
120 payload: Bytes,
121 ) -> Result<(), ClientError>
122 where
123 S: Into<String>,
124 {
125 let mut publish = Publish::from_bytes(topic, qos, payload);
126 publish.retain = retain;
127 let publish = Request::Publish(publish);
128 self.request_tx.send(publish).await?;
129 Ok(())
130 }
131
132 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
134 let subscribe = Subscribe::new(topic.into(), qos);
135 let request = Request::Subscribe(subscribe);
136 self.request_tx.send(request).await?;
137 Ok(())
138 }
139
140 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
142 let subscribe = Subscribe::new(topic.into(), qos);
143 let request = Request::Subscribe(subscribe);
144 self.request_tx.try_send(request)?;
145 Ok(())
146 }
147
148 pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
150 where
151 T: IntoIterator<Item = SubscribeFilter>,
152 {
153 let subscribe = Subscribe::new_many(topics);
154 let request = Request::Subscribe(subscribe);
155 self.request_tx.send(request).await?;
156 Ok(())
157 }
158
159 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
161 where
162 T: IntoIterator<Item = SubscribeFilter>,
163 {
164 let subscribe = Subscribe::new_many(topics);
165 let request = Request::Subscribe(subscribe);
166 self.request_tx.try_send(request)?;
167 Ok(())
168 }
169
170 pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
172 let unsubscribe = Unsubscribe::new(topic.into());
173 let request = Request::Unsubscribe(unsubscribe);
174 self.request_tx.send(request).await?;
175 Ok(())
176 }
177
178 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
180 let unsubscribe = Unsubscribe::new(topic.into());
181 let request = Request::Unsubscribe(unsubscribe);
182 self.request_tx.try_send(request)?;
183 Ok(())
184 }
185
186 pub async fn disconnect(&self) -> Result<(), ClientError> {
188 let request = Request::Disconnect;
189 self.request_tx.send(request).await?;
190 Ok(())
191 }
192
193 pub fn try_disconnect(&self) -> Result<(), ClientError> {
195 let request = Request::Disconnect;
196 self.request_tx.try_send(request)?;
197 Ok(())
198 }
199
200 pub async fn cancel(&self) -> Result<(), ClientError> {
202 self.cancel_tx.send(()).await?;
203 Ok(())
204 }
205}
206
207fn get_ack_req(publish: &Publish) -> Option<Request> {
208 let ack = match publish.qos {
209 QoS::AtMostOnce => return None,
210 QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid)),
211 QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid)),
212 };
213 Some(ack)
214}
215
216#[derive(Clone)]
221pub struct Client {
222 client: AsyncClient,
223}
224
225impl Client {
226 pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
228 let (client, eventloop) = AsyncClient::new(options, cap);
229 let client = Client { client };
230 let runtime = runtime::Builder::new_current_thread()
231 .enable_all()
232 .build()
233 .unwrap();
234
235 let connection = Connection::new(eventloop, runtime);
236 (client, connection)
237 }
238
239 pub fn publish<S, V>(
241 &mut self,
242 topic: S,
243 qos: QoS,
244 retain: bool,
245 payload: V,
246 ) -> Result<(), ClientError>
247 where
248 S: Into<String>,
249 V: Into<Vec<u8>>,
250 {
251 pollster::block_on(self.client.publish(topic, qos, retain, payload))?;
252 Ok(())
253 }
254
255 pub fn try_publish<S, V>(
256 &mut self,
257 topic: S,
258 qos: QoS,
259 retain: bool,
260 payload: V,
261 ) -> Result<(), ClientError>
262 where
263 S: Into<String>,
264 V: Into<Vec<u8>>,
265 {
266 self.client.try_publish(topic, qos, retain, payload)?;
267 Ok(())
268 }
269
270 pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
272 pollster::block_on(self.client.ack(publish))?;
273 Ok(())
274 }
275
276 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
278 self.client.try_ack(publish)?;
279 Ok(())
280 }
281
282 pub fn subscribe<S: Into<String>>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> {
284 pollster::block_on(self.client.subscribe(topic, qos))?;
285 Ok(())
286 }
287
288 pub fn try_subscribe<S: Into<String>>(
290 &mut self,
291 topic: S,
292 qos: QoS,
293 ) -> Result<(), ClientError> {
294 self.client.try_subscribe(topic, qos)?;
295 Ok(())
296 }
297
298 pub fn subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
300 where
301 T: IntoIterator<Item = SubscribeFilter>,
302 {
303 pollster::block_on(self.client.subscribe_many(topics))
304 }
305
306 pub fn try_subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
307 where
308 T: IntoIterator<Item = SubscribeFilter>,
309 {
310 self.client.try_subscribe_many(topics)
311 }
312
313 pub fn unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
315 pollster::block_on(self.client.unsubscribe(topic))?;
316 Ok(())
317 }
318
319 pub fn try_unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
321 self.client.try_unsubscribe(topic)?;
322 Ok(())
323 }
324
325 pub fn disconnect(&mut self) -> Result<(), ClientError> {
327 pollster::block_on(self.client.disconnect())?;
328 Ok(())
329 }
330
331 pub fn try_disconnect(&mut self) -> Result<(), ClientError> {
333 self.client.try_disconnect()?;
334 Ok(())
335 }
336
337 pub fn cancel(&mut self) -> Result<(), ClientError> {
339 pollster::block_on(self.client.cancel())?;
340 Ok(())
341 }
342}
343
344pub struct Connection {
346 pub eventloop: EventLoop,
347 runtime: Option<Runtime>,
348}
349
350impl Connection {
351 fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
352 Connection {
353 eventloop,
354 runtime: Some(runtime),
355 }
356 }
357
358 #[must_use = "Connection should be iterated over a loop to make progress"]
363 pub fn iter(&mut self) -> Iter {
364 let runtime = self.runtime.take().unwrap();
365 Iter {
366 connection: self,
367 runtime,
368 }
369 }
370}
371
372pub struct Iter<'a> {
374 connection: &'a mut Connection,
375 runtime: runtime::Runtime,
376}
377
378impl<'a> Iterator for Iter<'a> {
379 type Item = Result<Event, ConnectionError>;
380
381 fn next(&mut self) -> Option<Self::Item> {
382 let f = self.connection.eventloop.poll();
383 match self.runtime.block_on(f) {
384 Ok(v) => Some(Ok(v)),
385 Err(ConnectionError::RequestsDone) => {
387 trace!("Done with requests");
388 None
389 }
390 Err(ConnectionError::Cancel) => {
391 trace!("Cancellation request received");
392 None
393 }
394 Err(e) => Some(Err(e)),
395 }
396 }
397}
398
399impl<'a> Drop for Iter<'a> {
400 fn drop(&mut self) {
401 let runtime = runtime::Builder::new_current_thread().build().unwrap();
403 self.connection.runtime = Some(mem::replace(&mut self.runtime, runtime));
404 }
405}