Skip to main content

whatsapp_rust/
send.rs

1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore::types::message::AddressingMode;
8use wacore_binary::jid::{DeviceKey, Jid, JidExt as _};
9use wacore_binary::node::Node;
10use waproto::whatsapp as wa;
11
12/// Options for sending messages with additional customization.
13#[derive(Debug, Clone, Default)]
14pub struct SendOptions {
15    /// Extra XML nodes to add to the message stanza.
16    pub extra_stanza_nodes: Vec<Node>,
17}
18
19/// Specifies who is revoking (deleting) the message.
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub enum RevokeType {
22    /// The message sender deleting their own message.
23    #[default]
24    Sender,
25    /// A group admin deleting another user's message.
26    /// `original_sender` is the JID of the user who sent the message being deleted.
27    Admin { original_sender: Jid },
28}
29
30impl Client {
31    /// Send an end-to-end encrypted message to a user or group.
32    ///
33    /// Returns the message ID on success. For status/story updates use
34    /// [`Client::status()`] instead.
35    pub async fn send_message(
36        &self,
37        to: Jid,
38        message: wa::Message,
39    ) -> Result<String, anyhow::Error> {
40        self.send_message_with_options(to, message, SendOptions::default())
41            .await
42    }
43
44    /// Send a message with additional options.
45    pub async fn send_message_with_options(
46        &self,
47        to: Jid,
48        message: wa::Message,
49        options: SendOptions,
50    ) -> Result<String, anyhow::Error> {
51        let request_id = self.generate_message_id().await;
52        self.send_message_impl(
53            to,
54            &message,
55            Some(request_id.clone()),
56            false,
57            false,
58            None,
59            options.extra_stanza_nodes,
60        )
61        .await?;
62        Ok(request_id)
63    }
64
65    /// Send a status/story update to the given recipients using sender key encryption.
66    ///
67    /// This builds a `GroupInfo` from the provided recipients (always PN addressing mode),
68    /// then reuses the group encryption pipeline with `to = status@broadcast`.
69    pub(crate) async fn send_status_message(
70        &self,
71        message: wa::Message,
72        recipients: Vec<Jid>,
73        options: crate::features::status::StatusSendOptions,
74    ) -> Result<String, anyhow::Error> {
75        use wacore::client::context::GroupInfo;
76        use wacore_binary::builder::NodeBuilder;
77
78        if recipients.is_empty() {
79            return Err(anyhow!("Cannot send status with no recipients"));
80        }
81
82        let to = Jid::status_broadcast();
83        let request_id = self.generate_message_id().await;
84
85        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
86        let own_jid = device_snapshot
87            .pn
88            .clone()
89            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
90        // Status always uses PN addressing, so own_lid is only needed as a
91        // fallback parameter for prepare_group_stanza (unused in PN mode).
92        let own_lid = device_snapshot
93            .lid
94            .clone()
95            .unwrap_or_else(|| own_jid.clone());
96        let account_info = device_snapshot.account.clone();
97
98        // Status always uses PN addressing. Resolve any LID recipients to their
99        // phone numbers so we don't end up with duplicate PN+LID entries for the
100        // same user (which causes server error 400).
101        // Reject non-user JIDs (groups, broadcasts, etc.) to prevent invalid
102        // <participants> entries that cause server errors.
103        let mut resolved_recipients = Vec::with_capacity(recipients.len());
104        for jid in recipients {
105            if jid.is_group() || jid.is_status_broadcast() || jid.is_broadcast_list() {
106                return Err(anyhow!(
107                    "Invalid status recipient {}: must be a user JID, not a group/broadcast",
108                    jid
109                ));
110            }
111            if jid.is_lid() {
112                if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
113                    resolved_recipients
114                        .push(Jid::new(&pn, wacore_binary::jid::DEFAULT_USER_SERVER));
115                } else {
116                    return Err(anyhow!(
117                        "No PN mapping for LID {}. Ensure the recipient has been \
118                         contacted previously.",
119                        jid
120                    ));
121                }
122            } else {
123                resolved_recipients.push(jid);
124            }
125        }
126
127        if resolved_recipients.is_empty() {
128            return Err(anyhow!("No valid PN recipients after LID resolution"));
129        }
130
131        // Deduplicate by user (in case both LID and PN were provided for the same user)
132        let mut seen_users = std::collections::HashSet::new();
133        resolved_recipients.retain(|jid| seen_users.insert(jid.user.clone()));
134
135        let mut group_info = GroupInfo::new(resolved_recipients, AddressingMode::Pn);
136
137        // Ensure we're in the participant list
138        let own_base = own_jid.to_non_ad();
139        if !group_info
140            .participants
141            .iter()
142            .any(|p| p.is_same_user_as(&own_base))
143        {
144            group_info.participants.push(own_base);
145        }
146
147        self.add_recent_message(to.clone(), request_id.clone(), &message)
148            .await;
149
150        let device_store_arc = self.persistence_manager.get_device_arc().await;
151        let to_str = to.to_string();
152
153        let force_skdm = {
154            use wacore::libsignal::store::sender_key_name::SenderKeyName;
155            let sender_address = own_jid.to_protocol_address();
156            let sender_key_name = SenderKeyName::new(to_str.clone(), sender_address.to_string());
157            let cache_key = format!(
158                "{}:{}",
159                sender_key_name.group_id(),
160                sender_key_name.sender_id()
161            );
162
163            let device_guard = device_store_arc.read().await;
164            let key_exists = self
165                .signal_cache
166                .get_sender_key(&cache_key, &*device_guard.backend)
167                .await?
168                .is_some();
169
170            !key_exists
171        };
172
173        let mut store_adapter =
174            SignalProtocolStoreAdapter::new(device_store_arc.clone(), self.signal_cache.clone());
175        let mut stores = wacore::send::SignalStores {
176            session_store: &mut store_adapter.session_store,
177            identity_store: &mut store_adapter.identity_store,
178            prekey_store: &mut store_adapter.pre_key_store,
179            signed_prekey_store: &store_adapter.signed_pre_key_store,
180            sender_key_store: &mut store_adapter.sender_key_store,
181        };
182
183        let marked_for_fresh_skdm = self.consume_forget_marks(&to_str).await.unwrap_or_default();
184
185        let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
186            None
187        } else {
188            let known_recipients = self
189                .persistence_manager
190                .get_skdm_recipients(&to_str)
191                .await
192                .unwrap_or_default();
193
194            if known_recipients.is_empty() {
195                None
196            } else {
197                let jids_to_resolve: Vec<Jid> = group_info
198                    .participants
199                    .iter()
200                    .map(|jid| jid.to_non_ad())
201                    .collect();
202
203                match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
204                    Ok(all_devices) => {
205                        let known_set: std::collections::HashSet<DeviceKey<'_>> =
206                            known_recipients.iter().map(|j| j.device_key()).collect();
207                        let new_devices: Vec<Jid> = all_devices
208                            .into_iter()
209                            .filter(|device| !known_set.contains(&device.device_key()))
210                            .collect();
211                        if new_devices.is_empty() {
212                            Some(vec![])
213                        } else {
214                            Some(new_devices)
215                        }
216                    }
217                    Err(e) => {
218                        log::warn!("Failed to resolve devices for status SKDM check: {:?}", e);
219                        None
220                    }
221                }
222            }
223        };
224
225        let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
226            match skdm_target_devices {
227                None => None,
228                Some(mut devices) => {
229                    for marked_jid_str in &marked_for_fresh_skdm {
230                        if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
231                            && !devices.iter().any(|d| d.device_eq(&marked_jid))
232                        {
233                            devices.push(marked_jid);
234                        }
235                    }
236                    Some(devices)
237                }
238            }
239        } else {
240            skdm_target_devices
241        };
242
243        let is_full_distribution = force_skdm || skdm_target_devices.is_none();
244        let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
245        #[allow(clippy::needless_late_init)]
246        let skdm_is_full: bool;
247
248        // WhatsApp Web includes <meta status_setting="..."/> on non-revoke status messages.
249        // Revoke messages omit this node.
250        let is_revoke = message.protocol_message.as_ref().is_some_and(|pm| {
251            pm.r#type == Some(wa::message::protocol_message::Type::Revoke as i32)
252        });
253        let extra_stanza_nodes = if is_revoke {
254            vec![]
255        } else {
256            vec![
257                NodeBuilder::new("meta")
258                    .attr("status_setting", options.privacy.as_str())
259                    .build(),
260            ]
261        };
262
263        let stanza = match wacore::send::prepare_group_stanza(
264            &mut stores,
265            self,
266            &mut group_info,
267            &own_jid,
268            &own_lid,
269            account_info.as_ref(),
270            to.clone(),
271            &message,
272            request_id.clone(),
273            force_skdm,
274            skdm_target_devices,
275            None,
276            &extra_stanza_nodes,
277        )
278        .await
279        {
280            Ok(stanza) => {
281                skdm_is_full = is_full_distribution;
282                stanza
283            }
284            Err(e) => {
285                if let Some(SignalProtocolError::NoSenderKeyState(_)) =
286                    e.downcast_ref::<SignalProtocolError>()
287                {
288                    log::warn!("No sender key for status broadcast, forcing distribution.");
289
290                    if let Err(e) = self
291                        .persistence_manager
292                        .clear_skdm_recipients(&to_str)
293                        .await
294                    {
295                        log::warn!("Failed to clear status SKDM recipients: {:?}", e);
296                    }
297
298                    let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
299                        device_store_arc.clone(),
300                        self.signal_cache.clone(),
301                    );
302                    let mut stores_retry = wacore::send::SignalStores {
303                        session_store: &mut store_adapter_retry.session_store,
304                        identity_store: &mut store_adapter_retry.identity_store,
305                        prekey_store: &mut store_adapter_retry.pre_key_store,
306                        signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
307                        sender_key_store: &mut store_adapter_retry.sender_key_store,
308                    };
309
310                    let retry_stanza = wacore::send::prepare_group_stanza(
311                        &mut stores_retry,
312                        self,
313                        &mut group_info,
314                        &own_jid,
315                        &own_lid,
316                        account_info.as_ref(),
317                        to.clone(),
318                        &message,
319                        request_id.clone(),
320                        true,
321                        None,
322                        None,
323                        &extra_stanza_nodes,
324                    )
325                    .await?;
326
327                    // Retry is always full distribution (rotated sender key)
328                    skdm_is_full = true;
329                    retry_stanza
330                } else {
331                    return Err(e);
332                }
333            }
334        };
335
336        // For status broadcasts, the server doesn't know the recipient list
337        // (unlike groups where the server has the member list). We must always
338        // include a <participants> node so the server knows who to deliver to.
339        // If prepare_group_stanza already added one (SKDM distribution), we
340        // extend it with bare <to> entries for devices that already have the
341        // sender key. If there's no <participants> node yet, we create one.
342        let stanza = self.ensure_status_participants(stanza, &group_info).await?;
343
344        self.send_node(stanza).await?;
345
346        // Update SKDM recipient cache AFTER server ACK (matches WhatsApp Web behavior).
347        // WA Web only calls markHasSenderKey() after the server confirms receipt.
348        self.update_skdm_recipients(
349            &to_str,
350            &devices_receiving_skdm,
351            skdm_is_full,
352            &group_info.participants,
353        )
354        .await;
355
356        // Flush cached Signal state to DB after encryption
357        if let Err(e) = self.flush_signal_cache().await {
358            log::error!("Failed to flush signal cache after send_status_message: {e:?}");
359        }
360
361        Ok(request_id)
362    }
363
364    /// Update SKDM recipient bookkeeping after a successful group/status send.
365    ///
366    /// Called AFTER `send_node()` succeeds to match WhatsApp Web behavior, which
367    /// only marks devices as having the sender key after the server ACK.
368    /// If specific devices received SKDM, record them. If this was a full distribution
369    /// (all participants), resolve all devices and record them instead.
370    async fn update_skdm_recipients(
371        &self,
372        to_str: &str,
373        devices_receiving_skdm: &[Jid],
374        is_full_distribution: bool,
375        participants: &[Jid],
376    ) {
377        if !devices_receiving_skdm.is_empty() {
378            if let Err(e) = self
379                .persistence_manager
380                .add_skdm_recipients(to_str, devices_receiving_skdm)
381                .await
382            {
383                log::warn!("Failed to update SKDM recipients: {:?}", e);
384            }
385        } else if is_full_distribution {
386            let jids_to_resolve: Vec<Jid> =
387                participants.iter().map(|jid| jid.to_non_ad()).collect();
388            match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
389                Ok(all_devices) => {
390                    if let Err(e) = self
391                        .persistence_manager
392                        .add_skdm_recipients(to_str, &all_devices)
393                        .await
394                    {
395                        log::warn!("Failed to persist SKDM recipients: {:?}", e);
396                    }
397                }
398                Err(e) => {
399                    log::warn!("Failed to resolve devices for SKDM recipients: {:?}", e);
400                }
401            }
402        }
403    }
404
405    /// Ensure the status stanza has a <participants> node listing all recipient
406    /// user JIDs. WhatsApp Web's `participantList` uses bare USER JIDs (not
407    /// device JIDs) — `<to jid="user@s.whatsapp.net"/>` — to tell the server
408    /// which users should receive the skmsg. The SKDM distribution list
409    /// (already in <participants>) uses device JIDs with <enc> children.
410    async fn ensure_status_participants(
411        &self,
412        stanza: wacore_binary::Node,
413        group_info: &wacore::client::context::GroupInfo,
414    ) -> Result<wacore_binary::Node, anyhow::Error> {
415        Ok(wacore::send::ensure_status_participants(stanza, group_info))
416    }
417
418    /// Delete a message for everyone in the chat (revoke).
419    ///
420    /// This sends a revoke protocol message that removes the message for all participants.
421    /// The message will show as "This message was deleted" for recipients.
422    ///
423    /// # Arguments
424    /// * `to` - The chat JID (DM or group)
425    /// * `message_id` - The ID of the message to delete
426    /// * `revoke_type` - Use `RevokeType::Sender` to delete your own message,
427    ///   or `RevokeType::Admin { original_sender }` to delete another user's message as group admin
428    pub async fn revoke_message(
429        &self,
430        to: Jid,
431        message_id: impl Into<String>,
432        revoke_type: RevokeType,
433    ) -> Result<(), anyhow::Error> {
434        let message_id = message_id.into();
435        // Verify we're logged in
436        self.get_pn()
437            .await
438            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
439
440        let (from_me, participant, edit_attr) = match &revoke_type {
441            RevokeType::Sender => {
442                // For sender revoke, participant is NOT set (from_me=true identifies it)
443                // This matches whatsmeow's BuildMessageKey behavior
444                (
445                    true,
446                    None,
447                    crate::types::message::EditAttribute::SenderRevoke,
448                )
449            }
450            RevokeType::Admin { original_sender } => {
451                // Admin revoke requires group context
452                if !to.is_group() {
453                    return Err(anyhow!("Admin revoke is only valid for group chats"));
454                }
455                // The protocolMessageKey.participant should match the original message's key exactly
456                // Do NOT convert LID to PN - pass through unchanged like WhatsApp Web does
457                let participant_str = original_sender.to_non_ad().to_string();
458                log::debug!(
459                    "Admin revoke: using participant {} for MessageKey",
460                    participant_str
461                );
462                (
463                    false,
464                    Some(participant_str),
465                    crate::types::message::EditAttribute::AdminRevoke,
466                )
467            }
468        };
469
470        let revoke_message = wa::Message {
471            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
472                key: Some(wa::MessageKey {
473                    remote_jid: Some(to.to_string()),
474                    from_me: Some(from_me),
475                    id: Some(message_id.clone()),
476                    participant,
477                }),
478                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
479                ..Default::default()
480            })),
481            ..Default::default()
482        };
483
484        // The revoke message stanza needs a NEW unique ID, not the message ID being revoked
485        // The message_id being revoked is already in protocolMessage.key.id
486        // Passing None generates a fresh stanza ID
487        //
488        // For admin revokes, force SKDM distribution to get the proper message structure
489        // with phash, <participants>, and <device-identity> that WhatsApp Web uses
490        let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
491        self.send_message_impl(
492            to,
493            &revoke_message,
494            None,
495            false,
496            force_skdm,
497            Some(edit_attr),
498            vec![],
499        )
500        .await
501    }
502
503    #[allow(clippy::too_many_arguments)]
504    pub(crate) async fn send_message_impl(
505        &self,
506        to: Jid,
507        message: &wa::Message,
508        request_id_override: Option<String>,
509        peer: bool,
510        force_key_distribution: bool,
511        edit: Option<crate::types::message::EditAttribute>,
512        extra_stanza_nodes: Vec<Node>,
513    ) -> Result<(), anyhow::Error> {
514        // Status broadcasts must go through send_status_message() which provides recipients
515        if to.is_status_broadcast() {
516            return Err(anyhow!(
517                "Use send_status_message() or client.status() API for status@broadcast"
518            ));
519        }
520
521        // Generate request ID early (doesn't need lock)
522        let request_id = match request_id_override {
523            Some(id) => id,
524            None => self.generate_message_id().await,
525        };
526
527        // SKDM update data — only populated for group sends, deferred until after send_node().
528        // This matches WhatsApp Web which only calls markHasSenderKey() after server ACK.
529        struct SkdmUpdate {
530            to_str: String,
531            devices: Vec<Jid>,
532            is_full_distribution: bool,
533            participants: Vec<Jid>,
534        }
535        let mut skdm_update: Option<SkdmUpdate> = None;
536
537        let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
538            // Peer messages are only valid for individual users, not groups
539            // Resolve encryption JID and acquire lock ONLY for encryption
540            let encryption_jid = self.resolve_encryption_jid(&to).await;
541            let signal_addr_str = encryption_jid.to_protocol_address_string();
542
543            let session_mutex = self
544                .session_locks
545                .get_with(signal_addr_str.clone(), async {
546                    std::sync::Arc::new(async_lock::Mutex::new(()))
547                })
548                .await;
549            let _session_guard = session_mutex.lock().await;
550
551            let device_store_arc = self.persistence_manager.get_device_arc().await;
552            let mut store_adapter =
553                SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
554
555            wacore::send::prepare_peer_stanza(
556                &mut store_adapter.session_store,
557                &mut store_adapter.identity_store,
558                to,
559                encryption_jid,
560                message,
561                request_id,
562            )
563            .await?
564        } else if to.is_group() {
565            // Group messages: No client-level lock needed.
566            // Each participant device is encrypted separately with its own per-device lock
567            // inside prepare_group_stanza, so we don't need to serialize entire group sends.
568
569            // Preparation work (no lock needed)
570            let mut group_info = self.groups().query_info(&to).await?;
571
572            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
573            let own_jid = device_snapshot
574                .pn
575                .clone()
576                .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
577            let own_lid = device_snapshot
578                .lid
579                .clone()
580                .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
581            let account_info = device_snapshot.account.clone();
582
583            // Store serialized message bytes for retry (lightweight)
584            self.add_recent_message(to.clone(), request_id.clone(), message)
585                .await;
586
587            let device_store_arc = self.persistence_manager.get_device_arc().await;
588            let to_str = to.to_string();
589
590            let (own_sending_jid, _) = match group_info.addressing_mode {
591                crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
592                crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
593            };
594
595            if !group_info
596                .participants
597                .iter()
598                .any(|participant| participant.is_same_user_as(&own_sending_jid))
599            {
600                group_info.participants.push(own_sending_jid.to_non_ad());
601            }
602
603            let force_skdm = {
604                use wacore::libsignal::protocol::SenderKeyStore;
605                use wacore::libsignal::store::sender_key_name::SenderKeyName;
606                let mut device_guard = device_store_arc.write().await;
607                let sender_address = own_sending_jid.to_protocol_address();
608                let sender_key_name =
609                    SenderKeyName::new(to_str.clone(), sender_address.to_string());
610
611                let key_exists = device_guard
612                    .load_sender_key(&sender_key_name)
613                    .await?
614                    .is_some();
615
616                force_key_distribution || !key_exists
617            };
618
619            let mut store_adapter = SignalProtocolStoreAdapter::new(
620                device_store_arc.clone(),
621                self.signal_cache.clone(),
622            );
623
624            let mut stores = wacore::send::SignalStores {
625                session_store: &mut store_adapter.session_store,
626                identity_store: &mut store_adapter.identity_store,
627                prekey_store: &mut store_adapter.pre_key_store,
628                signed_prekey_store: &store_adapter.signed_pre_key_store,
629                sender_key_store: &mut store_adapter.sender_key_store,
630            };
631
632            // Consume forget marks - these participants need fresh SKDMs (matches WhatsApp Web)
633            // markForgetSenderKey is called during retry handling, this consumes those marks
634            let marked_for_fresh_skdm =
635                self.consume_forget_marks(&to_str).await.unwrap_or_default();
636
637            // Determine which devices need SKDM distribution
638            let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
639                None
640            } else {
641                let known_recipients = self
642                    .persistence_manager
643                    .get_skdm_recipients(&to_str)
644                    .await
645                    .unwrap_or_default();
646
647                if known_recipients.is_empty() {
648                    None
649                } else {
650                    let jids_to_resolve: Vec<Jid> = group_info
651                        .participants
652                        .iter()
653                        .map(|jid| jid.to_non_ad())
654                        .collect();
655
656                    match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
657                        Ok(all_devices) => {
658                            use std::collections::HashSet;
659
660                            let known_set: HashSet<DeviceKey<'_>> =
661                                known_recipients.iter().map(|j| j.device_key()).collect();
662
663                            let new_devices: Vec<Jid> = all_devices
664                                .into_iter()
665                                .filter(|device| !known_set.contains(&device.device_key()))
666                                .collect();
667
668                            if new_devices.is_empty() {
669                                Some(vec![])
670                            } else {
671                                log::debug!(
672                                    "Found {} new devices needing SKDM for group {}",
673                                    new_devices.len(),
674                                    to
675                                );
676                                Some(new_devices)
677                            }
678                        }
679                        Err(e) => {
680                            log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
681                            None
682                        }
683                    }
684                }
685            };
686
687            // Merge devices marked for fresh SKDM (from retry/error handling)
688            let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
689                match skdm_target_devices {
690                    None => None,
691                    Some(mut devices) => {
692                        for marked_jid_str in &marked_for_fresh_skdm {
693                            if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
694                                && !devices.iter().any(|d| d.device_eq(&marked_jid))
695                            {
696                                log::debug!(
697                                    "Adding {} to SKDM targets (marked for fresh key)",
698                                    marked_jid_str
699                                );
700                                devices.push(marked_jid);
701                            }
702                        }
703                        Some(devices)
704                    }
705                }
706            } else {
707                skdm_target_devices
708            };
709
710            let is_full_distribution = force_skdm || skdm_target_devices.is_none();
711            let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
712
713            match wacore::send::prepare_group_stanza(
714                &mut stores,
715                self,
716                &mut group_info,
717                &own_jid,
718                &own_lid,
719                account_info.as_ref(),
720                to.clone(),
721                message,
722                request_id.clone(),
723                force_skdm,
724                skdm_target_devices,
725                edit.clone(),
726                &extra_stanza_nodes,
727            )
728            .await
729            {
730                Ok(stanza) => {
731                    skdm_update = Some(SkdmUpdate {
732                        to_str: to_str.clone(),
733                        devices: devices_receiving_skdm,
734                        is_full_distribution,
735                        participants: group_info.participants.clone(),
736                    });
737                    stanza
738                }
739                Err(e) => {
740                    if let Some(SignalProtocolError::NoSenderKeyState(_)) =
741                        e.downcast_ref::<SignalProtocolError>()
742                    {
743                        log::warn!("No sender key for group {}, forcing distribution.", to);
744
745                        // Clear SKDM recipients since we're rotating the key
746                        if let Err(e) = self
747                            .persistence_manager
748                            .clear_skdm_recipients(&to_str)
749                            .await
750                        {
751                            log::warn!("Failed to clear SKDM recipients: {:?}", e);
752                        }
753
754                        let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
755                            device_store_arc.clone(),
756                            self.signal_cache.clone(),
757                        );
758                        let mut stores_retry = wacore::send::SignalStores {
759                            session_store: &mut store_adapter_retry.session_store,
760                            identity_store: &mut store_adapter_retry.identity_store,
761                            prekey_store: &mut store_adapter_retry.pre_key_store,
762                            signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
763                            sender_key_store: &mut store_adapter_retry.sender_key_store,
764                        };
765
766                        let retry_stanza = wacore::send::prepare_group_stanza(
767                            &mut stores_retry,
768                            self,
769                            &mut group_info,
770                            &own_jid,
771                            &own_lid,
772                            account_info.as_ref(),
773                            to,
774                            message,
775                            request_id,
776                            true, // Force distribution on retry
777                            None, // Distribute to all devices
778                            edit.clone(),
779                            &extra_stanza_nodes,
780                        )
781                        .await?;
782
783                        // Retry is always full distribution (rotated sender key)
784                        skdm_update = Some(SkdmUpdate {
785                            to_str,
786                            devices: vec![],
787                            is_full_distribution: true,
788                            participants: group_info.participants.clone(),
789                        });
790                        retry_stanza
791                    } else {
792                        return Err(e);
793                    }
794                }
795            }
796        } else {
797            // Direct message: Acquire lock only during encryption
798
799            // Ensure E2E sessions exist before encryption (matches WhatsApp Web)
800            // This deduplicates concurrent prekey fetches for the same recipient
801            let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
802            self.ensure_e2e_sessions(recipient_devices).await?;
803
804            // Resolve encryption JID and prepare lock acquisition
805            let encryption_jid = self.resolve_encryption_jid(&to).await;
806            let signal_addr_str = encryption_jid.to_protocol_address_string();
807
808            // Store serialized message bytes for retry (lightweight)
809            self.add_recent_message(to.clone(), request_id.clone(), message)
810                .await;
811
812            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
813            let own_jid = device_snapshot
814                .pn
815                .clone()
816                .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
817            let account_info = device_snapshot.account.clone();
818
819            // Include tctoken in 1:1 messages (matches WhatsApp Web behavior).
820            // Skip for newsletters, groups, and own JID.
821            let mut extra_stanza_nodes = extra_stanza_nodes;
822            if !to.is_group() && !to.is_newsletter() {
823                self.maybe_include_tc_token(&to, &mut extra_stanza_nodes)
824                    .await;
825            }
826
827            // Acquire lock only for encryption
828            let session_mutex = self
829                .session_locks
830                .get_with(signal_addr_str.clone(), async {
831                    std::sync::Arc::new(async_lock::Mutex::new(()))
832                })
833                .await;
834            let _session_guard = session_mutex.lock().await;
835
836            let device_store_arc = self.persistence_manager.get_device_arc().await;
837            let mut store_adapter =
838                SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
839
840            let mut stores = wacore::send::SignalStores {
841                session_store: &mut store_adapter.session_store,
842                identity_store: &mut store_adapter.identity_store,
843                prekey_store: &mut store_adapter.pre_key_store,
844                signed_prekey_store: &store_adapter.signed_pre_key_store,
845                sender_key_store: &mut store_adapter.sender_key_store,
846            };
847
848            wacore::send::prepare_dm_stanza(
849                &mut stores,
850                self,
851                &own_jid,
852                account_info.as_ref(),
853                to,
854                message,
855                request_id,
856                edit,
857                &extra_stanza_nodes,
858            )
859            .await?
860        };
861
862        self.send_node(stanza_to_send).await?;
863
864        // Update SKDM recipient cache AFTER server ACK (matches WhatsApp Web behavior).
865        // WA Web only calls markHasSenderKey() after the server confirms receipt.
866        if let Some(update) = skdm_update {
867            self.update_skdm_recipients(
868                &update.to_str,
869                &update.devices,
870                update.is_full_distribution,
871                &update.participants,
872            )
873            .await;
874        }
875
876        // Flush cached Signal state to DB after encryption
877        if let Err(e) = self.flush_signal_cache().await {
878            log::error!("Failed to flush signal cache after send_message_impl: {e:?}");
879        }
880
881        Ok(())
882    }
883
884    /// Look up and include a tctoken in outgoing 1:1 message stanza nodes.
885    ///
886    /// If a valid (non-expired) token exists, adds a `<tctoken>` child node.
887    /// If the token is missing or expired, attempts to issue new tokens via IQ.
888    async fn maybe_include_tc_token(&self, to: &Jid, extra_nodes: &mut Vec<Node>) {
889        use wacore::iq::tctoken::{
890            IssuePrivacyTokensSpec, build_tc_token_node, is_tc_token_expired,
891            should_send_new_tc_token,
892        };
893        use wacore::store::traits::TcTokenEntry;
894
895        // Skip for own JID — no need to send privacy token to ourselves
896        let snapshot = self.persistence_manager.get_device_snapshot().await;
897        let is_self = snapshot
898            .pn
899            .as_ref()
900            .is_some_and(|pn| pn.is_same_user_as(to))
901            || snapshot
902                .lid
903                .as_ref()
904                .is_some_and(|lid| lid.is_same_user_as(to));
905        if is_self {
906            return;
907        }
908
909        // Resolve the destination to a LID for token lookup
910        let token_jid = if to.is_lid() {
911            to.user.clone()
912        } else {
913            match self.lid_pn_cache.get_current_lid(&to.user).await {
914                Some(lid) => lid,
915                None => to.user.clone(),
916            }
917        };
918
919        let backend = self.persistence_manager.backend();
920
921        // Look up existing token
922        let existing = match backend.get_tc_token(&token_jid).await {
923            Ok(entry) => entry,
924            Err(e) => {
925                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
926                return;
927            }
928        };
929
930        match existing {
931            Some(entry) if !is_tc_token_expired(entry.token_timestamp) => {
932                // Valid token — include it in the stanza
933                extra_nodes.push(build_tc_token_node(&entry.token));
934
935                // Check if we should re-issue (bucket boundary crossed).
936                // Update sender_timestamp to mark we've sent our token in this bucket.
937                if should_send_new_tc_token(entry.sender_timestamp) {
938                    let now = wacore::time::now_secs();
939                    let updated_entry = TcTokenEntry {
940                        sender_timestamp: Some(now),
941                        ..entry
942                    };
943                    if let Err(e) = backend.put_tc_token(&token_jid, &updated_entry).await {
944                        log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp: {e}");
945                    }
946                }
947            }
948            _ => {
949                // Token missing or expired — try to issue
950                let to_lid = self.resolve_to_lid_jid(to).await;
951                match self
952                    .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(&to_lid)))
953                    .await
954                {
955                    Ok(response) => {
956                        let now = wacore::time::now_secs();
957                        for received in &response.tokens {
958                            let entry = TcTokenEntry {
959                                token: received.token.clone(),
960                                token_timestamp: received.timestamp,
961                                sender_timestamp: Some(now),
962                            };
963
964                            // Store the received token
965                            let store_jid = received.jid.user.clone();
966                            if let Err(e) = backend.put_tc_token(&store_jid, &entry).await {
967                                log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
968                            }
969
970                            // Include in message stanza
971                            if !received.token.is_empty() {
972                                extra_nodes.push(build_tc_token_node(&received.token));
973                            }
974                        }
975                    }
976                    Err(e) => {
977                        log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}: {e}", to_lid);
978                        // Don't fail the message send — tctoken is optional
979                    }
980                }
981            }
982        }
983    }
984
985    /// Look up a valid (non-expired) tctoken for a JID. Returns the raw token bytes if found.
986    ///
987    /// Used by profile picture, presence subscribe, and other features that need tctoken gating.
988    pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
989        use wacore::iq::tctoken::is_tc_token_expired;
990
991        let token_jid = if jid.is_lid() {
992            jid.user.clone()
993        } else {
994            match self.lid_pn_cache.get_current_lid(&jid.user).await {
995                Some(lid) => lid,
996                None => jid.user.clone(),
997            }
998        };
999
1000        let backend = self.persistence_manager.backend();
1001        match backend.get_tc_token(&token_jid).await {
1002            Ok(Some(entry)) if !is_tc_token_expired(entry.token_timestamp) => Some(entry.token),
1003            Ok(_) => None,
1004            Err(e) => {
1005                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1006                None
1007            }
1008        }
1009    }
1010
1011    /// Resolve a JID to its LID form for tc_token storage.
1012    async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1013        if jid.is_lid() {
1014            return jid.clone();
1015        }
1016
1017        if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1018            Jid::new(&lid_user, "lid")
1019        } else {
1020            jid.clone()
1021        }
1022    }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027    use super::*;
1028    use std::str::FromStr;
1029
1030    #[test]
1031    fn test_revoke_type_default_is_sender() {
1032        // RevokeType::Sender is the default (for deleting own messages)
1033        let revoke_type = RevokeType::default();
1034        assert_eq!(revoke_type, RevokeType::Sender);
1035    }
1036
1037    #[test]
1038    fn test_force_skdm_only_for_admin_revoke() {
1039        // Admin revokes require force_skdm=true to get proper message structure
1040        // with phash, <participants>, and <device-identity> that WhatsApp Web uses.
1041        // Without this, the server returns error 479.
1042        let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1043
1044        let sender_revoke = RevokeType::Sender;
1045        let admin_revoke = RevokeType::Admin {
1046            original_sender: sender_jid,
1047        };
1048
1049        // This matches the logic in revoke_message()
1050        let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1051        let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1052
1053        assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1054        assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1055    }
1056
1057    #[test]
1058    fn test_sender_revoke_message_key_structure() {
1059        // Sender revoke (edit="7"): from_me=true, participant=None
1060        // The sender is identified by from_me=true, no participant field needed
1061        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1062        let message_id = "3EB0ABC123".to_string();
1063
1064        let (from_me, participant, edit_attr) = match RevokeType::Sender {
1065            RevokeType::Sender => (
1066                true,
1067                None,
1068                crate::types::message::EditAttribute::SenderRevoke,
1069            ),
1070            RevokeType::Admin { original_sender } => (
1071                false,
1072                Some(original_sender.to_non_ad().to_string()),
1073                crate::types::message::EditAttribute::AdminRevoke,
1074            ),
1075        };
1076
1077        assert!(from_me, "Sender revoke must have from_me=true");
1078        assert!(
1079            participant.is_none(),
1080            "Sender revoke must NOT set participant"
1081        );
1082        assert_eq!(edit_attr.to_string_val(), "7");
1083
1084        let revoke_message = wa::Message {
1085            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1086                key: Some(wa::MessageKey {
1087                    remote_jid: Some(to.to_string()),
1088                    from_me: Some(from_me),
1089                    id: Some(message_id.clone()),
1090                    participant,
1091                }),
1092                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1093                ..Default::default()
1094            })),
1095            ..Default::default()
1096        };
1097
1098        let proto_msg = revoke_message.protocol_message.unwrap();
1099        let key = proto_msg.key.unwrap();
1100        assert_eq!(key.from_me, Some(true));
1101        assert_eq!(key.participant, None);
1102        assert_eq!(key.id, Some(message_id));
1103    }
1104
1105    #[test]
1106    fn test_admin_revoke_message_key_structure() {
1107        // Admin revoke (edit="8"): from_me=false, participant=original_sender
1108        // The participant field identifies whose message is being deleted
1109        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1110        let message_id = "3EB0ABC123".to_string();
1111        let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1112
1113        let revoke_type = RevokeType::Admin {
1114            original_sender: original_sender.clone(),
1115        };
1116        let (from_me, participant, edit_attr) = match revoke_type {
1117            RevokeType::Sender => (
1118                true,
1119                None,
1120                crate::types::message::EditAttribute::SenderRevoke,
1121            ),
1122            RevokeType::Admin { original_sender } => (
1123                false,
1124                Some(original_sender.to_non_ad().to_string()),
1125                crate::types::message::EditAttribute::AdminRevoke,
1126            ),
1127        };
1128
1129        assert!(!from_me, "Admin revoke must have from_me=false");
1130        assert!(
1131            participant.is_some(),
1132            "Admin revoke MUST set participant to original sender"
1133        );
1134        assert_eq!(edit_attr.to_string_val(), "8");
1135
1136        let revoke_message = wa::Message {
1137            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1138                key: Some(wa::MessageKey {
1139                    remote_jid: Some(to.to_string()),
1140                    from_me: Some(from_me),
1141                    id: Some(message_id.clone()),
1142                    participant: participant.clone(),
1143                }),
1144                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1145                ..Default::default()
1146            })),
1147            ..Default::default()
1148        };
1149
1150        let proto_msg = revoke_message.protocol_message.unwrap();
1151        let key = proto_msg.key.unwrap();
1152        assert_eq!(key.from_me, Some(false));
1153        // Participant should be the original sender with device number stripped
1154        assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1155        assert_eq!(key.id, Some(message_id));
1156    }
1157
1158    #[test]
1159    fn test_admin_revoke_preserves_lid_format() {
1160        // LID JIDs must NOT be converted to PN (phone number) format.
1161        // This was a bug that caused error 479 - the participant field must
1162        // preserve the original JID format exactly (with device stripped).
1163        let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1164        let participant_str = lid_sender.to_non_ad().to_string();
1165
1166        // Must preserve @lid suffix, device number stripped
1167        assert_eq!(participant_str, "236395184570386@lid");
1168        assert!(
1169            participant_str.ends_with("@lid"),
1170            "LID participant must preserve @lid suffix"
1171        );
1172    }
1173
1174    // SKDM Recipient Filtering Tests - validates DeviceKey-based filtering
1175
1176    #[test]
1177    fn test_skdm_recipient_filtering_basic() {
1178        use std::collections::HashSet;
1179
1180        let known_recipients: Vec<Jid> = [
1181            "1234567890:0@s.whatsapp.net",
1182            "1234567890:5@s.whatsapp.net",
1183            "9876543210:0@s.whatsapp.net",
1184        ]
1185        .into_iter()
1186        .map(|s| Jid::from_str(s).unwrap())
1187        .collect();
1188
1189        let all_devices: Vec<Jid> = [
1190            "1234567890:0@s.whatsapp.net",
1191            "1234567890:5@s.whatsapp.net",
1192            "9876543210:0@s.whatsapp.net",
1193            "5555555555:0@s.whatsapp.net", // new
1194        ]
1195        .into_iter()
1196        .map(|s| Jid::from_str(s).unwrap())
1197        .collect();
1198
1199        let known_set: HashSet<DeviceKey<'_>> =
1200            known_recipients.iter().map(|j| j.device_key()).collect();
1201
1202        let new_devices: Vec<Jid> = all_devices
1203            .into_iter()
1204            .filter(|device| !known_set.contains(&device.device_key()))
1205            .collect();
1206
1207        assert_eq!(new_devices.len(), 1);
1208        assert_eq!(new_devices[0].user, "5555555555");
1209    }
1210
1211    #[test]
1212    fn test_skdm_recipient_filtering_lid_jids() {
1213        use std::collections::HashSet;
1214
1215        let known_recipients: Vec<Jid> = [
1216            "236395184570386:91@lid",
1217            "129171292463295:0@lid",
1218            "45857667830004:14@lid",
1219        ]
1220        .into_iter()
1221        .map(|s| Jid::from_str(s).unwrap())
1222        .collect();
1223
1224        let all_devices: Vec<Jid> = [
1225            "236395184570386:91@lid",
1226            "129171292463295:0@lid",
1227            "45857667830004:14@lid",
1228            "45857667830004:15@lid", // new
1229        ]
1230        .into_iter()
1231        .map(|s| Jid::from_str(s).unwrap())
1232        .collect();
1233
1234        let known_set: HashSet<DeviceKey<'_>> =
1235            known_recipients.iter().map(|j| j.device_key()).collect();
1236
1237        let new_devices: Vec<Jid> = all_devices
1238            .into_iter()
1239            .filter(|device| !known_set.contains(&device.device_key()))
1240            .collect();
1241
1242        assert_eq!(new_devices.len(), 1);
1243        assert_eq!(new_devices[0].user, "45857667830004");
1244        assert_eq!(new_devices[0].device, 15);
1245    }
1246
1247    #[test]
1248    fn test_skdm_recipient_filtering_all_known() {
1249        use std::collections::HashSet;
1250
1251        let known_recipients: Vec<Jid> =
1252            ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1253                .into_iter()
1254                .map(|s| Jid::from_str(s).unwrap())
1255                .collect();
1256
1257        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1258            .into_iter()
1259            .map(|s| Jid::from_str(s).unwrap())
1260            .collect();
1261
1262        let known_set: HashSet<DeviceKey<'_>> =
1263            known_recipients.iter().map(|j| j.device_key()).collect();
1264
1265        let new_devices: Vec<Jid> = all_devices
1266            .into_iter()
1267            .filter(|device| !known_set.contains(&device.device_key()))
1268            .collect();
1269
1270        assert!(new_devices.is_empty());
1271    }
1272
1273    #[test]
1274    fn test_skdm_recipient_filtering_all_new() {
1275        use std::collections::HashSet;
1276
1277        let known_recipients: Vec<Jid> = vec![];
1278
1279        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
1280            .into_iter()
1281            .map(|s| Jid::from_str(s).unwrap())
1282            .collect();
1283
1284        let known_set: HashSet<DeviceKey<'_>> =
1285            known_recipients.iter().map(|j| j.device_key()).collect();
1286
1287        let new_devices: Vec<Jid> = all_devices
1288            .clone()
1289            .into_iter()
1290            .filter(|device| !known_set.contains(&device.device_key()))
1291            .collect();
1292
1293        assert_eq!(new_devices.len(), all_devices.len());
1294    }
1295
1296    #[test]
1297    fn test_device_key_comparison() {
1298        // Jid parse/display normalizes :0 (omitted in Display, missing ':N' parses as device 0).
1299        // This test ensures DeviceKey comparisons work correctly under that normalization.
1300        let test_cases = [
1301            (
1302                "1234567890:0@s.whatsapp.net",
1303                "1234567890@s.whatsapp.net",
1304                true,
1305            ),
1306            (
1307                "1234567890:5@s.whatsapp.net",
1308                "1234567890:5@s.whatsapp.net",
1309                true,
1310            ),
1311            (
1312                "1234567890:5@s.whatsapp.net",
1313                "1234567890:6@s.whatsapp.net",
1314                false,
1315            ),
1316            ("236395184570386:91@lid", "236395184570386:91@lid", true),
1317            ("236395184570386:0@lid", "236395184570386@lid", true),
1318            ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
1319        ];
1320
1321        for (jid1_str, jid2_str, should_match) in test_cases {
1322            let jid1: Jid = jid1_str.parse().expect("should parse jid1");
1323            let jid2: Jid = jid2_str.parse().expect("should parse jid2");
1324
1325            let key1 = jid1.device_key();
1326            let key2 = jid2.device_key();
1327
1328            assert_eq!(
1329                key1 == key2,
1330                should_match,
1331                "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
1332                jid1_str,
1333                jid2_str,
1334                should_match,
1335                key1 == key2
1336            );
1337
1338            assert_eq!(
1339                jid1.device_eq(&jid2),
1340                should_match,
1341                "device_eq failed for '{}' vs '{}'",
1342                jid1_str,
1343                jid2_str
1344            );
1345        }
1346    }
1347
1348    #[test]
1349    fn test_skdm_filtering_large_group() {
1350        use std::collections::HashSet;
1351
1352        let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
1353        let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
1354
1355        for i in 0..1000i64 {
1356            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1357            let jid = Jid::from_str(&jid_str).unwrap();
1358            known_recipients.push(jid.clone());
1359            all_devices.push(jid);
1360        }
1361
1362        for i in 1000i64..1010i64 {
1363            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1364            all_devices.push(Jid::from_str(&jid_str).unwrap());
1365        }
1366
1367        let known_set: HashSet<DeviceKey<'_>> =
1368            known_recipients.iter().map(|j| j.device_key()).collect();
1369
1370        let new_devices: Vec<Jid> = all_devices
1371            .into_iter()
1372            .filter(|device| !known_set.contains(&device.device_key()))
1373            .collect();
1374
1375        assert_eq!(new_devices.len(), 10);
1376    }
1377}