aws_iot_device_sdk_rust/
async_client.rs1use crate::error;
2use crate::settings::{get_mqtt_options_async, AWSIoTSettings};
3use rumqttc::{
4 self, AsyncClient, ClientError, ConnectionError, Event, EventLoop, Incoming, NetworkOptions,
5 QoS,
6};
7use tokio::sync::broadcast::{self, Receiver, Sender};
8
9pub async fn async_event_loop_listener(
10 (mut eventloop, incoming_event_sender): (EventLoop, Sender<Incoming>),
11) -> Result<(), ConnectionError> {
12 loop {
13 match eventloop.poll().await {
14 Ok(event) => {
15 if let Event::Incoming(i) = event {
16 if let Err(e) = incoming_event_sender.send(i) {
17 println!("Error sending incoming event: {:?}", e);
18 }
19 }
20 }
21 Err(e) => {
22 println!("AWS IoT client error: {:?}", e);
23 }
24 }
25 }
26}
27
28pub struct AWSIoTAsyncClient {
29 client: AsyncClient,
30 incoming_event_sender: Sender<Incoming>,
31}
32
33impl AWSIoTAsyncClient {
34 pub async fn new(
38 settings: AWSIoTSettings,
39 ) -> Result<(AWSIoTAsyncClient, (EventLoop, Sender<Incoming>)), error::AWSIoTError> {
40 let timeout = settings
41 .mqtt_options_overrides
42 .as_ref()
43 .and_then(|opts| opts.conn_timeout);
44 let mqtt_options = get_mqtt_options_async(settings).await?;
45
46 let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);
47 if let Some(timeout) = timeout {
48 let mut network_options = NetworkOptions::new();
49 network_options.set_connection_timeout(timeout);
50 eventloop.set_network_options(network_options);
51 }
52
53 let (request_tx, _) = broadcast::channel(50);
54 Ok((
55 AWSIoTAsyncClient {
56 client,
57 incoming_event_sender: request_tx.clone(),
58 },
59 (eventloop, request_tx),
60 ))
61 }
62
63 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
65 self.client.subscribe(topic, qos).await
66 }
67
68 pub async fn publish<S, V>(&self, topic: S, qos: QoS, payload: V) -> Result<(), ClientError>
70 where
71 S: Into<String>,
72 V: Into<Vec<u8>>,
73 {
74 self.client.publish(topic, qos, false, payload).await
75 }
76
77 pub async fn get_receiver(&self) -> Receiver<Incoming> {
80 self.incoming_event_sender.subscribe()
81 }
82
83 pub async fn get_client(self) -> AsyncClient {
86 self.client
87 }
88}