Skip to main content

whatsapp_rust/handlers/
message.rs

1use super::traits::StanzaHandler;
2use crate::client::Client;
3use async_trait::async_trait;
4use log::warn;
5use std::sync::Arc;
6use wacore_binary::node::Node;
7
8/// Handler for `<message>` stanzas.
9///
10/// Processes incoming WhatsApp messages, including:
11/// - Text messages
12/// - Media messages (images, videos, documents, etc.)
13/// - System messages
14/// - Group messages
15///
16/// Messages are processed sequentially per-chat using a mailbox pattern to prevent
17/// race conditions where a later message could be processed before the PreKey
18/// message that establishes the Signal session.
19#[derive(Default)]
20pub struct MessageHandler;
21
22#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
23#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
24impl StanzaHandler for MessageHandler {
25    fn tag(&self) -> &'static str {
26        "message"
27    }
28
29    async fn handle(&self, client: Arc<Client>, node: Arc<Node>, _cancelled: &mut bool) -> bool {
30        // Extract the chat ID to serialize processing for this chat.
31        // This prevents race conditions where a later message is processed before
32        // the PreKey message that establishes the session.
33        let chat_id = match node.attrs().optional_jid("from") {
34            Some(jid) => jid.to_string(),
35            None => {
36                warn!("Message stanza missing required 'from' attribute");
37                return false;
38            }
39        };
40
41        // CRITICAL: Acquire the enqueue lock BEFORE getting/creating the queue.
42        // This ensures that messages are enqueued in the exact order they arrive,
43        // even when multiple messages arrive concurrently and the queue needs
44        // to be created for the first time.
45        //
46        // The key insight is that get_with (for the lock) establishes ordering
47        // based on who calls it first, and then the mutex.lock() preserves that
48        // ordering since we hold the lock for the entire enqueue operation.
49        let enqueue_mutex = client
50            .message_enqueue_locks
51            .get_with_by_ref(&chat_id, async { Arc::new(async_lock::Mutex::new(())) })
52            .await;
53
54        // Acquire the lock - this serializes all enqueue operations for this chat
55        let _enqueue_guard = enqueue_mutex.lock().await;
56
57        // Now get or create the worker queue for this chat
58        let tx = client
59            .message_queues
60            .get_with_by_ref(&chat_id, async {
61                // Create a channel with backpressure
62                // Increased capacity to handle high message rates without blocking
63                let (tx, rx) = async_channel::bounded::<Arc<Node>>(10000);
64
65                let client_for_worker = client.clone();
66
67                // Spawn a worker task that processes messages sequentially for this chat.
68                // The worker exits when all tx senders are dropped (cache TTI expiry drops
69                // the cached tx, and any cloned tx's are short-lived). No explicit
70                // invalidate() here — that would race with new queue entries under the
71                // same key (see bug audit #27).
72                client
73                    .runtime
74                    .spawn(Box::pin(async move {
75                        while let Ok(msg_node) = rx.recv().await {
76                            let client = client_for_worker.clone();
77                            Box::pin(client.handle_incoming_message(msg_node)).await;
78                        }
79                    }))
80                    .detach();
81
82                tx
83            })
84            .await;
85
86        // Send the message to the queue - just clones the Arc, not the Node!
87        if let Err(e) = tx.send(node).await {
88            warn!("Failed to enqueue message for processing: {e}");
89        }
90
91        // Lock is released here when _enqueue_guard is dropped
92
93        true
94    }
95}