rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
#[cfg(feature = "cache")]
use std::sync::Arc;

use tokio::sync::{
    broadcast,
    mpsc,
};
use tokio::task::JoinHandle;
use tracing::{
    debug,
    trace,
    warn,
};

#[cfg(feature = "cache")]
use crate::cache::Cache;
use crate::gateway::events::{
    Event,
    EventError,
    EventKind,
};
use crate::gateway::mqtt::IncomingMessage;
use crate::http::send::PendingRequests;
use crate::parser::parse_events;

pub(super) fn spawn(
    mut receiver: mpsc::Receiver<IncomingMessage>,
    pending: PendingRequests,
    events: broadcast::Sender<Event>,
    #[cfg(feature = "cache")] cache: Arc<Cache>,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        debug!("event dispatcher task started");
        let _ = events.send(Event::Listening);
        while let Some(message) = receiver.recv().await {
            trace!(
                topic = message.topic.as_str(),
                payload_len = message.payload.len(),
                "dispatcher received mqtt message"
            );
            let raw_payload = String::from_utf8_lossy(&message.payload);
            trace!(
                topic = message.topic.as_str(),
                payload = %raw_payload,
                "received raw mqtt payload"
            );
            if message.topic == "/ls_resp"
                && let Err(error) = pending.resolve_payload(&message.payload).await
            {
                warn!(?error, "failed to resolve request response payload");
            }

            match parse_events(&message.topic, &message.payload) {
                Ok(parsed) => {
                    trace!(
                        count = parsed.len(),
                        topic = message.topic.as_str(),
                        "parsed events"
                    );
                    if parsed.is_empty() && message.topic == "/t_ms" {
                        let preview = String::from_utf8_lossy(&message.payload);
                        let preview = preview.chars().take(1024).collect::<String>();
                        trace!(
                            payload = preview,
                            "received /t_ms payload with no parsed events"
                        );
                    }
                    if parsed.is_empty() && message.topic == "/ls_resp" {
                        let preview = String::from_utf8_lossy(&message.payload);
                        let preview = preview.chars().take(1024).collect::<String>();
                        trace!(
                            payload = preview,
                            "received /ls_resp payload with no parsed events"
                        );
                    }
                    #[cfg(feature = "cache")]
                    for mut event in parsed {
                        cache.update(&mut event);
                        let _ = events.send(event);
                    }

                    #[cfg(not(feature = "cache"))]
                    for event in parsed {
                        let _ = events.send(event);
                    }
                }
                Err(error) => {
                    let _ = events.send(Event::Error(EventError {
                        kind: EventKind::Parse,
                        message: error.to_string(),
                    }));
                }
            }
        }

        let _ = events.send(Event::Disconnect);
    })
}