whatsapp_rust/handlers/message.rs
1use super::traits::StanzaHandler;
2use crate::client::Client;
3use async_trait::async_trait;
4use log::warn;
5use std::sync::Arc;
6use wacore_binary::node::Node;
7
8/// Handler for `<message>` stanzas.
9///
10/// Processes incoming WhatsApp messages, including:
11/// - Text messages
12/// - Media messages (images, videos, documents, etc.)
13/// - System messages
14/// - Group messages
15///
16/// Messages are processed sequentially per-chat using a mailbox pattern to prevent
17/// race conditions where a later message could be processed before the PreKey
18/// message that establishes the Signal session.
19#[derive(Default)]
20pub struct MessageHandler;
21
22#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
23#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
24impl StanzaHandler for MessageHandler {
25 fn tag(&self) -> &'static str {
26 "message"
27 }
28
29 async fn handle(&self, client: Arc<Client>, node: Arc<Node>, _cancelled: &mut bool) -> bool {
30 // Extract the chat ID to serialize processing for this chat.
31 // This prevents race conditions where a later message is processed before
32 // the PreKey message that establishes the session.
33 let chat_id = match node.attrs().optional_jid("from") {
34 Some(jid) => jid.to_string(),
35 None => {
36 warn!("Message stanza missing required 'from' attribute");
37 return false;
38 }
39 };
40
41 // CRITICAL: Acquire the enqueue lock BEFORE getting/creating the queue.
42 // This ensures that messages are enqueued in the exact order they arrive,
43 // even when multiple messages arrive concurrently and the queue needs
44 // to be created for the first time.
45 //
46 // The key insight is that get_with (for the lock) establishes ordering
47 // based on who calls it first, and then the mutex.lock() preserves that
48 // ordering since we hold the lock for the entire enqueue operation.
49 let enqueue_mutex = client
50 .message_enqueue_locks
51 .get_with_by_ref(&chat_id, async { Arc::new(async_lock::Mutex::new(())) })
52 .await;
53
54 // Acquire the lock - this serializes all enqueue operations for this chat
55 let _enqueue_guard = enqueue_mutex.lock().await;
56
57 // Now get or create the worker queue for this chat
58 let tx = client
59 .message_queues
60 .get_with_by_ref(&chat_id, async {
61 // Create a channel with backpressure
62 // Increased capacity to handle high message rates without blocking
63 let (tx, rx) = async_channel::bounded::<Arc<Node>>(10000);
64
65 let client_for_worker = client.clone();
66
67 // Spawn a worker task that processes messages sequentially for this chat.
68 // The worker exits when all tx senders are dropped (cache TTI expiry drops
69 // the cached tx, and any cloned tx's are short-lived). No explicit
70 // invalidate() here — that would race with new queue entries under the
71 // same key (see bug audit #27).
72 client
73 .runtime
74 .spawn(Box::pin(async move {
75 while let Ok(msg_node) = rx.recv().await {
76 let client = client_for_worker.clone();
77 Box::pin(client.handle_incoming_message(msg_node)).await;
78 }
79 }))
80 .detach();
81
82 tx
83 })
84 .await;
85
86 // Send the message to the queue - just clones the Arc, not the Node!
87 if let Err(e) = tx.send(node).await {
88 warn!("Failed to enqueue message for processing: {e}");
89 }
90
91 // Lock is released here when _enqueue_guard is dropped
92
93 true
94 }
95}