whatsapp_rust/
send.rs

1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore_binary::jid::{Jid, JidExt as _};
8use waproto::whatsapp as wa;
9
10impl Client {
11    pub async fn send_message(
12        &self,
13        to: Jid,
14        message: wa::Message,
15    ) -> Result<String, anyhow::Error> {
16        let request_id = self.generate_message_id().await;
17        self.send_message_impl(to, &message, Some(request_id.clone()), false, false, None)
18            .await?;
19        Ok(request_id)
20    }
21
22    pub(crate) async fn send_message_impl(
23        &self,
24        to: Jid,
25        message: &wa::Message,
26        request_id_override: Option<String>,
27        peer: bool,
28        force_key_distribution: bool,
29        edit: Option<crate::types::message::EditAttribute>,
30    ) -> Result<(), anyhow::Error> {
31        // Generate request ID early (doesn't need lock)
32        let request_id = match request_id_override {
33            Some(id) => id,
34            None => self.generate_message_id().await,
35        };
36
37        let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
38            // Peer messages are only valid for individual users, not groups
39            // Resolve encryption JID and acquire lock ONLY for encryption
40            let encryption_jid = self.resolve_encryption_jid(&to).await;
41            let signal_addr_str = encryption_jid.to_protocol_address().to_string();
42
43            let session_mutex = self
44                .session_locks
45                .get_with(signal_addr_str.clone(), async {
46                    std::sync::Arc::new(tokio::sync::Mutex::new(()))
47                })
48                .await;
49            let _session_guard = session_mutex.lock().await;
50
51            // Lock is held only during encryption
52            let device_store_arc = self.persistence_manager.get_device_arc().await;
53            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
54
55            wacore::send::prepare_peer_stanza(
56                &mut store_adapter.session_store,
57                &mut store_adapter.identity_store,
58                to,
59                message,
60                request_id,
61            )
62            .await?
63            // Lock released here automatically
64        } else if to.is_group() {
65            // Group messages: No client-level lock needed.
66            // Each participant device is encrypted separately with its own per-device lock
67            // inside prepare_group_stanza, so we don't need to serialize entire group sends.
68
69            // Preparation work (no lock needed)
70            let mut group_info = self.groups().query_info(&to).await?;
71
72            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
73            let own_jid = device_snapshot
74                .pn
75                .clone()
76                .ok_or_else(|| anyhow!("Not logged in"))?;
77            let own_lid = device_snapshot
78                .lid
79                .clone()
80                .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
81            let account_info = device_snapshot.account.clone();
82
83            // Store serialized message bytes for retry (lightweight)
84            self.add_recent_message(to.clone(), request_id.clone(), message)
85                .await;
86
87            let device_store_arc = self.persistence_manager.get_device_arc().await;
88
89            let (own_sending_jid, _) = match group_info.addressing_mode {
90                crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
91                crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
92            };
93
94            if !group_info
95                .participants
96                .iter()
97                .any(|participant| participant.is_same_user_as(&own_sending_jid))
98            {
99                group_info.participants.push(own_sending_jid.to_non_ad());
100            }
101
102            let force_skdm = {
103                use wacore::libsignal::protocol::SenderKeyStore;
104                use wacore::libsignal::store::sender_key_name::SenderKeyName;
105                let mut device_guard = device_store_arc.write().await;
106                let sender_address = own_sending_jid.to_protocol_address();
107                let sender_key_name =
108                    SenderKeyName::new(to.to_string(), sender_address.to_string());
109
110                let key_exists = device_guard
111                    .load_sender_key(&sender_key_name)
112                    .await?
113                    .is_some();
114
115                force_key_distribution || !key_exists
116            };
117
118            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
119
120            let mut stores = wacore::send::SignalStores {
121                session_store: &mut store_adapter.session_store,
122                identity_store: &mut store_adapter.identity_store,
123                prekey_store: &mut store_adapter.pre_key_store,
124                signed_prekey_store: &store_adapter.signed_pre_key_store,
125                sender_key_store: &mut store_adapter.sender_key_store,
126            };
127
128            // Consume forget marks - these participants need fresh SKDMs (matches WhatsApp Web)
129            // markForgetSenderKey is called during retry handling, this consumes those marks
130            let marked_for_fresh_skdm = self
131                .consume_forget_marks(&to.to_string())
132                .await
133                .unwrap_or_default();
134
135            // Determine which devices need SKDM distribution
136            let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
137                // Forcing full distribution (either first message or explicit request)
138                None // Let prepare_group_stanza resolve all devices
139            } else {
140                // Check which devices already have SKDM and find new ones
141                let known_recipients = self
142                    .persistence_manager
143                    .get_skdm_recipients(&to.to_string())
144                    .await
145                    .unwrap_or_default();
146
147                if known_recipients.is_empty() {
148                    // No known recipients, need full distribution
149                    None
150                } else {
151                    // Get current devices for all participants
152                    let jids_to_resolve: Vec<Jid> = group_info
153                        .participants
154                        .iter()
155                        .map(|jid| jid.to_non_ad())
156                        .collect();
157
158                    match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
159                        Ok(all_devices) => {
160                            // Filter to find devices that don't have SKDM yet
161                            let new_devices: Vec<Jid> = all_devices
162                                .into_iter()
163                                .filter(|device: &Jid| {
164                                    !known_recipients.contains(&device.to_string())
165                                })
166                                .collect();
167
168                            if new_devices.is_empty() {
169                                Some(vec![]) // No new devices, no SKDM needed
170                            } else {
171                                log::debug!(
172                                    "Found {} new devices needing SKDM for group {}",
173                                    new_devices.len(),
174                                    to
175                                );
176                                Some(new_devices)
177                            }
178                        }
179                        Err(e) => {
180                            log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
181                            None // Fall back to full distribution
182                        }
183                    }
184                }
185            };
186
187            // Merge marked_for_fresh_skdm into skdm_target_devices
188            // These are devices that need fresh SKDMs due to retry/error handling
189            let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
190                match skdm_target_devices {
191                    None => None, // Already doing full distribution
192                    Some(mut devices) => {
193                        // Parse marked JID strings and add to target list
194                        for marked_jid_str in &marked_for_fresh_skdm {
195                            if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
196                                && !devices.iter().any(|d| d.to_string() == *marked_jid_str)
197                            {
198                                log::debug!(
199                                    "Adding {} to SKDM targets (marked for fresh key)",
200                                    marked_jid_str
201                                );
202                                devices.push(marked_jid);
203                            }
204                        }
205                        Some(devices)
206                    }
207                }
208            } else {
209                skdm_target_devices
210            };
211
212            // Track devices that will receive SKDM in this message
213            let devices_receiving_skdm: Vec<String> = skdm_target_devices
214                .as_ref()
215                .map(|devices: &Vec<Jid>| devices.iter().map(|d: &Jid| d.to_string()).collect())
216                .unwrap_or_default();
217
218            // Encryption happens here (per-device locking handled internally)
219            match wacore::send::prepare_group_stanza(
220                &mut stores,
221                self,
222                &mut group_info,
223                &own_jid,
224                &own_lid,
225                account_info.as_ref(),
226                to.clone(),
227                message,
228                request_id.clone(),
229                force_skdm,
230                skdm_target_devices.clone(),
231                edit.clone(),
232            )
233            .await
234            {
235                Ok(stanza) => {
236                    // Update SKDM recipients tracking after preparing the stanza
237                    if !devices_receiving_skdm.is_empty() {
238                        if let Err(e) = self
239                            .persistence_manager
240                            .add_skdm_recipients(&to.to_string(), &devices_receiving_skdm)
241                            .await
242                        {
243                            log::warn!("Failed to update SKDM recipients: {:?}", e);
244                        }
245                    } else if force_skdm || skdm_target_devices.is_none() {
246                        // Full distribution happened, query all devices and track them
247                        let jids_to_resolve: Vec<Jid> = group_info
248                            .participants
249                            .iter()
250                            .map(|jid| jid.to_non_ad())
251                            .collect();
252
253                        if let Ok(all_devices) =
254                            SendContextResolver::resolve_devices(self, &jids_to_resolve).await
255                        {
256                            let all_device_strs: Vec<String> =
257                                all_devices.iter().map(|d| d.to_string()).collect();
258                            if let Err(e) = self
259                                .persistence_manager
260                                .add_skdm_recipients(&to.to_string(), &all_device_strs)
261                                .await
262                            {
263                                log::warn!("Failed to update SKDM recipients: {:?}", e);
264                            }
265                        }
266                    }
267                    stanza
268                }
269                Err(e) => {
270                    if let Some(SignalProtocolError::NoSenderKeyState(_)) =
271                        e.downcast_ref::<SignalProtocolError>()
272                    {
273                        log::warn!("No sender key for group {}, forcing distribution.", to);
274
275                        // Clear SKDM recipients since we're rotating the key
276                        if let Err(e) = self
277                            .persistence_manager
278                            .clear_skdm_recipients(&to.to_string())
279                            .await
280                        {
281                            log::warn!("Failed to clear SKDM recipients: {:?}", e);
282                        }
283
284                        let mut store_adapter_retry =
285                            SignalProtocolStoreAdapter::new(device_store_arc.clone());
286                        let mut stores_retry = wacore::send::SignalStores {
287                            session_store: &mut store_adapter_retry.session_store,
288                            identity_store: &mut store_adapter_retry.identity_store,
289                            prekey_store: &mut store_adapter_retry.pre_key_store,
290                            signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
291                            sender_key_store: &mut store_adapter_retry.sender_key_store,
292                        };
293
294                        wacore::send::prepare_group_stanza(
295                            &mut stores_retry,
296                            self,
297                            &mut group_info,
298                            &own_jid,
299                            &own_lid,
300                            account_info.as_ref(),
301                            to,
302                            message,
303                            request_id,
304                            true, // Force distribution on retry
305                            None, // Distribute to all devices
306                            edit.clone(),
307                        )
308                        .await?
309                    } else {
310                        return Err(e);
311                    }
312                }
313            }
314        } else {
315            // Direct message: Acquire lock only during encryption
316
317            // Ensure E2E sessions exist before encryption (matches WhatsApp Web)
318            // This deduplicates concurrent prekey fetches for the same recipient
319            let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
320            self.ensure_e2e_sessions(recipient_devices).await?;
321
322            // Resolve encryption JID and prepare lock acquisition
323            let encryption_jid = self.resolve_encryption_jid(&to).await;
324            let signal_addr_str = encryption_jid.to_protocol_address().to_string();
325
326            // Store serialized message bytes for retry (lightweight)
327            self.add_recent_message(to.clone(), request_id.clone(), message)
328                .await;
329
330            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
331            let own_jid = device_snapshot
332                .pn
333                .clone()
334                .ok_or_else(|| anyhow!("Not logged in"))?;
335            let account_info = device_snapshot.account.clone();
336
337            // Acquire lock only for encryption
338            let session_mutex = self
339                .session_locks
340                .get_with(signal_addr_str.clone(), async {
341                    std::sync::Arc::new(tokio::sync::Mutex::new(()))
342                })
343                .await;
344            let _session_guard = session_mutex.lock().await;
345
346            // Lock is held only during encryption
347            let device_store_arc = self.persistence_manager.get_device_arc().await;
348            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
349
350            let mut stores = wacore::send::SignalStores {
351                session_store: &mut store_adapter.session_store,
352                identity_store: &mut store_adapter.identity_store,
353                prekey_store: &mut store_adapter.pre_key_store,
354                signed_prekey_store: &store_adapter.signed_pre_key_store,
355                sender_key_store: &mut store_adapter.sender_key_store,
356            };
357
358            wacore::send::prepare_dm_stanza(
359                &mut stores,
360                self,
361                &own_jid,
362                account_info.as_ref(),
363                to,
364                message,
365                request_id,
366                edit,
367            )
368            .await?
369            // Lock released here automatically
370        };
371        // Network send happens with NO lock held
372        self.send_node(stanza_to_send).await.map_err(|e| e.into())
373    }
374}