1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore::types::message::AddressingMode;
8use wacore_binary::jid::{DeviceKey, Jid, JidExt as _};
9use wacore_binary::node::Node;
10use waproto::whatsapp as wa;
11
12#[derive(Debug, Clone, Default)]
14pub struct SendOptions {
15 pub extra_stanza_nodes: Vec<Node>,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub enum RevokeType {
22 #[default]
24 Sender,
25 Admin { original_sender: Jid },
28}
29
30impl Client {
31 pub async fn send_message(
36 &self,
37 to: Jid,
38 message: wa::Message,
39 ) -> Result<String, anyhow::Error> {
40 self.send_message_with_options(to, message, SendOptions::default())
41 .await
42 }
43
44 pub async fn send_message_with_options(
46 &self,
47 to: Jid,
48 message: wa::Message,
49 options: SendOptions,
50 ) -> Result<String, anyhow::Error> {
51 let request_id = self.generate_message_id().await;
52 self.send_message_impl(
53 to,
54 &message,
55 Some(request_id.clone()),
56 false,
57 false,
58 None,
59 options.extra_stanza_nodes,
60 )
61 .await?;
62 Ok(request_id)
63 }
64
65 pub(crate) async fn send_status_message(
70 &self,
71 message: wa::Message,
72 recipients: Vec<Jid>,
73 options: crate::features::status::StatusSendOptions,
74 ) -> Result<String, anyhow::Error> {
75 use wacore::client::context::GroupInfo;
76 use wacore_binary::builder::NodeBuilder;
77
78 if recipients.is_empty() {
79 return Err(anyhow!("Cannot send status with no recipients"));
80 }
81
82 let to = Jid::status_broadcast();
83 let request_id = self.generate_message_id().await;
84
85 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
86 let own_jid = device_snapshot
87 .pn
88 .clone()
89 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
90 let own_lid = device_snapshot
93 .lid
94 .clone()
95 .unwrap_or_else(|| own_jid.clone());
96 let account_info = device_snapshot.account.clone();
97
98 let mut resolved_recipients = Vec::with_capacity(recipients.len());
104 for jid in recipients {
105 if jid.is_group() || jid.is_status_broadcast() || jid.is_broadcast_list() {
106 return Err(anyhow!(
107 "Invalid status recipient {}: must be a user JID, not a group/broadcast",
108 jid
109 ));
110 }
111 if jid.is_lid() {
112 if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
113 resolved_recipients
114 .push(Jid::new(&pn, wacore_binary::jid::DEFAULT_USER_SERVER));
115 } else {
116 return Err(anyhow!(
117 "No PN mapping for LID {}. Ensure the recipient has been \
118 contacted previously.",
119 jid
120 ));
121 }
122 } else {
123 resolved_recipients.push(jid);
124 }
125 }
126
127 if resolved_recipients.is_empty() {
128 return Err(anyhow!("No valid PN recipients after LID resolution"));
129 }
130
131 let mut seen_users = std::collections::HashSet::new();
133 resolved_recipients.retain(|jid| seen_users.insert(jid.user.clone()));
134
135 let mut group_info = GroupInfo::new(resolved_recipients, AddressingMode::Pn);
136
137 let own_base = own_jid.to_non_ad();
139 if !group_info
140 .participants
141 .iter()
142 .any(|p| p.is_same_user_as(&own_base))
143 {
144 group_info.participants.push(own_base);
145 }
146
147 self.add_recent_message(to.clone(), request_id.clone(), &message)
148 .await;
149
150 let device_store_arc = self.persistence_manager.get_device_arc().await;
151 let to_str = to.to_string();
152
153 let force_skdm = {
154 use wacore::libsignal::store::sender_key_name::SenderKeyName;
155 let sender_address = own_jid.to_protocol_address();
156 let sender_key_name = SenderKeyName::new(to_str.clone(), sender_address.to_string());
157
158 let device_guard = device_store_arc.read().await;
159 let key_exists = self
160 .signal_cache
161 .get_sender_key(&sender_key_name, &*device_guard.backend)
162 .await?
163 .is_some();
164
165 !key_exists
166 };
167
168 let mut store_adapter =
169 SignalProtocolStoreAdapter::new(device_store_arc.clone(), self.signal_cache.clone());
170 let mut stores = wacore::send::SignalStores {
171 session_store: &mut store_adapter.session_store,
172 identity_store: &mut store_adapter.identity_store,
173 prekey_store: &mut store_adapter.pre_key_store,
174 signed_prekey_store: &store_adapter.signed_pre_key_store,
175 sender_key_store: &mut store_adapter.sender_key_store,
176 };
177
178 let marked_for_fresh_skdm = self.consume_forget_marks(&to_str).await.unwrap_or_default();
179
180 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
181 None
182 } else {
183 let known_recipients = self
184 .persistence_manager
185 .get_skdm_recipients(&to_str)
186 .await
187 .unwrap_or_default();
188
189 if known_recipients.is_empty() {
190 None
191 } else {
192 let jids_to_resolve: Vec<Jid> = group_info
193 .participants
194 .iter()
195 .map(|jid| jid.to_non_ad())
196 .collect();
197
198 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
199 Ok(all_devices) => {
200 let known_set: std::collections::HashSet<DeviceKey<'_>> =
201 known_recipients.iter().map(|j| j.device_key()).collect();
202 let new_devices: Vec<Jid> = all_devices
203 .into_iter()
204 .filter(|device| !known_set.contains(&device.device_key()))
205 .collect();
206 if new_devices.is_empty() {
207 Some(vec![])
208 } else {
209 Some(new_devices)
210 }
211 }
212 Err(e) => {
213 log::warn!("Failed to resolve devices for status SKDM check: {:?}", e);
214 None
215 }
216 }
217 }
218 };
219
220 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
221 match skdm_target_devices {
222 None => None,
223 Some(mut devices) => {
224 for marked_jid_str in &marked_for_fresh_skdm {
225 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
226 && !devices.iter().any(|d| d.device_eq(&marked_jid))
227 {
228 devices.push(marked_jid);
229 }
230 }
231 Some(devices)
232 }
233 }
234 } else {
235 skdm_target_devices
236 };
237
238 let is_full_distribution = force_skdm || skdm_target_devices.is_none();
239 let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
240 #[allow(clippy::needless_late_init)]
241 let skdm_is_full: bool;
242
243 let is_revoke = message.protocol_message.as_ref().is_some_and(|pm| {
246 pm.r#type == Some(wa::message::protocol_message::Type::Revoke as i32)
247 });
248 let extra_stanza_nodes = if is_revoke {
249 vec![]
250 } else {
251 vec![
252 NodeBuilder::new("meta")
253 .attr("status_setting", options.privacy.as_str())
254 .build(),
255 ]
256 };
257
258 let stanza = match wacore::send::prepare_group_stanza(
259 &mut stores,
260 self,
261 &mut group_info,
262 &own_jid,
263 &own_lid,
264 account_info.as_ref(),
265 to.clone(),
266 &message,
267 request_id.clone(),
268 force_skdm,
269 skdm_target_devices,
270 None,
271 &extra_stanza_nodes,
272 )
273 .await
274 {
275 Ok(stanza) => {
276 skdm_is_full = is_full_distribution;
277 stanza
278 }
279 Err(e) => {
280 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
281 e.downcast_ref::<SignalProtocolError>()
282 {
283 log::warn!("No sender key for status broadcast, forcing distribution.");
284
285 if let Err(e) = self
286 .persistence_manager
287 .clear_skdm_recipients(&to_str)
288 .await
289 {
290 log::warn!("Failed to clear status SKDM recipients: {:?}", e);
291 }
292
293 let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
294 device_store_arc.clone(),
295 self.signal_cache.clone(),
296 );
297 let mut stores_retry = wacore::send::SignalStores {
298 session_store: &mut store_adapter_retry.session_store,
299 identity_store: &mut store_adapter_retry.identity_store,
300 prekey_store: &mut store_adapter_retry.pre_key_store,
301 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
302 sender_key_store: &mut store_adapter_retry.sender_key_store,
303 };
304
305 let retry_stanza = wacore::send::prepare_group_stanza(
306 &mut stores_retry,
307 self,
308 &mut group_info,
309 &own_jid,
310 &own_lid,
311 account_info.as_ref(),
312 to.clone(),
313 &message,
314 request_id.clone(),
315 true,
316 None,
317 None,
318 &extra_stanza_nodes,
319 )
320 .await?;
321
322 skdm_is_full = true;
324 retry_stanza
325 } else {
326 return Err(e);
327 }
328 }
329 };
330
331 let stanza = self.ensure_status_participants(stanza, &group_info).await?;
338
339 self.send_node(stanza).await?;
340
341 self.update_skdm_recipients(
344 &to_str,
345 &devices_receiving_skdm,
346 skdm_is_full,
347 &group_info.participants,
348 )
349 .await;
350
351 if let Err(e) = self.flush_signal_cache().await {
353 log::error!("Failed to flush signal cache after send_status_message: {e:?}");
354 }
355
356 Ok(request_id)
357 }
358
359 async fn update_skdm_recipients(
366 &self,
367 to_str: &str,
368 devices_receiving_skdm: &[Jid],
369 is_full_distribution: bool,
370 participants: &[Jid],
371 ) {
372 if !devices_receiving_skdm.is_empty() {
373 if let Err(e) = self
374 .persistence_manager
375 .add_skdm_recipients(to_str, devices_receiving_skdm)
376 .await
377 {
378 log::warn!("Failed to update SKDM recipients: {:?}", e);
379 }
380 } else if is_full_distribution {
381 let jids_to_resolve: Vec<Jid> =
382 participants.iter().map(|jid| jid.to_non_ad()).collect();
383 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
384 Ok(all_devices) => {
385 if let Err(e) = self
386 .persistence_manager
387 .add_skdm_recipients(to_str, &all_devices)
388 .await
389 {
390 log::warn!("Failed to persist SKDM recipients: {:?}", e);
391 }
392 }
393 Err(e) => {
394 log::warn!("Failed to resolve devices for SKDM recipients: {:?}", e);
395 }
396 }
397 }
398 }
399
400 async fn ensure_status_participants(
406 &self,
407 stanza: wacore_binary::Node,
408 group_info: &wacore::client::context::GroupInfo,
409 ) -> Result<wacore_binary::Node, anyhow::Error> {
410 Ok(wacore::send::ensure_status_participants(stanza, group_info))
411 }
412
413 pub async fn revoke_message(
424 &self,
425 to: Jid,
426 message_id: impl Into<String>,
427 revoke_type: RevokeType,
428 ) -> Result<(), anyhow::Error> {
429 let message_id = message_id.into();
430 self.get_pn()
432 .await
433 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
434
435 let (from_me, participant, edit_attr) = match &revoke_type {
436 RevokeType::Sender => {
437 (
440 true,
441 None,
442 crate::types::message::EditAttribute::SenderRevoke,
443 )
444 }
445 RevokeType::Admin { original_sender } => {
446 if !to.is_group() {
448 return Err(anyhow!("Admin revoke is only valid for group chats"));
449 }
450 let participant_str = original_sender.to_non_ad().to_string();
453 log::debug!(
454 "Admin revoke: using participant {} for MessageKey",
455 participant_str
456 );
457 (
458 false,
459 Some(participant_str),
460 crate::types::message::EditAttribute::AdminRevoke,
461 )
462 }
463 };
464
465 let revoke_message = wa::Message {
466 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
467 key: Some(wa::MessageKey {
468 remote_jid: Some(to.to_string()),
469 from_me: Some(from_me),
470 id: Some(message_id.clone()),
471 participant,
472 }),
473 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
474 ..Default::default()
475 })),
476 ..Default::default()
477 };
478
479 let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
486 self.send_message_impl(
487 to,
488 &revoke_message,
489 None,
490 false,
491 force_skdm,
492 Some(edit_attr),
493 vec![],
494 )
495 .await
496 }
497
498 #[allow(clippy::too_many_arguments)]
499 pub(crate) async fn send_message_impl(
500 &self,
501 to: Jid,
502 message: &wa::Message,
503 request_id_override: Option<String>,
504 peer: bool,
505 force_key_distribution: bool,
506 edit: Option<crate::types::message::EditAttribute>,
507 extra_stanza_nodes: Vec<Node>,
508 ) -> Result<(), anyhow::Error> {
509 if to.is_status_broadcast() {
511 return Err(anyhow!(
512 "Use send_status_message() or client.status() API for status@broadcast"
513 ));
514 }
515
516 let request_id = match request_id_override {
518 Some(id) => id,
519 None => self.generate_message_id().await,
520 };
521
522 struct SkdmUpdate {
525 to_str: String,
526 devices: Vec<Jid>,
527 is_full_distribution: bool,
528 participants: Vec<Jid>,
529 }
530 let mut skdm_update: Option<SkdmUpdate> = None;
531
532 let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
533 let encryption_jid = self.resolve_encryption_jid(&to).await;
536 let signal_addr_str = encryption_jid.to_protocol_address_string();
537
538 let session_mutex = self
539 .session_locks
540 .get_with_by_ref(&signal_addr_str, async {
541 std::sync::Arc::new(async_lock::Mutex::new(()))
542 })
543 .await;
544 let _session_guard = session_mutex.lock().await;
545
546 let device_store_arc = self.persistence_manager.get_device_arc().await;
547 let mut store_adapter =
548 SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
549
550 wacore::send::prepare_peer_stanza(
551 &mut store_adapter.session_store,
552 &mut store_adapter.identity_store,
553 to,
554 encryption_jid,
555 message,
556 request_id,
557 )
558 .await?
559 } else if to.is_group() {
560 let mut group_info = self.groups().query_info(&to).await?;
566
567 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
568 let own_jid = device_snapshot
569 .pn
570 .clone()
571 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
572 let own_lid = device_snapshot
573 .lid
574 .clone()
575 .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
576 let account_info = device_snapshot.account.clone();
577
578 self.add_recent_message(to.clone(), request_id.clone(), message)
580 .await;
581
582 let device_store_arc = self.persistence_manager.get_device_arc().await;
583 let to_str = to.to_string();
584
585 let (own_sending_jid, _) = match group_info.addressing_mode {
586 crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
587 crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
588 };
589
590 if !group_info
591 .participants
592 .iter()
593 .any(|participant| participant.is_same_user_as(&own_sending_jid))
594 {
595 group_info.participants.push(own_sending_jid.to_non_ad());
596 }
597
598 let force_skdm = {
599 use wacore::libsignal::protocol::SenderKeyStore;
600 use wacore::libsignal::store::sender_key_name::SenderKeyName;
601 let mut device_guard = device_store_arc.write().await;
602 let sender_address = own_sending_jid.to_protocol_address();
603 let sender_key_name =
604 SenderKeyName::new(to_str.clone(), sender_address.to_string());
605
606 let key_exists = device_guard
607 .load_sender_key(&sender_key_name)
608 .await?
609 .is_some();
610
611 force_key_distribution || !key_exists
612 };
613
614 let mut store_adapter = SignalProtocolStoreAdapter::new(
615 device_store_arc.clone(),
616 self.signal_cache.clone(),
617 );
618
619 let mut stores = wacore::send::SignalStores {
620 session_store: &mut store_adapter.session_store,
621 identity_store: &mut store_adapter.identity_store,
622 prekey_store: &mut store_adapter.pre_key_store,
623 signed_prekey_store: &store_adapter.signed_pre_key_store,
624 sender_key_store: &mut store_adapter.sender_key_store,
625 };
626
627 let marked_for_fresh_skdm =
630 self.consume_forget_marks(&to_str).await.unwrap_or_default();
631
632 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
634 None
635 } else {
636 let known_recipients = self
637 .persistence_manager
638 .get_skdm_recipients(&to_str)
639 .await
640 .unwrap_or_default();
641
642 if known_recipients.is_empty() {
643 None
644 } else {
645 let jids_to_resolve: Vec<Jid> = group_info
646 .participants
647 .iter()
648 .map(|jid| jid.to_non_ad())
649 .collect();
650
651 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
652 Ok(all_devices) => {
653 use std::collections::HashSet;
654
655 let known_set: HashSet<DeviceKey<'_>> =
656 known_recipients.iter().map(|j| j.device_key()).collect();
657
658 let new_devices: Vec<Jid> = all_devices
659 .into_iter()
660 .filter(|device| !known_set.contains(&device.device_key()))
661 .collect();
662
663 if new_devices.is_empty() {
664 Some(vec![])
665 } else {
666 log::debug!(
667 "Found {} new devices needing SKDM for group {}",
668 new_devices.len(),
669 to
670 );
671 Some(new_devices)
672 }
673 }
674 Err(e) => {
675 log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
676 None
677 }
678 }
679 }
680 };
681
682 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
684 match skdm_target_devices {
685 None => None,
686 Some(mut devices) => {
687 for marked_jid_str in &marked_for_fresh_skdm {
688 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
689 && !devices.iter().any(|d| d.device_eq(&marked_jid))
690 {
691 log::debug!(
692 "Adding {} to SKDM targets (marked for fresh key)",
693 marked_jid_str
694 );
695 devices.push(marked_jid);
696 }
697 }
698 Some(devices)
699 }
700 }
701 } else {
702 skdm_target_devices
703 };
704
705 let is_full_distribution = force_skdm || skdm_target_devices.is_none();
706 let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
707
708 match wacore::send::prepare_group_stanza(
709 &mut stores,
710 self,
711 &mut group_info,
712 &own_jid,
713 &own_lid,
714 account_info.as_ref(),
715 to.clone(),
716 message,
717 request_id.clone(),
718 force_skdm,
719 skdm_target_devices,
720 edit.clone(),
721 &extra_stanza_nodes,
722 )
723 .await
724 {
725 Ok(stanza) => {
726 skdm_update = Some(SkdmUpdate {
727 to_str: to_str.clone(),
728 devices: devices_receiving_skdm,
729 is_full_distribution,
730 participants: group_info.participants.clone(),
731 });
732 stanza
733 }
734 Err(e) => {
735 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
736 e.downcast_ref::<SignalProtocolError>()
737 {
738 log::warn!("No sender key for group {}, forcing distribution.", to);
739
740 if let Err(e) = self
742 .persistence_manager
743 .clear_skdm_recipients(&to_str)
744 .await
745 {
746 log::warn!("Failed to clear SKDM recipients: {:?}", e);
747 }
748
749 let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
750 device_store_arc.clone(),
751 self.signal_cache.clone(),
752 );
753 let mut stores_retry = wacore::send::SignalStores {
754 session_store: &mut store_adapter_retry.session_store,
755 identity_store: &mut store_adapter_retry.identity_store,
756 prekey_store: &mut store_adapter_retry.pre_key_store,
757 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
758 sender_key_store: &mut store_adapter_retry.sender_key_store,
759 };
760
761 let retry_stanza = wacore::send::prepare_group_stanza(
762 &mut stores_retry,
763 self,
764 &mut group_info,
765 &own_jid,
766 &own_lid,
767 account_info.as_ref(),
768 to,
769 message,
770 request_id,
771 true, None, edit.clone(),
774 &extra_stanza_nodes,
775 )
776 .await?;
777
778 skdm_update = Some(SkdmUpdate {
780 to_str,
781 devices: vec![],
782 is_full_distribution: true,
783 participants: group_info.participants.clone(),
784 });
785 retry_stanza
786 } else {
787 return Err(e);
788 }
789 }
790 }
791 } else {
792 let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
797 self.ensure_e2e_sessions(recipient_devices).await?;
798
799 let encryption_jid = self.resolve_encryption_jid(&to).await;
801 let signal_addr_str = encryption_jid.to_protocol_address_string();
802
803 self.add_recent_message(to.clone(), request_id.clone(), message)
805 .await;
806
807 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
808 let own_jid = device_snapshot
809 .pn
810 .clone()
811 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
812 let account_info = device_snapshot.account.clone();
813
814 let mut extra_stanza_nodes = extra_stanza_nodes;
817 if !to.is_group() && !to.is_newsletter() {
818 self.maybe_include_tc_token(&to, &mut extra_stanza_nodes)
819 .await;
820 }
821
822 let session_mutex = self
824 .session_locks
825 .get_with_by_ref(&signal_addr_str, async {
826 std::sync::Arc::new(async_lock::Mutex::new(()))
827 })
828 .await;
829 let _session_guard = session_mutex.lock().await;
830
831 let device_store_arc = self.persistence_manager.get_device_arc().await;
832 let mut store_adapter =
833 SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
834
835 let mut stores = wacore::send::SignalStores {
836 session_store: &mut store_adapter.session_store,
837 identity_store: &mut store_adapter.identity_store,
838 prekey_store: &mut store_adapter.pre_key_store,
839 signed_prekey_store: &store_adapter.signed_pre_key_store,
840 sender_key_store: &mut store_adapter.sender_key_store,
841 };
842
843 wacore::send::prepare_dm_stanza(
844 &mut stores,
845 self,
846 &own_jid,
847 account_info.as_ref(),
848 to,
849 message,
850 request_id,
851 edit,
852 &extra_stanza_nodes,
853 )
854 .await?
855 };
856
857 self.send_node(stanza_to_send).await?;
858
859 if let Some(update) = skdm_update {
862 self.update_skdm_recipients(
863 &update.to_str,
864 &update.devices,
865 update.is_full_distribution,
866 &update.participants,
867 )
868 .await;
869 }
870
871 if let Err(e) = self.flush_signal_cache().await {
873 log::error!("Failed to flush signal cache after send_message_impl: {e:?}");
874 }
875
876 Ok(())
877 }
878
879 async fn maybe_include_tc_token(&self, to: &Jid, extra_nodes: &mut Vec<Node>) {
884 use wacore::iq::tctoken::{
885 IssuePrivacyTokensSpec, build_tc_token_node, is_tc_token_expired,
886 should_send_new_tc_token,
887 };
888 use wacore::store::traits::TcTokenEntry;
889
890 let snapshot = self.persistence_manager.get_device_snapshot().await;
892 let is_self = snapshot
893 .pn
894 .as_ref()
895 .is_some_and(|pn| pn.is_same_user_as(to))
896 || snapshot
897 .lid
898 .as_ref()
899 .is_some_and(|lid| lid.is_same_user_as(to));
900 if is_self {
901 return;
902 }
903
904 let token_jid = if to.is_lid() {
906 to.user.clone()
907 } else {
908 match self.lid_pn_cache.get_current_lid(&to.user).await {
909 Some(lid) => lid,
910 None => to.user.clone(),
911 }
912 };
913
914 let backend = self.persistence_manager.backend();
915
916 let existing = match backend.get_tc_token(&token_jid).await {
918 Ok(entry) => entry,
919 Err(e) => {
920 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
921 return;
922 }
923 };
924
925 match existing {
926 Some(entry) if !is_tc_token_expired(entry.token_timestamp) => {
927 extra_nodes.push(build_tc_token_node(&entry.token));
929
930 if should_send_new_tc_token(entry.sender_timestamp) {
933 let now = wacore::time::now_secs();
934 let updated_entry = TcTokenEntry {
935 sender_timestamp: Some(now),
936 ..entry
937 };
938 if let Err(e) = backend.put_tc_token(&token_jid, &updated_entry).await {
939 log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp: {e}");
940 }
941 }
942 }
943 _ => {
944 let to_lid = self.resolve_to_lid_jid(to).await;
946 match self
947 .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(&to_lid)))
948 .await
949 {
950 Ok(response) => {
951 let now = wacore::time::now_secs();
952 for received in &response.tokens {
953 let entry = TcTokenEntry {
954 token: received.token.clone(),
955 token_timestamp: received.timestamp,
956 sender_timestamp: Some(now),
957 };
958
959 let store_jid = received.jid.user.clone();
961 if let Err(e) = backend.put_tc_token(&store_jid, &entry).await {
962 log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
963 }
964
965 if !received.token.is_empty() {
967 extra_nodes.push(build_tc_token_node(&received.token));
968 }
969 }
970 }
971 Err(e) => {
972 log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}: {e}", to_lid);
973 }
975 }
976 }
977 }
978 }
979
980 pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
984 use wacore::iq::tctoken::is_tc_token_expired;
985
986 let token_jid = if jid.is_lid() {
987 jid.user.clone()
988 } else {
989 match self.lid_pn_cache.get_current_lid(&jid.user).await {
990 Some(lid) => lid,
991 None => jid.user.clone(),
992 }
993 };
994
995 let backend = self.persistence_manager.backend();
996 match backend.get_tc_token(&token_jid).await {
997 Ok(Some(entry)) if !is_tc_token_expired(entry.token_timestamp) => Some(entry.token),
998 Ok(_) => None,
999 Err(e) => {
1000 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1001 None
1002 }
1003 }
1004 }
1005
1006 async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1008 if jid.is_lid() {
1009 return jid.clone();
1010 }
1011
1012 if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1013 Jid::new(&lid_user, "lid")
1014 } else {
1015 jid.clone()
1016 }
1017 }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use super::*;
1023 use std::str::FromStr;
1024
1025 #[test]
1026 fn test_revoke_type_default_is_sender() {
1027 let revoke_type = RevokeType::default();
1029 assert_eq!(revoke_type, RevokeType::Sender);
1030 }
1031
1032 #[test]
1033 fn test_force_skdm_only_for_admin_revoke() {
1034 let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1038
1039 let sender_revoke = RevokeType::Sender;
1040 let admin_revoke = RevokeType::Admin {
1041 original_sender: sender_jid,
1042 };
1043
1044 let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1046 let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1047
1048 assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1049 assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1050 }
1051
1052 #[test]
1053 fn test_sender_revoke_message_key_structure() {
1054 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1057 let message_id = "3EB0ABC123".to_string();
1058
1059 let (from_me, participant, edit_attr) = match RevokeType::Sender {
1060 RevokeType::Sender => (
1061 true,
1062 None,
1063 crate::types::message::EditAttribute::SenderRevoke,
1064 ),
1065 RevokeType::Admin { original_sender } => (
1066 false,
1067 Some(original_sender.to_non_ad().to_string()),
1068 crate::types::message::EditAttribute::AdminRevoke,
1069 ),
1070 };
1071
1072 assert!(from_me, "Sender revoke must have from_me=true");
1073 assert!(
1074 participant.is_none(),
1075 "Sender revoke must NOT set participant"
1076 );
1077 assert_eq!(edit_attr.to_string_val(), "7");
1078
1079 let revoke_message = wa::Message {
1080 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1081 key: Some(wa::MessageKey {
1082 remote_jid: Some(to.to_string()),
1083 from_me: Some(from_me),
1084 id: Some(message_id.clone()),
1085 participant,
1086 }),
1087 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1088 ..Default::default()
1089 })),
1090 ..Default::default()
1091 };
1092
1093 let proto_msg = revoke_message.protocol_message.unwrap();
1094 let key = proto_msg.key.unwrap();
1095 assert_eq!(key.from_me, Some(true));
1096 assert_eq!(key.participant, None);
1097 assert_eq!(key.id, Some(message_id));
1098 }
1099
1100 #[test]
1101 fn test_admin_revoke_message_key_structure() {
1102 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1105 let message_id = "3EB0ABC123".to_string();
1106 let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1107
1108 let revoke_type = RevokeType::Admin {
1109 original_sender: original_sender.clone(),
1110 };
1111 let (from_me, participant, edit_attr) = match revoke_type {
1112 RevokeType::Sender => (
1113 true,
1114 None,
1115 crate::types::message::EditAttribute::SenderRevoke,
1116 ),
1117 RevokeType::Admin { original_sender } => (
1118 false,
1119 Some(original_sender.to_non_ad().to_string()),
1120 crate::types::message::EditAttribute::AdminRevoke,
1121 ),
1122 };
1123
1124 assert!(!from_me, "Admin revoke must have from_me=false");
1125 assert!(
1126 participant.is_some(),
1127 "Admin revoke MUST set participant to original sender"
1128 );
1129 assert_eq!(edit_attr.to_string_val(), "8");
1130
1131 let revoke_message = wa::Message {
1132 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1133 key: Some(wa::MessageKey {
1134 remote_jid: Some(to.to_string()),
1135 from_me: Some(from_me),
1136 id: Some(message_id.clone()),
1137 participant: participant.clone(),
1138 }),
1139 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1140 ..Default::default()
1141 })),
1142 ..Default::default()
1143 };
1144
1145 let proto_msg = revoke_message.protocol_message.unwrap();
1146 let key = proto_msg.key.unwrap();
1147 assert_eq!(key.from_me, Some(false));
1148 assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1150 assert_eq!(key.id, Some(message_id));
1151 }
1152
1153 #[test]
1154 fn test_admin_revoke_preserves_lid_format() {
1155 let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1159 let participant_str = lid_sender.to_non_ad().to_string();
1160
1161 assert_eq!(participant_str, "236395184570386@lid");
1163 assert!(
1164 participant_str.ends_with("@lid"),
1165 "LID participant must preserve @lid suffix"
1166 );
1167 }
1168
1169 #[test]
1172 fn test_skdm_recipient_filtering_basic() {
1173 use std::collections::HashSet;
1174
1175 let known_recipients: Vec<Jid> = [
1176 "1234567890:0@s.whatsapp.net",
1177 "1234567890:5@s.whatsapp.net",
1178 "9876543210:0@s.whatsapp.net",
1179 ]
1180 .into_iter()
1181 .map(|s| Jid::from_str(s).unwrap())
1182 .collect();
1183
1184 let all_devices: Vec<Jid> = [
1185 "1234567890:0@s.whatsapp.net",
1186 "1234567890:5@s.whatsapp.net",
1187 "9876543210:0@s.whatsapp.net",
1188 "5555555555:0@s.whatsapp.net", ]
1190 .into_iter()
1191 .map(|s| Jid::from_str(s).unwrap())
1192 .collect();
1193
1194 let known_set: HashSet<DeviceKey<'_>> =
1195 known_recipients.iter().map(|j| j.device_key()).collect();
1196
1197 let new_devices: Vec<Jid> = all_devices
1198 .into_iter()
1199 .filter(|device| !known_set.contains(&device.device_key()))
1200 .collect();
1201
1202 assert_eq!(new_devices.len(), 1);
1203 assert_eq!(new_devices[0].user, "5555555555");
1204 }
1205
1206 #[test]
1207 fn test_skdm_recipient_filtering_lid_jids() {
1208 use std::collections::HashSet;
1209
1210 let known_recipients: Vec<Jid> = [
1211 "236395184570386:91@lid",
1212 "129171292463295:0@lid",
1213 "45857667830004:14@lid",
1214 ]
1215 .into_iter()
1216 .map(|s| Jid::from_str(s).unwrap())
1217 .collect();
1218
1219 let all_devices: Vec<Jid> = [
1220 "236395184570386:91@lid",
1221 "129171292463295:0@lid",
1222 "45857667830004:14@lid",
1223 "45857667830004:15@lid", ]
1225 .into_iter()
1226 .map(|s| Jid::from_str(s).unwrap())
1227 .collect();
1228
1229 let known_set: HashSet<DeviceKey<'_>> =
1230 known_recipients.iter().map(|j| j.device_key()).collect();
1231
1232 let new_devices: Vec<Jid> = all_devices
1233 .into_iter()
1234 .filter(|device| !known_set.contains(&device.device_key()))
1235 .collect();
1236
1237 assert_eq!(new_devices.len(), 1);
1238 assert_eq!(new_devices[0].user, "45857667830004");
1239 assert_eq!(new_devices[0].device, 15);
1240 }
1241
1242 #[test]
1243 fn test_skdm_recipient_filtering_all_known() {
1244 use std::collections::HashSet;
1245
1246 let known_recipients: Vec<Jid> =
1247 ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1248 .into_iter()
1249 .map(|s| Jid::from_str(s).unwrap())
1250 .collect();
1251
1252 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1253 .into_iter()
1254 .map(|s| Jid::from_str(s).unwrap())
1255 .collect();
1256
1257 let known_set: HashSet<DeviceKey<'_>> =
1258 known_recipients.iter().map(|j| j.device_key()).collect();
1259
1260 let new_devices: Vec<Jid> = all_devices
1261 .into_iter()
1262 .filter(|device| !known_set.contains(&device.device_key()))
1263 .collect();
1264
1265 assert!(new_devices.is_empty());
1266 }
1267
1268 #[test]
1269 fn test_skdm_recipient_filtering_all_new() {
1270 use std::collections::HashSet;
1271
1272 let known_recipients: Vec<Jid> = vec![];
1273
1274 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
1275 .into_iter()
1276 .map(|s| Jid::from_str(s).unwrap())
1277 .collect();
1278
1279 let known_set: HashSet<DeviceKey<'_>> =
1280 known_recipients.iter().map(|j| j.device_key()).collect();
1281
1282 let new_devices: Vec<Jid> = all_devices
1283 .clone()
1284 .into_iter()
1285 .filter(|device| !known_set.contains(&device.device_key()))
1286 .collect();
1287
1288 assert_eq!(new_devices.len(), all_devices.len());
1289 }
1290
1291 #[test]
1292 fn test_device_key_comparison() {
1293 let test_cases = [
1296 (
1297 "1234567890:0@s.whatsapp.net",
1298 "1234567890@s.whatsapp.net",
1299 true,
1300 ),
1301 (
1302 "1234567890:5@s.whatsapp.net",
1303 "1234567890:5@s.whatsapp.net",
1304 true,
1305 ),
1306 (
1307 "1234567890:5@s.whatsapp.net",
1308 "1234567890:6@s.whatsapp.net",
1309 false,
1310 ),
1311 ("236395184570386:91@lid", "236395184570386:91@lid", true),
1312 ("236395184570386:0@lid", "236395184570386@lid", true),
1313 ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
1314 ];
1315
1316 for (jid1_str, jid2_str, should_match) in test_cases {
1317 let jid1: Jid = jid1_str.parse().expect("should parse jid1");
1318 let jid2: Jid = jid2_str.parse().expect("should parse jid2");
1319
1320 let key1 = jid1.device_key();
1321 let key2 = jid2.device_key();
1322
1323 assert_eq!(
1324 key1 == key2,
1325 should_match,
1326 "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
1327 jid1_str,
1328 jid2_str,
1329 should_match,
1330 key1 == key2
1331 );
1332
1333 assert_eq!(
1334 jid1.device_eq(&jid2),
1335 should_match,
1336 "device_eq failed for '{}' vs '{}'",
1337 jid1_str,
1338 jid2_str
1339 );
1340 }
1341 }
1342
1343 #[test]
1344 fn test_skdm_filtering_large_group() {
1345 use std::collections::HashSet;
1346
1347 let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
1348 let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
1349
1350 for i in 0..1000i64 {
1351 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1352 let jid = Jid::from_str(&jid_str).unwrap();
1353 known_recipients.push(jid.clone());
1354 all_devices.push(jid);
1355 }
1356
1357 for i in 1000i64..1010i64 {
1358 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1359 all_devices.push(Jid::from_str(&jid_str).unwrap());
1360 }
1361
1362 let known_set: HashSet<DeviceKey<'_>> =
1363 known_recipients.iter().map(|j| j.device_key()).collect();
1364
1365 let new_devices: Vec<Jid> = all_devices
1366 .into_iter()
1367 .filter(|device| !known_set.contains(&device.device_key()))
1368 .collect();
1369
1370 assert_eq!(new_devices.len(), 10);
1371 }
1372}