1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore::types::message::AddressingMode;
8use wacore_binary::jid::{DeviceKey, Jid, JidExt as _};
9use wacore_binary::node::Node;
10use waproto::whatsapp as wa;
11
12#[derive(Debug, Clone, Default)]
14pub struct SendOptions {
15 pub extra_stanza_nodes: Vec<Node>,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub enum RevokeType {
22 #[default]
24 Sender,
25 Admin { original_sender: Jid },
28}
29
30impl Client {
31 pub async fn send_message(
36 &self,
37 to: Jid,
38 message: wa::Message,
39 ) -> Result<String, anyhow::Error> {
40 self.send_message_with_options(to, message, SendOptions::default())
41 .await
42 }
43
44 pub async fn send_message_with_options(
46 &self,
47 to: Jid,
48 message: wa::Message,
49 options: SendOptions,
50 ) -> Result<String, anyhow::Error> {
51 let request_id = self.generate_message_id().await;
52 self.send_message_impl(
53 to,
54 &message,
55 Some(request_id.clone()),
56 false,
57 false,
58 None,
59 options.extra_stanza_nodes,
60 )
61 .await?;
62 Ok(request_id)
63 }
64
65 pub(crate) async fn send_status_message(
70 &self,
71 message: wa::Message,
72 recipients: Vec<Jid>,
73 options: crate::features::status::StatusSendOptions,
74 ) -> Result<String, anyhow::Error> {
75 use wacore::client::context::GroupInfo;
76 use wacore_binary::builder::NodeBuilder;
77
78 if recipients.is_empty() {
79 return Err(anyhow!("Cannot send status with no recipients"));
80 }
81
82 let to = Jid::status_broadcast();
83 let request_id = self.generate_message_id().await;
84
85 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
86 let own_jid = device_snapshot
87 .pn
88 .clone()
89 .ok_or_else(|| anyhow!("Not logged in"))?;
90 let own_lid = device_snapshot
93 .lid
94 .clone()
95 .unwrap_or_else(|| own_jid.clone());
96 let account_info = device_snapshot.account.clone();
97
98 let mut resolved_recipients = Vec::with_capacity(recipients.len());
104 for jid in recipients {
105 if jid.is_group() || jid.is_status_broadcast() || jid.is_broadcast_list() {
106 return Err(anyhow!(
107 "Invalid status recipient {}: must be a user JID, not a group/broadcast",
108 jid
109 ));
110 }
111 if jid.is_lid() {
112 if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
113 resolved_recipients
114 .push(Jid::new(&pn, wacore_binary::jid::DEFAULT_USER_SERVER));
115 } else {
116 return Err(anyhow!(
117 "No PN mapping for LID {}. Ensure the recipient has been \
118 contacted previously.",
119 jid
120 ));
121 }
122 } else {
123 resolved_recipients.push(jid);
124 }
125 }
126
127 if resolved_recipients.is_empty() {
128 return Err(anyhow!("No valid PN recipients after LID resolution"));
129 }
130
131 let mut seen_users = std::collections::HashSet::new();
133 resolved_recipients.retain(|jid| seen_users.insert(jid.user.clone()));
134
135 let mut group_info = GroupInfo::new(resolved_recipients, AddressingMode::Pn);
136
137 let own_base = own_jid.to_non_ad();
139 if !group_info
140 .participants
141 .iter()
142 .any(|p| p.is_same_user_as(&own_base))
143 {
144 group_info.participants.push(own_base);
145 }
146
147 self.add_recent_message(to.clone(), request_id.clone(), &message)
148 .await;
149
150 let device_store_arc = self.persistence_manager.get_device_arc().await;
151
152 let force_skdm = {
153 use wacore::libsignal::protocol::SenderKeyStore;
154 use wacore::libsignal::store::sender_key_name::SenderKeyName;
155 let mut device_guard = device_store_arc.write().await;
156 let sender_address = own_jid.to_protocol_address();
157 let sender_key_name = SenderKeyName::new(to.to_string(), sender_address.to_string());
158
159 let key_exists = device_guard
160 .load_sender_key(&sender_key_name)
161 .await?
162 .is_some();
163
164 !key_exists
165 };
166
167 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
168 let mut stores = wacore::send::SignalStores {
169 session_store: &mut store_adapter.session_store,
170 identity_store: &mut store_adapter.identity_store,
171 prekey_store: &mut store_adapter.pre_key_store,
172 signed_prekey_store: &store_adapter.signed_pre_key_store,
173 sender_key_store: &mut store_adapter.sender_key_store,
174 };
175
176 let marked_for_fresh_skdm = self
177 .consume_forget_marks(&to.to_string())
178 .await
179 .unwrap_or_default();
180
181 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
182 None
183 } else {
184 let known_recipients = self
185 .persistence_manager
186 .get_skdm_recipients(&to.to_string())
187 .await
188 .unwrap_or_default();
189
190 if known_recipients.is_empty() {
191 None
192 } else {
193 let jids_to_resolve: Vec<Jid> = group_info
194 .participants
195 .iter()
196 .map(|jid| jid.to_non_ad())
197 .collect();
198
199 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
200 Ok(all_devices) => {
201 let known_set: std::collections::HashSet<DeviceKey<'_>> =
202 known_recipients.iter().map(|j| j.device_key()).collect();
203 let new_devices: Vec<Jid> = all_devices
204 .into_iter()
205 .filter(|device| !known_set.contains(&device.device_key()))
206 .collect();
207 if new_devices.is_empty() {
208 Some(vec![])
209 } else {
210 Some(new_devices)
211 }
212 }
213 Err(e) => {
214 log::warn!("Failed to resolve devices for status SKDM check: {:?}", e);
215 None
216 }
217 }
218 }
219 };
220
221 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
222 match skdm_target_devices {
223 None => None,
224 Some(mut devices) => {
225 for marked_jid_str in &marked_for_fresh_skdm {
226 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
227 && !devices.iter().any(|d| d.device_eq(&marked_jid))
228 {
229 devices.push(marked_jid);
230 }
231 }
232 Some(devices)
233 }
234 }
235 } else {
236 skdm_target_devices
237 };
238
239 let is_full_distribution = force_skdm || skdm_target_devices.is_none();
240 let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
241
242 let is_revoke = message.protocol_message.as_ref().is_some_and(|pm| {
245 pm.r#type == Some(wa::message::protocol_message::Type::Revoke as i32)
246 });
247 let extra_stanza_nodes = if is_revoke {
248 vec![]
249 } else {
250 vec![
251 NodeBuilder::new("meta")
252 .attr("status_setting", options.privacy.as_str())
253 .build(),
254 ]
255 };
256
257 let stanza = match wacore::send::prepare_group_stanza(
258 &mut stores,
259 self,
260 &mut group_info,
261 &own_jid,
262 &own_lid,
263 account_info.as_ref(),
264 to.clone(),
265 &message,
266 request_id.clone(),
267 force_skdm,
268 skdm_target_devices,
269 None,
270 extra_stanza_nodes.clone(),
271 )
272 .await
273 {
274 Ok(stanza) => {
275 if !devices_receiving_skdm.is_empty() {
276 if let Err(e) = self
277 .persistence_manager
278 .add_skdm_recipients(&to.to_string(), &devices_receiving_skdm)
279 .await
280 {
281 log::warn!("Failed to update status SKDM recipients: {:?}", e);
282 }
283 } else if is_full_distribution {
284 let jids_to_resolve: Vec<Jid> = group_info
285 .participants
286 .iter()
287 .map(|jid| jid.to_non_ad())
288 .collect();
289
290 if let Ok(all_devices) =
291 SendContextResolver::resolve_devices(self, &jids_to_resolve).await
292 && let Err(e) = self
293 .persistence_manager
294 .add_skdm_recipients(&to.to_string(), &all_devices)
295 .await
296 {
297 log::warn!("Failed to update status SKDM recipients: {:?}", e);
298 }
299 }
300 stanza
301 }
302 Err(e) => {
303 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
304 e.downcast_ref::<SignalProtocolError>()
305 {
306 log::warn!("No sender key for status broadcast, forcing distribution.");
307
308 if let Err(e) = self
309 .persistence_manager
310 .clear_skdm_recipients(&to.to_string())
311 .await
312 {
313 log::warn!("Failed to clear status SKDM recipients: {:?}", e);
314 }
315
316 let mut store_adapter_retry =
317 SignalProtocolStoreAdapter::new(device_store_arc.clone());
318 let mut stores_retry = wacore::send::SignalStores {
319 session_store: &mut store_adapter_retry.session_store,
320 identity_store: &mut store_adapter_retry.identity_store,
321 prekey_store: &mut store_adapter_retry.pre_key_store,
322 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
323 sender_key_store: &mut store_adapter_retry.sender_key_store,
324 };
325
326 let retry_stanza = wacore::send::prepare_group_stanza(
327 &mut stores_retry,
328 self,
329 &mut group_info,
330 &own_jid,
331 &own_lid,
332 account_info.as_ref(),
333 to.clone(),
334 &message,
335 request_id.clone(),
336 true,
337 None,
338 None,
339 extra_stanza_nodes,
340 )
341 .await?;
342
343 let jids_to_resolve: Vec<Jid> = group_info
345 .participants
346 .iter()
347 .map(|jid| jid.to_non_ad())
348 .collect();
349 if let Ok(all_devices) =
350 SendContextResolver::resolve_devices(self, &jids_to_resolve).await
351 && let Err(e) = self
352 .persistence_manager
353 .add_skdm_recipients(&to.to_string(), &all_devices)
354 .await
355 {
356 log::warn!(
357 "Failed to update status SKDM recipients after retry: {:?}",
358 e
359 );
360 }
361
362 retry_stanza
363 } else {
364 return Err(e);
365 }
366 }
367 };
368
369 let stanza = self.ensure_status_participants(stanza, &group_info).await?;
376
377 self.send_node(stanza).await?;
378 Ok(request_id)
379 }
380
381 async fn ensure_status_participants(
387 &self,
388 mut stanza: wacore_binary::Node,
389 group_info: &wacore::client::context::GroupInfo,
390 ) -> Result<wacore_binary::Node, anyhow::Error> {
391 use wacore_binary::builder::NodeBuilder;
392 use wacore_binary::node::NodeContent;
393
394 let bare_to_nodes: Vec<wacore_binary::Node> = group_info
397 .participants
398 .iter()
399 .map(|jid| {
400 NodeBuilder::new("to")
401 .attr("jid", jid.to_non_ad().to_string())
402 .build()
403 })
404 .collect();
405
406 let children = match &mut stanza.content {
408 Some(NodeContent::Nodes(nodes)) => nodes,
409 _ => {
410 stanza.content = Some(NodeContent::Nodes(vec![]));
411 match &mut stanza.content {
412 Some(NodeContent::Nodes(nodes)) => nodes,
413 _ => unreachable!(),
414 }
415 }
416 };
417
418 if let Some(participants_node) = children.iter_mut().find(|n| n.tag == "participants") {
419 let existing_users: std::collections::HashSet<String> = participants_node
423 .children()
424 .unwrap_or_default()
425 .iter()
426 .filter_map(|n| {
427 n.attrs
428 .get("jid")
429 .and_then(|v| v.to_string().parse::<Jid>().ok().map(|j| j.user.clone()))
430 })
431 .collect();
432
433 let new_to_nodes: Vec<wacore_binary::Node> = bare_to_nodes
434 .into_iter()
435 .filter(|n| {
436 n.attrs
437 .get("jid")
438 .and_then(|v| v.to_string().parse::<Jid>().ok())
439 .map(|j| !existing_users.contains(&j.user))
440 .unwrap_or(false)
441 })
442 .collect();
443
444 if !new_to_nodes.is_empty() {
445 match &mut participants_node.content {
446 Some(NodeContent::Nodes(nodes)) => nodes.extend(new_to_nodes),
447 _ => {
448 participants_node.content = Some(NodeContent::Nodes(new_to_nodes));
449 }
450 }
451 }
452 } else {
453 let participants_node = NodeBuilder::new("participants")
455 .children(bare_to_nodes)
456 .build();
457 children.insert(0, participants_node);
458 }
459
460 Ok(stanza)
461 }
462
463 pub async fn revoke_message(
474 &self,
475 to: Jid,
476 message_id: impl Into<String>,
477 revoke_type: RevokeType,
478 ) -> Result<(), anyhow::Error> {
479 let message_id = message_id.into();
480 self.get_pn()
482 .await
483 .ok_or_else(|| anyhow!("Not logged in"))?;
484
485 let (from_me, participant, edit_attr) = match &revoke_type {
486 RevokeType::Sender => {
487 (
490 true,
491 None,
492 crate::types::message::EditAttribute::SenderRevoke,
493 )
494 }
495 RevokeType::Admin { original_sender } => {
496 if !to.is_group() {
498 return Err(anyhow!("Admin revoke is only valid for group chats"));
499 }
500 let participant_str = original_sender.to_non_ad().to_string();
503 log::debug!(
504 "Admin revoke: using participant {} for MessageKey",
505 participant_str
506 );
507 (
508 false,
509 Some(participant_str),
510 crate::types::message::EditAttribute::AdminRevoke,
511 )
512 }
513 };
514
515 let revoke_message = wa::Message {
516 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
517 key: Some(wa::MessageKey {
518 remote_jid: Some(to.to_string()),
519 from_me: Some(from_me),
520 id: Some(message_id.clone()),
521 participant,
522 }),
523 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
524 ..Default::default()
525 })),
526 ..Default::default()
527 };
528
529 let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
536 self.send_message_impl(
537 to,
538 &revoke_message,
539 None,
540 false,
541 force_skdm,
542 Some(edit_attr),
543 vec![],
544 )
545 .await
546 }
547
548 #[allow(clippy::too_many_arguments)]
549 pub(crate) async fn send_message_impl(
550 &self,
551 to: Jid,
552 message: &wa::Message,
553 request_id_override: Option<String>,
554 peer: bool,
555 force_key_distribution: bool,
556 edit: Option<crate::types::message::EditAttribute>,
557 extra_stanza_nodes: Vec<Node>,
558 ) -> Result<(), anyhow::Error> {
559 if to.is_status_broadcast() {
561 return Err(anyhow!(
562 "Use send_status_message() or client.status() API for status@broadcast"
563 ));
564 }
565
566 let request_id = match request_id_override {
568 Some(id) => id,
569 None => self.generate_message_id().await,
570 };
571
572 let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
573 let encryption_jid = self.resolve_encryption_jid(&to).await;
576 let signal_addr_str = encryption_jid.to_protocol_address().to_string();
577
578 let session_mutex = self
579 .session_locks
580 .get_with(signal_addr_str.clone(), async {
581 std::sync::Arc::new(tokio::sync::Mutex::new(()))
582 })
583 .await;
584 let _session_guard = session_mutex.lock().await;
585
586 let device_store_arc = self.persistence_manager.get_device_arc().await;
587 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
588
589 wacore::send::prepare_peer_stanza(
590 &mut store_adapter.session_store,
591 &mut store_adapter.identity_store,
592 to,
593 encryption_jid,
594 message,
595 request_id,
596 )
597 .await?
598 } else if to.is_group() {
599 let mut group_info = self.groups().query_info(&to).await?;
605
606 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
607 let own_jid = device_snapshot
608 .pn
609 .clone()
610 .ok_or_else(|| anyhow!("Not logged in"))?;
611 let own_lid = device_snapshot
612 .lid
613 .clone()
614 .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
615 let account_info = device_snapshot.account.clone();
616
617 self.add_recent_message(to.clone(), request_id.clone(), message)
619 .await;
620
621 let device_store_arc = self.persistence_manager.get_device_arc().await;
622
623 let (own_sending_jid, _) = match group_info.addressing_mode {
624 crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
625 crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
626 };
627
628 if !group_info
629 .participants
630 .iter()
631 .any(|participant| participant.is_same_user_as(&own_sending_jid))
632 {
633 group_info.participants.push(own_sending_jid.to_non_ad());
634 }
635
636 let force_skdm = {
637 use wacore::libsignal::protocol::SenderKeyStore;
638 use wacore::libsignal::store::sender_key_name::SenderKeyName;
639 let mut device_guard = device_store_arc.write().await;
640 let sender_address = own_sending_jid.to_protocol_address();
641 let sender_key_name =
642 SenderKeyName::new(to.to_string(), sender_address.to_string());
643
644 let key_exists = device_guard
645 .load_sender_key(&sender_key_name)
646 .await?
647 .is_some();
648
649 force_key_distribution || !key_exists
650 };
651
652 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
653
654 let mut stores = wacore::send::SignalStores {
655 session_store: &mut store_adapter.session_store,
656 identity_store: &mut store_adapter.identity_store,
657 prekey_store: &mut store_adapter.pre_key_store,
658 signed_prekey_store: &store_adapter.signed_pre_key_store,
659 sender_key_store: &mut store_adapter.sender_key_store,
660 };
661
662 let marked_for_fresh_skdm = self
665 .consume_forget_marks(&to.to_string())
666 .await
667 .unwrap_or_default();
668
669 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
671 None
672 } else {
673 let known_recipients = self
674 .persistence_manager
675 .get_skdm_recipients(&to.to_string())
676 .await
677 .unwrap_or_default();
678
679 if known_recipients.is_empty() {
680 None
681 } else {
682 let jids_to_resolve: Vec<Jid> = group_info
683 .participants
684 .iter()
685 .map(|jid| jid.to_non_ad())
686 .collect();
687
688 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
689 Ok(all_devices) => {
690 use std::collections::HashSet;
691
692 let known_set: HashSet<DeviceKey<'_>> =
693 known_recipients.iter().map(|j| j.device_key()).collect();
694
695 let new_devices: Vec<Jid> = all_devices
696 .into_iter()
697 .filter(|device| !known_set.contains(&device.device_key()))
698 .collect();
699
700 if new_devices.is_empty() {
701 Some(vec![])
702 } else {
703 log::debug!(
704 "Found {} new devices needing SKDM for group {}",
705 new_devices.len(),
706 to
707 );
708 Some(new_devices)
709 }
710 }
711 Err(e) => {
712 log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
713 None
714 }
715 }
716 }
717 };
718
719 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
721 match skdm_target_devices {
722 None => None,
723 Some(mut devices) => {
724 for marked_jid_str in &marked_for_fresh_skdm {
725 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
726 && !devices.iter().any(|d| d.device_eq(&marked_jid))
727 {
728 log::debug!(
729 "Adding {} to SKDM targets (marked for fresh key)",
730 marked_jid_str
731 );
732 devices.push(marked_jid);
733 }
734 }
735 Some(devices)
736 }
737 }
738 } else {
739 skdm_target_devices
740 };
741
742 let is_full_distribution = force_skdm || skdm_target_devices.is_none();
743 let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
744
745 match wacore::send::prepare_group_stanza(
746 &mut stores,
747 self,
748 &mut group_info,
749 &own_jid,
750 &own_lid,
751 account_info.as_ref(),
752 to.clone(),
753 message,
754 request_id.clone(),
755 force_skdm,
756 skdm_target_devices,
757 edit.clone(),
758 extra_stanza_nodes.clone(),
759 )
760 .await
761 {
762 Ok(stanza) => {
763 if !devices_receiving_skdm.is_empty() {
764 if let Err(e) = self
765 .persistence_manager
766 .add_skdm_recipients(&to.to_string(), &devices_receiving_skdm)
767 .await
768 {
769 log::warn!("Failed to update SKDM recipients: {:?}", e);
770 }
771 } else if is_full_distribution {
772 let jids_to_resolve: Vec<Jid> = group_info
773 .participants
774 .iter()
775 .map(|jid| jid.to_non_ad())
776 .collect();
777
778 if let Ok(all_devices) =
779 SendContextResolver::resolve_devices(self, &jids_to_resolve).await
780 && let Err(e) = self
781 .persistence_manager
782 .add_skdm_recipients(&to.to_string(), &all_devices)
783 .await
784 {
785 log::warn!("Failed to update SKDM recipients: {:?}", e);
786 }
787 }
788 stanza
789 }
790 Err(e) => {
791 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
792 e.downcast_ref::<SignalProtocolError>()
793 {
794 log::warn!("No sender key for group {}, forcing distribution.", to);
795
796 if let Err(e) = self
798 .persistence_manager
799 .clear_skdm_recipients(&to.to_string())
800 .await
801 {
802 log::warn!("Failed to clear SKDM recipients: {:?}", e);
803 }
804
805 let mut store_adapter_retry =
806 SignalProtocolStoreAdapter::new(device_store_arc.clone());
807 let mut stores_retry = wacore::send::SignalStores {
808 session_store: &mut store_adapter_retry.session_store,
809 identity_store: &mut store_adapter_retry.identity_store,
810 prekey_store: &mut store_adapter_retry.pre_key_store,
811 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
812 sender_key_store: &mut store_adapter_retry.sender_key_store,
813 };
814
815 let to_str = to.to_string();
816 let retry_stanza = wacore::send::prepare_group_stanza(
817 &mut stores_retry,
818 self,
819 &mut group_info,
820 &own_jid,
821 &own_lid,
822 account_info.as_ref(),
823 to,
824 message,
825 request_id,
826 true, None, edit.clone(),
829 extra_stanza_nodes.clone(),
830 )
831 .await?;
832
833 let jids_to_resolve: Vec<Jid> = group_info
835 .participants
836 .iter()
837 .map(|jid| jid.to_non_ad())
838 .collect();
839 if let Ok(all_devices) =
840 SendContextResolver::resolve_devices(self, &jids_to_resolve).await
841 && let Err(e) = self
842 .persistence_manager
843 .add_skdm_recipients(&to_str, &all_devices)
844 .await
845 {
846 log::warn!("Failed to update SKDM recipients after retry: {:?}", e);
847 }
848
849 retry_stanza
850 } else {
851 return Err(e);
852 }
853 }
854 }
855 } else {
856 let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
861 self.ensure_e2e_sessions(recipient_devices).await?;
862
863 let encryption_jid = self.resolve_encryption_jid(&to).await;
865 let signal_addr_str = encryption_jid.to_protocol_address().to_string();
866
867 self.add_recent_message(to.clone(), request_id.clone(), message)
869 .await;
870
871 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
872 let own_jid = device_snapshot
873 .pn
874 .clone()
875 .ok_or_else(|| anyhow!("Not logged in"))?;
876 let account_info = device_snapshot.account.clone();
877
878 let mut extra_stanza_nodes = extra_stanza_nodes;
881 if !to.is_group() && !to.is_newsletter() {
882 self.maybe_include_tc_token(&to, &mut extra_stanza_nodes)
883 .await;
884 }
885
886 let session_mutex = self
888 .session_locks
889 .get_with(signal_addr_str.clone(), async {
890 std::sync::Arc::new(tokio::sync::Mutex::new(()))
891 })
892 .await;
893 let _session_guard = session_mutex.lock().await;
894
895 let device_store_arc = self.persistence_manager.get_device_arc().await;
896 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
897
898 let mut stores = wacore::send::SignalStores {
899 session_store: &mut store_adapter.session_store,
900 identity_store: &mut store_adapter.identity_store,
901 prekey_store: &mut store_adapter.pre_key_store,
902 signed_prekey_store: &store_adapter.signed_pre_key_store,
903 sender_key_store: &mut store_adapter.sender_key_store,
904 };
905
906 wacore::send::prepare_dm_stanza(
907 &mut stores,
908 self,
909 &own_jid,
910 account_info.as_ref(),
911 to,
912 message,
913 request_id,
914 edit,
915 extra_stanza_nodes,
916 )
917 .await?
918 };
919
920 self.send_node(stanza_to_send).await.map_err(|e| e.into())
921 }
922
923 async fn maybe_include_tc_token(&self, to: &Jid, extra_nodes: &mut Vec<Node>) {
928 use wacore::iq::tctoken::{
929 IssuePrivacyTokensSpec, build_tc_token_node, is_tc_token_expired,
930 should_send_new_tc_token,
931 };
932 use wacore::store::traits::TcTokenEntry;
933
934 let snapshot = self.persistence_manager.get_device_snapshot().await;
936 let is_self = snapshot
937 .pn
938 .as_ref()
939 .is_some_and(|pn| pn.is_same_user_as(to))
940 || snapshot
941 .lid
942 .as_ref()
943 .is_some_and(|lid| lid.is_same_user_as(to));
944 if is_self {
945 return;
946 }
947
948 let token_jid = if to.is_lid() {
950 to.user.clone()
951 } else {
952 match self.lid_pn_cache.get_current_lid(&to.user).await {
953 Some(lid) => lid,
954 None => to.user.clone(),
955 }
956 };
957
958 let backend = self.persistence_manager.backend();
959
960 let existing = match backend.get_tc_token(&token_jid).await {
962 Ok(entry) => entry,
963 Err(e) => {
964 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
965 return;
966 }
967 };
968
969 match existing {
970 Some(entry) if !is_tc_token_expired(entry.token_timestamp) => {
971 extra_nodes.push(build_tc_token_node(&entry.token));
973
974 if should_send_new_tc_token(entry.sender_timestamp) {
977 let now = std::time::SystemTime::now()
978 .duration_since(std::time::UNIX_EPOCH)
979 .unwrap_or_default()
980 .as_secs() as i64;
981 let updated_entry = TcTokenEntry {
982 sender_timestamp: Some(now),
983 ..entry
984 };
985 if let Err(e) = backend.put_tc_token(&token_jid, &updated_entry).await {
986 log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp: {e}");
987 }
988 }
989 }
990 _ => {
991 let to_lid = self.resolve_to_lid_jid(to).await;
993 match self
994 .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(&to_lid)))
995 .await
996 {
997 Ok(response) => {
998 let now = std::time::SystemTime::now()
999 .duration_since(std::time::UNIX_EPOCH)
1000 .unwrap_or_default()
1001 .as_secs() as i64;
1002 for received in &response.tokens {
1003 let entry = TcTokenEntry {
1004 token: received.token.clone(),
1005 token_timestamp: received.timestamp,
1006 sender_timestamp: Some(now),
1007 };
1008
1009 let store_jid = received.jid.user.clone();
1011 if let Err(e) = backend.put_tc_token(&store_jid, &entry).await {
1012 log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
1013 }
1014
1015 if !received.token.is_empty() {
1017 extra_nodes.push(build_tc_token_node(&received.token));
1018 }
1019 }
1020 }
1021 Err(e) => {
1022 log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}: {e}", to_lid);
1023 }
1025 }
1026 }
1027 }
1028 }
1029
1030 pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
1034 use wacore::iq::tctoken::is_tc_token_expired;
1035
1036 let token_jid = if jid.is_lid() {
1037 jid.user.clone()
1038 } else {
1039 match self.lid_pn_cache.get_current_lid(&jid.user).await {
1040 Some(lid) => lid,
1041 None => jid.user.clone(),
1042 }
1043 };
1044
1045 let backend = self.persistence_manager.backend();
1046 match backend.get_tc_token(&token_jid).await {
1047 Ok(Some(entry)) if !is_tc_token_expired(entry.token_timestamp) => Some(entry.token),
1048 Ok(_) => None,
1049 Err(e) => {
1050 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1051 None
1052 }
1053 }
1054 }
1055
1056 async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1058 if jid.is_lid() {
1059 return jid.clone();
1060 }
1061
1062 if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1063 Jid::new(&lid_user, "lid")
1064 } else {
1065 jid.clone()
1066 }
1067 }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072 use super::*;
1073 use std::str::FromStr;
1074
1075 #[test]
1076 fn test_revoke_type_default_is_sender() {
1077 let revoke_type = RevokeType::default();
1079 assert_eq!(revoke_type, RevokeType::Sender);
1080 }
1081
1082 #[test]
1083 fn test_force_skdm_only_for_admin_revoke() {
1084 let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1088
1089 let sender_revoke = RevokeType::Sender;
1090 let admin_revoke = RevokeType::Admin {
1091 original_sender: sender_jid,
1092 };
1093
1094 let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1096 let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1097
1098 assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1099 assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1100 }
1101
1102 #[test]
1103 fn test_sender_revoke_message_key_structure() {
1104 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1107 let message_id = "3EB0ABC123".to_string();
1108
1109 let (from_me, participant, edit_attr) = match RevokeType::Sender {
1110 RevokeType::Sender => (
1111 true,
1112 None,
1113 crate::types::message::EditAttribute::SenderRevoke,
1114 ),
1115 RevokeType::Admin { original_sender } => (
1116 false,
1117 Some(original_sender.to_non_ad().to_string()),
1118 crate::types::message::EditAttribute::AdminRevoke,
1119 ),
1120 };
1121
1122 assert!(from_me, "Sender revoke must have from_me=true");
1123 assert!(
1124 participant.is_none(),
1125 "Sender revoke must NOT set participant"
1126 );
1127 assert_eq!(edit_attr.to_string_val(), "7");
1128
1129 let revoke_message = wa::Message {
1130 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1131 key: Some(wa::MessageKey {
1132 remote_jid: Some(to.to_string()),
1133 from_me: Some(from_me),
1134 id: Some(message_id.clone()),
1135 participant,
1136 }),
1137 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1138 ..Default::default()
1139 })),
1140 ..Default::default()
1141 };
1142
1143 let proto_msg = revoke_message.protocol_message.unwrap();
1144 let key = proto_msg.key.unwrap();
1145 assert_eq!(key.from_me, Some(true));
1146 assert_eq!(key.participant, None);
1147 assert_eq!(key.id, Some(message_id));
1148 }
1149
1150 #[test]
1151 fn test_admin_revoke_message_key_structure() {
1152 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1155 let message_id = "3EB0ABC123".to_string();
1156 let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1157
1158 let revoke_type = RevokeType::Admin {
1159 original_sender: original_sender.clone(),
1160 };
1161 let (from_me, participant, edit_attr) = match revoke_type {
1162 RevokeType::Sender => (
1163 true,
1164 None,
1165 crate::types::message::EditAttribute::SenderRevoke,
1166 ),
1167 RevokeType::Admin { original_sender } => (
1168 false,
1169 Some(original_sender.to_non_ad().to_string()),
1170 crate::types::message::EditAttribute::AdminRevoke,
1171 ),
1172 };
1173
1174 assert!(!from_me, "Admin revoke must have from_me=false");
1175 assert!(
1176 participant.is_some(),
1177 "Admin revoke MUST set participant to original sender"
1178 );
1179 assert_eq!(edit_attr.to_string_val(), "8");
1180
1181 let revoke_message = wa::Message {
1182 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1183 key: Some(wa::MessageKey {
1184 remote_jid: Some(to.to_string()),
1185 from_me: Some(from_me),
1186 id: Some(message_id.clone()),
1187 participant: participant.clone(),
1188 }),
1189 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1190 ..Default::default()
1191 })),
1192 ..Default::default()
1193 };
1194
1195 let proto_msg = revoke_message.protocol_message.unwrap();
1196 let key = proto_msg.key.unwrap();
1197 assert_eq!(key.from_me, Some(false));
1198 assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1200 assert_eq!(key.id, Some(message_id));
1201 }
1202
1203 #[test]
1204 fn test_admin_revoke_preserves_lid_format() {
1205 let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1209 let participant_str = lid_sender.to_non_ad().to_string();
1210
1211 assert_eq!(participant_str, "236395184570386@lid");
1213 assert!(
1214 participant_str.ends_with("@lid"),
1215 "LID participant must preserve @lid suffix"
1216 );
1217 }
1218
1219 #[test]
1222 fn test_skdm_recipient_filtering_basic() {
1223 use std::collections::HashSet;
1224
1225 let known_recipients: Vec<Jid> = [
1226 "1234567890:0@s.whatsapp.net",
1227 "1234567890:5@s.whatsapp.net",
1228 "9876543210:0@s.whatsapp.net",
1229 ]
1230 .into_iter()
1231 .map(|s| Jid::from_str(s).unwrap())
1232 .collect();
1233
1234 let all_devices: Vec<Jid> = [
1235 "1234567890:0@s.whatsapp.net",
1236 "1234567890:5@s.whatsapp.net",
1237 "9876543210:0@s.whatsapp.net",
1238 "5555555555:0@s.whatsapp.net", ]
1240 .into_iter()
1241 .map(|s| Jid::from_str(s).unwrap())
1242 .collect();
1243
1244 let known_set: HashSet<DeviceKey<'_>> =
1245 known_recipients.iter().map(|j| j.device_key()).collect();
1246
1247 let new_devices: Vec<Jid> = all_devices
1248 .into_iter()
1249 .filter(|device| !known_set.contains(&device.device_key()))
1250 .collect();
1251
1252 assert_eq!(new_devices.len(), 1);
1253 assert_eq!(new_devices[0].user, "5555555555");
1254 }
1255
1256 #[test]
1257 fn test_skdm_recipient_filtering_lid_jids() {
1258 use std::collections::HashSet;
1259
1260 let known_recipients: Vec<Jid> = [
1261 "236395184570386:91@lid",
1262 "129171292463295:0@lid",
1263 "45857667830004:14@lid",
1264 ]
1265 .into_iter()
1266 .map(|s| Jid::from_str(s).unwrap())
1267 .collect();
1268
1269 let all_devices: Vec<Jid> = [
1270 "236395184570386:91@lid",
1271 "129171292463295:0@lid",
1272 "45857667830004:14@lid",
1273 "45857667830004:15@lid", ]
1275 .into_iter()
1276 .map(|s| Jid::from_str(s).unwrap())
1277 .collect();
1278
1279 let known_set: HashSet<DeviceKey<'_>> =
1280 known_recipients.iter().map(|j| j.device_key()).collect();
1281
1282 let new_devices: Vec<Jid> = all_devices
1283 .into_iter()
1284 .filter(|device| !known_set.contains(&device.device_key()))
1285 .collect();
1286
1287 assert_eq!(new_devices.len(), 1);
1288 assert_eq!(new_devices[0].user, "45857667830004");
1289 assert_eq!(new_devices[0].device, 15);
1290 }
1291
1292 #[test]
1293 fn test_skdm_recipient_filtering_all_known() {
1294 use std::collections::HashSet;
1295
1296 let known_recipients: Vec<Jid> =
1297 ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1298 .into_iter()
1299 .map(|s| Jid::from_str(s).unwrap())
1300 .collect();
1301
1302 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1303 .into_iter()
1304 .map(|s| Jid::from_str(s).unwrap())
1305 .collect();
1306
1307 let known_set: HashSet<DeviceKey<'_>> =
1308 known_recipients.iter().map(|j| j.device_key()).collect();
1309
1310 let new_devices: Vec<Jid> = all_devices
1311 .into_iter()
1312 .filter(|device| !known_set.contains(&device.device_key()))
1313 .collect();
1314
1315 assert!(new_devices.is_empty());
1316 }
1317
1318 #[test]
1319 fn test_skdm_recipient_filtering_all_new() {
1320 use std::collections::HashSet;
1321
1322 let known_recipients: Vec<Jid> = vec![];
1323
1324 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
1325 .into_iter()
1326 .map(|s| Jid::from_str(s).unwrap())
1327 .collect();
1328
1329 let known_set: HashSet<DeviceKey<'_>> =
1330 known_recipients.iter().map(|j| j.device_key()).collect();
1331
1332 let new_devices: Vec<Jid> = all_devices
1333 .clone()
1334 .into_iter()
1335 .filter(|device| !known_set.contains(&device.device_key()))
1336 .collect();
1337
1338 assert_eq!(new_devices.len(), all_devices.len());
1339 }
1340
1341 #[test]
1342 fn test_device_key_comparison() {
1343 let test_cases = [
1346 (
1347 "1234567890:0@s.whatsapp.net",
1348 "1234567890@s.whatsapp.net",
1349 true,
1350 ),
1351 (
1352 "1234567890:5@s.whatsapp.net",
1353 "1234567890:5@s.whatsapp.net",
1354 true,
1355 ),
1356 (
1357 "1234567890:5@s.whatsapp.net",
1358 "1234567890:6@s.whatsapp.net",
1359 false,
1360 ),
1361 ("236395184570386:91@lid", "236395184570386:91@lid", true),
1362 ("236395184570386:0@lid", "236395184570386@lid", true),
1363 ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
1364 ];
1365
1366 for (jid1_str, jid2_str, should_match) in test_cases {
1367 let jid1: Jid = jid1_str.parse().expect("should parse jid1");
1368 let jid2: Jid = jid2_str.parse().expect("should parse jid2");
1369
1370 let key1 = jid1.device_key();
1371 let key2 = jid2.device_key();
1372
1373 assert_eq!(
1374 key1 == key2,
1375 should_match,
1376 "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
1377 jid1_str,
1378 jid2_str,
1379 should_match,
1380 key1 == key2
1381 );
1382
1383 assert_eq!(
1384 jid1.device_eq(&jid2),
1385 should_match,
1386 "device_eq failed for '{}' vs '{}'",
1387 jid1_str,
1388 jid2_str
1389 );
1390 }
1391 }
1392
1393 #[test]
1394 fn test_skdm_filtering_large_group() {
1395 use std::collections::HashSet;
1396
1397 let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
1398 let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
1399
1400 for i in 0..1000i64 {
1401 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1402 let jid = Jid::from_str(&jid_str).unwrap();
1403 known_recipients.push(jid.clone());
1404 all_devices.push(jid);
1405 }
1406
1407 for i in 1000i64..1010i64 {
1408 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1409 all_devices.push(Jid::from_str(&jid_str).unwrap());
1410 }
1411
1412 let known_set: HashSet<DeviceKey<'_>> =
1413 known_recipients.iter().map(|j| j.device_key()).collect();
1414
1415 let new_devices: Vec<Jid> = all_devices
1416 .into_iter()
1417 .filter(|device| !known_set.contains(&device.device_key()))
1418 .collect();
1419
1420 assert_eq!(new_devices.len(), 10);
1421 }
1422}