Skip to main content

whatsapp_rust/
send.rs

1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore::types::message::AddressingMode;
8use wacore_binary::jid::{DeviceKey, Jid, JidExt as _};
9use wacore_binary::node::Node;
10use waproto::whatsapp as wa;
11
12/// Options for sending messages with additional customization.
13#[derive(Debug, Clone, Default)]
14pub struct SendOptions {
15    /// Extra XML nodes to add to the message stanza.
16    pub extra_stanza_nodes: Vec<Node>,
17}
18
19/// Specifies who is revoking (deleting) the message.
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub enum RevokeType {
22    /// The message sender deleting their own message.
23    #[default]
24    Sender,
25    /// A group admin deleting another user's message.
26    /// `original_sender` is the JID of the user who sent the message being deleted.
27    Admin { original_sender: Jid },
28}
29
30impl Client {
31    /// Send an end-to-end encrypted message to a user or group.
32    ///
33    /// Returns the message ID on success. For status/story updates use
34    /// [`Client::status()`] instead.
35    pub async fn send_message(
36        &self,
37        to: Jid,
38        message: wa::Message,
39    ) -> Result<String, anyhow::Error> {
40        self.send_message_with_options(to, message, SendOptions::default())
41            .await
42    }
43
44    /// Send a message with additional options.
45    pub async fn send_message_with_options(
46        &self,
47        to: Jid,
48        message: wa::Message,
49        options: SendOptions,
50    ) -> Result<String, anyhow::Error> {
51        let request_id = self.generate_message_id().await;
52        self.send_message_impl(
53            to,
54            &message,
55            Some(request_id.clone()),
56            false,
57            false,
58            None,
59            options.extra_stanza_nodes,
60        )
61        .await?;
62        Ok(request_id)
63    }
64
65    /// Send a status/story update to the given recipients using sender key encryption.
66    ///
67    /// This builds a `GroupInfo` from the provided recipients (always PN addressing mode),
68    /// then reuses the group encryption pipeline with `to = status@broadcast`.
69    pub(crate) async fn send_status_message(
70        &self,
71        message: wa::Message,
72        recipients: Vec<Jid>,
73        options: crate::features::status::StatusSendOptions,
74    ) -> Result<String, anyhow::Error> {
75        use wacore::client::context::GroupInfo;
76        use wacore_binary::builder::NodeBuilder;
77
78        if recipients.is_empty() {
79            return Err(anyhow!("Cannot send status with no recipients"));
80        }
81
82        let to = Jid::status_broadcast();
83        let request_id = self.generate_message_id().await;
84
85        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
86        let own_jid = device_snapshot
87            .pn
88            .clone()
89            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
90        // Status always uses PN addressing, so own_lid is only needed as a
91        // fallback parameter for prepare_group_stanza (unused in PN mode).
92        let own_lid = device_snapshot
93            .lid
94            .clone()
95            .unwrap_or_else(|| own_jid.clone());
96        let account_info = device_snapshot.account.clone();
97
98        // Status always uses PN addressing. Resolve any LID recipients to their
99        // phone numbers so we don't end up with duplicate PN+LID entries for the
100        // same user (which causes server error 400).
101        // Reject non-user JIDs (groups, broadcasts, etc.) to prevent invalid
102        // <participants> entries that cause server errors.
103        let mut resolved_recipients = Vec::with_capacity(recipients.len());
104        for jid in recipients {
105            if jid.is_group() || jid.is_status_broadcast() || jid.is_broadcast_list() {
106                return Err(anyhow!(
107                    "Invalid status recipient {}: must be a user JID, not a group/broadcast",
108                    jid
109                ));
110            }
111            if jid.is_lid() {
112                if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
113                    resolved_recipients
114                        .push(Jid::new(&pn, wacore_binary::jid::DEFAULT_USER_SERVER));
115                } else {
116                    return Err(anyhow!(
117                        "No PN mapping for LID {}. Ensure the recipient has been \
118                         contacted previously.",
119                        jid
120                    ));
121                }
122            } else {
123                resolved_recipients.push(jid);
124            }
125        }
126
127        if resolved_recipients.is_empty() {
128            return Err(anyhow!("No valid PN recipients after LID resolution"));
129        }
130
131        // Deduplicate by user (in case both LID and PN were provided for the same user)
132        let mut seen_users = std::collections::HashSet::new();
133        resolved_recipients.retain(|jid| seen_users.insert(jid.user.clone()));
134
135        let mut group_info = GroupInfo::new(resolved_recipients, AddressingMode::Pn);
136
137        // Ensure we're in the participant list
138        let own_base = own_jid.to_non_ad();
139        if !group_info
140            .participants
141            .iter()
142            .any(|p| p.is_same_user_as(&own_base))
143        {
144            group_info.participants.push(own_base);
145        }
146
147        self.add_recent_message(to.clone(), request_id.clone(), &message)
148            .await;
149
150        let device_store_arc = self.persistence_manager.get_device_arc().await;
151        let to_str = to.to_string();
152
153        let force_skdm = {
154            use wacore::libsignal::store::sender_key_name::SenderKeyName;
155            let sender_address = own_jid.to_protocol_address();
156            let sender_key_name = SenderKeyName::new(to_str.clone(), sender_address.to_string());
157
158            let device_guard = device_store_arc.read().await;
159            let key_exists = self
160                .signal_cache
161                .get_sender_key(&sender_key_name, &*device_guard.backend)
162                .await?
163                .is_some();
164
165            !key_exists
166        };
167
168        let mut store_adapter =
169            SignalProtocolStoreAdapter::new(device_store_arc.clone(), self.signal_cache.clone());
170        let mut stores = wacore::send::SignalStores {
171            session_store: &mut store_adapter.session_store,
172            identity_store: &mut store_adapter.identity_store,
173            prekey_store: &mut store_adapter.pre_key_store,
174            signed_prekey_store: &store_adapter.signed_pre_key_store,
175            sender_key_store: &mut store_adapter.sender_key_store,
176        };
177
178        let marked_for_fresh_skdm = self.consume_forget_marks(&to_str).await.unwrap_or_default();
179
180        let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
181            None
182        } else {
183            let known_recipients = self
184                .persistence_manager
185                .get_skdm_recipients(&to_str)
186                .await
187                .unwrap_or_default();
188
189            if known_recipients.is_empty() {
190                None
191            } else {
192                let jids_to_resolve: Vec<Jid> = group_info
193                    .participants
194                    .iter()
195                    .map(|jid| jid.to_non_ad())
196                    .collect();
197
198                match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
199                    Ok(all_devices) => {
200                        let known_set: std::collections::HashSet<DeviceKey<'_>> =
201                            known_recipients.iter().map(|j| j.device_key()).collect();
202                        let new_devices: Vec<Jid> = all_devices
203                            .into_iter()
204                            .filter(|device| !known_set.contains(&device.device_key()))
205                            .collect();
206                        if new_devices.is_empty() {
207                            Some(vec![])
208                        } else {
209                            Some(new_devices)
210                        }
211                    }
212                    Err(e) => {
213                        log::warn!("Failed to resolve devices for status SKDM check: {:?}", e);
214                        None
215                    }
216                }
217            }
218        };
219
220        let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
221            match skdm_target_devices {
222                None => None,
223                Some(mut devices) => {
224                    for marked_jid_str in &marked_for_fresh_skdm {
225                        if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
226                            && !devices.iter().any(|d| d.device_eq(&marked_jid))
227                        {
228                            devices.push(marked_jid);
229                        }
230                    }
231                    Some(devices)
232                }
233            }
234        } else {
235            skdm_target_devices
236        };
237
238        let is_full_distribution = force_skdm || skdm_target_devices.is_none();
239        let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
240        #[allow(clippy::needless_late_init)]
241        let skdm_is_full: bool;
242
243        // WhatsApp Web includes <meta status_setting="..."/> on non-revoke status messages.
244        // Revoke messages omit this node.
245        let is_revoke = message.protocol_message.as_ref().is_some_and(|pm| {
246            pm.r#type == Some(wa::message::protocol_message::Type::Revoke as i32)
247        });
248        let extra_stanza_nodes = if is_revoke {
249            vec![]
250        } else {
251            vec![
252                NodeBuilder::new("meta")
253                    .attr("status_setting", options.privacy.as_str())
254                    .build(),
255            ]
256        };
257
258        let stanza = match wacore::send::prepare_group_stanza(
259            &mut stores,
260            self,
261            &mut group_info,
262            &own_jid,
263            &own_lid,
264            account_info.as_ref(),
265            to.clone(),
266            &message,
267            request_id.clone(),
268            force_skdm,
269            skdm_target_devices,
270            None,
271            &extra_stanza_nodes,
272        )
273        .await
274        {
275            Ok(stanza) => {
276                skdm_is_full = is_full_distribution;
277                stanza
278            }
279            Err(e) => {
280                if let Some(SignalProtocolError::NoSenderKeyState(_)) =
281                    e.downcast_ref::<SignalProtocolError>()
282                {
283                    log::warn!("No sender key for status broadcast, forcing distribution.");
284
285                    if let Err(e) = self
286                        .persistence_manager
287                        .clear_skdm_recipients(&to_str)
288                        .await
289                    {
290                        log::warn!("Failed to clear status SKDM recipients: {:?}", e);
291                    }
292
293                    let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
294                        device_store_arc.clone(),
295                        self.signal_cache.clone(),
296                    );
297                    let mut stores_retry = wacore::send::SignalStores {
298                        session_store: &mut store_adapter_retry.session_store,
299                        identity_store: &mut store_adapter_retry.identity_store,
300                        prekey_store: &mut store_adapter_retry.pre_key_store,
301                        signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
302                        sender_key_store: &mut store_adapter_retry.sender_key_store,
303                    };
304
305                    let retry_stanza = wacore::send::prepare_group_stanza(
306                        &mut stores_retry,
307                        self,
308                        &mut group_info,
309                        &own_jid,
310                        &own_lid,
311                        account_info.as_ref(),
312                        to.clone(),
313                        &message,
314                        request_id.clone(),
315                        true,
316                        None,
317                        None,
318                        &extra_stanza_nodes,
319                    )
320                    .await?;
321
322                    // Retry is always full distribution (rotated sender key)
323                    skdm_is_full = true;
324                    retry_stanza
325                } else {
326                    return Err(e);
327                }
328            }
329        };
330
331        // For status broadcasts, the server doesn't know the recipient list
332        // (unlike groups where the server has the member list). We must always
333        // include a <participants> node so the server knows who to deliver to.
334        // If prepare_group_stanza already added one (SKDM distribution), we
335        // extend it with bare <to> entries for devices that already have the
336        // sender key. If there's no <participants> node yet, we create one.
337        let stanza = self.ensure_status_participants(stanza, &group_info).await?;
338
339        self.send_node(stanza).await?;
340
341        // Update SKDM recipient cache AFTER server ACK (matches WhatsApp Web behavior).
342        // WA Web only calls markHasSenderKey() after the server confirms receipt.
343        self.update_skdm_recipients(
344            &to_str,
345            &devices_receiving_skdm,
346            skdm_is_full,
347            &group_info.participants,
348        )
349        .await;
350
351        // Flush cached Signal state to DB after encryption
352        if let Err(e) = self.flush_signal_cache().await {
353            log::error!("Failed to flush signal cache after send_status_message: {e:?}");
354        }
355
356        Ok(request_id)
357    }
358
359    /// Update SKDM recipient bookkeeping after a successful group/status send.
360    ///
361    /// Called AFTER `send_node()` succeeds to match WhatsApp Web behavior, which
362    /// only marks devices as having the sender key after the server ACK.
363    /// If specific devices received SKDM, record them. If this was a full distribution
364    /// (all participants), resolve all devices and record them instead.
365    async fn update_skdm_recipients(
366        &self,
367        to_str: &str,
368        devices_receiving_skdm: &[Jid],
369        is_full_distribution: bool,
370        participants: &[Jid],
371    ) {
372        if !devices_receiving_skdm.is_empty() {
373            if let Err(e) = self
374                .persistence_manager
375                .add_skdm_recipients(to_str, devices_receiving_skdm)
376                .await
377            {
378                log::warn!("Failed to update SKDM recipients: {:?}", e);
379            }
380        } else if is_full_distribution {
381            let jids_to_resolve: Vec<Jid> =
382                participants.iter().map(|jid| jid.to_non_ad()).collect();
383            match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
384                Ok(all_devices) => {
385                    if let Err(e) = self
386                        .persistence_manager
387                        .add_skdm_recipients(to_str, &all_devices)
388                        .await
389                    {
390                        log::warn!("Failed to persist SKDM recipients: {:?}", e);
391                    }
392                }
393                Err(e) => {
394                    log::warn!("Failed to resolve devices for SKDM recipients: {:?}", e);
395                }
396            }
397        }
398    }
399
400    /// Ensure the status stanza has a <participants> node listing all recipient
401    /// user JIDs. WhatsApp Web's `participantList` uses bare USER JIDs (not
402    /// device JIDs) — `<to jid="user@s.whatsapp.net"/>` — to tell the server
403    /// which users should receive the skmsg. The SKDM distribution list
404    /// (already in <participants>) uses device JIDs with <enc> children.
405    async fn ensure_status_participants(
406        &self,
407        stanza: wacore_binary::Node,
408        group_info: &wacore::client::context::GroupInfo,
409    ) -> Result<wacore_binary::Node, anyhow::Error> {
410        Ok(wacore::send::ensure_status_participants(stanza, group_info))
411    }
412
413    /// Delete a message for everyone in the chat (revoke).
414    ///
415    /// This sends a revoke protocol message that removes the message for all participants.
416    /// The message will show as "This message was deleted" for recipients.
417    ///
418    /// # Arguments
419    /// * `to` - The chat JID (DM or group)
420    /// * `message_id` - The ID of the message to delete
421    /// * `revoke_type` - Use `RevokeType::Sender` to delete your own message,
422    ///   or `RevokeType::Admin { original_sender }` to delete another user's message as group admin
423    pub async fn revoke_message(
424        &self,
425        to: Jid,
426        message_id: impl Into<String>,
427        revoke_type: RevokeType,
428    ) -> Result<(), anyhow::Error> {
429        let message_id = message_id.into();
430        // Verify we're logged in
431        self.get_pn()
432            .await
433            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
434
435        let (from_me, participant, edit_attr) = match &revoke_type {
436            RevokeType::Sender => {
437                // For sender revoke, participant is NOT set (from_me=true identifies it)
438                // This matches whatsmeow's BuildMessageKey behavior
439                (
440                    true,
441                    None,
442                    crate::types::message::EditAttribute::SenderRevoke,
443                )
444            }
445            RevokeType::Admin { original_sender } => {
446                // Admin revoke requires group context
447                if !to.is_group() {
448                    return Err(anyhow!("Admin revoke is only valid for group chats"));
449                }
450                // The protocolMessageKey.participant should match the original message's key exactly
451                // Do NOT convert LID to PN - pass through unchanged like WhatsApp Web does
452                let participant_str = original_sender.to_non_ad().to_string();
453                log::debug!(
454                    "Admin revoke: using participant {} for MessageKey",
455                    participant_str
456                );
457                (
458                    false,
459                    Some(participant_str),
460                    crate::types::message::EditAttribute::AdminRevoke,
461                )
462            }
463        };
464
465        let revoke_message = wa::Message {
466            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
467                key: Some(wa::MessageKey {
468                    remote_jid: Some(to.to_string()),
469                    from_me: Some(from_me),
470                    id: Some(message_id.clone()),
471                    participant,
472                }),
473                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
474                ..Default::default()
475            })),
476            ..Default::default()
477        };
478
479        // The revoke message stanza needs a NEW unique ID, not the message ID being revoked
480        // The message_id being revoked is already in protocolMessage.key.id
481        // Passing None generates a fresh stanza ID
482        //
483        // For admin revokes, force SKDM distribution to get the proper message structure
484        // with phash, <participants>, and <device-identity> that WhatsApp Web uses
485        let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
486        self.send_message_impl(
487            to,
488            &revoke_message,
489            None,
490            false,
491            force_skdm,
492            Some(edit_attr),
493            vec![],
494        )
495        .await
496    }
497
498    #[allow(clippy::too_many_arguments)]
499    pub(crate) async fn send_message_impl(
500        &self,
501        to: Jid,
502        message: &wa::Message,
503        request_id_override: Option<String>,
504        peer: bool,
505        force_key_distribution: bool,
506        edit: Option<crate::types::message::EditAttribute>,
507        extra_stanza_nodes: Vec<Node>,
508    ) -> Result<(), anyhow::Error> {
509        // Status broadcasts must go through send_status_message() which provides recipients
510        if to.is_status_broadcast() {
511            return Err(anyhow!(
512                "Use send_status_message() or client.status() API for status@broadcast"
513            ));
514        }
515
516        // Generate request ID early (doesn't need lock)
517        let request_id = match request_id_override {
518            Some(id) => id,
519            None => self.generate_message_id().await,
520        };
521
522        // SKDM update data — only populated for group sends, deferred until after send_node().
523        // This matches WhatsApp Web which only calls markHasSenderKey() after server ACK.
524        struct SkdmUpdate {
525            to_str: String,
526            devices: Vec<Jid>,
527            is_full_distribution: bool,
528            participants: Vec<Jid>,
529        }
530        let mut skdm_update: Option<SkdmUpdate> = None;
531
532        let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
533            // Peer messages are only valid for individual users, not groups
534            // Resolve encryption JID and acquire lock ONLY for encryption
535            let encryption_jid = self.resolve_encryption_jid(&to).await;
536            let signal_addr_str = encryption_jid.to_protocol_address_string();
537
538            let session_mutex = self
539                .session_locks
540                .get_with_by_ref(&signal_addr_str, async {
541                    std::sync::Arc::new(async_lock::Mutex::new(()))
542                })
543                .await;
544            let _session_guard = session_mutex.lock().await;
545
546            let device_store_arc = self.persistence_manager.get_device_arc().await;
547            let mut store_adapter =
548                SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
549
550            wacore::send::prepare_peer_stanza(
551                &mut store_adapter.session_store,
552                &mut store_adapter.identity_store,
553                to,
554                encryption_jid,
555                message,
556                request_id,
557            )
558            .await?
559        } else if to.is_group() {
560            // Group messages: No client-level lock needed.
561            // Each participant device is encrypted separately with its own per-device lock
562            // inside prepare_group_stanza, so we don't need to serialize entire group sends.
563
564            // Preparation work (no lock needed)
565            let mut group_info = self.groups().query_info(&to).await?;
566
567            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
568            let own_jid = device_snapshot
569                .pn
570                .clone()
571                .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
572            let own_lid = device_snapshot
573                .lid
574                .clone()
575                .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
576            let account_info = device_snapshot.account.clone();
577
578            // Store serialized message bytes for retry (lightweight)
579            self.add_recent_message(to.clone(), request_id.clone(), message)
580                .await;
581
582            let device_store_arc = self.persistence_manager.get_device_arc().await;
583            let to_str = to.to_string();
584
585            let (own_sending_jid, _) = match group_info.addressing_mode {
586                crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
587                crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
588            };
589
590            if !group_info
591                .participants
592                .iter()
593                .any(|participant| participant.is_same_user_as(&own_sending_jid))
594            {
595                group_info.participants.push(own_sending_jid.to_non_ad());
596            }
597
598            let force_skdm = {
599                use wacore::libsignal::protocol::SenderKeyStore;
600                use wacore::libsignal::store::sender_key_name::SenderKeyName;
601                let mut device_guard = device_store_arc.write().await;
602                let sender_address = own_sending_jid.to_protocol_address();
603                let sender_key_name =
604                    SenderKeyName::new(to_str.clone(), sender_address.to_string());
605
606                let key_exists = device_guard
607                    .load_sender_key(&sender_key_name)
608                    .await?
609                    .is_some();
610
611                force_key_distribution || !key_exists
612            };
613
614            let mut store_adapter = SignalProtocolStoreAdapter::new(
615                device_store_arc.clone(),
616                self.signal_cache.clone(),
617            );
618
619            let mut stores = wacore::send::SignalStores {
620                session_store: &mut store_adapter.session_store,
621                identity_store: &mut store_adapter.identity_store,
622                prekey_store: &mut store_adapter.pre_key_store,
623                signed_prekey_store: &store_adapter.signed_pre_key_store,
624                sender_key_store: &mut store_adapter.sender_key_store,
625            };
626
627            // Consume forget marks - these participants need fresh SKDMs (matches WhatsApp Web)
628            // markForgetSenderKey is called during retry handling, this consumes those marks
629            let marked_for_fresh_skdm =
630                self.consume_forget_marks(&to_str).await.unwrap_or_default();
631
632            // Determine which devices need SKDM distribution
633            let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
634                None
635            } else {
636                let known_recipients = self
637                    .persistence_manager
638                    .get_skdm_recipients(&to_str)
639                    .await
640                    .unwrap_or_default();
641
642                if known_recipients.is_empty() {
643                    None
644                } else {
645                    let jids_to_resolve: Vec<Jid> = group_info
646                        .participants
647                        .iter()
648                        .map(|jid| jid.to_non_ad())
649                        .collect();
650
651                    match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
652                        Ok(all_devices) => {
653                            use std::collections::HashSet;
654
655                            let known_set: HashSet<DeviceKey<'_>> =
656                                known_recipients.iter().map(|j| j.device_key()).collect();
657
658                            let new_devices: Vec<Jid> = all_devices
659                                .into_iter()
660                                .filter(|device| !known_set.contains(&device.device_key()))
661                                .collect();
662
663                            if new_devices.is_empty() {
664                                Some(vec![])
665                            } else {
666                                log::debug!(
667                                    "Found {} new devices needing SKDM for group {}",
668                                    new_devices.len(),
669                                    to
670                                );
671                                Some(new_devices)
672                            }
673                        }
674                        Err(e) => {
675                            log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
676                            None
677                        }
678                    }
679                }
680            };
681
682            // Merge devices marked for fresh SKDM (from retry/error handling)
683            let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
684                match skdm_target_devices {
685                    None => None,
686                    Some(mut devices) => {
687                        for marked_jid_str in &marked_for_fresh_skdm {
688                            if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
689                                && !devices.iter().any(|d| d.device_eq(&marked_jid))
690                            {
691                                log::debug!(
692                                    "Adding {} to SKDM targets (marked for fresh key)",
693                                    marked_jid_str
694                                );
695                                devices.push(marked_jid);
696                            }
697                        }
698                        Some(devices)
699                    }
700                }
701            } else {
702                skdm_target_devices
703            };
704
705            let is_full_distribution = force_skdm || skdm_target_devices.is_none();
706            let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
707
708            match wacore::send::prepare_group_stanza(
709                &mut stores,
710                self,
711                &mut group_info,
712                &own_jid,
713                &own_lid,
714                account_info.as_ref(),
715                to.clone(),
716                message,
717                request_id.clone(),
718                force_skdm,
719                skdm_target_devices,
720                edit.clone(),
721                &extra_stanza_nodes,
722            )
723            .await
724            {
725                Ok(stanza) => {
726                    skdm_update = Some(SkdmUpdate {
727                        to_str: to_str.clone(),
728                        devices: devices_receiving_skdm,
729                        is_full_distribution,
730                        participants: group_info.participants.clone(),
731                    });
732                    stanza
733                }
734                Err(e) => {
735                    if let Some(SignalProtocolError::NoSenderKeyState(_)) =
736                        e.downcast_ref::<SignalProtocolError>()
737                    {
738                        log::warn!("No sender key for group {}, forcing distribution.", to);
739
740                        // Clear SKDM recipients since we're rotating the key
741                        if let Err(e) = self
742                            .persistence_manager
743                            .clear_skdm_recipients(&to_str)
744                            .await
745                        {
746                            log::warn!("Failed to clear SKDM recipients: {:?}", e);
747                        }
748
749                        let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
750                            device_store_arc.clone(),
751                            self.signal_cache.clone(),
752                        );
753                        let mut stores_retry = wacore::send::SignalStores {
754                            session_store: &mut store_adapter_retry.session_store,
755                            identity_store: &mut store_adapter_retry.identity_store,
756                            prekey_store: &mut store_adapter_retry.pre_key_store,
757                            signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
758                            sender_key_store: &mut store_adapter_retry.sender_key_store,
759                        };
760
761                        let retry_stanza = wacore::send::prepare_group_stanza(
762                            &mut stores_retry,
763                            self,
764                            &mut group_info,
765                            &own_jid,
766                            &own_lid,
767                            account_info.as_ref(),
768                            to,
769                            message,
770                            request_id,
771                            true, // Force distribution on retry
772                            None, // Distribute to all devices
773                            edit.clone(),
774                            &extra_stanza_nodes,
775                        )
776                        .await?;
777
778                        // Retry is always full distribution (rotated sender key)
779                        skdm_update = Some(SkdmUpdate {
780                            to_str,
781                            devices: vec![],
782                            is_full_distribution: true,
783                            participants: group_info.participants.clone(),
784                        });
785                        retry_stanza
786                    } else {
787                        return Err(e);
788                    }
789                }
790            }
791        } else {
792            // Direct message: Acquire lock only during encryption
793
794            // Ensure E2E sessions exist before encryption (matches WhatsApp Web)
795            // This deduplicates concurrent prekey fetches for the same recipient
796            let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
797            self.ensure_e2e_sessions(recipient_devices).await?;
798
799            // Resolve encryption JID and prepare lock acquisition
800            let encryption_jid = self.resolve_encryption_jid(&to).await;
801            let signal_addr_str = encryption_jid.to_protocol_address_string();
802
803            // Store serialized message bytes for retry (lightweight)
804            self.add_recent_message(to.clone(), request_id.clone(), message)
805                .await;
806
807            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
808            let own_jid = device_snapshot
809                .pn
810                .clone()
811                .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
812            let account_info = device_snapshot.account.clone();
813
814            // Include tctoken in 1:1 messages (matches WhatsApp Web behavior).
815            // Skip for newsletters, groups, and own JID.
816            let mut extra_stanza_nodes = extra_stanza_nodes;
817            if !to.is_group() && !to.is_newsletter() {
818                self.maybe_include_tc_token(&to, &mut extra_stanza_nodes)
819                    .await;
820            }
821
822            // Acquire lock only for encryption
823            let session_mutex = self
824                .session_locks
825                .get_with_by_ref(&signal_addr_str, async {
826                    std::sync::Arc::new(async_lock::Mutex::new(()))
827                })
828                .await;
829            let _session_guard = session_mutex.lock().await;
830
831            let device_store_arc = self.persistence_manager.get_device_arc().await;
832            let mut store_adapter =
833                SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
834
835            let mut stores = wacore::send::SignalStores {
836                session_store: &mut store_adapter.session_store,
837                identity_store: &mut store_adapter.identity_store,
838                prekey_store: &mut store_adapter.pre_key_store,
839                signed_prekey_store: &store_adapter.signed_pre_key_store,
840                sender_key_store: &mut store_adapter.sender_key_store,
841            };
842
843            wacore::send::prepare_dm_stanza(
844                &mut stores,
845                self,
846                &own_jid,
847                account_info.as_ref(),
848                to,
849                message,
850                request_id,
851                edit,
852                &extra_stanza_nodes,
853            )
854            .await?
855        };
856
857        self.send_node(stanza_to_send).await?;
858
859        // Update SKDM recipient cache AFTER server ACK (matches WhatsApp Web behavior).
860        // WA Web only calls markHasSenderKey() after the server confirms receipt.
861        if let Some(update) = skdm_update {
862            self.update_skdm_recipients(
863                &update.to_str,
864                &update.devices,
865                update.is_full_distribution,
866                &update.participants,
867            )
868            .await;
869        }
870
871        // Flush cached Signal state to DB after encryption
872        if let Err(e) = self.flush_signal_cache().await {
873            log::error!("Failed to flush signal cache after send_message_impl: {e:?}");
874        }
875
876        Ok(())
877    }
878
879    /// Look up and include a tctoken in outgoing 1:1 message stanza nodes.
880    ///
881    /// If a valid (non-expired) token exists, adds a `<tctoken>` child node.
882    /// If the token is missing or expired, attempts to issue new tokens via IQ.
883    async fn maybe_include_tc_token(&self, to: &Jid, extra_nodes: &mut Vec<Node>) {
884        use wacore::iq::tctoken::{
885            IssuePrivacyTokensSpec, build_tc_token_node, is_tc_token_expired,
886            should_send_new_tc_token,
887        };
888        use wacore::store::traits::TcTokenEntry;
889
890        // Skip for own JID — no need to send privacy token to ourselves
891        let snapshot = self.persistence_manager.get_device_snapshot().await;
892        let is_self = snapshot
893            .pn
894            .as_ref()
895            .is_some_and(|pn| pn.is_same_user_as(to))
896            || snapshot
897                .lid
898                .as_ref()
899                .is_some_and(|lid| lid.is_same_user_as(to));
900        if is_self {
901            return;
902        }
903
904        // Resolve the destination to a LID for token lookup
905        let token_jid = if to.is_lid() {
906            to.user.clone()
907        } else {
908            match self.lid_pn_cache.get_current_lid(&to.user).await {
909                Some(lid) => lid,
910                None => to.user.clone(),
911            }
912        };
913
914        let backend = self.persistence_manager.backend();
915
916        // Look up existing token
917        let existing = match backend.get_tc_token(&token_jid).await {
918            Ok(entry) => entry,
919            Err(e) => {
920                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
921                return;
922            }
923        };
924
925        match existing {
926            Some(entry) if !is_tc_token_expired(entry.token_timestamp) => {
927                // Valid token — include it in the stanza
928                extra_nodes.push(build_tc_token_node(&entry.token));
929
930                // Check if we should re-issue (bucket boundary crossed).
931                // Update sender_timestamp to mark we've sent our token in this bucket.
932                if should_send_new_tc_token(entry.sender_timestamp) {
933                    let now = wacore::time::now_secs();
934                    let updated_entry = TcTokenEntry {
935                        sender_timestamp: Some(now),
936                        ..entry
937                    };
938                    if let Err(e) = backend.put_tc_token(&token_jid, &updated_entry).await {
939                        log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp: {e}");
940                    }
941                }
942            }
943            _ => {
944                // Token missing or expired — try to issue
945                let to_lid = self.resolve_to_lid_jid(to).await;
946                match self
947                    .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(&to_lid)))
948                    .await
949                {
950                    Ok(response) => {
951                        let now = wacore::time::now_secs();
952                        for received in &response.tokens {
953                            let entry = TcTokenEntry {
954                                token: received.token.clone(),
955                                token_timestamp: received.timestamp,
956                                sender_timestamp: Some(now),
957                            };
958
959                            // Store the received token
960                            let store_jid = received.jid.user.clone();
961                            if let Err(e) = backend.put_tc_token(&store_jid, &entry).await {
962                                log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
963                            }
964
965                            // Include in message stanza
966                            if !received.token.is_empty() {
967                                extra_nodes.push(build_tc_token_node(&received.token));
968                            }
969                        }
970                    }
971                    Err(e) => {
972                        log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}: {e}", to_lid);
973                        // Don't fail the message send — tctoken is optional
974                    }
975                }
976            }
977        }
978    }
979
980    /// Look up a valid (non-expired) tctoken for a JID. Returns the raw token bytes if found.
981    ///
982    /// Used by profile picture, presence subscribe, and other features that need tctoken gating.
983    pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
984        use wacore::iq::tctoken::is_tc_token_expired;
985
986        let token_jid = if jid.is_lid() {
987            jid.user.clone()
988        } else {
989            match self.lid_pn_cache.get_current_lid(&jid.user).await {
990                Some(lid) => lid,
991                None => jid.user.clone(),
992            }
993        };
994
995        let backend = self.persistence_manager.backend();
996        match backend.get_tc_token(&token_jid).await {
997            Ok(Some(entry)) if !is_tc_token_expired(entry.token_timestamp) => Some(entry.token),
998            Ok(_) => None,
999            Err(e) => {
1000                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1001                None
1002            }
1003        }
1004    }
1005
1006    /// Resolve a JID to its LID form for tc_token storage.
1007    async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1008        if jid.is_lid() {
1009            return jid.clone();
1010        }
1011
1012        if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1013            Jid::new(&lid_user, "lid")
1014        } else {
1015            jid.clone()
1016        }
1017    }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use super::*;
1023    use std::str::FromStr;
1024
1025    #[test]
1026    fn test_revoke_type_default_is_sender() {
1027        // RevokeType::Sender is the default (for deleting own messages)
1028        let revoke_type = RevokeType::default();
1029        assert_eq!(revoke_type, RevokeType::Sender);
1030    }
1031
1032    #[test]
1033    fn test_force_skdm_only_for_admin_revoke() {
1034        // Admin revokes require force_skdm=true to get proper message structure
1035        // with phash, <participants>, and <device-identity> that WhatsApp Web uses.
1036        // Without this, the server returns error 479.
1037        let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1038
1039        let sender_revoke = RevokeType::Sender;
1040        let admin_revoke = RevokeType::Admin {
1041            original_sender: sender_jid,
1042        };
1043
1044        // This matches the logic in revoke_message()
1045        let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1046        let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1047
1048        assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1049        assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1050    }
1051
1052    #[test]
1053    fn test_sender_revoke_message_key_structure() {
1054        // Sender revoke (edit="7"): from_me=true, participant=None
1055        // The sender is identified by from_me=true, no participant field needed
1056        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1057        let message_id = "3EB0ABC123".to_string();
1058
1059        let (from_me, participant, edit_attr) = match RevokeType::Sender {
1060            RevokeType::Sender => (
1061                true,
1062                None,
1063                crate::types::message::EditAttribute::SenderRevoke,
1064            ),
1065            RevokeType::Admin { original_sender } => (
1066                false,
1067                Some(original_sender.to_non_ad().to_string()),
1068                crate::types::message::EditAttribute::AdminRevoke,
1069            ),
1070        };
1071
1072        assert!(from_me, "Sender revoke must have from_me=true");
1073        assert!(
1074            participant.is_none(),
1075            "Sender revoke must NOT set participant"
1076        );
1077        assert_eq!(edit_attr.to_string_val(), "7");
1078
1079        let revoke_message = wa::Message {
1080            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1081                key: Some(wa::MessageKey {
1082                    remote_jid: Some(to.to_string()),
1083                    from_me: Some(from_me),
1084                    id: Some(message_id.clone()),
1085                    participant,
1086                }),
1087                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1088                ..Default::default()
1089            })),
1090            ..Default::default()
1091        };
1092
1093        let proto_msg = revoke_message.protocol_message.unwrap();
1094        let key = proto_msg.key.unwrap();
1095        assert_eq!(key.from_me, Some(true));
1096        assert_eq!(key.participant, None);
1097        assert_eq!(key.id, Some(message_id));
1098    }
1099
1100    #[test]
1101    fn test_admin_revoke_message_key_structure() {
1102        // Admin revoke (edit="8"): from_me=false, participant=original_sender
1103        // The participant field identifies whose message is being deleted
1104        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1105        let message_id = "3EB0ABC123".to_string();
1106        let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1107
1108        let revoke_type = RevokeType::Admin {
1109            original_sender: original_sender.clone(),
1110        };
1111        let (from_me, participant, edit_attr) = match revoke_type {
1112            RevokeType::Sender => (
1113                true,
1114                None,
1115                crate::types::message::EditAttribute::SenderRevoke,
1116            ),
1117            RevokeType::Admin { original_sender } => (
1118                false,
1119                Some(original_sender.to_non_ad().to_string()),
1120                crate::types::message::EditAttribute::AdminRevoke,
1121            ),
1122        };
1123
1124        assert!(!from_me, "Admin revoke must have from_me=false");
1125        assert!(
1126            participant.is_some(),
1127            "Admin revoke MUST set participant to original sender"
1128        );
1129        assert_eq!(edit_attr.to_string_val(), "8");
1130
1131        let revoke_message = wa::Message {
1132            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1133                key: Some(wa::MessageKey {
1134                    remote_jid: Some(to.to_string()),
1135                    from_me: Some(from_me),
1136                    id: Some(message_id.clone()),
1137                    participant: participant.clone(),
1138                }),
1139                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1140                ..Default::default()
1141            })),
1142            ..Default::default()
1143        };
1144
1145        let proto_msg = revoke_message.protocol_message.unwrap();
1146        let key = proto_msg.key.unwrap();
1147        assert_eq!(key.from_me, Some(false));
1148        // Participant should be the original sender with device number stripped
1149        assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1150        assert_eq!(key.id, Some(message_id));
1151    }
1152
1153    #[test]
1154    fn test_admin_revoke_preserves_lid_format() {
1155        // LID JIDs must NOT be converted to PN (phone number) format.
1156        // This was a bug that caused error 479 - the participant field must
1157        // preserve the original JID format exactly (with device stripped).
1158        let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1159        let participant_str = lid_sender.to_non_ad().to_string();
1160
1161        // Must preserve @lid suffix, device number stripped
1162        assert_eq!(participant_str, "236395184570386@lid");
1163        assert!(
1164            participant_str.ends_with("@lid"),
1165            "LID participant must preserve @lid suffix"
1166        );
1167    }
1168
1169    // SKDM Recipient Filtering Tests - validates DeviceKey-based filtering
1170
1171    #[test]
1172    fn test_skdm_recipient_filtering_basic() {
1173        use std::collections::HashSet;
1174
1175        let known_recipients: Vec<Jid> = [
1176            "1234567890:0@s.whatsapp.net",
1177            "1234567890:5@s.whatsapp.net",
1178            "9876543210:0@s.whatsapp.net",
1179        ]
1180        .into_iter()
1181        .map(|s| Jid::from_str(s).unwrap())
1182        .collect();
1183
1184        let all_devices: Vec<Jid> = [
1185            "1234567890:0@s.whatsapp.net",
1186            "1234567890:5@s.whatsapp.net",
1187            "9876543210:0@s.whatsapp.net",
1188            "5555555555:0@s.whatsapp.net", // new
1189        ]
1190        .into_iter()
1191        .map(|s| Jid::from_str(s).unwrap())
1192        .collect();
1193
1194        let known_set: HashSet<DeviceKey<'_>> =
1195            known_recipients.iter().map(|j| j.device_key()).collect();
1196
1197        let new_devices: Vec<Jid> = all_devices
1198            .into_iter()
1199            .filter(|device| !known_set.contains(&device.device_key()))
1200            .collect();
1201
1202        assert_eq!(new_devices.len(), 1);
1203        assert_eq!(new_devices[0].user, "5555555555");
1204    }
1205
1206    #[test]
1207    fn test_skdm_recipient_filtering_lid_jids() {
1208        use std::collections::HashSet;
1209
1210        let known_recipients: Vec<Jid> = [
1211            "236395184570386:91@lid",
1212            "129171292463295:0@lid",
1213            "45857667830004:14@lid",
1214        ]
1215        .into_iter()
1216        .map(|s| Jid::from_str(s).unwrap())
1217        .collect();
1218
1219        let all_devices: Vec<Jid> = [
1220            "236395184570386:91@lid",
1221            "129171292463295:0@lid",
1222            "45857667830004:14@lid",
1223            "45857667830004:15@lid", // new
1224        ]
1225        .into_iter()
1226        .map(|s| Jid::from_str(s).unwrap())
1227        .collect();
1228
1229        let known_set: HashSet<DeviceKey<'_>> =
1230            known_recipients.iter().map(|j| j.device_key()).collect();
1231
1232        let new_devices: Vec<Jid> = all_devices
1233            .into_iter()
1234            .filter(|device| !known_set.contains(&device.device_key()))
1235            .collect();
1236
1237        assert_eq!(new_devices.len(), 1);
1238        assert_eq!(new_devices[0].user, "45857667830004");
1239        assert_eq!(new_devices[0].device, 15);
1240    }
1241
1242    #[test]
1243    fn test_skdm_recipient_filtering_all_known() {
1244        use std::collections::HashSet;
1245
1246        let known_recipients: Vec<Jid> =
1247            ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1248                .into_iter()
1249                .map(|s| Jid::from_str(s).unwrap())
1250                .collect();
1251
1252        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1253            .into_iter()
1254            .map(|s| Jid::from_str(s).unwrap())
1255            .collect();
1256
1257        let known_set: HashSet<DeviceKey<'_>> =
1258            known_recipients.iter().map(|j| j.device_key()).collect();
1259
1260        let new_devices: Vec<Jid> = all_devices
1261            .into_iter()
1262            .filter(|device| !known_set.contains(&device.device_key()))
1263            .collect();
1264
1265        assert!(new_devices.is_empty());
1266    }
1267
1268    #[test]
1269    fn test_skdm_recipient_filtering_all_new() {
1270        use std::collections::HashSet;
1271
1272        let known_recipients: Vec<Jid> = vec![];
1273
1274        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
1275            .into_iter()
1276            .map(|s| Jid::from_str(s).unwrap())
1277            .collect();
1278
1279        let known_set: HashSet<DeviceKey<'_>> =
1280            known_recipients.iter().map(|j| j.device_key()).collect();
1281
1282        let new_devices: Vec<Jid> = all_devices
1283            .clone()
1284            .into_iter()
1285            .filter(|device| !known_set.contains(&device.device_key()))
1286            .collect();
1287
1288        assert_eq!(new_devices.len(), all_devices.len());
1289    }
1290
1291    #[test]
1292    fn test_device_key_comparison() {
1293        // Jid parse/display normalizes :0 (omitted in Display, missing ':N' parses as device 0).
1294        // This test ensures DeviceKey comparisons work correctly under that normalization.
1295        let test_cases = [
1296            (
1297                "1234567890:0@s.whatsapp.net",
1298                "1234567890@s.whatsapp.net",
1299                true,
1300            ),
1301            (
1302                "1234567890:5@s.whatsapp.net",
1303                "1234567890:5@s.whatsapp.net",
1304                true,
1305            ),
1306            (
1307                "1234567890:5@s.whatsapp.net",
1308                "1234567890:6@s.whatsapp.net",
1309                false,
1310            ),
1311            ("236395184570386:91@lid", "236395184570386:91@lid", true),
1312            ("236395184570386:0@lid", "236395184570386@lid", true),
1313            ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
1314        ];
1315
1316        for (jid1_str, jid2_str, should_match) in test_cases {
1317            let jid1: Jid = jid1_str.parse().expect("should parse jid1");
1318            let jid2: Jid = jid2_str.parse().expect("should parse jid2");
1319
1320            let key1 = jid1.device_key();
1321            let key2 = jid2.device_key();
1322
1323            assert_eq!(
1324                key1 == key2,
1325                should_match,
1326                "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
1327                jid1_str,
1328                jid2_str,
1329                should_match,
1330                key1 == key2
1331            );
1332
1333            assert_eq!(
1334                jid1.device_eq(&jid2),
1335                should_match,
1336                "device_eq failed for '{}' vs '{}'",
1337                jid1_str,
1338                jid2_str
1339            );
1340        }
1341    }
1342
1343    #[test]
1344    fn test_skdm_filtering_large_group() {
1345        use std::collections::HashSet;
1346
1347        let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
1348        let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
1349
1350        for i in 0..1000i64 {
1351            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1352            let jid = Jid::from_str(&jid_str).unwrap();
1353            known_recipients.push(jid.clone());
1354            all_devices.push(jid);
1355        }
1356
1357        for i in 1000i64..1010i64 {
1358            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1359            all_devices.push(Jid::from_str(&jid_str).unwrap());
1360        }
1361
1362        let known_set: HashSet<DeviceKey<'_>> =
1363            known_recipients.iter().map(|j| j.device_key()).collect();
1364
1365        let new_devices: Vec<Jid> = all_devices
1366            .into_iter()
1367            .filter(|device| !known_set.contains(&device.device_key()))
1368            .collect();
1369
1370        assert_eq!(new_devices.len(), 10);
1371    }
1372}