aws_iot_device_sdk_rust/
async_client.rs

1use crate::error;
2use crate::settings::{get_mqtt_options_async, AWSIoTSettings};
3use rumqttc::{
4    self, AsyncClient, ClientError, ConnectionError, Event, EventLoop, Incoming, NetworkOptions,
5    QoS,
6};
7use tokio::sync::broadcast::{self, Receiver, Sender};
8
9pub async fn async_event_loop_listener(
10    (mut eventloop, incoming_event_sender): (EventLoop, Sender<Incoming>),
11) -> Result<(), ConnectionError> {
12    loop {
13        match eventloop.poll().await {
14            Ok(event) => {
15                if let Event::Incoming(i) = event {
16                    if let Err(e) = incoming_event_sender.send(i) {
17                        println!("Error sending incoming event: {:?}", e);
18                    }
19                }
20            }
21            Err(e) => {
22                println!("AWS IoT client error: {:?}", e);
23            }
24        }
25    }
26}
27
28pub struct AWSIoTAsyncClient {
29    client: AsyncClient,
30    incoming_event_sender: Sender<Incoming>,
31}
32
33impl AWSIoTAsyncClient {
34    /// Create new AWSIoTAsyncClient. Input argument should be the AWSIoTSettings. Returns a tuple where the first element is the
35    /// AWSIoTAsyncClient, and the second element is a new tuple with the eventloop and incoming
36    /// event sender. This tuple should be sent as an argument to the async_event_loop_listener.
37    pub async fn new(
38        settings: AWSIoTSettings,
39    ) -> Result<(AWSIoTAsyncClient, (EventLoop, Sender<Incoming>)), error::AWSIoTError> {
40        let timeout = settings
41            .mqtt_options_overrides
42            .as_ref()
43            .and_then(|opts| opts.conn_timeout);
44        let mqtt_options = get_mqtt_options_async(settings).await?;
45
46        let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);
47        if let Some(timeout) = timeout {
48            let mut network_options = NetworkOptions::new();
49            network_options.set_connection_timeout(timeout);
50            eventloop.set_network_options(network_options);
51        }
52
53        let (request_tx, _) = broadcast::channel(50);
54        Ok((
55            AWSIoTAsyncClient {
56                client,
57                incoming_event_sender: request_tx.clone(),
58            },
59            (eventloop, request_tx),
60        ))
61    }
62
63    /// Subscribe to a topic.
64    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
65        self.client.subscribe(topic, qos).await
66    }
67
68    /// Publish to topic.
69    pub async fn publish<S, V>(&self, topic: S, qos: QoS, payload: V) -> Result<(), ClientError>
70    where
71        S: Into<String>,
72        V: Into<Vec<u8>>,
73    {
74        self.client.publish(topic, qos, false, payload).await
75    }
76
77    /// Get a receiver of the incoming messages. Send this to any function that wants to read the
78    /// incoming messages from IoT Core.
79    pub async fn get_receiver(&self) -> Receiver<Incoming> {
80        self.incoming_event_sender.subscribe()
81    }
82
83    /// If you want to use the Rumqttc AsyncClient and EventLoop manually, this method can be used
84    /// to get the AsyncClient.
85    pub async fn get_client(self) -> AsyncClient {
86        self.client
87    }
88}