#[cfg(feature = "cache")]
use std::sync::Arc;
use tokio::sync::{
broadcast,
mpsc,
};
use tokio::task::JoinHandle;
use tracing::{
debug,
trace,
warn,
};
#[cfg(feature = "cache")]
use crate::cache::Cache;
use crate::gateway::events::{
Event,
EventError,
EventKind,
};
use crate::gateway::mqtt::IncomingMessage;
use crate::http::send::PendingRequests;
use crate::parser::parse_events;
pub(super) fn spawn(
mut receiver: mpsc::Receiver<IncomingMessage>,
pending: PendingRequests,
events: broadcast::Sender<Event>,
#[cfg(feature = "cache")] cache: Arc<Cache>,
) -> JoinHandle<()> {
tokio::spawn(async move {
debug!("event dispatcher task started");
let _ = events.send(Event::Listening);
while let Some(message) = receiver.recv().await {
trace!(
topic = message.topic.as_str(),
payload_len = message.payload.len(),
"dispatcher received mqtt message"
);
let raw_payload = String::from_utf8_lossy(&message.payload);
trace!(
topic = message.topic.as_str(),
payload = %raw_payload,
"received raw mqtt payload"
);
if message.topic == "/ls_resp"
&& let Err(error) = pending.resolve_payload(&message.payload).await
{
warn!(?error, "failed to resolve request response payload");
}
match parse_events(&message.topic, &message.payload) {
Ok(parsed) => {
trace!(
count = parsed.len(),
topic = message.topic.as_str(),
"parsed events"
);
if parsed.is_empty() && message.topic == "/t_ms" {
let preview = String::from_utf8_lossy(&message.payload);
let preview = preview.chars().take(1024).collect::<String>();
trace!(
payload = preview,
"received /t_ms payload with no parsed events"
);
}
if parsed.is_empty() && message.topic == "/ls_resp" {
let preview = String::from_utf8_lossy(&message.payload);
let preview = preview.chars().take(1024).collect::<String>();
trace!(
payload = preview,
"received /ls_resp payload with no parsed events"
);
}
#[cfg(feature = "cache")]
for mut event in parsed {
cache.update(&mut event);
let _ = events.send(event);
}
#[cfg(not(feature = "cache"))]
for event in parsed {
let _ = events.send(event);
}
}
Err(error) => {
let _ = events.send(Event::Error(EventError {
kind: EventKind::Parse,
message: error.to_string(),
}));
}
}
}
let _ = events.send(Event::Disconnect);
})
}