use super::traits::StanzaHandler;
use crate::client::Client;
use async_trait::async_trait;
use log::warn;
use std::sync::Arc;
use wacore_binary::node::Node;
#[derive(Default)]
pub struct MessageHandler;
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl StanzaHandler for MessageHandler {
fn tag(&self) -> &'static str {
"message"
}
async fn handle(&self, client: Arc<Client>, node: Arc<Node>, _cancelled: &mut bool) -> bool {
let chat_id = match node.attrs().optional_jid("from") {
Some(jid) => jid.to_string(),
None => {
warn!("Message stanza missing required 'from' attribute");
return false;
}
};
let enqueue_mutex = client
.message_enqueue_locks
.get_with_by_ref(&chat_id, async { Arc::new(async_lock::Mutex::new(())) })
.await;
let _enqueue_guard = enqueue_mutex.lock().await;
let tx = client
.message_queues
.get_with_by_ref(&chat_id, async {
let (tx, rx) = async_channel::bounded::<Arc<Node>>(10000);
let client_for_worker = client.clone();
client
.runtime
.spawn(Box::pin(async move {
while let Ok(msg_node) = rx.recv().await {
let client = client_for_worker.clone();
Box::pin(client.handle_incoming_message(msg_node)).await;
}
}))
.detach();
tx
})
.await;
if let Err(e) = tx.send(node).await {
warn!("Failed to enqueue message for processing: {e}");
}
true
}
}