Skip to main content

whatsapp_rust/
send.rs

1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore::types::message::AddressingMode;
8use wacore_binary::jid::{DeviceKey, Jid, JidExt as _};
9use wacore_binary::node::Node;
10use waproto::whatsapp as wa;
11
12/// Options for sending messages with additional customization.
13#[derive(Debug, Clone, Default)]
14pub struct SendOptions {
15    /// Extra XML nodes to add to the message stanza.
16    pub extra_stanza_nodes: Vec<Node>,
17}
18
19/// Specifies who is revoking (deleting) the message.
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub enum RevokeType {
22    /// The message sender deleting their own message.
23    #[default]
24    Sender,
25    /// A group admin deleting another user's message.
26    /// `original_sender` is the JID of the user who sent the message being deleted.
27    Admin { original_sender: Jid },
28}
29
30impl Client {
31    /// Send an end-to-end encrypted message to a user or group.
32    ///
33    /// Returns the message ID on success. For status/story updates use
34    /// [`Client::status()`] instead.
35    pub async fn send_message(
36        &self,
37        to: Jid,
38        message: wa::Message,
39    ) -> Result<String, anyhow::Error> {
40        self.send_message_with_options(to, message, SendOptions::default())
41            .await
42    }
43
44    /// Send a message with additional options.
45    pub async fn send_message_with_options(
46        &self,
47        to: Jid,
48        message: wa::Message,
49        options: SendOptions,
50    ) -> Result<String, anyhow::Error> {
51        let request_id = self.generate_message_id().await;
52        self.send_message_impl(
53            to,
54            &message,
55            Some(request_id.clone()),
56            false,
57            false,
58            None,
59            options.extra_stanza_nodes,
60        )
61        .await?;
62        Ok(request_id)
63    }
64
65    /// Send a status/story update to the given recipients using sender key encryption.
66    ///
67    /// This builds a `GroupInfo` from the provided recipients (always PN addressing mode),
68    /// then reuses the group encryption pipeline with `to = status@broadcast`.
69    pub(crate) async fn send_status_message(
70        &self,
71        message: wa::Message,
72        recipients: Vec<Jid>,
73        options: crate::features::status::StatusSendOptions,
74    ) -> Result<String, anyhow::Error> {
75        use wacore::client::context::GroupInfo;
76        use wacore_binary::builder::NodeBuilder;
77
78        if recipients.is_empty() {
79            return Err(anyhow!("Cannot send status with no recipients"));
80        }
81
82        let to = Jid::status_broadcast();
83        let request_id = self.generate_message_id().await;
84
85        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
86        let own_jid = device_snapshot
87            .pn
88            .clone()
89            .ok_or_else(|| anyhow!("Not logged in"))?;
90        // Status always uses PN addressing, so own_lid is only needed as a
91        // fallback parameter for prepare_group_stanza (unused in PN mode).
92        let own_lid = device_snapshot
93            .lid
94            .clone()
95            .unwrap_or_else(|| own_jid.clone());
96        let account_info = device_snapshot.account.clone();
97
98        // Status always uses PN addressing. Resolve any LID recipients to their
99        // phone numbers so we don't end up with duplicate PN+LID entries for the
100        // same user (which causes server error 400).
101        // Reject non-user JIDs (groups, broadcasts, etc.) to prevent invalid
102        // <participants> entries that cause server errors.
103        let mut resolved_recipients = Vec::with_capacity(recipients.len());
104        for jid in recipients {
105            if jid.is_group() || jid.is_status_broadcast() || jid.is_broadcast_list() {
106                return Err(anyhow!(
107                    "Invalid status recipient {}: must be a user JID, not a group/broadcast",
108                    jid
109                ));
110            }
111            if jid.is_lid() {
112                if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
113                    resolved_recipients
114                        .push(Jid::new(&pn, wacore_binary::jid::DEFAULT_USER_SERVER));
115                } else {
116                    return Err(anyhow!(
117                        "No PN mapping for LID {}. Ensure the recipient has been \
118                         contacted previously.",
119                        jid
120                    ));
121                }
122            } else {
123                resolved_recipients.push(jid);
124            }
125        }
126
127        if resolved_recipients.is_empty() {
128            return Err(anyhow!("No valid PN recipients after LID resolution"));
129        }
130
131        // Deduplicate by user (in case both LID and PN were provided for the same user)
132        let mut seen_users = std::collections::HashSet::new();
133        resolved_recipients.retain(|jid| seen_users.insert(jid.user.clone()));
134
135        let mut group_info = GroupInfo::new(resolved_recipients, AddressingMode::Pn);
136
137        // Ensure we're in the participant list
138        let own_base = own_jid.to_non_ad();
139        if !group_info
140            .participants
141            .iter()
142            .any(|p| p.is_same_user_as(&own_base))
143        {
144            group_info.participants.push(own_base);
145        }
146
147        self.add_recent_message(to.clone(), request_id.clone(), &message)
148            .await;
149
150        let device_store_arc = self.persistence_manager.get_device_arc().await;
151
152        let force_skdm = {
153            use wacore::libsignal::protocol::SenderKeyStore;
154            use wacore::libsignal::store::sender_key_name::SenderKeyName;
155            let mut device_guard = device_store_arc.write().await;
156            let sender_address = own_jid.to_protocol_address();
157            let sender_key_name = SenderKeyName::new(to.to_string(), sender_address.to_string());
158
159            let key_exists = device_guard
160                .load_sender_key(&sender_key_name)
161                .await?
162                .is_some();
163
164            !key_exists
165        };
166
167        let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
168        let mut stores = wacore::send::SignalStores {
169            session_store: &mut store_adapter.session_store,
170            identity_store: &mut store_adapter.identity_store,
171            prekey_store: &mut store_adapter.pre_key_store,
172            signed_prekey_store: &store_adapter.signed_pre_key_store,
173            sender_key_store: &mut store_adapter.sender_key_store,
174        };
175
176        let marked_for_fresh_skdm = self
177            .consume_forget_marks(&to.to_string())
178            .await
179            .unwrap_or_default();
180
181        let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
182            None
183        } else {
184            let known_recipients = self
185                .persistence_manager
186                .get_skdm_recipients(&to.to_string())
187                .await
188                .unwrap_or_default();
189
190            if known_recipients.is_empty() {
191                None
192            } else {
193                let jids_to_resolve: Vec<Jid> = group_info
194                    .participants
195                    .iter()
196                    .map(|jid| jid.to_non_ad())
197                    .collect();
198
199                match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
200                    Ok(all_devices) => {
201                        let known_set: std::collections::HashSet<DeviceKey<'_>> =
202                            known_recipients.iter().map(|j| j.device_key()).collect();
203                        let new_devices: Vec<Jid> = all_devices
204                            .into_iter()
205                            .filter(|device| !known_set.contains(&device.device_key()))
206                            .collect();
207                        if new_devices.is_empty() {
208                            Some(vec![])
209                        } else {
210                            Some(new_devices)
211                        }
212                    }
213                    Err(e) => {
214                        log::warn!("Failed to resolve devices for status SKDM check: {:?}", e);
215                        None
216                    }
217                }
218            }
219        };
220
221        let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
222            match skdm_target_devices {
223                None => None,
224                Some(mut devices) => {
225                    for marked_jid_str in &marked_for_fresh_skdm {
226                        if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
227                            && !devices.iter().any(|d| d.device_eq(&marked_jid))
228                        {
229                            devices.push(marked_jid);
230                        }
231                    }
232                    Some(devices)
233                }
234            }
235        } else {
236            skdm_target_devices
237        };
238
239        let is_full_distribution = force_skdm || skdm_target_devices.is_none();
240        let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
241
242        // WhatsApp Web includes <meta status_setting="..."/> on non-revoke status messages.
243        // Revoke messages omit this node.
244        let is_revoke = message.protocol_message.as_ref().is_some_and(|pm| {
245            pm.r#type == Some(wa::message::protocol_message::Type::Revoke as i32)
246        });
247        let extra_stanza_nodes = if is_revoke {
248            vec![]
249        } else {
250            vec![
251                NodeBuilder::new("meta")
252                    .attr("status_setting", options.privacy.as_str())
253                    .build(),
254            ]
255        };
256
257        let stanza = match wacore::send::prepare_group_stanza(
258            &mut stores,
259            self,
260            &mut group_info,
261            &own_jid,
262            &own_lid,
263            account_info.as_ref(),
264            to.clone(),
265            &message,
266            request_id.clone(),
267            force_skdm,
268            skdm_target_devices,
269            None,
270            extra_stanza_nodes.clone(),
271        )
272        .await
273        {
274            Ok(stanza) => {
275                if !devices_receiving_skdm.is_empty() {
276                    if let Err(e) = self
277                        .persistence_manager
278                        .add_skdm_recipients(&to.to_string(), &devices_receiving_skdm)
279                        .await
280                    {
281                        log::warn!("Failed to update status SKDM recipients: {:?}", e);
282                    }
283                } else if is_full_distribution {
284                    let jids_to_resolve: Vec<Jid> = group_info
285                        .participants
286                        .iter()
287                        .map(|jid| jid.to_non_ad())
288                        .collect();
289
290                    if let Ok(all_devices) =
291                        SendContextResolver::resolve_devices(self, &jids_to_resolve).await
292                        && let Err(e) = self
293                            .persistence_manager
294                            .add_skdm_recipients(&to.to_string(), &all_devices)
295                            .await
296                    {
297                        log::warn!("Failed to update status SKDM recipients: {:?}", e);
298                    }
299                }
300                stanza
301            }
302            Err(e) => {
303                if let Some(SignalProtocolError::NoSenderKeyState(_)) =
304                    e.downcast_ref::<SignalProtocolError>()
305                {
306                    log::warn!("No sender key for status broadcast, forcing distribution.");
307
308                    if let Err(e) = self
309                        .persistence_manager
310                        .clear_skdm_recipients(&to.to_string())
311                        .await
312                    {
313                        log::warn!("Failed to clear status SKDM recipients: {:?}", e);
314                    }
315
316                    let mut store_adapter_retry =
317                        SignalProtocolStoreAdapter::new(device_store_arc.clone());
318                    let mut stores_retry = wacore::send::SignalStores {
319                        session_store: &mut store_adapter_retry.session_store,
320                        identity_store: &mut store_adapter_retry.identity_store,
321                        prekey_store: &mut store_adapter_retry.pre_key_store,
322                        signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
323                        sender_key_store: &mut store_adapter_retry.sender_key_store,
324                    };
325
326                    let retry_stanza = wacore::send::prepare_group_stanza(
327                        &mut stores_retry,
328                        self,
329                        &mut group_info,
330                        &own_jid,
331                        &own_lid,
332                        account_info.as_ref(),
333                        to.clone(),
334                        &message,
335                        request_id.clone(),
336                        true,
337                        None,
338                        None,
339                        extra_stanza_nodes,
340                    )
341                    .await?;
342
343                    // Re-populate SKDM recipients after successful full distribution
344                    let jids_to_resolve: Vec<Jid> = group_info
345                        .participants
346                        .iter()
347                        .map(|jid| jid.to_non_ad())
348                        .collect();
349                    if let Ok(all_devices) =
350                        SendContextResolver::resolve_devices(self, &jids_to_resolve).await
351                        && let Err(e) = self
352                            .persistence_manager
353                            .add_skdm_recipients(&to.to_string(), &all_devices)
354                            .await
355                    {
356                        log::warn!(
357                            "Failed to update status SKDM recipients after retry: {:?}",
358                            e
359                        );
360                    }
361
362                    retry_stanza
363                } else {
364                    return Err(e);
365                }
366            }
367        };
368
369        // For status broadcasts, the server doesn't know the recipient list
370        // (unlike groups where the server has the member list). We must always
371        // include a <participants> node so the server knows who to deliver to.
372        // If prepare_group_stanza already added one (SKDM distribution), we
373        // extend it with bare <to> entries for devices that already have the
374        // sender key. If there's no <participants> node yet, we create one.
375        let stanza = self.ensure_status_participants(stanza, &group_info).await?;
376
377        self.send_node(stanza).await?;
378        Ok(request_id)
379    }
380
381    /// Ensure the status stanza has a <participants> node listing all recipient
382    /// user JIDs. WhatsApp Web's `participantList` uses bare USER JIDs (not
383    /// device JIDs) — `<to jid="user@s.whatsapp.net"/>` — to tell the server
384    /// which users should receive the skmsg. The SKDM distribution list
385    /// (already in <participants>) uses device JIDs with <enc> children.
386    async fn ensure_status_participants(
387        &self,
388        mut stanza: wacore_binary::Node,
389        group_info: &wacore::client::context::GroupInfo,
390    ) -> Result<wacore_binary::Node, anyhow::Error> {
391        use wacore_binary::builder::NodeBuilder;
392        use wacore_binary::node::NodeContent;
393
394        // Build bare <to jid="USER_JID"/> entries for each participant.
395        // WhatsApp Web uses USER_JID (not DEVICE_JID) for the participantList.
396        let bare_to_nodes: Vec<wacore_binary::Node> = group_info
397            .participants
398            .iter()
399            .map(|jid| {
400                NodeBuilder::new("to")
401                    .attr("jid", jid.to_non_ad().to_string())
402                    .build()
403            })
404            .collect();
405
406        // Check if <participants> already exists in the stanza children
407        let children = match &mut stanza.content {
408            Some(NodeContent::Nodes(nodes)) => nodes,
409            _ => {
410                stanza.content = Some(NodeContent::Nodes(vec![]));
411                match &mut stanza.content {
412                    Some(NodeContent::Nodes(nodes)) => nodes,
413                    _ => unreachable!(),
414                }
415            }
416        };
417
418        if let Some(participants_node) = children.iter_mut().find(|n| n.tag == "participants") {
419            // <participants> already exists (from SKDM distribution).
420            // Add bare <to> user JID entries for users whose devices are NOT
421            // already represented by SKDM device-level entries.
422            let existing_users: std::collections::HashSet<String> = participants_node
423                .children()
424                .unwrap_or_default()
425                .iter()
426                .filter_map(|n| {
427                    n.attrs
428                        .get("jid")
429                        .and_then(|v| v.to_string().parse::<Jid>().ok().map(|j| j.user.clone()))
430                })
431                .collect();
432
433            let new_to_nodes: Vec<wacore_binary::Node> = bare_to_nodes
434                .into_iter()
435                .filter(|n| {
436                    n.attrs
437                        .get("jid")
438                        .and_then(|v| v.to_string().parse::<Jid>().ok())
439                        .map(|j| !existing_users.contains(&j.user))
440                        .unwrap_or(false)
441                })
442                .collect();
443
444            if !new_to_nodes.is_empty() {
445                match &mut participants_node.content {
446                    Some(NodeContent::Nodes(nodes)) => nodes.extend(new_to_nodes),
447                    _ => {
448                        participants_node.content = Some(NodeContent::Nodes(new_to_nodes));
449                    }
450                }
451            }
452        } else {
453            // No <participants> node — create one with bare <to> entries.
454            let participants_node = NodeBuilder::new("participants")
455                .children(bare_to_nodes)
456                .build();
457            children.insert(0, participants_node);
458        }
459
460        Ok(stanza)
461    }
462
463    /// Delete a message for everyone in the chat (revoke).
464    ///
465    /// This sends a revoke protocol message that removes the message for all participants.
466    /// The message will show as "This message was deleted" for recipients.
467    ///
468    /// # Arguments
469    /// * `to` - The chat JID (DM or group)
470    /// * `message_id` - The ID of the message to delete
471    /// * `revoke_type` - Use `RevokeType::Sender` to delete your own message,
472    ///   or `RevokeType::Admin { original_sender }` to delete another user's message as group admin
473    pub async fn revoke_message(
474        &self,
475        to: Jid,
476        message_id: impl Into<String>,
477        revoke_type: RevokeType,
478    ) -> Result<(), anyhow::Error> {
479        let message_id = message_id.into();
480        // Verify we're logged in
481        self.get_pn()
482            .await
483            .ok_or_else(|| anyhow!("Not logged in"))?;
484
485        let (from_me, participant, edit_attr) = match &revoke_type {
486            RevokeType::Sender => {
487                // For sender revoke, participant is NOT set (from_me=true identifies it)
488                // This matches whatsmeow's BuildMessageKey behavior
489                (
490                    true,
491                    None,
492                    crate::types::message::EditAttribute::SenderRevoke,
493                )
494            }
495            RevokeType::Admin { original_sender } => {
496                // Admin revoke requires group context
497                if !to.is_group() {
498                    return Err(anyhow!("Admin revoke is only valid for group chats"));
499                }
500                // The protocolMessageKey.participant should match the original message's key exactly
501                // Do NOT convert LID to PN - pass through unchanged like WhatsApp Web does
502                let participant_str = original_sender.to_non_ad().to_string();
503                log::debug!(
504                    "Admin revoke: using participant {} for MessageKey",
505                    participant_str
506                );
507                (
508                    false,
509                    Some(participant_str),
510                    crate::types::message::EditAttribute::AdminRevoke,
511                )
512            }
513        };
514
515        let revoke_message = wa::Message {
516            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
517                key: Some(wa::MessageKey {
518                    remote_jid: Some(to.to_string()),
519                    from_me: Some(from_me),
520                    id: Some(message_id.clone()),
521                    participant,
522                }),
523                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
524                ..Default::default()
525            })),
526            ..Default::default()
527        };
528
529        // The revoke message stanza needs a NEW unique ID, not the message ID being revoked
530        // The message_id being revoked is already in protocolMessage.key.id
531        // Passing None generates a fresh stanza ID
532        //
533        // For admin revokes, force SKDM distribution to get the proper message structure
534        // with phash, <participants>, and <device-identity> that WhatsApp Web uses
535        let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
536        self.send_message_impl(
537            to,
538            &revoke_message,
539            None,
540            false,
541            force_skdm,
542            Some(edit_attr),
543            vec![],
544        )
545        .await
546    }
547
548    #[allow(clippy::too_many_arguments)]
549    pub(crate) async fn send_message_impl(
550        &self,
551        to: Jid,
552        message: &wa::Message,
553        request_id_override: Option<String>,
554        peer: bool,
555        force_key_distribution: bool,
556        edit: Option<crate::types::message::EditAttribute>,
557        extra_stanza_nodes: Vec<Node>,
558    ) -> Result<(), anyhow::Error> {
559        // Status broadcasts must go through send_status_message() which provides recipients
560        if to.is_status_broadcast() {
561            return Err(anyhow!(
562                "Use send_status_message() or client.status() API for status@broadcast"
563            ));
564        }
565
566        // Generate request ID early (doesn't need lock)
567        let request_id = match request_id_override {
568            Some(id) => id,
569            None => self.generate_message_id().await,
570        };
571
572        let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
573            // Peer messages are only valid for individual users, not groups
574            // Resolve encryption JID and acquire lock ONLY for encryption
575            let encryption_jid = self.resolve_encryption_jid(&to).await;
576            let signal_addr_str = encryption_jid.to_protocol_address().to_string();
577
578            let session_mutex = self
579                .session_locks
580                .get_with(signal_addr_str.clone(), async {
581                    std::sync::Arc::new(tokio::sync::Mutex::new(()))
582                })
583                .await;
584            let _session_guard = session_mutex.lock().await;
585
586            let device_store_arc = self.persistence_manager.get_device_arc().await;
587            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
588
589            wacore::send::prepare_peer_stanza(
590                &mut store_adapter.session_store,
591                &mut store_adapter.identity_store,
592                to,
593                encryption_jid,
594                message,
595                request_id,
596            )
597            .await?
598        } else if to.is_group() {
599            // Group messages: No client-level lock needed.
600            // Each participant device is encrypted separately with its own per-device lock
601            // inside prepare_group_stanza, so we don't need to serialize entire group sends.
602
603            // Preparation work (no lock needed)
604            let mut group_info = self.groups().query_info(&to).await?;
605
606            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
607            let own_jid = device_snapshot
608                .pn
609                .clone()
610                .ok_or_else(|| anyhow!("Not logged in"))?;
611            let own_lid = device_snapshot
612                .lid
613                .clone()
614                .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
615            let account_info = device_snapshot.account.clone();
616
617            // Store serialized message bytes for retry (lightweight)
618            self.add_recent_message(to.clone(), request_id.clone(), message)
619                .await;
620
621            let device_store_arc = self.persistence_manager.get_device_arc().await;
622
623            let (own_sending_jid, _) = match group_info.addressing_mode {
624                crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
625                crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
626            };
627
628            if !group_info
629                .participants
630                .iter()
631                .any(|participant| participant.is_same_user_as(&own_sending_jid))
632            {
633                group_info.participants.push(own_sending_jid.to_non_ad());
634            }
635
636            let force_skdm = {
637                use wacore::libsignal::protocol::SenderKeyStore;
638                use wacore::libsignal::store::sender_key_name::SenderKeyName;
639                let mut device_guard = device_store_arc.write().await;
640                let sender_address = own_sending_jid.to_protocol_address();
641                let sender_key_name =
642                    SenderKeyName::new(to.to_string(), sender_address.to_string());
643
644                let key_exists = device_guard
645                    .load_sender_key(&sender_key_name)
646                    .await?
647                    .is_some();
648
649                force_key_distribution || !key_exists
650            };
651
652            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
653
654            let mut stores = wacore::send::SignalStores {
655                session_store: &mut store_adapter.session_store,
656                identity_store: &mut store_adapter.identity_store,
657                prekey_store: &mut store_adapter.pre_key_store,
658                signed_prekey_store: &store_adapter.signed_pre_key_store,
659                sender_key_store: &mut store_adapter.sender_key_store,
660            };
661
662            // Consume forget marks - these participants need fresh SKDMs (matches WhatsApp Web)
663            // markForgetSenderKey is called during retry handling, this consumes those marks
664            let marked_for_fresh_skdm = self
665                .consume_forget_marks(&to.to_string())
666                .await
667                .unwrap_or_default();
668
669            // Determine which devices need SKDM distribution
670            let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
671                None
672            } else {
673                let known_recipients = self
674                    .persistence_manager
675                    .get_skdm_recipients(&to.to_string())
676                    .await
677                    .unwrap_or_default();
678
679                if known_recipients.is_empty() {
680                    None
681                } else {
682                    let jids_to_resolve: Vec<Jid> = group_info
683                        .participants
684                        .iter()
685                        .map(|jid| jid.to_non_ad())
686                        .collect();
687
688                    match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
689                        Ok(all_devices) => {
690                            use std::collections::HashSet;
691
692                            let known_set: HashSet<DeviceKey<'_>> =
693                                known_recipients.iter().map(|j| j.device_key()).collect();
694
695                            let new_devices: Vec<Jid> = all_devices
696                                .into_iter()
697                                .filter(|device| !known_set.contains(&device.device_key()))
698                                .collect();
699
700                            if new_devices.is_empty() {
701                                Some(vec![])
702                            } else {
703                                log::debug!(
704                                    "Found {} new devices needing SKDM for group {}",
705                                    new_devices.len(),
706                                    to
707                                );
708                                Some(new_devices)
709                            }
710                        }
711                        Err(e) => {
712                            log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
713                            None
714                        }
715                    }
716                }
717            };
718
719            // Merge devices marked for fresh SKDM (from retry/error handling)
720            let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
721                match skdm_target_devices {
722                    None => None,
723                    Some(mut devices) => {
724                        for marked_jid_str in &marked_for_fresh_skdm {
725                            if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
726                                && !devices.iter().any(|d| d.device_eq(&marked_jid))
727                            {
728                                log::debug!(
729                                    "Adding {} to SKDM targets (marked for fresh key)",
730                                    marked_jid_str
731                                );
732                                devices.push(marked_jid);
733                            }
734                        }
735                        Some(devices)
736                    }
737                }
738            } else {
739                skdm_target_devices
740            };
741
742            let is_full_distribution = force_skdm || skdm_target_devices.is_none();
743            let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
744
745            match wacore::send::prepare_group_stanza(
746                &mut stores,
747                self,
748                &mut group_info,
749                &own_jid,
750                &own_lid,
751                account_info.as_ref(),
752                to.clone(),
753                message,
754                request_id.clone(),
755                force_skdm,
756                skdm_target_devices,
757                edit.clone(),
758                extra_stanza_nodes.clone(),
759            )
760            .await
761            {
762                Ok(stanza) => {
763                    if !devices_receiving_skdm.is_empty() {
764                        if let Err(e) = self
765                            .persistence_manager
766                            .add_skdm_recipients(&to.to_string(), &devices_receiving_skdm)
767                            .await
768                        {
769                            log::warn!("Failed to update SKDM recipients: {:?}", e);
770                        }
771                    } else if is_full_distribution {
772                        let jids_to_resolve: Vec<Jid> = group_info
773                            .participants
774                            .iter()
775                            .map(|jid| jid.to_non_ad())
776                            .collect();
777
778                        if let Ok(all_devices) =
779                            SendContextResolver::resolve_devices(self, &jids_to_resolve).await
780                            && let Err(e) = self
781                                .persistence_manager
782                                .add_skdm_recipients(&to.to_string(), &all_devices)
783                                .await
784                        {
785                            log::warn!("Failed to update SKDM recipients: {:?}", e);
786                        }
787                    }
788                    stanza
789                }
790                Err(e) => {
791                    if let Some(SignalProtocolError::NoSenderKeyState(_)) =
792                        e.downcast_ref::<SignalProtocolError>()
793                    {
794                        log::warn!("No sender key for group {}, forcing distribution.", to);
795
796                        // Clear SKDM recipients since we're rotating the key
797                        if let Err(e) = self
798                            .persistence_manager
799                            .clear_skdm_recipients(&to.to_string())
800                            .await
801                        {
802                            log::warn!("Failed to clear SKDM recipients: {:?}", e);
803                        }
804
805                        let mut store_adapter_retry =
806                            SignalProtocolStoreAdapter::new(device_store_arc.clone());
807                        let mut stores_retry = wacore::send::SignalStores {
808                            session_store: &mut store_adapter_retry.session_store,
809                            identity_store: &mut store_adapter_retry.identity_store,
810                            prekey_store: &mut store_adapter_retry.pre_key_store,
811                            signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
812                            sender_key_store: &mut store_adapter_retry.sender_key_store,
813                        };
814
815                        let to_str = to.to_string();
816                        let retry_stanza = wacore::send::prepare_group_stanza(
817                            &mut stores_retry,
818                            self,
819                            &mut group_info,
820                            &own_jid,
821                            &own_lid,
822                            account_info.as_ref(),
823                            to,
824                            message,
825                            request_id,
826                            true, // Force distribution on retry
827                            None, // Distribute to all devices
828                            edit.clone(),
829                            extra_stanza_nodes.clone(),
830                        )
831                        .await?;
832
833                        // Re-populate SKDM recipients after successful full distribution
834                        let jids_to_resolve: Vec<Jid> = group_info
835                            .participants
836                            .iter()
837                            .map(|jid| jid.to_non_ad())
838                            .collect();
839                        if let Ok(all_devices) =
840                            SendContextResolver::resolve_devices(self, &jids_to_resolve).await
841                            && let Err(e) = self
842                                .persistence_manager
843                                .add_skdm_recipients(&to_str, &all_devices)
844                                .await
845                        {
846                            log::warn!("Failed to update SKDM recipients after retry: {:?}", e);
847                        }
848
849                        retry_stanza
850                    } else {
851                        return Err(e);
852                    }
853                }
854            }
855        } else {
856            // Direct message: Acquire lock only during encryption
857
858            // Ensure E2E sessions exist before encryption (matches WhatsApp Web)
859            // This deduplicates concurrent prekey fetches for the same recipient
860            let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
861            self.ensure_e2e_sessions(recipient_devices).await?;
862
863            // Resolve encryption JID and prepare lock acquisition
864            let encryption_jid = self.resolve_encryption_jid(&to).await;
865            let signal_addr_str = encryption_jid.to_protocol_address().to_string();
866
867            // Store serialized message bytes for retry (lightweight)
868            self.add_recent_message(to.clone(), request_id.clone(), message)
869                .await;
870
871            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
872            let own_jid = device_snapshot
873                .pn
874                .clone()
875                .ok_or_else(|| anyhow!("Not logged in"))?;
876            let account_info = device_snapshot.account.clone();
877
878            // Include tctoken in 1:1 messages (matches WhatsApp Web behavior).
879            // Skip for newsletters, groups, and own JID.
880            let mut extra_stanza_nodes = extra_stanza_nodes;
881            if !to.is_group() && !to.is_newsletter() {
882                self.maybe_include_tc_token(&to, &mut extra_stanza_nodes)
883                    .await;
884            }
885
886            // Acquire lock only for encryption
887            let session_mutex = self
888                .session_locks
889                .get_with(signal_addr_str.clone(), async {
890                    std::sync::Arc::new(tokio::sync::Mutex::new(()))
891                })
892                .await;
893            let _session_guard = session_mutex.lock().await;
894
895            let device_store_arc = self.persistence_manager.get_device_arc().await;
896            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
897
898            let mut stores = wacore::send::SignalStores {
899                session_store: &mut store_adapter.session_store,
900                identity_store: &mut store_adapter.identity_store,
901                prekey_store: &mut store_adapter.pre_key_store,
902                signed_prekey_store: &store_adapter.signed_pre_key_store,
903                sender_key_store: &mut store_adapter.sender_key_store,
904            };
905
906            wacore::send::prepare_dm_stanza(
907                &mut stores,
908                self,
909                &own_jid,
910                account_info.as_ref(),
911                to,
912                message,
913                request_id,
914                edit,
915                extra_stanza_nodes,
916            )
917            .await?
918        };
919
920        self.send_node(stanza_to_send).await.map_err(|e| e.into())
921    }
922
923    /// Look up and include a tctoken in outgoing 1:1 message stanza nodes.
924    ///
925    /// If a valid (non-expired) token exists, adds a `<tctoken>` child node.
926    /// If the token is missing or expired, attempts to issue new tokens via IQ.
927    async fn maybe_include_tc_token(&self, to: &Jid, extra_nodes: &mut Vec<Node>) {
928        use wacore::iq::tctoken::{
929            IssuePrivacyTokensSpec, build_tc_token_node, is_tc_token_expired,
930            should_send_new_tc_token,
931        };
932        use wacore::store::traits::TcTokenEntry;
933
934        // Skip for own JID — no need to send privacy token to ourselves
935        let snapshot = self.persistence_manager.get_device_snapshot().await;
936        let is_self = snapshot
937            .pn
938            .as_ref()
939            .is_some_and(|pn| pn.is_same_user_as(to))
940            || snapshot
941                .lid
942                .as_ref()
943                .is_some_and(|lid| lid.is_same_user_as(to));
944        if is_self {
945            return;
946        }
947
948        // Resolve the destination to a LID for token lookup
949        let token_jid = if to.is_lid() {
950            to.user.clone()
951        } else {
952            match self.lid_pn_cache.get_current_lid(&to.user).await {
953                Some(lid) => lid,
954                None => to.user.clone(),
955            }
956        };
957
958        let backend = self.persistence_manager.backend();
959
960        // Look up existing token
961        let existing = match backend.get_tc_token(&token_jid).await {
962            Ok(entry) => entry,
963            Err(e) => {
964                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
965                return;
966            }
967        };
968
969        match existing {
970            Some(entry) if !is_tc_token_expired(entry.token_timestamp) => {
971                // Valid token — include it in the stanza
972                extra_nodes.push(build_tc_token_node(&entry.token));
973
974                // Check if we should re-issue (bucket boundary crossed).
975                // Update sender_timestamp to mark we've sent our token in this bucket.
976                if should_send_new_tc_token(entry.sender_timestamp) {
977                    let now = std::time::SystemTime::now()
978                        .duration_since(std::time::UNIX_EPOCH)
979                        .unwrap_or_default()
980                        .as_secs() as i64;
981                    let updated_entry = TcTokenEntry {
982                        sender_timestamp: Some(now),
983                        ..entry
984                    };
985                    if let Err(e) = backend.put_tc_token(&token_jid, &updated_entry).await {
986                        log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp: {e}");
987                    }
988                }
989            }
990            _ => {
991                // Token missing or expired — try to issue
992                let to_lid = self.resolve_to_lid_jid(to).await;
993                match self
994                    .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(&to_lid)))
995                    .await
996                {
997                    Ok(response) => {
998                        let now = std::time::SystemTime::now()
999                            .duration_since(std::time::UNIX_EPOCH)
1000                            .unwrap_or_default()
1001                            .as_secs() as i64;
1002                        for received in &response.tokens {
1003                            let entry = TcTokenEntry {
1004                                token: received.token.clone(),
1005                                token_timestamp: received.timestamp,
1006                                sender_timestamp: Some(now),
1007                            };
1008
1009                            // Store the received token
1010                            let store_jid = received.jid.user.clone();
1011                            if let Err(e) = backend.put_tc_token(&store_jid, &entry).await {
1012                                log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
1013                            }
1014
1015                            // Include in message stanza
1016                            if !received.token.is_empty() {
1017                                extra_nodes.push(build_tc_token_node(&received.token));
1018                            }
1019                        }
1020                    }
1021                    Err(e) => {
1022                        log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}: {e}", to_lid);
1023                        // Don't fail the message send — tctoken is optional
1024                    }
1025                }
1026            }
1027        }
1028    }
1029
1030    /// Look up a valid (non-expired) tctoken for a JID. Returns the raw token bytes if found.
1031    ///
1032    /// Used by profile picture, presence subscribe, and other features that need tctoken gating.
1033    pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
1034        use wacore::iq::tctoken::is_tc_token_expired;
1035
1036        let token_jid = if jid.is_lid() {
1037            jid.user.clone()
1038        } else {
1039            match self.lid_pn_cache.get_current_lid(&jid.user).await {
1040                Some(lid) => lid,
1041                None => jid.user.clone(),
1042            }
1043        };
1044
1045        let backend = self.persistence_manager.backend();
1046        match backend.get_tc_token(&token_jid).await {
1047            Ok(Some(entry)) if !is_tc_token_expired(entry.token_timestamp) => Some(entry.token),
1048            Ok(_) => None,
1049            Err(e) => {
1050                log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1051                None
1052            }
1053        }
1054    }
1055
1056    /// Resolve a JID to its LID form for tc_token storage.
1057    async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1058        if jid.is_lid() {
1059            return jid.clone();
1060        }
1061
1062        if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1063            Jid::new(&lid_user, "lid")
1064        } else {
1065            jid.clone()
1066        }
1067    }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072    use super::*;
1073    use std::str::FromStr;
1074
1075    #[test]
1076    fn test_revoke_type_default_is_sender() {
1077        // RevokeType::Sender is the default (for deleting own messages)
1078        let revoke_type = RevokeType::default();
1079        assert_eq!(revoke_type, RevokeType::Sender);
1080    }
1081
1082    #[test]
1083    fn test_force_skdm_only_for_admin_revoke() {
1084        // Admin revokes require force_skdm=true to get proper message structure
1085        // with phash, <participants>, and <device-identity> that WhatsApp Web uses.
1086        // Without this, the server returns error 479.
1087        let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1088
1089        let sender_revoke = RevokeType::Sender;
1090        let admin_revoke = RevokeType::Admin {
1091            original_sender: sender_jid,
1092        };
1093
1094        // This matches the logic in revoke_message()
1095        let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1096        let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1097
1098        assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1099        assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1100    }
1101
1102    #[test]
1103    fn test_sender_revoke_message_key_structure() {
1104        // Sender revoke (edit="7"): from_me=true, participant=None
1105        // The sender is identified by from_me=true, no participant field needed
1106        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1107        let message_id = "3EB0ABC123".to_string();
1108
1109        let (from_me, participant, edit_attr) = match RevokeType::Sender {
1110            RevokeType::Sender => (
1111                true,
1112                None,
1113                crate::types::message::EditAttribute::SenderRevoke,
1114            ),
1115            RevokeType::Admin { original_sender } => (
1116                false,
1117                Some(original_sender.to_non_ad().to_string()),
1118                crate::types::message::EditAttribute::AdminRevoke,
1119            ),
1120        };
1121
1122        assert!(from_me, "Sender revoke must have from_me=true");
1123        assert!(
1124            participant.is_none(),
1125            "Sender revoke must NOT set participant"
1126        );
1127        assert_eq!(edit_attr.to_string_val(), "7");
1128
1129        let revoke_message = wa::Message {
1130            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1131                key: Some(wa::MessageKey {
1132                    remote_jid: Some(to.to_string()),
1133                    from_me: Some(from_me),
1134                    id: Some(message_id.clone()),
1135                    participant,
1136                }),
1137                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1138                ..Default::default()
1139            })),
1140            ..Default::default()
1141        };
1142
1143        let proto_msg = revoke_message.protocol_message.unwrap();
1144        let key = proto_msg.key.unwrap();
1145        assert_eq!(key.from_me, Some(true));
1146        assert_eq!(key.participant, None);
1147        assert_eq!(key.id, Some(message_id));
1148    }
1149
1150    #[test]
1151    fn test_admin_revoke_message_key_structure() {
1152        // Admin revoke (edit="8"): from_me=false, participant=original_sender
1153        // The participant field identifies whose message is being deleted
1154        let to = Jid::from_str("120363040237990503@g.us").unwrap();
1155        let message_id = "3EB0ABC123".to_string();
1156        let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1157
1158        let revoke_type = RevokeType::Admin {
1159            original_sender: original_sender.clone(),
1160        };
1161        let (from_me, participant, edit_attr) = match revoke_type {
1162            RevokeType::Sender => (
1163                true,
1164                None,
1165                crate::types::message::EditAttribute::SenderRevoke,
1166            ),
1167            RevokeType::Admin { original_sender } => (
1168                false,
1169                Some(original_sender.to_non_ad().to_string()),
1170                crate::types::message::EditAttribute::AdminRevoke,
1171            ),
1172        };
1173
1174        assert!(!from_me, "Admin revoke must have from_me=false");
1175        assert!(
1176            participant.is_some(),
1177            "Admin revoke MUST set participant to original sender"
1178        );
1179        assert_eq!(edit_attr.to_string_val(), "8");
1180
1181        let revoke_message = wa::Message {
1182            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1183                key: Some(wa::MessageKey {
1184                    remote_jid: Some(to.to_string()),
1185                    from_me: Some(from_me),
1186                    id: Some(message_id.clone()),
1187                    participant: participant.clone(),
1188                }),
1189                r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1190                ..Default::default()
1191            })),
1192            ..Default::default()
1193        };
1194
1195        let proto_msg = revoke_message.protocol_message.unwrap();
1196        let key = proto_msg.key.unwrap();
1197        assert_eq!(key.from_me, Some(false));
1198        // Participant should be the original sender with device number stripped
1199        assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1200        assert_eq!(key.id, Some(message_id));
1201    }
1202
1203    #[test]
1204    fn test_admin_revoke_preserves_lid_format() {
1205        // LID JIDs must NOT be converted to PN (phone number) format.
1206        // This was a bug that caused error 479 - the participant field must
1207        // preserve the original JID format exactly (with device stripped).
1208        let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1209        let participant_str = lid_sender.to_non_ad().to_string();
1210
1211        // Must preserve @lid suffix, device number stripped
1212        assert_eq!(participant_str, "236395184570386@lid");
1213        assert!(
1214            participant_str.ends_with("@lid"),
1215            "LID participant must preserve @lid suffix"
1216        );
1217    }
1218
1219    // SKDM Recipient Filtering Tests - validates DeviceKey-based filtering
1220
1221    #[test]
1222    fn test_skdm_recipient_filtering_basic() {
1223        use std::collections::HashSet;
1224
1225        let known_recipients: Vec<Jid> = [
1226            "1234567890:0@s.whatsapp.net",
1227            "1234567890:5@s.whatsapp.net",
1228            "9876543210:0@s.whatsapp.net",
1229        ]
1230        .into_iter()
1231        .map(|s| Jid::from_str(s).unwrap())
1232        .collect();
1233
1234        let all_devices: Vec<Jid> = [
1235            "1234567890:0@s.whatsapp.net",
1236            "1234567890:5@s.whatsapp.net",
1237            "9876543210:0@s.whatsapp.net",
1238            "5555555555:0@s.whatsapp.net", // new
1239        ]
1240        .into_iter()
1241        .map(|s| Jid::from_str(s).unwrap())
1242        .collect();
1243
1244        let known_set: HashSet<DeviceKey<'_>> =
1245            known_recipients.iter().map(|j| j.device_key()).collect();
1246
1247        let new_devices: Vec<Jid> = all_devices
1248            .into_iter()
1249            .filter(|device| !known_set.contains(&device.device_key()))
1250            .collect();
1251
1252        assert_eq!(new_devices.len(), 1);
1253        assert_eq!(new_devices[0].user, "5555555555");
1254    }
1255
1256    #[test]
1257    fn test_skdm_recipient_filtering_lid_jids() {
1258        use std::collections::HashSet;
1259
1260        let known_recipients: Vec<Jid> = [
1261            "236395184570386:91@lid",
1262            "129171292463295:0@lid",
1263            "45857667830004:14@lid",
1264        ]
1265        .into_iter()
1266        .map(|s| Jid::from_str(s).unwrap())
1267        .collect();
1268
1269        let all_devices: Vec<Jid> = [
1270            "236395184570386:91@lid",
1271            "129171292463295:0@lid",
1272            "45857667830004:14@lid",
1273            "45857667830004:15@lid", // new
1274        ]
1275        .into_iter()
1276        .map(|s| Jid::from_str(s).unwrap())
1277        .collect();
1278
1279        let known_set: HashSet<DeviceKey<'_>> =
1280            known_recipients.iter().map(|j| j.device_key()).collect();
1281
1282        let new_devices: Vec<Jid> = all_devices
1283            .into_iter()
1284            .filter(|device| !known_set.contains(&device.device_key()))
1285            .collect();
1286
1287        assert_eq!(new_devices.len(), 1);
1288        assert_eq!(new_devices[0].user, "45857667830004");
1289        assert_eq!(new_devices[0].device, 15);
1290    }
1291
1292    #[test]
1293    fn test_skdm_recipient_filtering_all_known() {
1294        use std::collections::HashSet;
1295
1296        let known_recipients: Vec<Jid> =
1297            ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1298                .into_iter()
1299                .map(|s| Jid::from_str(s).unwrap())
1300                .collect();
1301
1302        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1303            .into_iter()
1304            .map(|s| Jid::from_str(s).unwrap())
1305            .collect();
1306
1307        let known_set: HashSet<DeviceKey<'_>> =
1308            known_recipients.iter().map(|j| j.device_key()).collect();
1309
1310        let new_devices: Vec<Jid> = all_devices
1311            .into_iter()
1312            .filter(|device| !known_set.contains(&device.device_key()))
1313            .collect();
1314
1315        assert!(new_devices.is_empty());
1316    }
1317
1318    #[test]
1319    fn test_skdm_recipient_filtering_all_new() {
1320        use std::collections::HashSet;
1321
1322        let known_recipients: Vec<Jid> = vec![];
1323
1324        let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
1325            .into_iter()
1326            .map(|s| Jid::from_str(s).unwrap())
1327            .collect();
1328
1329        let known_set: HashSet<DeviceKey<'_>> =
1330            known_recipients.iter().map(|j| j.device_key()).collect();
1331
1332        let new_devices: Vec<Jid> = all_devices
1333            .clone()
1334            .into_iter()
1335            .filter(|device| !known_set.contains(&device.device_key()))
1336            .collect();
1337
1338        assert_eq!(new_devices.len(), all_devices.len());
1339    }
1340
1341    #[test]
1342    fn test_device_key_comparison() {
1343        // Jid parse/display normalizes :0 (omitted in Display, missing ':N' parses as device 0).
1344        // This test ensures DeviceKey comparisons work correctly under that normalization.
1345        let test_cases = [
1346            (
1347                "1234567890:0@s.whatsapp.net",
1348                "1234567890@s.whatsapp.net",
1349                true,
1350            ),
1351            (
1352                "1234567890:5@s.whatsapp.net",
1353                "1234567890:5@s.whatsapp.net",
1354                true,
1355            ),
1356            (
1357                "1234567890:5@s.whatsapp.net",
1358                "1234567890:6@s.whatsapp.net",
1359                false,
1360            ),
1361            ("236395184570386:91@lid", "236395184570386:91@lid", true),
1362            ("236395184570386:0@lid", "236395184570386@lid", true),
1363            ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
1364        ];
1365
1366        for (jid1_str, jid2_str, should_match) in test_cases {
1367            let jid1: Jid = jid1_str.parse().expect("should parse jid1");
1368            let jid2: Jid = jid2_str.parse().expect("should parse jid2");
1369
1370            let key1 = jid1.device_key();
1371            let key2 = jid2.device_key();
1372
1373            assert_eq!(
1374                key1 == key2,
1375                should_match,
1376                "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
1377                jid1_str,
1378                jid2_str,
1379                should_match,
1380                key1 == key2
1381            );
1382
1383            assert_eq!(
1384                jid1.device_eq(&jid2),
1385                should_match,
1386                "device_eq failed for '{}' vs '{}'",
1387                jid1_str,
1388                jid2_str
1389            );
1390        }
1391    }
1392
1393    #[test]
1394    fn test_skdm_filtering_large_group() {
1395        use std::collections::HashSet;
1396
1397        let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
1398        let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
1399
1400        for i in 0..1000i64 {
1401            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1402            let jid = Jid::from_str(&jid_str).unwrap();
1403            known_recipients.push(jid.clone());
1404            all_devices.push(jid);
1405        }
1406
1407        for i in 1000i64..1010i64 {
1408            let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1409            all_devices.push(Jid::from_str(&jid_str).unwrap());
1410        }
1411
1412        let known_set: HashSet<DeviceKey<'_>> =
1413            known_recipients.iter().map(|j| j.device_key()).collect();
1414
1415        let new_devices: Vec<Jid> = all_devices
1416            .into_iter()
1417            .filter(|device| !known_set.contains(&device.device_key()))
1418            .collect();
1419
1420        assert_eq!(new_devices.len(), 10);
1421    }
1422}