use super::traits::StanzaHandler;
use crate::client::Client;
use async_trait::async_trait;
use log::warn;
use std::sync::Arc;
use tokio::sync::mpsc;
use wacore_binary_ng::node::Node;
#[derive(Default)]
pub struct MessageHandler;
#[async_trait]
impl StanzaHandler for MessageHandler {
fn tag(&self) -> &'static str {
"message"
}
async fn handle(&self, client: Arc<Client>, node: Arc<Node>, _cancelled: &mut bool) -> bool {
let chat_id = match node.attrs().optional_jid("from") {
Some(jid) => jid.to_string(),
None => {
warn!("Message stanza missing required 'from' attribute");
return false;
}
};
let enqueue_mutex = client
.message_enqueue_locks
.get_with_by_ref(&chat_id, async { Arc::new(tokio::sync::Mutex::new(())) })
.await;
let _enqueue_guard = enqueue_mutex.lock().await;
let tx = client
.message_queues
.get_with_by_ref(&chat_id, async {
let (tx, mut rx) = mpsc::channel::<Arc<Node>>(10000);
let client_for_worker = client.clone();
tokio::spawn(async move {
while let Some(msg_node) = rx.recv().await {
let client = client_for_worker.clone();
Box::pin(client.handle_incoming_message(msg_node)).await;
}
});
tx
})
.await;
if let Err(e) = tx.send(node).await {
warn!("Failed to enqueue message for processing: {e}");
}
true
}
}