mod legacy;
mod options;
use std::time::Duration;
use rumqttc::{
AsyncClient,
Event,
EventLoop,
Incoming,
Packet,
QoS,
};
use tokio::sync::{
mpsc,
oneshot,
};
use tracing::{
debug,
info,
trace,
warn,
};
use crate::error::MessengerError;
use crate::state::State;
#[derive(Debug)]
pub struct IncomingMessage {
pub topic: String,
pub payload: Vec<u8>,
}
pub struct Runtime {
client: AsyncClient,
sender: mpsc::Sender<IncomingMessage>,
receiver: Option<mpsc::Receiver<IncomingMessage>>,
shutdown: Option<oneshot::Sender<()>>,
task: Option<tokio::task::JoinHandle<()>>,
}
impl Runtime {
pub async fn connect(state: &State, online: bool) -> Result<(Self, EventLoop), MessengerError> {
let session_id: u64 = rand::random();
let options = options::build_connect_options(state, online, session_id);
let (client, eventloop) = AsyncClient::new(options, 100);
debug!(session_id, online, "mqtt runtime initialized");
let (tx, rx) = mpsc::channel(1024);
Ok((
Self {
client,
sender: tx,
receiver: Some(rx),
shutdown: None,
task: None,
},
eventloop,
))
}
pub async fn initialize_session(
&self,
state: &State,
online: bool,
) -> Result<(), MessengerError> {
for topic in options::default_topics() {
debug!(topic = *topic, "subscribing mqtt topic");
self.client.subscribe(*topic, QoS::AtLeastOnce).await?;
}
debug!(online, "lightspeed bootstrap publish disabled");
legacy::publish_sync_queue(&self.client, state).await?;
debug!("published messenger sync queue request");
Ok(())
}
pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<IncomingMessage>> {
self.receiver.take()
}
pub fn start_polling(&mut self, mut eventloop: EventLoop) -> Result<(), MessengerError> {
if self.task.is_some() {
return Ok(());
}
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let tx = self.sender.clone();
let task = tokio::spawn(async move {
let mut delay_ms = 250u64;
loop {
tokio::select! {
_ = &mut shutdown_rx => {
debug!("mqtt runtime received shutdown signal");
break;
}
poll_result = eventloop.poll() => {
match poll_result {
Ok(Event::Incoming(Packet::Publish(packet))) => {
trace!(
topic = packet.topic.as_str(),
payload_len = packet.payload.len(),
"received mqtt publish packet"
);
let message = IncomingMessage {
topic: packet.topic,
payload: packet.payload.to_vec(),
};
if tx.send(message).await.is_err() {
break;
}
delay_ms = 250;
}
Ok(Event::Incoming(Incoming::Disconnect)) => {
warn!("mqtt disconnect packet received");
}
Ok(event) => {
trace!(?event, "received mqtt event");
delay_ms = 250;
}
Err(error) => {
warn!(?error, "mqtt poll failed, retrying");
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms.saturating_mul(2)).min(30_000);
}
}
}
}
}
});
self.shutdown = Some(shutdown_tx);
self.task = Some(task);
Ok(())
}
pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> Result<(), MessengerError> {
debug!(
topic,
payload_len = payload.len(),
"publishing mqtt payload"
);
if topic.starts_with("/ls") {
let preview = String::from_utf8_lossy(&payload);
let preview = preview.chars().take(512).collect::<String>();
trace!(
topic,
payload = preview,
"publishing lightspeed mqtt payload"
);
}
self.client
.publish(topic, QoS::AtLeastOnce, false, payload)
.await?;
Ok(())
}
pub async fn stop(mut self) -> Result<(), MessengerError> {
info!("stopping mqtt runtime");
if let Some(shutdown) = self.shutdown.take() {
let _ = shutdown.send(());
}
self.client.disconnect().await?;
if let Some(task) = self.task.take() {
task.abort();
}
Ok(())
}
}