rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
mod legacy;
mod options;

use std::time::Duration;

use rumqttc::{
    AsyncClient,
    Event,
    EventLoop,
    Incoming,
    Packet,
    QoS,
};
use tokio::sync::{
    mpsc,
    oneshot,
};
use tracing::{
    debug,
    info,
    trace,
    warn,
};

use crate::error::MessengerError;
use crate::state::State;

#[derive(Debug)]
pub struct IncomingMessage {
    pub topic: String,
    pub payload: Vec<u8>,
}

pub struct Runtime {
    client: AsyncClient,
    sender: mpsc::Sender<IncomingMessage>,
    receiver: Option<mpsc::Receiver<IncomingMessage>>,
    shutdown: Option<oneshot::Sender<()>>,
    task: Option<tokio::task::JoinHandle<()>>,
}

impl Runtime {
    pub async fn connect(state: &State, online: bool) -> Result<(Self, EventLoop), MessengerError> {
        let session_id: u64 = rand::random();
        let options = options::build_connect_options(state, online, session_id);

        let (client, eventloop) = AsyncClient::new(options, 100);
        debug!(session_id, online, "mqtt runtime initialized");

        let (tx, rx) = mpsc::channel(1024);

        Ok((
            Self {
                client,
                sender: tx,
                receiver: Some(rx),
                shutdown: None,
                task: None,
            },
            eventloop,
        ))
    }

    pub async fn initialize_session(
        &self,
        state: &State,
        online: bool,
    ) -> Result<(), MessengerError> {
        for topic in options::default_topics() {
            debug!(topic = *topic, "subscribing mqtt topic");
            self.client.subscribe(*topic, QoS::AtLeastOnce).await?;
        }

        debug!(online, "lightspeed bootstrap publish disabled");

        legacy::publish_sync_queue(&self.client, state).await?;
        debug!("published messenger sync queue request");

        Ok(())
    }

    pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<IncomingMessage>> {
        self.receiver.take()
    }

    pub fn start_polling(&mut self, mut eventloop: EventLoop) -> Result<(), MessengerError> {
        if self.task.is_some() {
            return Ok(());
        }

        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
        let tx = self.sender.clone();

        let task = tokio::spawn(async move {
            let mut delay_ms = 250u64;

            loop {
                tokio::select! {
                    _ = &mut shutdown_rx => {
                        debug!("mqtt runtime received shutdown signal");
                        break;
                    }
                    poll_result = eventloop.poll() => {
                        match poll_result {
                            Ok(Event::Incoming(Packet::Publish(packet))) => {
                                trace!(
                                    topic = packet.topic.as_str(),
                                    payload_len = packet.payload.len(),
                                    "received mqtt publish packet"
                                );
                                let message = IncomingMessage {
                                    topic: packet.topic,
                                    payload: packet.payload.to_vec(),
                                };

                                if tx.send(message).await.is_err() {
                                    break;
                                }
                                delay_ms = 250;
                            }
                            Ok(Event::Incoming(Incoming::Disconnect)) => {
                                warn!("mqtt disconnect packet received");
                            }
                            Ok(event) => {
                                trace!(?event, "received mqtt event");
                                delay_ms = 250;
                            }
                            Err(error) => {
                                warn!(?error, "mqtt poll failed, retrying");
                                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
                                delay_ms = (delay_ms.saturating_mul(2)).min(30_000);
                            }
                        }
                    }
                }
            }
        });

        self.shutdown = Some(shutdown_tx);
        self.task = Some(task);

        Ok(())
    }

    pub async fn publish(&self, topic: &str, payload: Vec<u8>) -> Result<(), MessengerError> {
        debug!(
            topic,
            payload_len = payload.len(),
            "publishing mqtt payload"
        );
        if topic.starts_with("/ls") {
            let preview = String::from_utf8_lossy(&payload);
            let preview = preview.chars().take(512).collect::<String>();
            trace!(
                topic,
                payload = preview,
                "publishing lightspeed mqtt payload"
            );
        }
        self.client
            .publish(topic, QoS::AtLeastOnce, false, payload)
            .await?;
        Ok(())
    }

    pub async fn stop(mut self) -> Result<(), MessengerError> {
        info!("stopping mqtt runtime");
        if let Some(shutdown) = self.shutdown.take() {
            let _ = shutdown.send(());
        }

        self.client.disconnect().await?;

        if let Some(task) = self.task.take() {
            task.abort();
        }
        Ok(())
    }
}