1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore::types::message::AddressingMode;
8use wacore_binary::jid::{DeviceKey, Jid, JidExt as _};
9use wacore_binary::node::Node;
10use waproto::whatsapp as wa;
11
12#[derive(Debug, Clone, Default)]
14pub struct SendOptions {
15 pub extra_stanza_nodes: Vec<Node>,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub enum RevokeType {
22 #[default]
24 Sender,
25 Admin { original_sender: Jid },
28}
29
30impl Client {
31 pub async fn send_message(
36 &self,
37 to: Jid,
38 message: wa::Message,
39 ) -> Result<String, anyhow::Error> {
40 self.send_message_with_options(to, message, SendOptions::default())
41 .await
42 }
43
44 pub async fn send_message_with_options(
46 &self,
47 to: Jid,
48 message: wa::Message,
49 options: SendOptions,
50 ) -> Result<String, anyhow::Error> {
51 let request_id = self.generate_message_id().await;
52 self.send_message_impl(
53 to,
54 &message,
55 Some(request_id.clone()),
56 false,
57 false,
58 None,
59 options.extra_stanza_nodes,
60 )
61 .await?;
62 Ok(request_id)
63 }
64
65 pub(crate) async fn send_status_message(
70 &self,
71 message: wa::Message,
72 recipients: Vec<Jid>,
73 options: crate::features::status::StatusSendOptions,
74 ) -> Result<String, anyhow::Error> {
75 use wacore::client::context::GroupInfo;
76 use wacore_binary::builder::NodeBuilder;
77
78 if recipients.is_empty() {
79 return Err(anyhow!("Cannot send status with no recipients"));
80 }
81
82 let to = Jid::status_broadcast();
83 let request_id = self.generate_message_id().await;
84
85 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
86 let own_jid = device_snapshot
87 .pn
88 .clone()
89 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
90 let own_lid = device_snapshot
93 .lid
94 .clone()
95 .unwrap_or_else(|| own_jid.clone());
96 let account_info = device_snapshot.account.clone();
97
98 let mut resolved_recipients = Vec::with_capacity(recipients.len());
104 for jid in recipients {
105 if jid.is_group() || jid.is_status_broadcast() || jid.is_broadcast_list() {
106 return Err(anyhow!(
107 "Invalid status recipient {}: must be a user JID, not a group/broadcast",
108 jid
109 ));
110 }
111 if jid.is_lid() {
112 if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
113 resolved_recipients
114 .push(Jid::new(&pn, wacore_binary::jid::DEFAULT_USER_SERVER));
115 } else {
116 return Err(anyhow!(
117 "No PN mapping for LID {}. Ensure the recipient has been \
118 contacted previously.",
119 jid
120 ));
121 }
122 } else {
123 resolved_recipients.push(jid);
124 }
125 }
126
127 if resolved_recipients.is_empty() {
128 return Err(anyhow!("No valid PN recipients after LID resolution"));
129 }
130
131 let mut seen_users = std::collections::HashSet::new();
133 resolved_recipients.retain(|jid| seen_users.insert(jid.user.clone()));
134
135 let mut group_info = GroupInfo::new(resolved_recipients, AddressingMode::Pn);
136
137 let own_base = own_jid.to_non_ad();
139 if !group_info
140 .participants
141 .iter()
142 .any(|p| p.is_same_user_as(&own_base))
143 {
144 group_info.participants.push(own_base);
145 }
146
147 self.add_recent_message(to.clone(), request_id.clone(), &message)
148 .await;
149
150 let device_store_arc = self.persistence_manager.get_device_arc().await;
151 let to_str = to.to_string();
152
153 let force_skdm = {
154 use wacore::libsignal::store::sender_key_name::SenderKeyName;
155 let sender_address = own_jid.to_protocol_address();
156 let sender_key_name = SenderKeyName::new(to_str.clone(), sender_address.to_string());
157 let cache_key = format!(
158 "{}:{}",
159 sender_key_name.group_id(),
160 sender_key_name.sender_id()
161 );
162
163 let device_guard = device_store_arc.read().await;
164 let key_exists = self
165 .signal_cache
166 .get_sender_key(&cache_key, &*device_guard.backend)
167 .await?
168 .is_some();
169
170 !key_exists
171 };
172
173 let mut store_adapter =
174 SignalProtocolStoreAdapter::new(device_store_arc.clone(), self.signal_cache.clone());
175 let mut stores = wacore::send::SignalStores {
176 session_store: &mut store_adapter.session_store,
177 identity_store: &mut store_adapter.identity_store,
178 prekey_store: &mut store_adapter.pre_key_store,
179 signed_prekey_store: &store_adapter.signed_pre_key_store,
180 sender_key_store: &mut store_adapter.sender_key_store,
181 };
182
183 let marked_for_fresh_skdm = self.consume_forget_marks(&to_str).await.unwrap_or_default();
184
185 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
186 None
187 } else {
188 let known_recipients = self
189 .persistence_manager
190 .get_skdm_recipients(&to_str)
191 .await
192 .unwrap_or_default();
193
194 if known_recipients.is_empty() {
195 None
196 } else {
197 let jids_to_resolve: Vec<Jid> = group_info
198 .participants
199 .iter()
200 .map(|jid| jid.to_non_ad())
201 .collect();
202
203 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
204 Ok(all_devices) => {
205 let known_set: std::collections::HashSet<DeviceKey<'_>> =
206 known_recipients.iter().map(|j| j.device_key()).collect();
207 let new_devices: Vec<Jid> = all_devices
208 .into_iter()
209 .filter(|device| !known_set.contains(&device.device_key()))
210 .collect();
211 if new_devices.is_empty() {
212 Some(vec![])
213 } else {
214 Some(new_devices)
215 }
216 }
217 Err(e) => {
218 log::warn!("Failed to resolve devices for status SKDM check: {:?}", e);
219 None
220 }
221 }
222 }
223 };
224
225 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
226 match skdm_target_devices {
227 None => None,
228 Some(mut devices) => {
229 for marked_jid_str in &marked_for_fresh_skdm {
230 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
231 && !devices.iter().any(|d| d.device_eq(&marked_jid))
232 {
233 devices.push(marked_jid);
234 }
235 }
236 Some(devices)
237 }
238 }
239 } else {
240 skdm_target_devices
241 };
242
243 let is_full_distribution = force_skdm || skdm_target_devices.is_none();
244 let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
245 #[allow(clippy::needless_late_init)]
246 let skdm_is_full: bool;
247
248 let is_revoke = message.protocol_message.as_ref().is_some_and(|pm| {
251 pm.r#type == Some(wa::message::protocol_message::Type::Revoke as i32)
252 });
253 let extra_stanza_nodes = if is_revoke {
254 vec![]
255 } else {
256 vec![
257 NodeBuilder::new("meta")
258 .attr("status_setting", options.privacy.as_str())
259 .build(),
260 ]
261 };
262
263 let stanza = match wacore::send::prepare_group_stanza(
264 &mut stores,
265 self,
266 &mut group_info,
267 &own_jid,
268 &own_lid,
269 account_info.as_ref(),
270 to.clone(),
271 &message,
272 request_id.clone(),
273 force_skdm,
274 skdm_target_devices,
275 None,
276 &extra_stanza_nodes,
277 )
278 .await
279 {
280 Ok(stanza) => {
281 skdm_is_full = is_full_distribution;
282 stanza
283 }
284 Err(e) => {
285 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
286 e.downcast_ref::<SignalProtocolError>()
287 {
288 log::warn!("No sender key for status broadcast, forcing distribution.");
289
290 if let Err(e) = self
291 .persistence_manager
292 .clear_skdm_recipients(&to_str)
293 .await
294 {
295 log::warn!("Failed to clear status SKDM recipients: {:?}", e);
296 }
297
298 let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
299 device_store_arc.clone(),
300 self.signal_cache.clone(),
301 );
302 let mut stores_retry = wacore::send::SignalStores {
303 session_store: &mut store_adapter_retry.session_store,
304 identity_store: &mut store_adapter_retry.identity_store,
305 prekey_store: &mut store_adapter_retry.pre_key_store,
306 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
307 sender_key_store: &mut store_adapter_retry.sender_key_store,
308 };
309
310 let retry_stanza = wacore::send::prepare_group_stanza(
311 &mut stores_retry,
312 self,
313 &mut group_info,
314 &own_jid,
315 &own_lid,
316 account_info.as_ref(),
317 to.clone(),
318 &message,
319 request_id.clone(),
320 true,
321 None,
322 None,
323 &extra_stanza_nodes,
324 )
325 .await?;
326
327 skdm_is_full = true;
329 retry_stanza
330 } else {
331 return Err(e);
332 }
333 }
334 };
335
336 let stanza = self.ensure_status_participants(stanza, &group_info).await?;
343
344 self.send_node(stanza).await?;
345
346 self.update_skdm_recipients(
349 &to_str,
350 &devices_receiving_skdm,
351 skdm_is_full,
352 &group_info.participants,
353 )
354 .await;
355
356 if let Err(e) = self.flush_signal_cache().await {
358 log::error!("Failed to flush signal cache after send_status_message: {e:?}");
359 }
360
361 Ok(request_id)
362 }
363
364 async fn update_skdm_recipients(
371 &self,
372 to_str: &str,
373 devices_receiving_skdm: &[Jid],
374 is_full_distribution: bool,
375 participants: &[Jid],
376 ) {
377 if !devices_receiving_skdm.is_empty() {
378 if let Err(e) = self
379 .persistence_manager
380 .add_skdm_recipients(to_str, devices_receiving_skdm)
381 .await
382 {
383 log::warn!("Failed to update SKDM recipients: {:?}", e);
384 }
385 } else if is_full_distribution {
386 let jids_to_resolve: Vec<Jid> =
387 participants.iter().map(|jid| jid.to_non_ad()).collect();
388 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
389 Ok(all_devices) => {
390 if let Err(e) = self
391 .persistence_manager
392 .add_skdm_recipients(to_str, &all_devices)
393 .await
394 {
395 log::warn!("Failed to persist SKDM recipients: {:?}", e);
396 }
397 }
398 Err(e) => {
399 log::warn!("Failed to resolve devices for SKDM recipients: {:?}", e);
400 }
401 }
402 }
403 }
404
405 async fn ensure_status_participants(
411 &self,
412 stanza: wacore_binary::Node,
413 group_info: &wacore::client::context::GroupInfo,
414 ) -> Result<wacore_binary::Node, anyhow::Error> {
415 Ok(wacore::send::ensure_status_participants(stanza, group_info))
416 }
417
418 pub async fn revoke_message(
429 &self,
430 to: Jid,
431 message_id: impl Into<String>,
432 revoke_type: RevokeType,
433 ) -> Result<(), anyhow::Error> {
434 let message_id = message_id.into();
435 self.get_pn()
437 .await
438 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
439
440 let (from_me, participant, edit_attr) = match &revoke_type {
441 RevokeType::Sender => {
442 (
445 true,
446 None,
447 crate::types::message::EditAttribute::SenderRevoke,
448 )
449 }
450 RevokeType::Admin { original_sender } => {
451 if !to.is_group() {
453 return Err(anyhow!("Admin revoke is only valid for group chats"));
454 }
455 let participant_str = original_sender.to_non_ad().to_string();
458 log::debug!(
459 "Admin revoke: using participant {} for MessageKey",
460 participant_str
461 );
462 (
463 false,
464 Some(participant_str),
465 crate::types::message::EditAttribute::AdminRevoke,
466 )
467 }
468 };
469
470 let revoke_message = wa::Message {
471 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
472 key: Some(wa::MessageKey {
473 remote_jid: Some(to.to_string()),
474 from_me: Some(from_me),
475 id: Some(message_id.clone()),
476 participant,
477 }),
478 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
479 ..Default::default()
480 })),
481 ..Default::default()
482 };
483
484 let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
491 self.send_message_impl(
492 to,
493 &revoke_message,
494 None,
495 false,
496 force_skdm,
497 Some(edit_attr),
498 vec![],
499 )
500 .await
501 }
502
503 #[allow(clippy::too_many_arguments)]
504 pub(crate) async fn send_message_impl(
505 &self,
506 to: Jid,
507 message: &wa::Message,
508 request_id_override: Option<String>,
509 peer: bool,
510 force_key_distribution: bool,
511 edit: Option<crate::types::message::EditAttribute>,
512 extra_stanza_nodes: Vec<Node>,
513 ) -> Result<(), anyhow::Error> {
514 if to.is_status_broadcast() {
516 return Err(anyhow!(
517 "Use send_status_message() or client.status() API for status@broadcast"
518 ));
519 }
520
521 let request_id = match request_id_override {
523 Some(id) => id,
524 None => self.generate_message_id().await,
525 };
526
527 struct SkdmUpdate {
530 to_str: String,
531 devices: Vec<Jid>,
532 is_full_distribution: bool,
533 participants: Vec<Jid>,
534 }
535 let mut skdm_update: Option<SkdmUpdate> = None;
536
537 let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
538 let encryption_jid = self.resolve_encryption_jid(&to).await;
541 let signal_addr_str = encryption_jid.to_protocol_address_string();
542
543 let session_mutex = self
544 .session_locks
545 .get_with(signal_addr_str.clone(), async {
546 std::sync::Arc::new(async_lock::Mutex::new(()))
547 })
548 .await;
549 let _session_guard = session_mutex.lock().await;
550
551 let device_store_arc = self.persistence_manager.get_device_arc().await;
552 let mut store_adapter =
553 SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
554
555 wacore::send::prepare_peer_stanza(
556 &mut store_adapter.session_store,
557 &mut store_adapter.identity_store,
558 to,
559 encryption_jid,
560 message,
561 request_id,
562 )
563 .await?
564 } else if to.is_group() {
565 let mut group_info = self.groups().query_info(&to).await?;
571
572 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
573 let own_jid = device_snapshot
574 .pn
575 .clone()
576 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
577 let own_lid = device_snapshot
578 .lid
579 .clone()
580 .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
581 let account_info = device_snapshot.account.clone();
582
583 self.add_recent_message(to.clone(), request_id.clone(), message)
585 .await;
586
587 let device_store_arc = self.persistence_manager.get_device_arc().await;
588 let to_str = to.to_string();
589
590 let (own_sending_jid, _) = match group_info.addressing_mode {
591 crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
592 crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
593 };
594
595 if !group_info
596 .participants
597 .iter()
598 .any(|participant| participant.is_same_user_as(&own_sending_jid))
599 {
600 group_info.participants.push(own_sending_jid.to_non_ad());
601 }
602
603 let force_skdm = {
604 use wacore::libsignal::protocol::SenderKeyStore;
605 use wacore::libsignal::store::sender_key_name::SenderKeyName;
606 let mut device_guard = device_store_arc.write().await;
607 let sender_address = own_sending_jid.to_protocol_address();
608 let sender_key_name =
609 SenderKeyName::new(to_str.clone(), sender_address.to_string());
610
611 let key_exists = device_guard
612 .load_sender_key(&sender_key_name)
613 .await?
614 .is_some();
615
616 force_key_distribution || !key_exists
617 };
618
619 let mut store_adapter = SignalProtocolStoreAdapter::new(
620 device_store_arc.clone(),
621 self.signal_cache.clone(),
622 );
623
624 let mut stores = wacore::send::SignalStores {
625 session_store: &mut store_adapter.session_store,
626 identity_store: &mut store_adapter.identity_store,
627 prekey_store: &mut store_adapter.pre_key_store,
628 signed_prekey_store: &store_adapter.signed_pre_key_store,
629 sender_key_store: &mut store_adapter.sender_key_store,
630 };
631
632 let marked_for_fresh_skdm =
635 self.consume_forget_marks(&to_str).await.unwrap_or_default();
636
637 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
639 None
640 } else {
641 let known_recipients = self
642 .persistence_manager
643 .get_skdm_recipients(&to_str)
644 .await
645 .unwrap_or_default();
646
647 if known_recipients.is_empty() {
648 None
649 } else {
650 let jids_to_resolve: Vec<Jid> = group_info
651 .participants
652 .iter()
653 .map(|jid| jid.to_non_ad())
654 .collect();
655
656 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
657 Ok(all_devices) => {
658 use std::collections::HashSet;
659
660 let known_set: HashSet<DeviceKey<'_>> =
661 known_recipients.iter().map(|j| j.device_key()).collect();
662
663 let new_devices: Vec<Jid> = all_devices
664 .into_iter()
665 .filter(|device| !known_set.contains(&device.device_key()))
666 .collect();
667
668 if new_devices.is_empty() {
669 Some(vec![])
670 } else {
671 log::debug!(
672 "Found {} new devices needing SKDM for group {}",
673 new_devices.len(),
674 to
675 );
676 Some(new_devices)
677 }
678 }
679 Err(e) => {
680 log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
681 None
682 }
683 }
684 }
685 };
686
687 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
689 match skdm_target_devices {
690 None => None,
691 Some(mut devices) => {
692 for marked_jid_str in &marked_for_fresh_skdm {
693 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
694 && !devices.iter().any(|d| d.device_eq(&marked_jid))
695 {
696 log::debug!(
697 "Adding {} to SKDM targets (marked for fresh key)",
698 marked_jid_str
699 );
700 devices.push(marked_jid);
701 }
702 }
703 Some(devices)
704 }
705 }
706 } else {
707 skdm_target_devices
708 };
709
710 let is_full_distribution = force_skdm || skdm_target_devices.is_none();
711 let devices_receiving_skdm: Vec<Jid> = skdm_target_devices.clone().unwrap_or_default();
712
713 match wacore::send::prepare_group_stanza(
714 &mut stores,
715 self,
716 &mut group_info,
717 &own_jid,
718 &own_lid,
719 account_info.as_ref(),
720 to.clone(),
721 message,
722 request_id.clone(),
723 force_skdm,
724 skdm_target_devices,
725 edit.clone(),
726 &extra_stanza_nodes,
727 )
728 .await
729 {
730 Ok(stanza) => {
731 skdm_update = Some(SkdmUpdate {
732 to_str: to_str.clone(),
733 devices: devices_receiving_skdm,
734 is_full_distribution,
735 participants: group_info.participants.clone(),
736 });
737 stanza
738 }
739 Err(e) => {
740 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
741 e.downcast_ref::<SignalProtocolError>()
742 {
743 log::warn!("No sender key for group {}, forcing distribution.", to);
744
745 if let Err(e) = self
747 .persistence_manager
748 .clear_skdm_recipients(&to_str)
749 .await
750 {
751 log::warn!("Failed to clear SKDM recipients: {:?}", e);
752 }
753
754 let mut store_adapter_retry = SignalProtocolStoreAdapter::new(
755 device_store_arc.clone(),
756 self.signal_cache.clone(),
757 );
758 let mut stores_retry = wacore::send::SignalStores {
759 session_store: &mut store_adapter_retry.session_store,
760 identity_store: &mut store_adapter_retry.identity_store,
761 prekey_store: &mut store_adapter_retry.pre_key_store,
762 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
763 sender_key_store: &mut store_adapter_retry.sender_key_store,
764 };
765
766 let retry_stanza = wacore::send::prepare_group_stanza(
767 &mut stores_retry,
768 self,
769 &mut group_info,
770 &own_jid,
771 &own_lid,
772 account_info.as_ref(),
773 to,
774 message,
775 request_id,
776 true, None, edit.clone(),
779 &extra_stanza_nodes,
780 )
781 .await?;
782
783 skdm_update = Some(SkdmUpdate {
785 to_str,
786 devices: vec![],
787 is_full_distribution: true,
788 participants: group_info.participants.clone(),
789 });
790 retry_stanza
791 } else {
792 return Err(e);
793 }
794 }
795 }
796 } else {
797 let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
802 self.ensure_e2e_sessions(recipient_devices).await?;
803
804 let encryption_jid = self.resolve_encryption_jid(&to).await;
806 let signal_addr_str = encryption_jid.to_protocol_address_string();
807
808 self.add_recent_message(to.clone(), request_id.clone(), message)
810 .await;
811
812 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
813 let own_jid = device_snapshot
814 .pn
815 .clone()
816 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
817 let account_info = device_snapshot.account.clone();
818
819 let mut extra_stanza_nodes = extra_stanza_nodes;
822 if !to.is_group() && !to.is_newsletter() {
823 self.maybe_include_tc_token(&to, &mut extra_stanza_nodes)
824 .await;
825 }
826
827 let session_mutex = self
829 .session_locks
830 .get_with(signal_addr_str.clone(), async {
831 std::sync::Arc::new(async_lock::Mutex::new(()))
832 })
833 .await;
834 let _session_guard = session_mutex.lock().await;
835
836 let device_store_arc = self.persistence_manager.get_device_arc().await;
837 let mut store_adapter =
838 SignalProtocolStoreAdapter::new(device_store_arc, self.signal_cache.clone());
839
840 let mut stores = wacore::send::SignalStores {
841 session_store: &mut store_adapter.session_store,
842 identity_store: &mut store_adapter.identity_store,
843 prekey_store: &mut store_adapter.pre_key_store,
844 signed_prekey_store: &store_adapter.signed_pre_key_store,
845 sender_key_store: &mut store_adapter.sender_key_store,
846 };
847
848 wacore::send::prepare_dm_stanza(
849 &mut stores,
850 self,
851 &own_jid,
852 account_info.as_ref(),
853 to,
854 message,
855 request_id,
856 edit,
857 &extra_stanza_nodes,
858 )
859 .await?
860 };
861
862 self.send_node(stanza_to_send).await?;
863
864 if let Some(update) = skdm_update {
867 self.update_skdm_recipients(
868 &update.to_str,
869 &update.devices,
870 update.is_full_distribution,
871 &update.participants,
872 )
873 .await;
874 }
875
876 if let Err(e) = self.flush_signal_cache().await {
878 log::error!("Failed to flush signal cache after send_message_impl: {e:?}");
879 }
880
881 Ok(())
882 }
883
884 async fn maybe_include_tc_token(&self, to: &Jid, extra_nodes: &mut Vec<Node>) {
889 use wacore::iq::tctoken::{
890 IssuePrivacyTokensSpec, build_tc_token_node, is_tc_token_expired,
891 should_send_new_tc_token,
892 };
893 use wacore::store::traits::TcTokenEntry;
894
895 let snapshot = self.persistence_manager.get_device_snapshot().await;
897 let is_self = snapshot
898 .pn
899 .as_ref()
900 .is_some_and(|pn| pn.is_same_user_as(to))
901 || snapshot
902 .lid
903 .as_ref()
904 .is_some_and(|lid| lid.is_same_user_as(to));
905 if is_self {
906 return;
907 }
908
909 let token_jid = if to.is_lid() {
911 to.user.clone()
912 } else {
913 match self.lid_pn_cache.get_current_lid(&to.user).await {
914 Some(lid) => lid,
915 None => to.user.clone(),
916 }
917 };
918
919 let backend = self.persistence_manager.backend();
920
921 let existing = match backend.get_tc_token(&token_jid).await {
923 Ok(entry) => entry,
924 Err(e) => {
925 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
926 return;
927 }
928 };
929
930 match existing {
931 Some(entry) if !is_tc_token_expired(entry.token_timestamp) => {
932 extra_nodes.push(build_tc_token_node(&entry.token));
934
935 if should_send_new_tc_token(entry.sender_timestamp) {
938 let now = wacore::time::now_secs();
939 let updated_entry = TcTokenEntry {
940 sender_timestamp: Some(now),
941 ..entry
942 };
943 if let Err(e) = backend.put_tc_token(&token_jid, &updated_entry).await {
944 log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp: {e}");
945 }
946 }
947 }
948 _ => {
949 let to_lid = self.resolve_to_lid_jid(to).await;
951 match self
952 .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(&to_lid)))
953 .await
954 {
955 Ok(response) => {
956 let now = wacore::time::now_secs();
957 for received in &response.tokens {
958 let entry = TcTokenEntry {
959 token: received.token.clone(),
960 token_timestamp: received.timestamp,
961 sender_timestamp: Some(now),
962 };
963
964 let store_jid = received.jid.user.clone();
966 if let Err(e) = backend.put_tc_token(&store_jid, &entry).await {
967 log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
968 }
969
970 if !received.token.is_empty() {
972 extra_nodes.push(build_tc_token_node(&received.token));
973 }
974 }
975 }
976 Err(e) => {
977 log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}: {e}", to_lid);
978 }
980 }
981 }
982 }
983 }
984
985 pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
989 use wacore::iq::tctoken::is_tc_token_expired;
990
991 let token_jid = if jid.is_lid() {
992 jid.user.clone()
993 } else {
994 match self.lid_pn_cache.get_current_lid(&jid.user).await {
995 Some(lid) => lid,
996 None => jid.user.clone(),
997 }
998 };
999
1000 let backend = self.persistence_manager.backend();
1001 match backend.get_tc_token(&token_jid).await {
1002 Ok(Some(entry)) if !is_tc_token_expired(entry.token_timestamp) => Some(entry.token),
1003 Ok(_) => None,
1004 Err(e) => {
1005 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1006 None
1007 }
1008 }
1009 }
1010
1011 async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1013 if jid.is_lid() {
1014 return jid.clone();
1015 }
1016
1017 if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1018 Jid::new(&lid_user, "lid")
1019 } else {
1020 jid.clone()
1021 }
1022 }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027 use super::*;
1028 use std::str::FromStr;
1029
1030 #[test]
1031 fn test_revoke_type_default_is_sender() {
1032 let revoke_type = RevokeType::default();
1034 assert_eq!(revoke_type, RevokeType::Sender);
1035 }
1036
1037 #[test]
1038 fn test_force_skdm_only_for_admin_revoke() {
1039 let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1043
1044 let sender_revoke = RevokeType::Sender;
1045 let admin_revoke = RevokeType::Admin {
1046 original_sender: sender_jid,
1047 };
1048
1049 let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1051 let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1052
1053 assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1054 assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1055 }
1056
1057 #[test]
1058 fn test_sender_revoke_message_key_structure() {
1059 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1062 let message_id = "3EB0ABC123".to_string();
1063
1064 let (from_me, participant, edit_attr) = match RevokeType::Sender {
1065 RevokeType::Sender => (
1066 true,
1067 None,
1068 crate::types::message::EditAttribute::SenderRevoke,
1069 ),
1070 RevokeType::Admin { original_sender } => (
1071 false,
1072 Some(original_sender.to_non_ad().to_string()),
1073 crate::types::message::EditAttribute::AdminRevoke,
1074 ),
1075 };
1076
1077 assert!(from_me, "Sender revoke must have from_me=true");
1078 assert!(
1079 participant.is_none(),
1080 "Sender revoke must NOT set participant"
1081 );
1082 assert_eq!(edit_attr.to_string_val(), "7");
1083
1084 let revoke_message = wa::Message {
1085 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1086 key: Some(wa::MessageKey {
1087 remote_jid: Some(to.to_string()),
1088 from_me: Some(from_me),
1089 id: Some(message_id.clone()),
1090 participant,
1091 }),
1092 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1093 ..Default::default()
1094 })),
1095 ..Default::default()
1096 };
1097
1098 let proto_msg = revoke_message.protocol_message.unwrap();
1099 let key = proto_msg.key.unwrap();
1100 assert_eq!(key.from_me, Some(true));
1101 assert_eq!(key.participant, None);
1102 assert_eq!(key.id, Some(message_id));
1103 }
1104
1105 #[test]
1106 fn test_admin_revoke_message_key_structure() {
1107 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1110 let message_id = "3EB0ABC123".to_string();
1111 let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1112
1113 let revoke_type = RevokeType::Admin {
1114 original_sender: original_sender.clone(),
1115 };
1116 let (from_me, participant, edit_attr) = match revoke_type {
1117 RevokeType::Sender => (
1118 true,
1119 None,
1120 crate::types::message::EditAttribute::SenderRevoke,
1121 ),
1122 RevokeType::Admin { original_sender } => (
1123 false,
1124 Some(original_sender.to_non_ad().to_string()),
1125 crate::types::message::EditAttribute::AdminRevoke,
1126 ),
1127 };
1128
1129 assert!(!from_me, "Admin revoke must have from_me=false");
1130 assert!(
1131 participant.is_some(),
1132 "Admin revoke MUST set participant to original sender"
1133 );
1134 assert_eq!(edit_attr.to_string_val(), "8");
1135
1136 let revoke_message = wa::Message {
1137 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1138 key: Some(wa::MessageKey {
1139 remote_jid: Some(to.to_string()),
1140 from_me: Some(from_me),
1141 id: Some(message_id.clone()),
1142 participant: participant.clone(),
1143 }),
1144 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
1145 ..Default::default()
1146 })),
1147 ..Default::default()
1148 };
1149
1150 let proto_msg = revoke_message.protocol_message.unwrap();
1151 let key = proto_msg.key.unwrap();
1152 assert_eq!(key.from_me, Some(false));
1153 assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1155 assert_eq!(key.id, Some(message_id));
1156 }
1157
1158 #[test]
1159 fn test_admin_revoke_preserves_lid_format() {
1160 let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1164 let participant_str = lid_sender.to_non_ad().to_string();
1165
1166 assert_eq!(participant_str, "236395184570386@lid");
1168 assert!(
1169 participant_str.ends_with("@lid"),
1170 "LID participant must preserve @lid suffix"
1171 );
1172 }
1173
1174 #[test]
1177 fn test_skdm_recipient_filtering_basic() {
1178 use std::collections::HashSet;
1179
1180 let known_recipients: Vec<Jid> = [
1181 "1234567890:0@s.whatsapp.net",
1182 "1234567890:5@s.whatsapp.net",
1183 "9876543210:0@s.whatsapp.net",
1184 ]
1185 .into_iter()
1186 .map(|s| Jid::from_str(s).unwrap())
1187 .collect();
1188
1189 let all_devices: Vec<Jid> = [
1190 "1234567890:0@s.whatsapp.net",
1191 "1234567890:5@s.whatsapp.net",
1192 "9876543210:0@s.whatsapp.net",
1193 "5555555555:0@s.whatsapp.net", ]
1195 .into_iter()
1196 .map(|s| Jid::from_str(s).unwrap())
1197 .collect();
1198
1199 let known_set: HashSet<DeviceKey<'_>> =
1200 known_recipients.iter().map(|j| j.device_key()).collect();
1201
1202 let new_devices: Vec<Jid> = all_devices
1203 .into_iter()
1204 .filter(|device| !known_set.contains(&device.device_key()))
1205 .collect();
1206
1207 assert_eq!(new_devices.len(), 1);
1208 assert_eq!(new_devices[0].user, "5555555555");
1209 }
1210
1211 #[test]
1212 fn test_skdm_recipient_filtering_lid_jids() {
1213 use std::collections::HashSet;
1214
1215 let known_recipients: Vec<Jid> = [
1216 "236395184570386:91@lid",
1217 "129171292463295:0@lid",
1218 "45857667830004:14@lid",
1219 ]
1220 .into_iter()
1221 .map(|s| Jid::from_str(s).unwrap())
1222 .collect();
1223
1224 let all_devices: Vec<Jid> = [
1225 "236395184570386:91@lid",
1226 "129171292463295:0@lid",
1227 "45857667830004:14@lid",
1228 "45857667830004:15@lid", ]
1230 .into_iter()
1231 .map(|s| Jid::from_str(s).unwrap())
1232 .collect();
1233
1234 let known_set: HashSet<DeviceKey<'_>> =
1235 known_recipients.iter().map(|j| j.device_key()).collect();
1236
1237 let new_devices: Vec<Jid> = all_devices
1238 .into_iter()
1239 .filter(|device| !known_set.contains(&device.device_key()))
1240 .collect();
1241
1242 assert_eq!(new_devices.len(), 1);
1243 assert_eq!(new_devices[0].user, "45857667830004");
1244 assert_eq!(new_devices[0].device, 15);
1245 }
1246
1247 #[test]
1248 fn test_skdm_recipient_filtering_all_known() {
1249 use std::collections::HashSet;
1250
1251 let known_recipients: Vec<Jid> =
1252 ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1253 .into_iter()
1254 .map(|s| Jid::from_str(s).unwrap())
1255 .collect();
1256
1257 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
1258 .into_iter()
1259 .map(|s| Jid::from_str(s).unwrap())
1260 .collect();
1261
1262 let known_set: HashSet<DeviceKey<'_>> =
1263 known_recipients.iter().map(|j| j.device_key()).collect();
1264
1265 let new_devices: Vec<Jid> = all_devices
1266 .into_iter()
1267 .filter(|device| !known_set.contains(&device.device_key()))
1268 .collect();
1269
1270 assert!(new_devices.is_empty());
1271 }
1272
1273 #[test]
1274 fn test_skdm_recipient_filtering_all_new() {
1275 use std::collections::HashSet;
1276
1277 let known_recipients: Vec<Jid> = vec![];
1278
1279 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
1280 .into_iter()
1281 .map(|s| Jid::from_str(s).unwrap())
1282 .collect();
1283
1284 let known_set: HashSet<DeviceKey<'_>> =
1285 known_recipients.iter().map(|j| j.device_key()).collect();
1286
1287 let new_devices: Vec<Jid> = all_devices
1288 .clone()
1289 .into_iter()
1290 .filter(|device| !known_set.contains(&device.device_key()))
1291 .collect();
1292
1293 assert_eq!(new_devices.len(), all_devices.len());
1294 }
1295
1296 #[test]
1297 fn test_device_key_comparison() {
1298 let test_cases = [
1301 (
1302 "1234567890:0@s.whatsapp.net",
1303 "1234567890@s.whatsapp.net",
1304 true,
1305 ),
1306 (
1307 "1234567890:5@s.whatsapp.net",
1308 "1234567890:5@s.whatsapp.net",
1309 true,
1310 ),
1311 (
1312 "1234567890:5@s.whatsapp.net",
1313 "1234567890:6@s.whatsapp.net",
1314 false,
1315 ),
1316 ("236395184570386:91@lid", "236395184570386:91@lid", true),
1317 ("236395184570386:0@lid", "236395184570386@lid", true),
1318 ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
1319 ];
1320
1321 for (jid1_str, jid2_str, should_match) in test_cases {
1322 let jid1: Jid = jid1_str.parse().expect("should parse jid1");
1323 let jid2: Jid = jid2_str.parse().expect("should parse jid2");
1324
1325 let key1 = jid1.device_key();
1326 let key2 = jid2.device_key();
1327
1328 assert_eq!(
1329 key1 == key2,
1330 should_match,
1331 "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
1332 jid1_str,
1333 jid2_str,
1334 should_match,
1335 key1 == key2
1336 );
1337
1338 assert_eq!(
1339 jid1.device_eq(&jid2),
1340 should_match,
1341 "device_eq failed for '{}' vs '{}'",
1342 jid1_str,
1343 jid2_str
1344 );
1345 }
1346 }
1347
1348 #[test]
1349 fn test_skdm_filtering_large_group() {
1350 use std::collections::HashSet;
1351
1352 let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
1353 let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
1354
1355 for i in 0..1000i64 {
1356 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1357 let jid = Jid::from_str(&jid_str).unwrap();
1358 known_recipients.push(jid.clone());
1359 all_devices.push(jid);
1360 }
1361
1362 for i in 1000i64..1010i64 {
1363 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
1364 all_devices.push(Jid::from_str(&jid_str).unwrap());
1365 }
1366
1367 let known_set: HashSet<DeviceKey<'_>> =
1368 known_recipients.iter().map(|j| j.device_key()).collect();
1369
1370 let new_devices: Vec<Jid> = all_devices
1371 .into_iter()
1372 .filter(|device| !known_set.contains(&device.device_key()))
1373 .collect();
1374
1375 assert_eq!(new_devices.len(), 10);
1376 }
1377}