Skip to main content

whatsapp_rust/
send.rs

1use crate::client::Client;
2use crate::types::message::EditAttribute;
3use anyhow::anyhow;
4use log::debug;
5use wacore::client::context::SendContextResolver;
6use wacore::libsignal::protocol::SignalProtocolError;
7use wacore::types::jid::JidExt;
8use wacore::types::message::AddressingMode;
9#[cfg(test)]
10use wacore_binary::DeviceKey;
11use wacore_binary::Node;
12use wacore_binary::builder::NodeBuilder;
13use wacore_binary::{Jid, JidExt as _, Server};
14use waproto::whatsapp as wa;
15
16/// Options for [`Client::send_message_with_options`].
17#[derive(Debug, Clone, Default)]
18pub struct SendOptions {
19    /// Override the auto-generated message ID.
20    /// Useful for resending a failed message with the same ID or idempotency.
21    pub message_id: Option<String>,
22    /// Extra XML child nodes on the message stanza.
23    pub extra_stanza_nodes: Vec<Node>,
24    /// Ephemeral duration in seconds. Sets `contextInfo.expiration` on the
25    /// message (WA Web `EProtoGenerator.js:183` parity).
26    /// Common values: 86400 (24h), 604800 (7d), 7776000 (90d).
27    pub ephemeral_expiration: Option<u32>,
28}
29
30/// Result of a successfully sent message.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct SendResult {
33    pub message_id: String,
34    pub to: Jid,
35}
36
37impl SendResult {
38    /// `participant` is `None` -- only valid for the sender's own messages.
39    pub fn message_key(&self) -> wa::MessageKey {
40        wa::MessageKey {
41            remote_jid: Some(self.to.to_string()),
42            from_me: Some(true),
43            id: Some(self.message_id.clone()),
44            participant: None,
45        }
46    }
47}
48
49/// Duration for pinned messages. Default is 7 days (matches WA Web).
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51#[non_exhaustive]
52pub enum PinDuration {
53    Hours24,
54    #[default]
55    Days7,
56    Days30,
57}
58
59impl PinDuration {
60    fn as_secs(self) -> u32 {
61        match self {
62            Self::Hours24 => 86_400,
63            Self::Days7 => 604_800,
64            Self::Days30 => 2_592_000,
65        }
66    }
67}
68
69/// Specifies who is revoking (deleting) the message.
70#[derive(Debug, Clone, PartialEq, Eq, Default)]
71#[non_exhaustive]
72pub enum RevokeType {
73    /// The message sender deleting their own message.
74    #[default]
75    Sender,
76    /// A group admin deleting another user's message.
77    /// `original_sender` is the JID of the user who sent the message being deleted.
78    Admin { original_sender: Jid },
79}
80
81/// Derive stanza-level edit attribute and meta node from message content.
82fn infer_stanza_metadata(msg: &wa::Message) -> (Option<EditAttribute>, Option<Node>) {
83    if msg.pin_in_chat_message.is_some() {
84        return (Some(EditAttribute::PinInChat), None);
85    }
86
87    // Poll messages
88    if msg.poll_creation_message.is_some()
89        || msg.poll_creation_message_v2.is_some()
90        || msg.poll_creation_message_v3.is_some()
91    {
92        return (None, Some(meta_node("polltype", "creation")));
93    }
94    if let Some(ref poll_update) = msg.poll_update_message
95        && poll_update.vote.is_some()
96    {
97        return (None, Some(meta_node("polltype", "vote")));
98    }
99    // TODO: polltype="result_snapshot" for poll_result_snapshot_message (gated behind AB flag)
100
101    // Event messages
102    if msg.event_message.is_some() {
103        return (None, Some(meta_node("event_type", "creation")));
104    }
105    if msg.enc_event_response_message.is_some() {
106        return (None, Some(meta_node("event_type", "response")));
107    }
108    if let Some(ref sec) = msg.secret_encrypted_message
109        && sec.secret_enc_type
110            == Some(wa::message::secret_encrypted_message::SecretEncType::EventEdit as i32)
111    {
112        return (None, Some(meta_node("event_type", "edit")));
113    }
114
115    (None, None)
116}
117
118fn meta_node(key: &'static str, value: &'static str) -> Node {
119    NodeBuilder::new("meta").attr(key, value).build()
120}
121
122/// Derive the `<biz>` stanza child for native-flow interactive messages.
123/// All native flow types use the same nested structure (confirmed via protocol capture).
124fn infer_biz_node(msg: &wa::Message) -> Option<Node> {
125    let interactive = extract_interactive_message(msg)?;
126    let wa::message::interactive_message::InteractiveMessage::NativeFlowMessage(nf) =
127        interactive.interactive_message.as_ref()?
128    else {
129        return None;
130    };
131
132    let first_button_name = nf.buttons.first()?.name.as_deref()?;
133    let flow_name = button_name_to_flow_name(first_button_name);
134
135    Some(
136        NodeBuilder::new("biz")
137            .children([NodeBuilder::new("interactive")
138                .attr("type", "native_flow")
139                .attr("v", "1")
140                .children([NodeBuilder::new("native_flow")
141                    .attr("name", flow_name)
142                    .build()])
143                .build()])
144            .build(),
145    )
146}
147
148fn extract_interactive_message(msg: &wa::Message) -> Option<&wa::message::InteractiveMessage> {
149    // Only checks documentWithCaptionMessage wrapper (for media headers) and direct field.
150    // Does not use unwrap_message() since we need the InteractiveMessage specifically.
151    if let Some(ref doc) = msg.document_with_caption_message
152        && let Some(ref inner) = doc.message
153        && let Some(ref im) = inner.interactive_message
154    {
155        return Some(im);
156    }
157    msg.interactive_message.as_deref()
158}
159
160fn button_name_to_flow_name(button_name: &str) -> &str {
161    match button_name {
162        "review_and_pay" => "order_details",
163        "payment_info" => "payment_info",
164        "review_order" | "order_status" => "order_status",
165        "payment_status" => "payment_status",
166        "payment_method" => "payment_method",
167        "payment_reminder" => "payment_reminder",
168        "open_webview" => "message_with_link",
169        "message_with_link_status" => "message_with_link_status",
170        "cta_url" => "cta_url",
171        "cta_call" => "cta_call",
172        "cta_copy" => "cta_copy",
173        "cta_catalog" => "cta_catalog",
174        "catalog_message" => "catalog_message",
175        "quick_reply" => "quick_reply",
176        "galaxy_message" => "galaxy_message",
177        "booking_confirmation" => "booking_confirmation",
178        "call_permission_request" => "call_permission_request",
179        other => other,
180    }
181}
182
183fn build_revoke_message(
184    remote_jid: &Jid,
185    from_me: bool,
186    message_id: String,
187    participant: Option<String>,
188) -> wa::Message {
189    wa::Message {
190        protocol_message: Some(Box::new(wa::message::ProtocolMessage {
191            key: Some(wa::MessageKey {
192                remote_jid: Some(remote_jid.to_string()),
193                from_me: Some(from_me),
194                id: Some(message_id),
195                participant,
196            }),
197            r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
198            ..Default::default()
199        })),
200        ..Default::default()
201    }
202}
203
204impl Client {
205    /// Send a message to a user, group, or newsletter.
206    ///
207    /// Newsletter messages are sent as plaintext (no E2E encryption).
208    /// For status/story updates use [`Client::status()`] instead.
209    pub async fn send_message(
210        &self,
211        to: Jid,
212        message: wa::Message,
213    ) -> Result<SendResult, anyhow::Error> {
214        self.send_message_with_options(to, message, SendOptions::default())
215            .await
216    }
217
218    /// Send a message with additional options.
219    pub async fn send_message_with_options(
220        &self,
221        to: Jid,
222        mut message: wa::Message,
223        options: SendOptions,
224    ) -> Result<SendResult, anyhow::Error> {
225        if let Some(exp) = options.ephemeral_expiration
226            && exp > 0
227        {
228            use wacore::proto_helpers::MessageExt;
229            if !message.set_ephemeral_expiration(exp) {
230                // Bare `conversation` messages have no contextInfo field.
231                log::warn!("Could not set contextInfo.expiration on this message type");
232            }
233        }
234
235        let request_id = match options.message_id {
236            Some(id) => id,
237            None => self.generate_message_id().await,
238        };
239        // Both paths below consume `to` and `request_id`, so save copies for the result.
240        let result = SendResult {
241            message_id: request_id.clone(),
242            to: to.clone(),
243        };
244
245        // Newsletters are not E2E encrypted — send as plaintext via SMAX stanza.
246        // Matches WA Web's OutMessagePublishNewsletterRequest + ContentType mixins.
247        if to.is_newsletter() {
248            use prost::Message as _;
249            let stanza_type = wacore::send::stanza_type_from_message(&message);
250            let (_, meta_node) = infer_stanza_metadata(&message);
251            let mut plaintext_builder = NodeBuilder::new("plaintext");
252            if let Some(mt) = wacore::send::media_type_from_message(&message) {
253                plaintext_builder = plaintext_builder.attr("mediatype", mt);
254            }
255            let mut children = vec![plaintext_builder.bytes(message.encode_to_vec()).build()];
256            children.extend(meta_node);
257            children.extend(options.extra_stanza_nodes);
258            let stanza = NodeBuilder::new("message")
259                .attr("to", to)
260                .attr("type", stanza_type)
261                .attr("id", &request_id)
262                .children(children)
263                .build();
264            self.send_node(stanza).await?;
265            return Ok(result);
266        }
267
268        let (edit, inferred_meta) = infer_stanza_metadata(&message);
269        let inferred_biz = infer_biz_node(&message);
270
271        let extra_nodes = if inferred_meta.is_none() && inferred_biz.is_none() {
272            options.extra_stanza_nodes
273        } else {
274            let mut nodes = Vec::with_capacity(2 + options.extra_stanza_nodes.len());
275            nodes.extend(inferred_meta);
276            nodes.extend(inferred_biz);
277            nodes.extend(options.extra_stanza_nodes);
278            nodes
279        };
280        self.send_message_impl(
281            to,
282            &message,
283            Some(request_id),
284            false,
285            false,
286            edit,
287            extra_nodes,
288        )
289        .await?;
290        Ok(result)
291    }
292
293    /// Send a status/story update using sender-key encryption.
294    ///
295    /// Status uses LID addressing (matches `WAWebEncryptAndSendStatusMsg`):
296    /// LID recipients pass through, PN recipients are resolved to LID via
297    /// `Client::get_lid_pn_entry` (cache-aside), and unresolvable recipients
298    /// are skipped silently. The resulting `GroupInfo` carries
299    /// `AddressingMode::Lid`; `prepare_group_stanza` signs with `own_lid`
300    /// and emits `addressing_mode="lid"` on the stanza. Errors only if no
301    /// recipient could be resolved.
302    pub(crate) async fn send_status_message(
303        &self,
304        message: wa::Message,
305        recipients: &[Jid],
306        options: crate::features::status::StatusSendOptions,
307    ) -> Result<SendResult, anyhow::Error> {
308        use wacore::client::context::GroupInfo;
309        use wacore_binary::builder::NodeBuilder;
310
311        if recipients.is_empty() {
312            return Err(anyhow!("Cannot send status with no recipients"));
313        }
314
315        let to = Jid::status_broadcast();
316        let request_id = self.generate_message_id().await;
317
318        let mut device_snapshot = self.persistence_manager.get_device_snapshot().await;
319        let account_info = device_snapshot.account.take();
320        let own_jid = device_snapshot
321            .pn
322            .take()
323            .ok_or(crate::client::ClientError::NotLoggedIn)?;
324        // Status is LID-addressed (matches WA Web post-LID-migration). Without
325        // a real device LID we can't sign or fan out correctly; refuse rather
326        // than silently emit `addressing_mode="lid"` with a PN sender.
327        let own_lid = device_snapshot.lid.take().ok_or_else(|| {
328            anyhow!(
329                "Cannot send status: device has no LID yet. Finish pairing / LID \
330                 migration before posting status."
331            )
332        })?;
333
334        // Fail fast for any JID that isn't a user (PN or LID). Mirrors WA
335        // Web's `asUserWidOrThrow` inside `toUserLid`: non-user inputs are a
336        // programming bug, not something to silently drop during resolution.
337        for jid in recipients {
338            if !(jid.is_pn() || jid.is_lid()) {
339                return Err(anyhow!(
340                    "Invalid status recipient {}: must be a user JID (PN or LID), \
341                     not a group/broadcast/newsletter/hosted/etc.",
342                    jid
343                ));
344            }
345        }
346
347        use std::collections::HashMap;
348        let mut resolved: Vec<Option<Jid>> = Vec::with_capacity(recipients.len());
349        let mut lid_to_pn_map: HashMap<wacore_binary::CompactString, Jid> =
350            HashMap::with_capacity(recipients.len() + 1);
351        for jid in recipients {
352            if let Some(lid_jid) = self.resolve_recipient_to_lid(jid).await {
353                if jid.is_pn() {
354                    lid_to_pn_map.insert(lid_jid.user.clone(), jid.to_non_ad());
355                }
356                resolved.push(Some(lid_jid));
357            } else {
358                resolved.push(None);
359            }
360        }
361        lid_to_pn_map.insert(own_lid.user.clone(), own_jid.to_non_ad());
362
363        let participants = wacore::send::assemble_status_participants(resolved, &own_lid)?;
364        let mut group_info =
365            GroupInfo::with_lid_to_pn_map(participants, AddressingMode::Lid, lid_to_pn_map);
366
367        self.add_recent_message(&to, &request_id, &message).await;
368
369        let device_store_arc = self.persistence_manager.get_device_arc().await;
370        let to_str = to.to_string();
371
372        let force_skdm = {
373            use wacore::libsignal::store::sender_key_name::SenderKeyName;
374            // Sender key name tracks the addressing mode of the group stanza.
375            // Since status now uses LID addressing (see send_status_message
376            // header), the key is stored under own_lid, matching the address
377            // prepare_group_stanza derives internally.
378            let sender_address = own_lid.to_protocol_address();
379            let sender_key_name = SenderKeyName::from_parts(&to_str, sender_address.as_str());
380
381            let device_guard = device_store_arc.read().await;
382            let key_exists = self
383                .signal_cache
384                .get_sender_key(&sender_key_name, &*device_guard.backend)
385                .await?
386                .is_some();
387
388            !key_exists
389        };
390
391        let mut store_adapter = self.signal_adapter_from(device_store_arc.clone());
392        let mut stores = store_adapter.as_signal_stores();
393
394        // Determine which devices need SKDM using the unified per-device map
395        let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
396            None
397        } else {
398            self.resolve_skdm_targets(&to_str, &group_info, &own_lid)
399                .await
400        };
401
402        // `<meta status_setting>` describes the POSTER's privacy on their own
403        // status. Reactions go through WA Web's addon path and never visit
404        // `WAWebEncryptAndSendStatusMsg`; attaching the meta on a reaction
405        // gets the stanza NACK'd with 479 (SmaxInvalid). Revokes also skip it.
406        let extra_stanza_nodes = if wacore::send::status_carries_privacy_meta(&message) {
407            vec![
408                NodeBuilder::new("meta")
409                    .attr("status_setting", options.privacy.as_str())
410                    .build(),
411            ]
412        } else {
413            vec![]
414        };
415
416        let prepared = match wacore::send::prepare_group_stanza(
417            &*self.runtime,
418            &mut stores,
419            self,
420            &mut group_info,
421            &own_jid,
422            &own_lid,
423            account_info.as_ref(),
424            to.clone(),
425            &message,
426            request_id.clone(),
427            force_skdm,
428            skdm_target_devices,
429            None,
430            &extra_stanza_nodes,
431        )
432        .await
433        {
434            Ok(prepared) => prepared,
435            Err(e) => {
436                if let Some(SignalProtocolError::NoSenderKeyState(_)) =
437                    e.downcast_ref::<SignalProtocolError>()
438                {
439                    log::warn!("No sender key for status broadcast, forcing distribution.");
440
441                    if let Err(e) = self
442                        .persistence_manager
443                        .clear_sender_key_devices(&to_str)
444                        .await
445                    {
446                        log::warn!(
447                            "Failed to clear status SKDM recipients for {}: {:?}",
448                            to_str,
449                            e
450                        );
451                    }
452                    self.sender_key_device_cache.invalidate(&to_str).await;
453
454                    let mut store_adapter_retry =
455                        self.signal_adapter_from(device_store_arc.clone());
456                    let mut stores_retry = store_adapter_retry.as_signal_stores();
457
458                    wacore::send::prepare_group_stanza(
459                        &*self.runtime,
460                        &mut stores_retry,
461                        self,
462                        &mut group_info,
463                        &own_jid,
464                        &own_lid,
465                        account_info.as_ref(),
466                        to.clone(),
467                        &message,
468                        request_id.clone(),
469                        true,
470                        None,
471                        None,
472                        &extra_stanza_nodes,
473                    )
474                    .await?
475                } else {
476                    return Err(e);
477                }
478            }
479        };
480
481        let stanza = self
482            .ensure_status_participants(prepared.node, &group_info)
483            .await?;
484
485        let ack = if let Some(phash) = stanza
486            .attrs()
487            .optional_string("phash")
488            .map(|s| s.into_owned())
489        {
490            let rx = self.register_ack_waiter(&request_id).await;
491            Some((rx, phash))
492        } else {
493            None
494        };
495
496        if let Err(e) = self.send_node(stanza).await {
497            if ack.is_some() {
498                self.response_waiters.lock().await.remove(&request_id);
499            }
500            return Err(e.into());
501        }
502
503        if let Some((rx, phash)) = ack {
504            self.spawn_phash_validation(rx, phash, to.clone(), true, request_id.clone());
505        }
506
507        self.update_sender_key_devices(&to_str, &prepared.skdm_devices)
508            .await;
509
510        for user in &prepared.stale_device_users {
511            self.invalidate_device_cache(user).await;
512        }
513
514        self.flush_signal_cache_logged("send_status_message", None)
515            .await;
516
517        Ok(SendResult {
518            message_id: request_id,
519            to,
520        })
521    }
522
523    /// Resolve which devices need SKDM. Returns `None` for full distribution
524    /// (no cache data), or `Some(devices)` listing devices that need fresh SKDM.
525    ///
526    /// For LID mode, uses `group_info.phone_jid_for_lid_user` to query devices
527    /// via PN when available (LID usync is unreliable for own JID), then
528    /// converts the result back to LID. Same fallback as `prepare_group_stanza`.
529    async fn resolve_skdm_targets(
530        &self,
531        group_jid: &str,
532        group_info: &wacore::client::context::GroupInfo,
533        own_sending_jid: &Jid,
534    ) -> Option<Vec<Jid>> {
535        use crate::sender_key_device_cache::SenderKeyDeviceMap;
536
537        // Atomic get-or-init: if another task invalidated the cache during our
538        // DB read, get_or_init's single-flight guarantee means the stale data
539        // won't be inserted — the invalidation wins and the next caller re-inits.
540        let pm = self.persistence_manager.clone();
541        let cached_map = self
542            .sender_key_device_cache
543            .get_or_init(group_jid, async {
544                let db_rows = pm
545                    .get_sender_key_devices(group_jid)
546                    .await
547                    .unwrap_or_else(|e| {
548                        log::warn!(
549                            "Failed to read sender key devices for {}: {:?}",
550                            group_jid,
551                            e
552                        );
553                        vec![]
554                    });
555                std::sync::Arc::new(SenderKeyDeviceMap::from_db_rows(&db_rows))
556            })
557            .await;
558
559        // No empty-cache early-exit: WA Web iterates an empty `senderKey` Map
560        // as `false` per participant, so the filter below must run unconditionally.
561        let is_lid_mode = group_info.addressing_mode == wacore::types::message::AddressingMode::Lid;
562        let jids_to_resolve: Vec<Jid> = group_info
563            .participants
564            .iter()
565            .map(|jid| {
566                if is_lid_mode
567                    && jid.is_lid()
568                    && let Some(pn) = group_info.phone_jid_for_lid_user(&jid.user)
569                {
570                    return pn.to_non_ad();
571                }
572                jid.to_non_ad()
573            })
574            .collect();
575
576        match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
577            Ok(all_devices) => {
578                let all_devices: Vec<Jid> = if is_lid_mode {
579                    all_devices
580                        .into_iter()
581                        .map(|d| group_info.phone_device_jid_to_lid(&d))
582                        .collect()
583                } else {
584                    all_devices
585                };
586
587                let needs_skdm: Vec<Jid> = all_devices
588                    .into_iter()
589                    .filter(|device| {
590                        if device.is_hosted() {
591                            return false;
592                        }
593                        if device.user == own_sending_jid.user
594                            && device.device == own_sending_jid.device
595                        {
596                            return false;
597                        }
598                        // O(1) lookups into pre-indexed cache
599                        !cached_map
600                            .device_has_key(&device.user, device.device)
601                            .unwrap_or(false)
602                            || cached_map.is_user_forgotten(&device.user)
603                    })
604                    .collect();
605
606                if needs_skdm.is_empty() {
607                    Some(vec![])
608                } else {
609                    log::debug!(
610                        "Found {} devices needing SKDM for {}",
611                        needs_skdm.len(),
612                        group_jid
613                    );
614                    Some(needs_skdm)
615                }
616            }
617            Err(e) => {
618                log::warn!(
619                    "Failed to resolve devices for SKDM check in {}: {:?}",
620                    group_jid,
621                    e
622                );
623                None
624            }
625        }
626    }
627
628    /// Update sender key device tracking after a successful group/status send.
629    ///
630    /// Called AFTER `send_node()` succeeds (WA Web: `markHasSenderKey` after server ACK).
631    /// On full distribution, clears old state and marks the provided device list.
632    /// On partial, marks only the specific SKDM recipients.
633    ///
634    /// The `all_resolved_devices` parameter carries the exact device list resolved
635    /// for the stanza, avoiding a redundant `resolve_devices` call and preventing
636    /// the clear-then-fail race where a transient resolver failure leaves the map empty.
637    /// Mark devices as `has_key=true` after successful SKDM distribution.
638    async fn update_sender_key_devices(&self, group_jid: &str, devices: &[Jid]) {
639        if devices.is_empty() {
640            return;
641        }
642
643        if let Err(e) = self
644            .set_sender_key_status_for_devices(group_jid, devices, true, false)
645            .await
646        {
647            log::warn!(
648                "Failed to update sender key devices for {}: {:?}",
649                group_jid,
650                e
651            );
652        }
653        self.sender_key_device_cache.invalidate(group_jid).await;
654    }
655
656    /// Spawn a background task to validate phash from server ack.
657    /// On mismatch, invalidates sender key device cache and group info cache.
658    fn spawn_phash_validation(
659        &self,
660        rx: futures::channel::oneshot::Receiver<std::sync::Arc<wacore_binary::OwnedNodeRef>>,
661        our_phash: String,
662        jid: Jid,
663        invalidate_group_cache: bool,
664        message_id: String,
665    ) {
666        let Some(client) = self.self_weak.get().and_then(|w| w.upgrade()) else {
667            return;
668        };
669        self.runtime
670            .spawn(Box::pin(async move {
671                let ack = match wacore::runtime::timeout(
672                    &*client.runtime,
673                    std::time::Duration::from_secs(10),
674                    rx,
675                )
676                .await
677                {
678                    Ok(Ok(node)) => node,
679                    _ => {
680                        // Remove leaked waiter to prevent keepalive suppression
681                        client.response_waiters.lock().await.remove(&message_id);
682                        return;
683                    }
684                };
685                if let Some(server) = ack.get().get_attr("phash").map(|v| v.as_str())
686                    && server != our_phash
687                {
688                    log::warn!(
689                        "Phash mismatch for {jid}: ours={our_phash}, server={server}. Invalidating caches."
690                    );
691                    // DM phash covers both recipient + own devices
692                    // (WA Web: syncDeviceListJob([recipient, me]))
693                    if !jid.is_group() && !jid.is_status_broadcast() {
694                        client.invalidate_device_cache(&jid.user).await;
695                        if let Some(own_pn) =
696                            &client.persistence_manager.get_device_snapshot().await.pn
697                        {
698                            client.invalidate_device_cache(&own_pn.user).await;
699                        }
700                    }
701                    let jid_str = jid.to_string();
702                    // Cache-only invalidation re-reads the same stale rows on
703                    // the next send. Drop the persisted state too so the next
704                    // send takes the full-distribution path. If the clear
705                    // fails, fall back to deleting the bot's own sender key
706                    // for the chat — the next send will see `!key_exists`
707                    // and force_skdm without depending on the tracker.
708                    if jid.is_group() || jid.is_status_broadcast() {
709                        let cleared = client
710                            .persistence_manager
711                            .clear_sender_key_devices(&jid_str)
712                            .await;
713                        if let Err(e) = cleared {
714                            log::warn!(
715                                "phash mismatch: clear_sender_key_devices failed: {e} — \
716                                 deleting own sender key as fallback to force redistribution"
717                            );
718                            use wacore::libsignal::store::sender_key_name::SenderKeyName;
719                            use wacore::types::jid::JidExt;
720                            let snapshot =
721                                client.persistence_manager.get_device_snapshot().await;
722                            for own in snapshot.lid.iter().chain(snapshot.pn.iter()) {
723                                let sk =
724                                    SenderKeyName::from_parts(&jid_str, own.to_protocol_address().as_str());
725                                client.signal_cache.delete_sender_key(sk.cache_key()).await;
726                            }
727                            let _ = client
728                                .flush_signal_cache_logged("phash-mismatch-fallback", None)
729                                .await;
730                        }
731                    }
732                    client.sender_key_device_cache.invalidate(&jid_str).await;
733                    if invalidate_group_cache {
734                        client.get_group_cache().await.invalidate(&jid).await;
735                    }
736                }
737            }))
738            .detach();
739    }
740
741    /// Ensure the status stanza has a <participants> node listing all recipient
742    /// user JIDs. WhatsApp Web's `participantList` uses bare USER JIDs (not
743    /// device JIDs) — `<to jid="user@s.whatsapp.net"/>` — to tell the server
744    /// which users should receive the skmsg. The SKDM distribution list
745    /// (already in <participants>) uses device JIDs with <enc> children.
746    async fn ensure_status_participants(
747        &self,
748        stanza: wacore_binary::Node,
749        group_info: &wacore::client::context::GroupInfo,
750    ) -> Result<wacore_binary::Node, anyhow::Error> {
751        Ok(wacore::send::ensure_status_participants(stanza, group_info))
752    }
753
754    /// Delete a message for everyone in the chat (revoke).
755    ///
756    /// This sends a revoke protocol message that removes the message for all participants.
757    /// The message will show as "This message was deleted" for recipients.
758    ///
759    /// # Arguments
760    /// * `to` - The chat JID (DM or group)
761    /// * `message_id` - The ID of the message to delete
762    /// * `revoke_type` - Use `RevokeType::Sender` to delete your own message,
763    ///   or `RevokeType::Admin { original_sender }` to delete another user's message as group admin
764    pub async fn revoke_message(
765        &self,
766        to: Jid,
767        message_id: impl Into<String>,
768        revoke_type: RevokeType,
769    ) -> Result<(), anyhow::Error> {
770        let message_id = message_id.into();
771        self.require_pn().await?;
772
773        let (from_me, participant, edit_attr) = match &revoke_type {
774            RevokeType::Sender => {
775                // For sender revoke, participant is NOT set (from_me=true identifies it)
776                // This matches whatsmeow's BuildMessageKey behavior
777                (
778                    true,
779                    None,
780                    crate::types::message::EditAttribute::SenderRevoke,
781                )
782            }
783            RevokeType::Admin { original_sender } => {
784                // Admin revoke requires group context
785                if !to.is_group() {
786                    return Err(anyhow!("Admin revoke is only valid for group chats"));
787                }
788                // The protocolMessageKey.participant should match the original message's key exactly
789                // Do NOT convert LID to PN - pass through unchanged like WhatsApp Web does
790                let participant_str = original_sender.to_non_ad().to_string();
791                log::debug!(
792                    "Admin revoke: using participant {} for MessageKey",
793                    participant_str
794                );
795                (
796                    false,
797                    Some(participant_str),
798                    crate::types::message::EditAttribute::AdminRevoke,
799                )
800            }
801        };
802
803        let revoke_message = build_revoke_message(&to, from_me, message_id, participant);
804
805        // The revoke message stanza needs a NEW unique ID, not the message ID being revoked
806        // The message_id being revoked is already in protocolMessage.key.id
807        // Passing None generates a fresh stanza ID
808        //
809        // For admin revokes, force SKDM distribution to get the proper message structure
810        // with phash, <participants>, and <device-identity> that WhatsApp Web uses
811        let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
812        self.send_message_impl(
813            to,
814            &revoke_message,
815            None,
816            false,
817            force_skdm,
818            Some(edit_attr),
819            vec![],
820        )
821        .await
822    }
823
824    /// Pin a message in a chat for all participants.
825    pub async fn pin_message(
826        &self,
827        chat: Jid,
828        key: wa::MessageKey,
829        duration: PinDuration,
830    ) -> Result<(), anyhow::Error> {
831        self.send_pin(
832            chat,
833            key,
834            wa::message::pin_in_chat_message::Type::PinForAll,
835            duration.as_secs(),
836        )
837        .await
838    }
839
840    /// Unpin a previously pinned message.
841    pub async fn unpin_message(&self, chat: Jid, key: wa::MessageKey) -> Result<(), anyhow::Error> {
842        self.send_pin(
843            chat,
844            key,
845            wa::message::pin_in_chat_message::Type::UnpinForAll,
846            0,
847        )
848        .await
849    }
850
851    async fn send_pin(
852        &self,
853        chat: Jid,
854        key: wa::MessageKey,
855        pin_type: wa::message::pin_in_chat_message::Type,
856        duration_secs: u32,
857    ) -> Result<(), anyhow::Error> {
858        let message = wa::Message {
859            pin_in_chat_message: Some(wa::message::PinInChatMessage {
860                key: Some(key),
861                r#type: Some(pin_type as i32),
862                sender_timestamp_ms: Some(wacore::time::now_millis()),
863            }),
864            message_context_info: Some(wa::MessageContextInfo {
865                message_add_on_duration_in_secs: Some(duration_secs),
866                ..Default::default()
867            }),
868            ..Default::default()
869        };
870
871        self.send_message_impl(
872            chat,
873            &message,
874            None,
875            false,
876            false,
877            Some(crate::types::message::EditAttribute::PinInChat),
878            vec![],
879        )
880        .await
881    }
882
883    #[allow(clippy::too_many_arguments)]
884    pub(crate) async fn send_message_impl(
885        &self,
886        to: Jid,
887        message: &wa::Message,
888        request_id_override: Option<String>,
889        peer: bool,
890        force_key_distribution: bool,
891        edit: Option<crate::types::message::EditAttribute>,
892        extra_stanza_nodes: Vec<Node>,
893    ) -> Result<(), anyhow::Error> {
894        // status@broadcast reactions fan out pairwise to the author's devices;
895        // status posts keep going through send_status_message (owns recipients).
896        let (to, is_status_addon) = if to.is_status_broadcast() {
897            let author = message
898                .reaction_message
899                .as_ref()
900                .and_then(|rm| rm.key.as_ref())
901                .and_then(|k| k.participant.as_ref())
902                .and_then(|p| p.parse::<Jid>().ok())
903                .filter(|jid| jid.is_pn() || jid.is_lid())
904                .ok_or_else(|| {
905                    anyhow!(
906                        "send_message to status@broadcast requires \
907                         reaction_message.key.participant = status author (user JID). \
908                         Use client.status() for posting new statuses."
909                    )
910                })?;
911            (author, true)
912        } else {
913            (to, false)
914        };
915
916        // Generate request ID early (doesn't need lock)
917        let request_id = match request_id_override {
918            Some(id) => id,
919            None => self.generate_message_id().await,
920        };
921
922        // SKDM update data — only populated for group sends, deferred until after send_node().
923        // This matches WhatsApp Web which only calls markHasSenderKey() after server ACK.
924        struct SkdmUpdate {
925            to_str: String,
926            devices: Vec<Jid>,
927            stale_users: Vec<String>,
928        }
929        let mut skdm_update: Option<SkdmUpdate> = None;
930        let mut should_issue_tc_token_after_send = false;
931        let mut used_cached_tc_token_key: Option<String> = None;
932        let tc_issue_target = to.clone();
933
934        let mut dm_phash: Option<String> = None;
935        let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
936            // Peer messages are only valid for individual users, not groups
937            // Resolve encryption JID and acquire lock ONLY for encryption
938            let encryption_jid = self.resolve_encryption_jid(&to).await;
939            let signal_addr = encryption_jid.to_protocol_address();
940
941            let session_mutex = self.session_lock_for(signal_addr.as_str()).await;
942            let _session_guard = session_mutex.lock().await;
943
944            let mut store_adapter = self.signal_adapter().await;
945
946            wacore::send::prepare_peer_stanza(
947                &mut store_adapter.session_store,
948                &mut store_adapter.identity_store,
949                to,
950                &signal_addr,
951                message,
952                request_id,
953            )
954            .await?
955        } else if to.is_group() {
956            // Group messages: no client-level lock needed. The encrypt fan-out
957            // inside prepare_group_stanza touches a different Signal session
958            // per recipient device, so concurrent group sends to the same
959            // chat don't race on shared state.
960            let mut group_info = self.groups().query_info(&to).await?;
961
962            let mut device_snapshot = self.persistence_manager.get_device_snapshot().await;
963            let account_info = device_snapshot.account.take();
964            let own_jid = device_snapshot
965                .pn
966                .take()
967                .ok_or(crate::client::ClientError::NotLoggedIn)?;
968            let own_lid = device_snapshot
969                .lid
970                .take()
971                .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
972
973            // Store serialized message bytes for retry (lightweight)
974            self.add_recent_message(&to, &request_id, message).await;
975
976            let device_store_arc = self.persistence_manager.get_device_arc().await;
977            let to_str = to.to_string();
978
979            let (own_sending_jid, _) = match group_info.addressing_mode {
980                crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
981                crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
982            };
983
984            if !group_info
985                .participants
986                .iter()
987                .any(|participant| participant.is_same_user_as(&own_sending_jid))
988            {
989                group_info.participants.push(own_sending_jid.to_non_ad());
990            }
991
992            let force_skdm = {
993                use wacore::libsignal::store::sender_key_name::SenderKeyName;
994                let sender_address = own_sending_jid.to_protocol_address();
995                let sender_key_name = SenderKeyName::from_parts(&to_str, sender_address.as_str());
996
997                let device_guard = device_store_arc.read().await;
998                let record = self
999                    .signal_cache
1000                    .get_sender_key(&sender_key_name, &*device_guard.backend)
1001                    .await?;
1002                let key_exists = record.is_some();
1003
1004                // WA Web posts SenderKeyExpired with `PERIODIC_ROTATION` after
1005                // a chain advances past a threshold. Captured-js doesn't show
1006                // the value; 1000 mirrors common Signal hygiene defaults.
1007                const SENDER_KEY_ROTATION_THRESHOLD: u32 = 1000;
1008                let needs_rotation = record
1009                    .and_then(|mut r| r.sender_key_state_mut().ok().cloned())
1010                    .and_then(|state| state.sender_chain_key().map(|ck| ck.iteration()))
1011                    .is_some_and(|iter| iter >= SENDER_KEY_ROTATION_THRESHOLD);
1012                drop(device_guard);
1013
1014                if needs_rotation {
1015                    log::info!(
1016                        "Periodic sender-key rotation for {to} (chain iteration ≥ {SENDER_KEY_ROTATION_THRESHOLD})"
1017                    );
1018                    self.signal_cache
1019                        .delete_sender_key(sender_key_name.cache_key())
1020                        .await;
1021                    if let Err(e) = self
1022                        .persistence_manager
1023                        .clear_sender_key_devices(&to_str)
1024                        .await
1025                    {
1026                        log::warn!("periodic rotation: clear_sender_key_devices failed: {e}");
1027                    }
1028                    self.sender_key_device_cache.invalidate(&to_str).await;
1029                }
1030
1031                force_key_distribution || !key_exists || needs_rotation
1032            };
1033
1034            let mut store_adapter = self.signal_adapter_from(device_store_arc.clone());
1035
1036            let mut stores = store_adapter.as_signal_stores();
1037
1038            // Determine which devices need SKDM distribution using the unified
1039            // per-device sender key map (matches WA Web's participant.senderKey Map).
1040            let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
1041                None
1042            } else {
1043                self.resolve_skdm_targets(&to_str, &group_info, &own_sending_jid)
1044                    .await
1045            };
1046
1047            match wacore::send::prepare_group_stanza(
1048                &*self.runtime,
1049                &mut stores,
1050                self,
1051                &mut group_info,
1052                &own_jid,
1053                &own_lid,
1054                account_info.as_ref(),
1055                to.clone(),
1056                message,
1057                request_id.clone(),
1058                force_skdm,
1059                skdm_target_devices,
1060                edit.clone(),
1061                &extra_stanza_nodes,
1062            )
1063            .await
1064            {
1065                Ok(prepared) => {
1066                    skdm_update = Some(SkdmUpdate {
1067                        to_str: to_str.clone(),
1068                        devices: prepared.skdm_devices,
1069                        stale_users: prepared.stale_device_users,
1070                    });
1071                    prepared.node
1072                }
1073                Err(e) => {
1074                    if let Some(SignalProtocolError::NoSenderKeyState(_)) =
1075                        e.downcast_ref::<SignalProtocolError>()
1076                    {
1077                        log::warn!("No sender key for group {}, forcing distribution.", to);
1078
1079                        if let Err(e) = self
1080                            .persistence_manager
1081                            .clear_sender_key_devices(&to_str)
1082                            .await
1083                        {
1084                            log::warn!("Failed to clear SKDM recipients: {:?}", e);
1085                        }
1086                        self.sender_key_device_cache.invalidate(&to_str).await;
1087
1088                        let mut store_adapter_retry =
1089                            self.signal_adapter_from(device_store_arc.clone());
1090                        let mut stores_retry = store_adapter_retry.as_signal_stores();
1091
1092                        let retry_prepared = wacore::send::prepare_group_stanza(
1093                            &*self.runtime,
1094                            &mut stores_retry,
1095                            self,
1096                            &mut group_info,
1097                            &own_jid,
1098                            &own_lid,
1099                            account_info.as_ref(),
1100                            to,
1101                            message,
1102                            request_id,
1103                            true,
1104                            None,
1105                            edit.clone(),
1106                            &extra_stanza_nodes,
1107                        )
1108                        .await?;
1109
1110                        skdm_update = Some(SkdmUpdate {
1111                            to_str,
1112                            devices: retry_prepared.skdm_devices,
1113                            stale_users: retry_prepared.stale_device_users,
1114                        });
1115                        retry_prepared.node
1116                    } else {
1117                        return Err(e);
1118                    }
1119                }
1120            }
1121        } else {
1122            // Per-device locking to match decrypt path (message.rs:684),
1123            // preventing ratchet desync on concurrent send/receive.
1124
1125            // Status reaction retries arrive with `from=status@broadcast`;
1126            // cache under the broadcast chat so take_recent_message hits.
1127            if is_status_addon {
1128                self.add_recent_message(&Jid::status_broadcast(), &request_id, message)
1129                    .await;
1130            } else {
1131                self.add_recent_message(&to, &request_id, message).await;
1132            }
1133
1134            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1135            let own_jid = device_snapshot
1136                .pn
1137                .as_ref()
1138                .ok_or(crate::client::ClientError::NotLoggedIn)?;
1139
1140            // PN→LID mapping (WA Web: ManagePhoneNumberMappingJob)
1141            if to.is_pn() && self.lid_pn_cache.get_current_lid(&to.user).await.is_none() {
1142                let sid = self.generate_request_id();
1143                let spec = wacore::iq::usync::LidQuerySpec::new(vec![to.to_non_ad()], sid);
1144                // Best-effort: WA Web also catches and warns on failure
1145                match self.execute(spec).await {
1146                    Ok(resp) => {
1147                        for mapping in &resp.lid_mappings {
1148                            if let Err(e) = self
1149                                .add_lid_pn_mapping(
1150                                    &mapping.lid,
1151                                    &mapping.phone_number,
1152                                    crate::lid_pn_cache::LearningSource::Usync,
1153                                )
1154                                .await
1155                            {
1156                                log::warn!(
1157                                    "Failed to persist LID mapping {} -> {}: {e:?}",
1158                                    mapping.phone_number,
1159                                    mapping.lid
1160                                );
1161                            }
1162                        }
1163                    }
1164                    Err(e) => {
1165                        log::warn!("LID query failed for {}, falling back to PN: {e:?}", to);
1166                    }
1167                }
1168            }
1169
1170            // DM fanout: all known recipient devices + own companions.
1171            // WAWebSendUserMsgJob reads local device table only on the send
1172            // path; WAWebDBDeviceListFanout excludes hosted devices.
1173            let recipient_bare = self.resolve_encryption_jid(&to).await.to_non_ad();
1174
1175            // Local registry first; network warm only on miss to avoid
1176            // unnecessary LID-migration side effects from get_user_devices
1177            let mut recipient_cached = self.get_devices_from_registry(&recipient_bare).await;
1178            if recipient_cached.is_none() {
1179                let _ = self.get_user_devices(std::slice::from_ref(&to)).await;
1180                recipient_cached = self.get_devices_from_registry(&recipient_bare).await;
1181            }
1182
1183            let mut own_cached = self.get_devices_from_registry(own_jid).await;
1184            if own_cached.is_none() {
1185                let _ = self.get_user_devices(std::slice::from_ref(own_jid)).await;
1186                own_cached = self.get_devices_from_registry(own_jid).await;
1187            }
1188
1189            // Build device list, filter hosted in-place, reuse Vecs
1190            let mut all_dm_jids = match recipient_cached {
1191                Some(mut devices) => {
1192                    devices.retain(|j| !j.is_hosted());
1193                    devices
1194                }
1195                // No record at all — bare JID, server handles fanout
1196                None => vec![recipient_bare],
1197            };
1198
1199            if let Some(mut own_devices) = own_cached {
1200                own_devices.retain(|j| !j.is_hosted());
1201                all_dm_jids.append(&mut own_devices);
1202            }
1203
1204            // Exclude exact sender device (WA Web: isMeDevice in getFanOutList)
1205            // so ensure_e2e_sessions never creates a self-session
1206            let own_lid = device_snapshot.lid.as_ref();
1207            all_dm_jids.retain(|j| {
1208                let is_sender = (j.is_same_user_as(own_jid) && j.device == own_jid.device)
1209                    || own_lid.is_some_and(|lid| j.is_same_user_as(lid) && j.device == lid.device);
1210                !is_sender
1211            });
1212
1213            // Dedup for self-DMs: recipient and own device lists overlap when
1214            // sending to own account. `participant_list_hash` sorts internally,
1215            // so reordering here is safe.
1216            wacore::types::jid::sort_dedup_by_device(&mut all_dm_jids);
1217
1218            self.ensure_e2e_sessions(&all_dm_jids).await?;
1219
1220            let mut extra_stanza_nodes = extra_stanza_nodes;
1221            // tctoken applies to 1:1 chats; status reactions share the fanout
1222            // path but WA Web does not attach tctokens to them.
1223            if !to.is_group() && !to.is_newsletter() && !is_status_addon {
1224                let (should_issue_after_send, cached_token_key) = self
1225                    .maybe_include_tc_token(&to, &mut extra_stanza_nodes)
1226                    .await;
1227                should_issue_tc_token_after_send = should_issue_after_send;
1228                if should_issue_after_send {
1229                    used_cached_tc_token_key = cached_token_key;
1230                }
1231            }
1232            if should_issue_tc_token_after_send {
1233                debug!(target: "Client/TcToken", "Scheduled tc token issuance after send for {}", to);
1234            }
1235
1236            let lock_jids = self.build_session_lock_keys(&all_dm_jids).await;
1237            let _session_mutexes = self.session_mutexes_for(&lock_jids).await;
1238            let mut _session_guards = Vec::with_capacity(_session_mutexes.len());
1239            for mutex in &_session_mutexes {
1240                _session_guards.push(mutex.lock().await);
1241            }
1242
1243            let mut store_adapter = self.signal_adapter().await;
1244
1245            let mut stores = store_adapter.as_signal_stores();
1246
1247            let prepared = wacore::send::prepare_dm_stanza(
1248                &*self.runtime,
1249                &mut stores,
1250                self,
1251                own_jid,
1252                device_snapshot.lid.as_ref(),
1253                device_snapshot.account.as_ref(),
1254                to,
1255                message,
1256                request_id,
1257                edit,
1258                &extra_stanza_nodes,
1259                all_dm_jids,
1260            )
1261            .await?;
1262            dm_phash = prepared.phash;
1263            prepared.node
1264        };
1265
1266        let ack = if let Some(phash) = dm_phash
1267            && let Some(msg_id) = stanza_to_send
1268                .attrs()
1269                .optional_string("id")
1270                .map(|s| s.into_owned())
1271        {
1272            let rx = self.register_ack_waiter(&msg_id).await;
1273            Some((rx, phash, msg_id))
1274        } else {
1275            None
1276        };
1277
1278        // Server expects the outer `to` as the broadcast chat even though
1279        // encryption targeted the author's devices (mirrors incoming `from`).
1280        let mut stanza_to_send = stanza_to_send;
1281        if is_status_addon {
1282            stanza_to_send.attrs.insert("to", Jid::status_broadcast());
1283        }
1284
1285        if let Err(e) = self.send_node(stanza_to_send).await {
1286            if let Some((_, _, ref msg_id)) = ack {
1287                self.response_waiters.lock().await.remove(msg_id);
1288            }
1289            return Err(e.into());
1290        }
1291
1292        if let Some((rx, phash, msg_id)) = ack {
1293            // Group sends also invalidate group cache on mismatch — server's
1294            // participant set diverged, the next send needs a fresh query.
1295            let invalidate_group = tc_issue_target.is_group();
1296            self.spawn_phash_validation(
1297                rx,
1298                phash,
1299                tc_issue_target.clone(),
1300                invalidate_group,
1301                msg_id,
1302            );
1303        }
1304
1305        if let Some(update) = skdm_update {
1306            self.update_sender_key_devices(&update.to_str, &update.devices)
1307                .await;
1308            for user in &update.stale_users {
1309                self.invalidate_device_cache(user).await;
1310            }
1311        }
1312
1313        // Flush cached Signal state to DB after encryption
1314        self.flush_signal_cache_logged("send_message_impl", None)
1315            .await;
1316
1317        // Issue new tc token after send if a bucket boundary was crossed.
1318        // Fire-and-forget so send_message returns without waiting for the IQ
1319        if should_issue_tc_token_after_send {
1320            if let Some(client) = self.self_weak.get().and_then(|w| w.upgrade()) {
1321                let target = tc_issue_target;
1322                let cached_key = used_cached_tc_token_key;
1323                self.runtime
1324                    .spawn(Box::pin(async move {
1325                        let issued_ok = client.issue_tc_token_after_send(&target).await;
1326                        if issued_ok && let Some(token_key) = cached_key {
1327                            client.mark_tc_token_used_after_send(&token_key).await;
1328                        }
1329                    }))
1330                    .detach();
1331            } else {
1332                log::debug!(target: "Client/TcToken", "Skipping fire-and-forget issuance: client dropped");
1333            }
1334        }
1335
1336        Ok(())
1337    }
1338
1339    /// Look up and include a privacy token in outgoing 1:1 message stanza nodes.
1340    ///
1341    /// Follows WA Web's fallback chain (MsgCreateFanoutStanza.js):
1342    ///   1. tctoken — from stored trusted contact token (if valid, non-expired)
1343    ///   2. cstoken — HMAC-SHA256(nct_salt, recipient_lid) fallback for first-contact
1344    ///   3. No token — message sent without token (server may return 463)
1345    ///
1346    /// Returns whether we should issue a new tc token after send, and the cache key
1347    /// of the attached valid tc token when that token should be marked as used.
1348    async fn maybe_include_tc_token(
1349        &self,
1350        to: &Jid,
1351        extra_nodes: &mut Vec<Node>,
1352    ) -> (bool, Option<String>) {
1353        use wacore::iq::props::config_codes;
1354        use wacore::iq::tctoken::{
1355            build_cs_token_node, build_tc_token_node, compute_cs_token, is_tc_token_expired_with,
1356            should_send_new_tc_token_with,
1357        };
1358
1359        // Skip for own JID — no need to send privacy token to ourselves
1360        let snapshot = self.persistence_manager.get_device_snapshot().await;
1361        let is_self = snapshot
1362            .pn
1363            .as_ref()
1364            .is_some_and(|pn| pn.is_same_user_as(to))
1365            || snapshot
1366                .lid
1367                .as_ref()
1368                .is_some_and(|lid| lid.is_same_user_as(to));
1369        if is_self {
1370            return (false, None);
1371        }
1372
1373        // Bots and status broadcast don't participate in the privacy token system
1374        if to.is_bot() || to.is_status_broadcast() {
1375            return (false, None);
1376        }
1377
1378        // Resolve the destination to a LID user string once — reused for
1379        // tctoken lookup, issuance, and cstoken HMAC input.
1380        let cached_lid = if to.is_lid() {
1381            None
1382        } else {
1383            self.lid_pn_cache.get_current_lid(&to.user).await
1384        };
1385        let resolved_lid_user: Option<&str> = if to.is_lid() {
1386            Some(&to.user)
1387        } else {
1388            cached_lid.as_deref()
1389        };
1390        let token_jid: &str = resolved_lid_user.unwrap_or(&to.user);
1391
1392        let backend = self.persistence_manager.backend();
1393        let tc_config = self.tc_token_config().await;
1394
1395        // Look up existing tctoken
1396        let existing = match backend.get_tc_token(token_jid).await {
1397            Ok(entry) => entry,
1398            Err(e) => {
1399                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1400                None
1401            }
1402        };
1403
1404        // Issuance scheduling is independent of the AB prop — WA Web's sendTcToken
1405        // in MsgJob.js fires regardless of whether a token was attached to the stanza
1406        let should_issue_after_send = should_send_new_tc_token_with(
1407            existing.as_ref().and_then(|entry| entry.sender_timestamp),
1408            &tc_config,
1409        );
1410
1411        // AB prop gates stanza inclusion only (not issuance scheduling)
1412        let token_send_enabled = self
1413            .ab_props
1414            .is_enabled_or(config_codes::PRIVACY_TOKEN_ON_ALL_1_ON_1_MESSAGES, false)
1415            .await;
1416
1417        if token_send_enabled {
1418            match existing {
1419                Some(ref entry)
1420                    if !is_tc_token_expired_with(entry.token_timestamp, &tc_config)
1421                        && !entry.token.is_empty() =>
1422                {
1423                    extra_nodes.push(build_tc_token_node(&entry.token));
1424                    return (should_issue_after_send, Some(token_jid.to_string()));
1425                }
1426                _ => {
1427                    // cstoken fallback — gated by wa_nct_token_send_enabled
1428                    let nct_send_enabled = self
1429                        .ab_props
1430                        .is_enabled_or(config_codes::NCT_TOKEN_SEND_ENABLED, false)
1431                        .await;
1432
1433                    if nct_send_enabled
1434                        && let Some(salt) = &snapshot.nct_salt
1435                        && let Some(lid_user) = &resolved_lid_user
1436                    {
1437                        // HMAC input is "user@lid" (account LID without device suffix),
1438                        // matching WA Web's accountLid.toString()
1439                        let recipient_lid =
1440                            wacore_binary::Jid::new(*lid_user, Server::Lid).to_string();
1441                        let cs_token = compute_cs_token(salt, &recipient_lid);
1442                        extra_nodes.push(build_cs_token_node(&cs_token));
1443                        log::debug!(target: "Client/CsToken", "Attached cstoken for {} (NCT fallback)", to);
1444                    } else {
1445                        log::debug!(target: "Client/CsToken", "No tctoken or NCT salt/LID available for {}", to);
1446                    }
1447                }
1448            }
1449        }
1450
1451        (should_issue_after_send, None)
1452    }
1453
1454    /// Returns `true` if the issuance IQ succeeded.
1455    async fn issue_tc_token_after_send(&self, to: &Jid) -> bool {
1456        use wacore::iq::tctoken::IssuePrivacyTokensSpec;
1457
1458        // Bots and status broadcast don't participate in the privacy token system
1459        if to.is_bot() || to.is_status_broadcast() {
1460            return false;
1461        }
1462
1463        let issuance_jid = self.resolve_issuance_jid(to).await;
1464        let Ok(response) = self
1465            .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(
1466                &issuance_jid,
1467            )))
1468            .await
1469        else {
1470            log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}", issuance_jid);
1471            return false;
1472        };
1473
1474        self.store_issued_tc_tokens(&response.tokens).await
1475    }
1476
1477    /// Returns true if at least one token was persisted.
1478    pub(crate) async fn store_issued_tc_tokens(
1479        &self,
1480        tokens: &[wacore::iq::tctoken::ReceivedTcToken],
1481    ) -> bool {
1482        use wacore::store::traits::TcTokenEntry;
1483
1484        if tokens.is_empty() {
1485            return false;
1486        }
1487
1488        let backend = self.persistence_manager.backend();
1489        let now = wacore::time::now_secs();
1490        let mut any_stored = false;
1491        for received in tokens {
1492            if received.token.is_empty() {
1493                log::warn!(target: "Client/TcToken", "Server returned empty tc_token for {}, skipping", received.jid);
1494                continue;
1495            }
1496
1497            let entry = TcTokenEntry {
1498                token: received.token.clone(),
1499                token_timestamp: received.timestamp,
1500                sender_timestamp: Some(now),
1501            };
1502
1503            if let Err(e) = backend.put_tc_token(&received.jid.user, &entry).await {
1504                log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
1505            } else {
1506                any_stored = true;
1507            }
1508        }
1509        any_stored
1510    }
1511
1512    /// Variant of [`store_issued_tc_tokens`] that preserves the original
1513    /// sender_timestamp for identity-change re-issuance (bucket continuity).
1514    async fn store_issued_tc_tokens_with_sender_ts(
1515        &self,
1516        tokens: &[wacore::iq::tctoken::ReceivedTcToken],
1517        sender_ts: i64,
1518    ) {
1519        use wacore::store::traits::TcTokenEntry;
1520
1521        let backend = self.persistence_manager.backend();
1522        for received in tokens {
1523            if received.token.is_empty() {
1524                continue;
1525            }
1526            let entry = TcTokenEntry {
1527                token: received.token.clone(),
1528                token_timestamp: received.timestamp,
1529                sender_timestamp: Some(sender_ts),
1530            };
1531            if let Err(e) = backend.put_tc_token(&received.jid.user, &entry).await {
1532                log::warn!(target: "Client/TcToken", "Failed to store re-issued tc_token: {e}");
1533            }
1534        }
1535    }
1536
1537    async fn mark_tc_token_used_after_send(&self, token_key: &str) {
1538        use wacore::store::traits::TcTokenEntry;
1539
1540        let backend = self.persistence_manager.backend();
1541        let existing = match backend.get_tc_token(token_key).await {
1542            Ok(entry) => entry,
1543            Err(e) => {
1544                log::warn!(target: "Client/TcToken", "Failed to reload tc_token for {}: {e}", token_key);
1545                return;
1546            }
1547        };
1548
1549        let Some(entry) = existing else {
1550            return;
1551        };
1552        if entry.token.is_empty() {
1553            return;
1554        }
1555
1556        let updated_entry = TcTokenEntry {
1557            sender_timestamp: Some(wacore::time::now_secs()),
1558            ..entry
1559        };
1560        if let Err(e) = backend.put_tc_token(token_key, &updated_entry).await {
1561            log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp for {}: {e}", token_key);
1562        }
1563    }
1564
1565    /// Re-issue tctoken after a contact's device identity changes.
1566    /// Only re-issues if we previously sent a token (sender_timestamp valid).
1567    /// Uses session_locks to deduplicate concurrent spawns for the same sender.
1568    pub(crate) async fn reissue_tc_token_after_identity_change(&self, sender: &Jid) {
1569        use wacore::iq::tctoken::{IssuePrivacyTokensSpec, is_sender_tc_token_expired};
1570
1571        // Dedup via session_locks — bare JID won't collide with protocol addresses ("user:device")
1572        let bare = sender.to_non_ad().to_string();
1573        let mutex = self.session_lock_for(&bare).await;
1574        let Some(_guard) = mutex.try_lock() else {
1575            return;
1576        };
1577
1578        let resolved_lid = if sender.is_lid() {
1579            None
1580        } else {
1581            self.lid_pn_cache.get_current_lid(&sender.user).await
1582        };
1583        let token_jid: &str = resolved_lid.as_deref().unwrap_or(&sender.user);
1584
1585        let backend = self.persistence_manager.backend();
1586        let entry = match backend.get_tc_token(token_jid).await {
1587            Ok(Some(e)) => e,
1588            _ => return,
1589        };
1590
1591        let Some(sender_ts) = entry.sender_timestamp else {
1592            return;
1593        };
1594
1595        // Sender-side expiration (may use different bucket config than receiver)
1596        let tc_config = self.tc_token_config().await;
1597        if is_sender_tc_token_expired(sender_ts, &tc_config) {
1598            return;
1599        }
1600
1601        // Use stored sender_ts so the bucket window isn't advanced
1602        let issuance_jid = self.resolve_issuance_jid(sender).await;
1603        match self
1604            .execute(IssuePrivacyTokensSpec::with_timestamp(
1605                std::slice::from_ref(&issuance_jid),
1606                sender_ts,
1607            ))
1608            .await
1609        {
1610            Ok(response) => {
1611                // Keep original sender_ts so the bucket window isn't advanced
1612                self.store_issued_tc_tokens_with_sender_ts(&response.tokens, sender_ts)
1613                    .await;
1614                log::debug!(
1615                    target: "Client/TcToken",
1616                    "Re-issued tctoken after identity change for {}",
1617                    sender
1618                );
1619            }
1620            Err(e) => {
1621                log::debug!(
1622                    target: "Client/TcToken",
1623                    "Failed to re-issue tctoken after identity change for {}: {e}",
1624                    sender
1625                );
1626            }
1627        }
1628    }
1629
1630    /// Look up a valid (non-expired) tctoken for a JID. Returns the raw token bytes if found.
1631    ///
1632    /// Used by profile picture, presence subscribe, and other features that need tctoken gating.
1633    pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
1634        use wacore::iq::tctoken::is_tc_token_expired_with;
1635
1636        let resolved_lid = if jid.is_lid() {
1637            None
1638        } else {
1639            self.lid_pn_cache.get_current_lid(&jid.user).await
1640        };
1641        let token_jid: &str = resolved_lid.as_deref().unwrap_or(&jid.user);
1642
1643        let tc_config = self.tc_token_config().await;
1644        let backend = self.persistence_manager.backend();
1645        match backend.get_tc_token(token_jid).await {
1646            Ok(Some(entry))
1647                if !entry.token.is_empty()
1648                    && !is_tc_token_expired_with(entry.token_timestamp, &tc_config) =>
1649            {
1650                Some(entry.token)
1651            }
1652            Ok(_) => None,
1653            Err(e) => {
1654                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1655                None
1656            }
1657        }
1658    }
1659
1660    /// Build sorted, deduplicated per-device session lock keys.
1661    /// INVARIANT: Keys are sorted to prevent deadlocks when acquiring multiple
1662    /// session locks (e.g. DM sends that encrypt for recipient + own devices).
1663    /// Resolve encryption JIDs and sort for deadlock-free lock acquisition.
1664    pub(crate) async fn build_session_lock_keys(&self, device_jids: &[Jid]) -> Vec<Jid> {
1665        let mut keys: Vec<Jid> = Vec::with_capacity(device_jids.len());
1666        for jid in device_jids {
1667            keys.push(self.resolve_encryption_jid(jid).await);
1668        }
1669        keys.sort_unstable_by(wacore::types::jid::cmp_for_lock_order);
1670        keys.dedup_by(|a, b| wacore::types::jid::cmp_for_lock_order(a, b).is_eq());
1671        keys
1672    }
1673
1674    /// Fetch per-device session mutexes in deadlock-free order.
1675    pub(crate) async fn session_mutexes_for(
1676        &self,
1677        jids: &[Jid],
1678    ) -> Vec<std::sync::Arc<async_lock::Mutex<()>>> {
1679        let mut mutexes = Vec::with_capacity(jids.len());
1680        let mut buf = wacore::types::jid::make_address_buffer();
1681        for jid in jids {
1682            wacore::types::jid::write_protocol_address_to(jid, &mut buf);
1683            mutexes.push(self.session_lock_for(&buf).await);
1684        }
1685        mutexes
1686    }
1687
1688    /// Build tctoken timing config from AB props, falling back to defaults.
1689    pub(crate) async fn tc_token_config(&self) -> wacore::iq::tctoken::TcTokenConfig {
1690        use wacore::iq::props::config_codes;
1691        use wacore::iq::tctoken::{TC_TOKEN_BUCKET_DURATION, TC_TOKEN_NUM_BUCKETS, TcTokenConfig};
1692
1693        TcTokenConfig {
1694            bucket_duration: self
1695                .ab_props
1696                .get_int(config_codes::TCTOKEN_DURATION, TC_TOKEN_BUCKET_DURATION)
1697                .await,
1698            num_buckets: self
1699                .ab_props
1700                .get_int(config_codes::TCTOKEN_NUM_BUCKETS, TC_TOKEN_NUM_BUCKETS)
1701                .await,
1702            sender_bucket_duration: self
1703                .ab_props
1704                .get_int(
1705                    config_codes::TCTOKEN_DURATION_SENDER,
1706                    TC_TOKEN_BUCKET_DURATION,
1707                )
1708                .await,
1709            sender_num_buckets: self
1710                .ab_props
1711                .get_int(
1712                    config_codes::TCTOKEN_NUM_BUCKETS_SENDER,
1713                    TC_TOKEN_NUM_BUCKETS,
1714                )
1715                .await,
1716        }
1717        .clamped()
1718    }
1719
1720    /// Resolve a JID to its LID form for tc_token storage.
1721    async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1722        if jid.is_lid() {
1723            return jid.to_non_ad();
1724        }
1725
1726        if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1727            Jid::new(&lid_user, Server::Lid)
1728        } else {
1729            jid.to_non_ad()
1730        }
1731    }
1732
1733    /// Resolve the target JID for privacy token issuance.
1734    /// Gated by `lid_trusted_token_issue_to_lid` — LID when true, PN when false.
1735    async fn resolve_issuance_jid(&self, jid: &Jid) -> Jid {
1736        use wacore::iq::props::config_codes;
1737
1738        // Default true: issue to LID by default (safer — server accepts both)
1739        let issue_to_lid = self
1740            .ab_props
1741            .is_enabled_or(config_codes::LID_TRUSTED_TOKEN_ISSUE_TO_LID, true)
1742            .await;
1743
1744        let resolved = if issue_to_lid {
1745            self.resolve_to_lid_jid(jid).await
1746        } else if jid.is_lid() {
1747            if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
1748                Jid::new(&pn, Server::Pn)
1749            } else {
1750                jid.to_non_ad()
1751            }
1752        } else {
1753            jid.to_non_ad()
1754        };
1755        // Issuance targets bare account JIDs, not device-scoped ones
1756        resolved.to_non_ad()
1757    }
1758}
1759
1760#[cfg(test)]
1761mod tests {
1762    use super::*;
1763    use std::str::FromStr;
1764
1765    #[tokio::test]
1766    async fn send_message_to_status_without_reaction_errors() {
1767        let client = crate::test_utils::create_test_client().await;
1768        let to = Jid::status_broadcast();
1769        let err = client
1770            .send_message(
1771                to,
1772                wa::Message {
1773                    conversation: Some("hi".into()),
1774                    ..Default::default()
1775                },
1776            )
1777            .await
1778            .expect_err("status@broadcast without reaction must error");
1779        let msg = format!("{err}");
1780        assert!(
1781            msg.contains("reaction_message") || msg.contains("status"),
1782            "unexpected error: {msg}"
1783        );
1784    }
1785
1786    #[tokio::test]
1787    async fn send_message_to_status_reaction_rejects_non_user_participant() {
1788        let client = crate::test_utils::create_test_client().await;
1789        let to = Jid::status_broadcast();
1790        let err = client
1791            .send_message(
1792                to,
1793                wa::Message {
1794                    reaction_message: Some(wa::message::ReactionMessage {
1795                        key: Some(wa::MessageKey {
1796                            remote_jid: Some("status@broadcast".into()),
1797                            from_me: Some(false),
1798                            id: Some("ORIGID".into()),
1799                            participant: Some("120363040237990503@g.us".into()),
1800                        }),
1801                        text: Some("❤️".into()),
1802                        sender_timestamp_ms: Some(1),
1803                        ..Default::default()
1804                    }),
1805                    ..Default::default()
1806                },
1807            )
1808            .await
1809            .expect_err("group JID as participant must error");
1810        assert!(
1811            format!("{err}").contains("user JID"),
1812            "expected user-JID error, got: {err}"
1813        );
1814    }
1815
1816    #[tokio::test]
1817    async fn send_message_to_status_reaction_without_participant_errors() {
1818        let client = crate::test_utils::create_test_client().await;
1819        let to = Jid::status_broadcast();
1820        let err = client
1821            .send_message(
1822                to,
1823                wa::Message {
1824                    reaction_message: Some(wa::message::ReactionMessage {
1825                        key: Some(wa::MessageKey {
1826                            remote_jid: Some("status@broadcast".into()),
1827                            from_me: Some(false),
1828                            id: Some("ORIGID".into()),
1829                            participant: None,
1830                        }),
1831                        text: Some("❤️".into()),
1832                        sender_timestamp_ms: Some(1),
1833                        ..Default::default()
1834                    }),
1835                    ..Default::default()
1836                },
1837            )
1838            .await
1839            .expect_err("reaction without key.participant must error");
1840        assert!(
1841            format!("{err}").contains("participant"),
1842            "expected participant error, got: {err}"
1843        );
1844    }
1845
1846    #[test]
1847    fn test_revoke_type_default_is_sender() {
1848        // RevokeType::Sender is the default (for deleting own messages)
1849        let revoke_type = RevokeType::default();
1850        assert_eq!(revoke_type, RevokeType::Sender);
1851    }
1852
1853    #[test]
1854    fn test_force_skdm_only_for_admin_revoke() {
1855        // Admin revokes require force_skdm=true to get proper message structure
1856        // with phash, <participants>, and <device-identity> that WhatsApp Web uses.
1857        // Without this, the server returns error 479.
1858        let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1859
1860        let sender_revoke = RevokeType::Sender;
1861        let admin_revoke = RevokeType::Admin {
1862            original_sender: sender_jid,
1863        };
1864
1865        // This matches the logic in revoke_message()
1866        let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1867        let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1868
1869        assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1870        assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1871    }
1872
1873    #[test]
1874    fn test_sender_revoke_message_key_structure() {
1875        // Sender revoke (edit="7"): from_me=true, participant=None
1876        // The sender is identified by from_me=true, no participant field needed
1877        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1878        let message_id = "3EB0ABC123".to_string();
1879
1880        let (from_me, participant, edit_attr) = match RevokeType::Sender {
1881            RevokeType::Sender => (
1882                true,
1883                None,
1884                crate::types::message::EditAttribute::SenderRevoke,
1885            ),
1886            RevokeType::Admin { original_sender } => (
1887                false,
1888                Some(original_sender.to_non_ad().to_string()),
1889                crate::types::message::EditAttribute::AdminRevoke,
1890            ),
1891        };
1892
1893        assert!(from_me, "Sender revoke must have from_me=true");
1894        assert!(
1895            participant.is_none(),
1896            "Sender revoke must NOT set participant"
1897        );
1898        assert_eq!(edit_attr.to_string_val(), "7");
1899
1900        let revoke_message = build_revoke_message(&to, from_me, message_id.clone(), participant);
1901
1902        let proto_msg = revoke_message.protocol_message.unwrap();
1903        let key = proto_msg.key.unwrap();
1904        assert_eq!(key.from_me, Some(true));
1905        assert_eq!(key.participant, None);
1906        assert_eq!(key.id, Some(message_id));
1907    }
1908
1909    #[test]
1910    fn test_admin_revoke_message_key_structure() {
1911        // Admin revoke (edit="8"): from_me=false, participant=original_sender
1912        // The participant field identifies whose message is being deleted
1913        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1914        let message_id = "3EB0ABC123".to_string();
1915        let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1916
1917        let revoke_type = RevokeType::Admin {
1918            original_sender: original_sender.clone(),
1919        };
1920        let (from_me, participant, edit_attr) = match revoke_type {
1921            RevokeType::Sender => (
1922                true,
1923                None,
1924                crate::types::message::EditAttribute::SenderRevoke,
1925            ),
1926            RevokeType::Admin { original_sender } => (
1927                false,
1928                Some(original_sender.to_non_ad().to_string()),
1929                crate::types::message::EditAttribute::AdminRevoke,
1930            ),
1931        };
1932
1933        assert!(!from_me, "Admin revoke must have from_me=false");
1934        assert!(
1935            participant.is_some(),
1936            "Admin revoke MUST set participant to original sender"
1937        );
1938        assert_eq!(edit_attr.to_string_val(), "8");
1939
1940        let revoke_message =
1941            build_revoke_message(&to, from_me, message_id.clone(), participant.clone());
1942
1943        let proto_msg = revoke_message.protocol_message.unwrap();
1944        let key = proto_msg.key.unwrap();
1945        assert_eq!(key.from_me, Some(false));
1946        // Participant should be the original sender with device number stripped
1947        assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1948        assert_eq!(key.id, Some(message_id));
1949    }
1950
1951    #[test]
1952    fn test_admin_revoke_preserves_lid_format() {
1953        // LID JIDs must NOT be converted to PN (phone number) format.
1954        // This was a bug that caused error 479 - the participant field must
1955        // preserve the original JID format exactly (with device stripped).
1956        let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1957        let participant_str = lid_sender.to_non_ad().to_string();
1958
1959        // Must preserve @lid suffix, device number stripped
1960        assert_eq!(participant_str, "236395184570386@lid");
1961        assert!(
1962            participant_str.ends_with("@lid"),
1963            "LID participant must preserve @lid suffix"
1964        );
1965    }
1966
1967    // SKDM Recipient Filtering Tests - validates DeviceKey-based filtering
1968
1969    #[test]
1970    fn test_skdm_recipient_filtering_basic() {
1971        use std::collections::HashSet;
1972
1973        let known_recipients: Vec<Jid> = [
1974            "1234567890:0@s.whatsapp.net",
1975            "1234567890:5@s.whatsapp.net",
1976            "9876543210:0@s.whatsapp.net",
1977        ]
1978        .into_iter()
1979        .map(|s| Jid::from_str(s).unwrap())
1980        .collect();
1981
1982        let all_devices: Vec<Jid> = [
1983            "1234567890:0@s.whatsapp.net",
1984            "1234567890:5@s.whatsapp.net",
1985            "9876543210:0@s.whatsapp.net",
1986            "5555555555:0@s.whatsapp.net", // new
1987        ]
1988        .into_iter()
1989        .map(|s| Jid::from_str(s).unwrap())
1990        .collect();
1991
1992        let known_set: HashSet<DeviceKey<'_>> =
1993            known_recipients.iter().map(|j| j.device_key()).collect();
1994
1995        let new_devices: Vec<Jid> = all_devices
1996            .into_iter()
1997            .filter(|device| !known_set.contains(&device.device_key()))
1998            .collect();
1999
2000        assert_eq!(new_devices.len(), 1);
2001        assert_eq!(new_devices[0].user, "5555555555");
2002    }
2003
2004    #[test]
2005    fn test_skdm_recipient_filtering_lid_jids() {
2006        use std::collections::HashSet;
2007
2008        let known_recipients: Vec<Jid> = [
2009            "236395184570386:91@lid",
2010            "129171292463295:0@lid",
2011            "45857667830004:14@lid",
2012        ]
2013        .into_iter()
2014        .map(|s| Jid::from_str(s).unwrap())
2015        .collect();
2016
2017        let all_devices: Vec<Jid> = [
2018            "236395184570386:91@lid",
2019            "129171292463295:0@lid",
2020            "45857667830004:14@lid",
2021            "45857667830004:15@lid", // new
2022        ]
2023        .into_iter()
2024        .map(|s| Jid::from_str(s).unwrap())
2025        .collect();
2026
2027        let known_set: HashSet<DeviceKey<'_>> =
2028            known_recipients.iter().map(|j| j.device_key()).collect();
2029
2030        let new_devices: Vec<Jid> = all_devices
2031            .into_iter()
2032            .filter(|device| !known_set.contains(&device.device_key()))
2033            .collect();
2034
2035        assert_eq!(new_devices.len(), 1);
2036        assert_eq!(new_devices[0].user, "45857667830004");
2037        assert_eq!(new_devices[0].device, 15);
2038    }
2039
2040    #[test]
2041    fn test_skdm_recipient_filtering_all_known() {
2042        use std::collections::HashSet;
2043
2044        let known_recipients: Vec<Jid> =
2045            ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
2046                .into_iter()
2047                .map(|s| Jid::from_str(s).unwrap())
2048                .collect();
2049
2050        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
2051            .into_iter()
2052            .map(|s| Jid::from_str(s).unwrap())
2053            .collect();
2054
2055        let known_set: HashSet<DeviceKey<'_>> =
2056            known_recipients.iter().map(|j| j.device_key()).collect();
2057
2058        let new_devices: Vec<Jid> = all_devices
2059            .into_iter()
2060            .filter(|device| !known_set.contains(&device.device_key()))
2061            .collect();
2062
2063        assert!(new_devices.is_empty());
2064    }
2065
2066    #[test]
2067    fn test_skdm_recipient_filtering_all_new() {
2068        use std::collections::HashSet;
2069
2070        let known_recipients: Vec<Jid> = vec![];
2071
2072        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
2073            .into_iter()
2074            .map(|s| Jid::from_str(s).unwrap())
2075            .collect();
2076
2077        let known_set: HashSet<DeviceKey<'_>> =
2078            known_recipients.iter().map(|j| j.device_key()).collect();
2079
2080        let new_devices: Vec<Jid> = all_devices
2081            .clone()
2082            .into_iter()
2083            .filter(|device| !known_set.contains(&device.device_key()))
2084            .collect();
2085
2086        assert_eq!(new_devices.len(), all_devices.len());
2087    }
2088
2089    #[test]
2090    fn test_device_key_comparison() {
2091        // Jid parse/display normalizes :0 (omitted in Display, missing ':N' parses as device 0).
2092        // This test ensures DeviceKey comparisons work correctly under that normalization.
2093        let test_cases = [
2094            (
2095                "1234567890:0@s.whatsapp.net",
2096                "1234567890@s.whatsapp.net",
2097                true,
2098            ),
2099            (
2100                "1234567890:5@s.whatsapp.net",
2101                "1234567890:5@s.whatsapp.net",
2102                true,
2103            ),
2104            (
2105                "1234567890:5@s.whatsapp.net",
2106                "1234567890:6@s.whatsapp.net",
2107                false,
2108            ),
2109            ("236395184570386:91@lid", "236395184570386:91@lid", true),
2110            ("236395184570386:0@lid", "236395184570386@lid", true),
2111            ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
2112        ];
2113
2114        for (jid1_str, jid2_str, should_match) in test_cases {
2115            let jid1: Jid = jid1_str.parse().expect("should parse jid1");
2116            let jid2: Jid = jid2_str.parse().expect("should parse jid2");
2117
2118            let key1 = jid1.device_key();
2119            let key2 = jid2.device_key();
2120
2121            assert_eq!(
2122                key1 == key2,
2123                should_match,
2124                "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
2125                jid1_str,
2126                jid2_str,
2127                should_match,
2128                key1 == key2
2129            );
2130
2131            assert_eq!(
2132                jid1.device_eq(&jid2),
2133                should_match,
2134                "device_eq failed for '{}' vs '{}'",
2135                jid1_str,
2136                jid2_str
2137            );
2138        }
2139    }
2140
2141    #[test]
2142    fn empty_sender_key_device_map_marks_all_devices_for_skdm() {
2143        use crate::sender_key_device_cache::SenderKeyDeviceMap;
2144
2145        let map = SenderKeyDeviceMap::from_db_rows(&[]);
2146        assert_eq!(map.device_has_key("271060335329480", 0), None);
2147        assert!(!map.is_user_forgotten("271060335329480"));
2148
2149        let all_resolved_devices: Vec<Jid> = [
2150            "271060335329480@lid",
2151            "77610646245392@lid",
2152            "276661023027320:5@lid",
2153        ]
2154        .into_iter()
2155        .map(|s| Jid::from_str(s).unwrap())
2156        .collect();
2157
2158        let needs_skdm: Vec<&Jid> = all_resolved_devices
2159            .iter()
2160            .filter(|device| {
2161                !map.device_has_key(&device.user, device.device)
2162                    .unwrap_or(false)
2163                    || map.is_user_forgotten(&device.user)
2164            })
2165            .collect();
2166
2167        assert_eq!(needs_skdm.len(), all_resolved_devices.len());
2168    }
2169
2170    /// Fails if the empty-cache early-exit is reintroduced.
2171    #[tokio::test]
2172    async fn resolve_skdm_targets_distributes_when_cache_empty_but_devices_known() {
2173        use wacore::client::context::GroupInfo;
2174        use wacore::store::traits::{DeviceInfo, DeviceListRecord};
2175        use wacore::types::message::AddressingMode;
2176
2177        let client = crate::test_utils::create_test_client().await;
2178        let group_jid = "120363161500776365@g.us";
2179        let own_lid = Jid::from_str("193832511623409:13@lid").unwrap();
2180
2181        let participant_users = ["271060335329480", "77610646245392", "276661023027320"];
2182
2183        // Pre-populate so `resolve_devices` succeeds without a transport.
2184        for user in &participant_users {
2185            let record = DeviceListRecord {
2186                user: (*user).into(),
2187                devices: vec![DeviceInfo {
2188                    device_id: 0,
2189                    key_index: None,
2190                }],
2191                timestamp: wacore::time::now_secs(),
2192                phash: None,
2193                raw_id: None,
2194            };
2195            client
2196                .device_registry_cache
2197                .insert((*user).into(), record)
2198                .await;
2199        }
2200
2201        let participants: Vec<Jid> = participant_users
2202            .iter()
2203            .map(|u| Jid::from_str(&format!("{u}@lid")).unwrap())
2204            .collect();
2205
2206        let group_info = GroupInfo::new(participants.clone(), AddressingMode::Lid);
2207
2208        let result = client
2209            .resolve_skdm_targets(group_jid, &group_info, &own_lid)
2210            .await
2211            .expect("None means the empty-cache early-exit is back");
2212
2213        assert_eq!(result.len(), participants.len());
2214        for user in &participant_users {
2215            assert!(result.iter().any(|j| j.user == *user));
2216        }
2217    }
2218
2219    #[test]
2220    fn single_forgotten_row_keeps_full_distribution() {
2221        use crate::sender_key_device_cache::SenderKeyDeviceMap;
2222
2223        let map = SenderKeyDeviceMap::from_db_rows(&[("271060335329480@lid".to_string(), false)]);
2224        assert_eq!(map.device_has_key("271060335329480", 0), Some(false));
2225        assert!(map.is_user_forgotten("271060335329480"));
2226
2227        let all_resolved_devices: Vec<Jid> = [
2228            "271060335329480@lid",
2229            "77610646245392@lid",
2230            "276661023027320:5@lid",
2231        ]
2232        .into_iter()
2233        .map(|s| Jid::from_str(s).unwrap())
2234        .collect();
2235
2236        let needs_skdm: Vec<&Jid> = all_resolved_devices
2237            .iter()
2238            .filter(|device| {
2239                !map.device_has_key(&device.user, device.device)
2240                    .unwrap_or(false)
2241                    || map.is_user_forgotten(&device.user)
2242            })
2243            .collect();
2244
2245        assert_eq!(
2246            needs_skdm.len(),
2247            3,
2248            "after retry inserts one row, ALL devices correctly flagged for SKDM \
2249             (this is what unblocks redistribution on the SECOND message)"
2250        );
2251    }
2252
2253    #[test]
2254    fn test_skdm_filtering_large_group() {
2255        use std::collections::HashSet;
2256
2257        let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
2258        let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
2259
2260        for i in 0..1000i64 {
2261            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
2262            let jid = Jid::from_str(&jid_str).unwrap();
2263            known_recipients.push(jid.clone());
2264            all_devices.push(jid);
2265        }
2266
2267        for i in 1000i64..1010i64 {
2268            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
2269            all_devices.push(Jid::from_str(&jid_str).unwrap());
2270        }
2271
2272        let known_set: HashSet<DeviceKey<'_>> =
2273            known_recipients.iter().map(|j| j.device_key()).collect();
2274
2275        let new_devices: Vec<Jid> = all_devices
2276            .into_iter()
2277            .filter(|device| !known_set.contains(&device.device_key()))
2278            .collect();
2279
2280        assert_eq!(new_devices.len(), 10);
2281    }
2282
2283    mod infer_stanza {
2284        use super::*;
2285
2286        #[test]
2287        fn regular_message_returns_none() {
2288            let msg = wa::Message {
2289                conversation: Some("hello".into()),
2290                ..Default::default()
2291            };
2292            let (edit, node) = infer_stanza_metadata(&msg);
2293            assert!(edit.is_none());
2294            assert!(node.is_none());
2295        }
2296
2297        #[test]
2298        fn pin_returns_edit_attribute() {
2299            let msg = wa::Message {
2300                pin_in_chat_message: Some(wa::message::PinInChatMessage::default()),
2301                ..Default::default()
2302            };
2303            let (edit, node) = infer_stanza_metadata(&msg);
2304            assert_eq!(edit, Some(EditAttribute::PinInChat));
2305            assert!(node.is_none());
2306        }
2307
2308        #[test]
2309        fn poll_creation_v3_returns_meta_node() {
2310            let msg = wa::Message {
2311                poll_creation_message_v3: Some(Box::default()),
2312                ..Default::default()
2313            };
2314            let (edit, node) = infer_stanza_metadata(&msg);
2315            assert!(edit.is_none());
2316            let node = node.expect("should have meta node");
2317            assert_eq!(node.tag, "meta");
2318            let mut attrs = node.attrs();
2319            assert_eq!(
2320                attrs.optional_string("polltype").unwrap().as_ref(),
2321                "creation"
2322            );
2323        }
2324
2325        #[test]
2326        fn event_returns_meta_node() {
2327            let msg = wa::Message {
2328                event_message: Some(Box::default()),
2329                ..Default::default()
2330            };
2331            let (edit, node) = infer_stanza_metadata(&msg);
2332            assert!(edit.is_none());
2333            let node = node.expect("should have meta node");
2334            assert_eq!(node.tag, "meta");
2335            let mut attrs = node.attrs();
2336            assert_eq!(
2337                attrs.optional_string("event_type").unwrap().as_ref(),
2338                "creation"
2339            );
2340        }
2341
2342        #[test]
2343        fn empty_message_returns_none() {
2344            let (edit, node) = infer_stanza_metadata(&wa::Message::default());
2345            assert!(edit.is_none());
2346            assert!(node.is_none());
2347        }
2348
2349        #[test]
2350        fn poll_creation_v1_returns_meta_node() {
2351            let msg = wa::Message {
2352                poll_creation_message: Some(Box::default()),
2353                ..Default::default()
2354            };
2355            let (edit, node) = infer_stanza_metadata(&msg);
2356            assert!(edit.is_none());
2357            let node = node.expect("should have meta node");
2358            assert_eq!(node.tag, "meta");
2359            let mut attrs = node.attrs();
2360            assert_eq!(
2361                attrs.optional_string("polltype").unwrap().as_ref(),
2362                "creation"
2363            );
2364        }
2365
2366        #[test]
2367        fn poll_creation_v2_returns_meta_node() {
2368            let msg = wa::Message {
2369                poll_creation_message_v2: Some(Box::default()),
2370                ..Default::default()
2371            };
2372            let (edit, node) = infer_stanza_metadata(&msg);
2373            assert!(edit.is_none());
2374            let node = node.expect("should have meta node");
2375            assert_eq!(node.tag, "meta");
2376            let mut attrs = node.attrs();
2377            assert_eq!(
2378                attrs.optional_string("polltype").unwrap().as_ref(),
2379                "creation"
2380            );
2381        }
2382
2383        #[test]
2384        fn poll_vote_returns_meta_node() {
2385            let msg = wa::Message {
2386                poll_update_message: Some(wa::message::PollUpdateMessage {
2387                    vote: Some(wa::message::PollEncValue::default()),
2388                    ..Default::default()
2389                }),
2390                ..Default::default()
2391            };
2392            let (edit, node) = infer_stanza_metadata(&msg);
2393            assert!(edit.is_none());
2394            let node = node.expect("should have meta node");
2395            assert_eq!(node.tag, "meta");
2396            let mut attrs = node.attrs();
2397            assert_eq!(attrs.optional_string("polltype").unwrap().as_ref(), "vote");
2398        }
2399
2400        #[test]
2401        fn event_response_returns_meta_node() {
2402            let msg = wa::Message {
2403                enc_event_response_message: Some(Default::default()),
2404                ..Default::default()
2405            };
2406            let (edit, node) = infer_stanza_metadata(&msg);
2407            assert!(edit.is_none());
2408            let node = node.expect("should have meta node");
2409            assert_eq!(node.tag, "meta");
2410            let mut attrs = node.attrs();
2411            assert_eq!(
2412                attrs.optional_string("event_type").unwrap().as_ref(),
2413                "response"
2414            );
2415        }
2416
2417        #[test]
2418        fn poll_update_without_vote_returns_none() {
2419            let msg = wa::Message {
2420                poll_update_message: Some(wa::message::PollUpdateMessage {
2421                    vote: None,
2422                    ..Default::default()
2423                }),
2424                ..Default::default()
2425            };
2426            let (edit, node) = infer_stanza_metadata(&msg);
2427            assert!(edit.is_none());
2428            assert!(node.is_none());
2429        }
2430    }
2431
2432    mod infer_biz {
2433        use super::*;
2434        use wa::message::interactive_message::{
2435            self, NativeFlowMessage, native_flow_message::NativeFlowButton,
2436        };
2437
2438        fn msg_with_native_flow(button_name: &str) -> wa::Message {
2439            wa::Message {
2440                document_with_caption_message: Some(Box::new(wa::message::FutureProofMessage {
2441                    message: Some(Box::new(wa::Message {
2442                        interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2443                            interactive_message: Some(
2444                                interactive_message::InteractiveMessage::NativeFlowMessage(
2445                                    NativeFlowMessage {
2446                                        buttons: vec![NativeFlowButton {
2447                                            name: Some(button_name.to_string()),
2448                                            button_params_json: None,
2449                                        }],
2450                                        message_version: Some(1),
2451                                        message_params_json: None,
2452                                    },
2453                                ),
2454                            ),
2455                            ..Default::default()
2456                        })),
2457                        ..Default::default()
2458                    })),
2459                })),
2460                ..Default::default()
2461            }
2462        }
2463
2464        fn assert_biz_node(node: &Node, expected_flow_name: &str) {
2465            assert_eq!(node.tag, "biz");
2466            assert!(
2467                node.attrs().optional_string("native_flow_name").is_none(),
2468                "should NOT use simple attribute form"
2469            );
2470            let interactive = node.get_optional_child("interactive").unwrap();
2471            let mut attrs = interactive.attrs();
2472            assert_eq!(
2473                attrs.optional_string("type").unwrap().as_ref(),
2474                "native_flow"
2475            );
2476            assert_eq!(attrs.optional_string("v").unwrap().as_ref(), "1");
2477            let nf = interactive.get_optional_child("native_flow").unwrap();
2478            let mut nf_attrs = nf.attrs();
2479            assert_eq!(
2480                nf_attrs.optional_string("name").unwrap().as_ref(),
2481                expected_flow_name
2482            );
2483        }
2484
2485        #[test]
2486        fn all_button_types_use_nested_structure() {
2487            for (button, expected_flow) in [
2488                ("cta_url", "cta_url"),
2489                ("payment_info", "payment_info"),
2490                ("review_and_pay", "order_details"),
2491                ("cta_catalog", "cta_catalog"),
2492                ("mpm", "mpm"),
2493                ("quick_reply", "quick_reply"),
2494            ] {
2495                let node = infer_biz_node(&msg_with_native_flow(button))
2496                    .unwrap_or_else(|| panic!("{button} should produce biz node"));
2497                assert_biz_node(&node, expected_flow);
2498            }
2499        }
2500
2501        #[test]
2502        fn no_interactive_returns_none() {
2503            let msg = wa::Message {
2504                conversation: Some("hello".into()),
2505                ..Default::default()
2506            };
2507            assert!(infer_biz_node(&msg).is_none());
2508        }
2509
2510        #[test]
2511        fn interactive_without_native_flow_returns_none() {
2512            let msg = wa::Message {
2513                interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2514                    interactive_message: Some(
2515                        interactive_message::InteractiveMessage::CollectionMessage(
2516                            Default::default(),
2517                        ),
2518                    ),
2519                    ..Default::default()
2520                })),
2521                ..Default::default()
2522            };
2523            assert!(infer_biz_node(&msg).is_none());
2524        }
2525
2526        #[test]
2527        fn native_flow_without_buttons_returns_none() {
2528            let msg = wa::Message {
2529                interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2530                    interactive_message: Some(
2531                        interactive_message::InteractiveMessage::NativeFlowMessage(
2532                            NativeFlowMessage {
2533                                buttons: vec![],
2534                                message_version: Some(1),
2535                                message_params_json: None,
2536                            },
2537                        ),
2538                    ),
2539                    ..Default::default()
2540                })),
2541                ..Default::default()
2542            };
2543            assert!(infer_biz_node(&msg).is_none());
2544        }
2545
2546        #[test]
2547        fn direct_interactive_message_without_wrapper() {
2548            let msg = wa::Message {
2549                interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2550                    interactive_message: Some(
2551                        interactive_message::InteractiveMessage::NativeFlowMessage(
2552                            NativeFlowMessage {
2553                                buttons: vec![NativeFlowButton {
2554                                    name: Some("cta_url".to_string()),
2555                                    button_params_json: None,
2556                                }],
2557                                message_version: Some(1),
2558                                message_params_json: None,
2559                            },
2560                        ),
2561                    ),
2562                    ..Default::default()
2563                })),
2564                ..Default::default()
2565            };
2566            let node = infer_biz_node(&msg).unwrap();
2567            assert_biz_node(&node, "cta_url");
2568        }
2569    }
2570
2571    /// Regression tests for #462: send path session lock keys must match decrypt path.
2572    mod session_lock_regression {
2573        use super::*;
2574
2575        #[tokio::test]
2576        async fn per_device_lock_keys_cover_all_devices() {
2577            let client = crate::test_utils::create_test_client().await;
2578
2579            let devices: Vec<Jid> = [
2580                "100000012345678@lid",
2581                "100000012345678:5@lid",
2582                "100000012345678:33@lid",
2583            ]
2584            .iter()
2585            .map(|s| Jid::from_str(s).unwrap())
2586            .collect();
2587
2588            // Uses the production helper (resolve_encryption_jid + sort + dedup)
2589            let send_lock_keys = client.build_session_lock_keys(&devices).await;
2590
2591            assert_eq!(send_lock_keys.len(), 3);
2592            // Sorted by (server, user, device_numeric): 0, 5, 33
2593            assert_eq!(send_lock_keys[0].device, 0);
2594            assert_eq!(send_lock_keys[1].device, 5);
2595            assert_eq!(send_lock_keys[2].device, 33);
2596
2597            // Send keys must cover every device
2598            for device_jid in &devices {
2599                assert!(
2600                    send_lock_keys.contains(device_jid),
2601                    "device {device_jid} not in send keys: {send_lock_keys:?}"
2602                );
2603            }
2604
2605            // Bare JID key alone wouldn't protect linked devices
2606            let bare_key = devices[0].to_protocol_address_string();
2607            let device5_key = devices[1].to_protocol_address_string();
2608            assert_ne!(bare_key, device5_key);
2609        }
2610
2611        #[tokio::test]
2612        async fn per_device_lock_serializes_concurrent_session_access() {
2613            use std::sync::Arc;
2614            use std::sync::atomic::{AtomicU32, Ordering};
2615
2616            let session_locks: crate::cache::Cache<String, Arc<async_lock::Mutex<()>>> =
2617                crate::cache::Cache::builder().max_capacity(100).build();
2618
2619            let lock_key = "100000012345678:5@lid.0".to_string();
2620            let access_counter = Arc::new(AtomicU32::new(0));
2621            let max_concurrent = Arc::new(AtomicU32::new(0));
2622
2623            let mut handles = Vec::new();
2624            for _ in 0..10 {
2625                let locks = session_locks.clone();
2626                let key = lock_key.clone();
2627                let counter = access_counter.clone();
2628                let max = max_concurrent.clone();
2629
2630                handles.push(tokio::spawn(async move {
2631                    let mutex: Arc<async_lock::Mutex<()>> = locks
2632                        .get_with_by_ref(&key, async { Arc::new(async_lock::Mutex::new(())) })
2633                        .await;
2634                    // lock_arc() needed: guard must own the Arc since mutex is a local
2635                    // (production uses lock() with a separate Vec keeping Arcs alive)
2636                    let _guard = mutex.lock_arc().await;
2637
2638                    let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
2639                    max.fetch_max(active, Ordering::SeqCst);
2640                    tokio::task::yield_now().await;
2641                    counter.fetch_sub(1, Ordering::SeqCst);
2642                }));
2643            }
2644
2645            for handle in handles {
2646                handle.await.unwrap();
2647            }
2648
2649            assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
2650        }
2651
2652        #[tokio::test]
2653        async fn different_device_locks_are_independent() {
2654            use std::sync::Arc;
2655            use std::sync::atomic::{AtomicU32, Ordering};
2656
2657            let session_locks: crate::cache::Cache<String, Arc<async_lock::Mutex<()>>> =
2658                crate::cache::Cache::builder().max_capacity(100).build();
2659
2660            let max_concurrent = Arc::new(AtomicU32::new(0));
2661            let counter = Arc::new(AtomicU32::new(0));
2662            let barrier = Arc::new(tokio::sync::Barrier::new(2));
2663
2664            let keys = ["100000012345678@lid.0", "100000012345678:5@lid.0"];
2665
2666            let mut handles = Vec::new();
2667            for key in keys {
2668                let locks = session_locks.clone();
2669                let key = key.to_string();
2670                let c = counter.clone();
2671                let m = max_concurrent.clone();
2672                let b = barrier.clone();
2673
2674                handles.push(tokio::spawn(async move {
2675                    let mutex: Arc<async_lock::Mutex<()>> = locks
2676                        .get_with_by_ref(&key, async { Arc::new(async_lock::Mutex::new(())) })
2677                        .await;
2678                    // lock_arc(): same reason as above
2679                    let _guard = mutex.lock_arc().await;
2680
2681                    let active = c.fetch_add(1, Ordering::SeqCst) + 1;
2682                    m.fetch_max(active, Ordering::SeqCst);
2683                    b.wait().await;
2684                    c.fetch_sub(1, Ordering::SeqCst);
2685                }));
2686            }
2687
2688            for handle in handles {
2689                handle.await.unwrap();
2690            }
2691
2692            assert_eq!(max_concurrent.load(Ordering::SeqCst), 2);
2693        }
2694
2695        /// Regression: 1:1 DM recipient must use bare Signal address matching
2696        /// the receive path. Starts from device-specific JID and verifies
2697        /// to_non_ad() normalization produces the correct bare key.
2698        #[tokio::test]
2699        async fn dm_recipient_uses_bare_address() {
2700            let client = crate::test_utils::create_test_client().await;
2701
2702            // Start from device-specific JID, exercise the production path
2703            let recipient_device33 = Jid::from_str("100000012345678:33@lid").unwrap();
2704            let own_device_5 = Jid::from_str("999999999999:5@s.whatsapp.net").unwrap();
2705
2706            // Same normalization as send_message_impl
2707            let recipient_bare = client
2708                .resolve_encryption_jid(&recipient_device33)
2709                .await
2710                .to_non_ad();
2711
2712            let all_dm_jids = vec![recipient_bare.clone(), own_device_5.clone()];
2713            let lock_jids = client.build_session_lock_keys(&all_dm_jids).await;
2714
2715            // Recipient lock key must be BARE (device 0), matching decrypt path
2716            assert_eq!(
2717                recipient_bare.to_protocol_address_string(),
2718                "100000012345678@lid.0"
2719            );
2720            assert!(lock_jids.contains(&recipient_bare));
2721
2722            // Own device lock key must be device-specific
2723            assert!(lock_jids.contains(&own_device_5));
2724
2725            // Device-specific recipient key must NOT be present
2726            assert!(
2727                !lock_jids.contains(&recipient_device33),
2728                "recipient must NOT use device-specific address"
2729            );
2730        }
2731
2732        /// Verify bare normalization deduplicates multiple recipient devices.
2733        #[test]
2734        fn bare_normalization_deduplicates_recipient_devices() {
2735            let devices: Vec<Jid> = [
2736                "100000012345678@lid",
2737                "100000012345678:5@lid",
2738                "100000012345678:33@lid",
2739            ]
2740            .iter()
2741            .map(|s| Jid::from_str(s).unwrap())
2742            .collect();
2743
2744            // All collapse to the same bare JID
2745            let bare: Vec<Jid> = devices.iter().map(|j| j.to_non_ad()).collect();
2746            assert!(bare.windows(2).all(|w| w[0] == w[1]));
2747            assert_eq!(
2748                bare[0].to_protocol_address_string(),
2749                "100000012345678@lid.0"
2750            );
2751        }
2752    }
2753}