use crate::error::AWSIoTError;
use crate::settings::{get_mqtt_options_async, AWSIoTSettings};
use log::warn;
use rumqttc::{self, AsyncClient, Event, EventLoop, Incoming, NetworkOptions, QoS};
use tokio::sync::broadcast::{self, Receiver, Sender};
pub async fn async_event_loop_listener(
(mut eventloop, incoming_event_sender): (EventLoop, Sender<Incoming>),
) -> Result<(), AWSIoTError> {
loop {
match eventloop.poll().await {
Ok(event) => {
if let Event::Incoming(i) = event {
if let Err(e) = incoming_event_sender.send(i) {
warn!("Error sending incoming event: {:?}", e);
}
}
}
Err(e) => return Err(e.into()),
}
}
}
pub struct AWSIoTAsyncClient {
client: AsyncClient,
incoming_event_sender: Sender<Incoming>,
}
impl AWSIoTAsyncClient {
pub async fn new(
settings: AWSIoTSettings,
) -> Result<(AWSIoTAsyncClient, (EventLoop, Sender<Incoming>)), AWSIoTError> {
let timeout = settings
.mqtt_options_overrides
.as_ref()
.and_then(|opts| opts.conn_timeout);
let mqtt_options = get_mqtt_options_async(settings).await?;
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);
if let Some(timeout) = timeout {
let mut network_options = NetworkOptions::new();
network_options.set_connection_timeout(timeout);
eventloop.set_network_options(network_options);
}
let (request_tx, _) = broadcast::channel(50);
Ok((
AWSIoTAsyncClient {
client,
incoming_event_sender: request_tx.clone(),
},
(eventloop, request_tx),
))
}
pub async fn subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
) -> Result<(), AWSIoTError> {
self.client.subscribe(topic, qos).await.map_err(Into::into)
}
pub async fn publish<S, V>(
&self,
topic: S,
qos: QoS,
payload: V,
) -> Result<(), AWSIoTError>
where
S: Into<String>,
V: Into<Vec<u8>>,
{
self.client
.publish(topic, qos, false, payload)
.await
.map_err(Into::into)
}
pub async fn get_receiver(&self) -> Receiver<Incoming> {
self.incoming_event_sender.subscribe()
}
pub async fn get_client(self) -> AsyncClient {
self.client
}
}