1use crate::client::Client;
2use crate::types::message::EditAttribute;
3use anyhow::anyhow;
4use log::debug;
5use wacore::client::context::SendContextResolver;
6use wacore::libsignal::protocol::SignalProtocolError;
7use wacore::types::jid::JidExt;
8use wacore::types::message::AddressingMode;
9#[cfg(test)]
10use wacore_binary::DeviceKey;
11use wacore_binary::Node;
12use wacore_binary::builder::NodeBuilder;
13use wacore_binary::{Jid, JidExt as _, Server};
14use waproto::whatsapp as wa;
15
16#[derive(Debug, Clone, Default)]
18pub struct SendOptions {
19 pub message_id: Option<String>,
22 pub extra_stanza_nodes: Vec<Node>,
24 pub ephemeral_expiration: Option<u32>,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct SendResult {
33 pub message_id: String,
34 pub to: Jid,
35}
36
37impl SendResult {
38 pub fn message_key(&self) -> wa::MessageKey {
40 wa::MessageKey {
41 remote_jid: Some(self.to.to_string()),
42 from_me: Some(true),
43 id: Some(self.message_id.clone()),
44 participant: None,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51#[non_exhaustive]
52pub enum PinDuration {
53 Hours24,
54 #[default]
55 Days7,
56 Days30,
57}
58
59impl PinDuration {
60 fn as_secs(self) -> u32 {
61 match self {
62 Self::Hours24 => 86_400,
63 Self::Days7 => 604_800,
64 Self::Days30 => 2_592_000,
65 }
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Default)]
71#[non_exhaustive]
72pub enum RevokeType {
73 #[default]
75 Sender,
76 Admin { original_sender: Jid },
79}
80
81fn infer_stanza_metadata(msg: &wa::Message) -> (Option<EditAttribute>, Option<Node>) {
83 if msg.pin_in_chat_message.is_some() {
84 return (Some(EditAttribute::PinInChat), None);
85 }
86
87 if msg.poll_creation_message.is_some()
89 || msg.poll_creation_message_v2.is_some()
90 || msg.poll_creation_message_v3.is_some()
91 {
92 return (None, Some(meta_node("polltype", "creation")));
93 }
94 if let Some(ref poll_update) = msg.poll_update_message
95 && poll_update.vote.is_some()
96 {
97 return (None, Some(meta_node("polltype", "vote")));
98 }
99 if msg.event_message.is_some() {
103 return (None, Some(meta_node("event_type", "creation")));
104 }
105 if msg.enc_event_response_message.is_some() {
106 return (None, Some(meta_node("event_type", "response")));
107 }
108 if let Some(ref sec) = msg.secret_encrypted_message
109 && sec.secret_enc_type
110 == Some(wa::message::secret_encrypted_message::SecretEncType::EventEdit as i32)
111 {
112 return (None, Some(meta_node("event_type", "edit")));
113 }
114
115 (None, None)
116}
117
118fn meta_node(key: &'static str, value: &'static str) -> Node {
119 NodeBuilder::new("meta").attr(key, value).build()
120}
121
122fn infer_biz_node(msg: &wa::Message) -> Option<Node> {
125 let interactive = extract_interactive_message(msg)?;
126 let wa::message::interactive_message::InteractiveMessage::NativeFlowMessage(nf) =
127 interactive.interactive_message.as_ref()?
128 else {
129 return None;
130 };
131
132 let first_button_name = nf.buttons.first()?.name.as_deref()?;
133 let flow_name = button_name_to_flow_name(first_button_name);
134
135 Some(
136 NodeBuilder::new("biz")
137 .children([NodeBuilder::new("interactive")
138 .attr("type", "native_flow")
139 .attr("v", "1")
140 .children([NodeBuilder::new("native_flow")
141 .attr("name", flow_name)
142 .build()])
143 .build()])
144 .build(),
145 )
146}
147
148fn extract_interactive_message(msg: &wa::Message) -> Option<&wa::message::InteractiveMessage> {
149 if let Some(ref doc) = msg.document_with_caption_message
152 && let Some(ref inner) = doc.message
153 && let Some(ref im) = inner.interactive_message
154 {
155 return Some(im);
156 }
157 msg.interactive_message.as_deref()
158}
159
160fn button_name_to_flow_name(button_name: &str) -> &str {
161 match button_name {
162 "review_and_pay" => "order_details",
163 "payment_info" => "payment_info",
164 "review_order" | "order_status" => "order_status",
165 "payment_status" => "payment_status",
166 "payment_method" => "payment_method",
167 "payment_reminder" => "payment_reminder",
168 "open_webview" => "message_with_link",
169 "message_with_link_status" => "message_with_link_status",
170 "cta_url" => "cta_url",
171 "cta_call" => "cta_call",
172 "cta_copy" => "cta_copy",
173 "cta_catalog" => "cta_catalog",
174 "catalog_message" => "catalog_message",
175 "quick_reply" => "quick_reply",
176 "galaxy_message" => "galaxy_message",
177 "booking_confirmation" => "booking_confirmation",
178 "call_permission_request" => "call_permission_request",
179 other => other,
180 }
181}
182
183fn build_revoke_message(
184 remote_jid: &Jid,
185 from_me: bool,
186 message_id: String,
187 participant: Option<String>,
188) -> wa::Message {
189 wa::Message {
190 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
191 key: Some(wa::MessageKey {
192 remote_jid: Some(remote_jid.to_string()),
193 from_me: Some(from_me),
194 id: Some(message_id),
195 participant,
196 }),
197 r#type: Some(wa::message::protocol_message::Type::Revoke as i32),
198 ..Default::default()
199 })),
200 ..Default::default()
201 }
202}
203
204impl Client {
205 pub async fn send_message(
210 &self,
211 to: Jid,
212 message: wa::Message,
213 ) -> Result<SendResult, anyhow::Error> {
214 self.send_message_with_options(to, message, SendOptions::default())
215 .await
216 }
217
218 pub async fn send_message_with_options(
220 &self,
221 to: Jid,
222 mut message: wa::Message,
223 options: SendOptions,
224 ) -> Result<SendResult, anyhow::Error> {
225 if let Some(exp) = options.ephemeral_expiration
226 && exp > 0
227 {
228 use wacore::proto_helpers::MessageExt;
229 if !message.set_ephemeral_expiration(exp) {
230 log::warn!("Could not set contextInfo.expiration on this message type");
232 }
233 }
234
235 let request_id = match options.message_id {
236 Some(id) => id,
237 None => self.generate_message_id().await,
238 };
239 let result = SendResult {
241 message_id: request_id.clone(),
242 to: to.clone(),
243 };
244
245 if to.is_newsletter() {
248 use prost::Message as _;
249 let stanza_type = wacore::send::stanza_type_from_message(&message);
250 let (_, meta_node) = infer_stanza_metadata(&message);
251 let mut plaintext_builder = NodeBuilder::new("plaintext");
252 if let Some(mt) = wacore::send::media_type_from_message(&message) {
253 plaintext_builder = plaintext_builder.attr("mediatype", mt);
254 }
255 let mut children = vec![plaintext_builder.bytes(message.encode_to_vec()).build()];
256 children.extend(meta_node);
257 children.extend(options.extra_stanza_nodes);
258 let stanza = NodeBuilder::new("message")
259 .attr("to", to)
260 .attr("type", stanza_type)
261 .attr("id", &request_id)
262 .children(children)
263 .build();
264 self.send_node(stanza).await?;
265 return Ok(result);
266 }
267
268 let (edit, inferred_meta) = infer_stanza_metadata(&message);
269 let inferred_biz = infer_biz_node(&message);
270
271 let extra_nodes = if inferred_meta.is_none() && inferred_biz.is_none() {
272 options.extra_stanza_nodes
273 } else {
274 let mut nodes = Vec::with_capacity(2 + options.extra_stanza_nodes.len());
275 nodes.extend(inferred_meta);
276 nodes.extend(inferred_biz);
277 nodes.extend(options.extra_stanza_nodes);
278 nodes
279 };
280 self.send_message_impl(
281 to,
282 &message,
283 Some(request_id),
284 false,
285 false,
286 edit,
287 extra_nodes,
288 )
289 .await?;
290 Ok(result)
291 }
292
293 pub(crate) async fn send_status_message(
303 &self,
304 message: wa::Message,
305 recipients: &[Jid],
306 options: crate::features::status::StatusSendOptions,
307 ) -> Result<SendResult, anyhow::Error> {
308 use wacore::client::context::GroupInfo;
309 use wacore_binary::builder::NodeBuilder;
310
311 if recipients.is_empty() {
312 return Err(anyhow!("Cannot send status with no recipients"));
313 }
314
315 let to = Jid::status_broadcast();
316 let request_id = self.generate_message_id().await;
317
318 let mut device_snapshot = self.persistence_manager.get_device_snapshot().await;
319 let account_info = device_snapshot.account.take();
320 let own_jid = device_snapshot
321 .pn
322 .take()
323 .ok_or(crate::client::ClientError::NotLoggedIn)?;
324 let own_lid = device_snapshot.lid.take().ok_or_else(|| {
328 anyhow!(
329 "Cannot send status: device has no LID yet. Finish pairing / LID \
330 migration before posting status."
331 )
332 })?;
333
334 for jid in recipients {
338 if !(jid.is_pn() || jid.is_lid()) {
339 return Err(anyhow!(
340 "Invalid status recipient {}: must be a user JID (PN or LID), \
341 not a group/broadcast/newsletter/hosted/etc.",
342 jid
343 ));
344 }
345 }
346
347 use std::collections::HashMap;
348 let mut resolved: Vec<Option<Jid>> = Vec::with_capacity(recipients.len());
349 let mut lid_to_pn_map: HashMap<wacore_binary::CompactString, Jid> =
350 HashMap::with_capacity(recipients.len() + 1);
351 for jid in recipients {
352 if let Some(lid_jid) = self.resolve_recipient_to_lid(jid).await {
353 if jid.is_pn() {
354 lid_to_pn_map.insert(lid_jid.user.clone(), jid.to_non_ad());
355 }
356 resolved.push(Some(lid_jid));
357 } else {
358 resolved.push(None);
359 }
360 }
361 lid_to_pn_map.insert(own_lid.user.clone(), own_jid.to_non_ad());
362
363 let participants = wacore::send::assemble_status_participants(resolved, &own_lid)?;
364 let mut group_info =
365 GroupInfo::with_lid_to_pn_map(participants, AddressingMode::Lid, lid_to_pn_map);
366
367 self.add_recent_message(&to, &request_id, &message).await;
368
369 let device_store_arc = self.persistence_manager.get_device_arc().await;
370 let to_str = to.to_string();
371
372 let force_skdm = {
373 use wacore::libsignal::store::sender_key_name::SenderKeyName;
374 let sender_address = own_lid.to_protocol_address();
379 let sender_key_name = SenderKeyName::from_parts(&to_str, sender_address.as_str());
380
381 let device_guard = device_store_arc.read().await;
382 let key_exists = self
383 .signal_cache
384 .get_sender_key(&sender_key_name, &*device_guard.backend)
385 .await?
386 .is_some();
387
388 !key_exists
389 };
390
391 let mut store_adapter = self.signal_adapter_from(device_store_arc.clone());
392 let mut stores = store_adapter.as_signal_stores();
393
394 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
396 None
397 } else {
398 self.resolve_skdm_targets(&to_str, &group_info, &own_lid)
399 .await
400 };
401
402 let extra_stanza_nodes = if wacore::send::status_carries_privacy_meta(&message) {
407 vec![
408 NodeBuilder::new("meta")
409 .attr("status_setting", options.privacy.as_str())
410 .build(),
411 ]
412 } else {
413 vec![]
414 };
415
416 let prepared = match wacore::send::prepare_group_stanza(
417 &*self.runtime,
418 &mut stores,
419 self,
420 &mut group_info,
421 &own_jid,
422 &own_lid,
423 account_info.as_ref(),
424 to.clone(),
425 &message,
426 request_id.clone(),
427 force_skdm,
428 skdm_target_devices,
429 None,
430 &extra_stanza_nodes,
431 )
432 .await
433 {
434 Ok(prepared) => prepared,
435 Err(e) => {
436 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
437 e.downcast_ref::<SignalProtocolError>()
438 {
439 log::warn!("No sender key for status broadcast, forcing distribution.");
440
441 if let Err(e) = self
442 .persistence_manager
443 .clear_sender_key_devices(&to_str)
444 .await
445 {
446 log::warn!(
447 "Failed to clear status SKDM recipients for {}: {:?}",
448 to_str,
449 e
450 );
451 }
452 self.sender_key_device_cache.invalidate(&to_str).await;
453
454 let mut store_adapter_retry =
455 self.signal_adapter_from(device_store_arc.clone());
456 let mut stores_retry = store_adapter_retry.as_signal_stores();
457
458 wacore::send::prepare_group_stanza(
459 &*self.runtime,
460 &mut stores_retry,
461 self,
462 &mut group_info,
463 &own_jid,
464 &own_lid,
465 account_info.as_ref(),
466 to.clone(),
467 &message,
468 request_id.clone(),
469 true,
470 None,
471 None,
472 &extra_stanza_nodes,
473 )
474 .await?
475 } else {
476 return Err(e);
477 }
478 }
479 };
480
481 let stanza = self
482 .ensure_status_participants(prepared.node, &group_info)
483 .await?;
484
485 let ack = if let Some(phash) = stanza
486 .attrs()
487 .optional_string("phash")
488 .map(|s| s.into_owned())
489 {
490 let rx = self.register_ack_waiter(&request_id).await;
491 Some((rx, phash))
492 } else {
493 None
494 };
495
496 if let Err(e) = self.send_node(stanza).await {
497 if ack.is_some() {
498 self.response_waiters.lock().await.remove(&request_id);
499 }
500 return Err(e.into());
501 }
502
503 if let Some((rx, phash)) = ack {
504 self.spawn_phash_validation(rx, phash, to.clone(), true, request_id.clone());
505 }
506
507 self.update_sender_key_devices(&to_str, &prepared.skdm_devices)
508 .await;
509
510 for user in &prepared.stale_device_users {
511 self.invalidate_device_cache(user).await;
512 }
513
514 self.flush_signal_cache_logged("send_status_message", None)
515 .await;
516
517 Ok(SendResult {
518 message_id: request_id,
519 to,
520 })
521 }
522
523 async fn resolve_skdm_targets(
530 &self,
531 group_jid: &str,
532 group_info: &wacore::client::context::GroupInfo,
533 own_sending_jid: &Jid,
534 ) -> Option<Vec<Jid>> {
535 use crate::sender_key_device_cache::SenderKeyDeviceMap;
536
537 let pm = self.persistence_manager.clone();
541 let cached_map = self
542 .sender_key_device_cache
543 .get_or_init(group_jid, async {
544 let db_rows = pm
545 .get_sender_key_devices(group_jid)
546 .await
547 .unwrap_or_else(|e| {
548 log::warn!(
549 "Failed to read sender key devices for {}: {:?}",
550 group_jid,
551 e
552 );
553 vec![]
554 });
555 std::sync::Arc::new(SenderKeyDeviceMap::from_db_rows(&db_rows))
556 })
557 .await;
558
559 let is_lid_mode = group_info.addressing_mode == wacore::types::message::AddressingMode::Lid;
562 let jids_to_resolve: Vec<Jid> = group_info
563 .participants
564 .iter()
565 .map(|jid| {
566 if is_lid_mode
567 && jid.is_lid()
568 && let Some(pn) = group_info.phone_jid_for_lid_user(&jid.user)
569 {
570 return pn.to_non_ad();
571 }
572 jid.to_non_ad()
573 })
574 .collect();
575
576 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
577 Ok(all_devices) => {
578 let all_devices: Vec<Jid> = if is_lid_mode {
579 all_devices
580 .into_iter()
581 .map(|d| group_info.phone_device_jid_to_lid(&d))
582 .collect()
583 } else {
584 all_devices
585 };
586
587 let needs_skdm: Vec<Jid> = all_devices
588 .into_iter()
589 .filter(|device| {
590 if device.is_hosted() {
591 return false;
592 }
593 if device.user == own_sending_jid.user
594 && device.device == own_sending_jid.device
595 {
596 return false;
597 }
598 !cached_map
600 .device_has_key(&device.user, device.device)
601 .unwrap_or(false)
602 || cached_map.is_user_forgotten(&device.user)
603 })
604 .collect();
605
606 if needs_skdm.is_empty() {
607 Some(vec![])
608 } else {
609 log::debug!(
610 "Found {} devices needing SKDM for {}",
611 needs_skdm.len(),
612 group_jid
613 );
614 Some(needs_skdm)
615 }
616 }
617 Err(e) => {
618 log::warn!(
619 "Failed to resolve devices for SKDM check in {}: {:?}",
620 group_jid,
621 e
622 );
623 None
624 }
625 }
626 }
627
628 async fn update_sender_key_devices(&self, group_jid: &str, devices: &[Jid]) {
639 if devices.is_empty() {
640 return;
641 }
642
643 if let Err(e) = self
644 .set_sender_key_status_for_devices(group_jid, devices, true, false)
645 .await
646 {
647 log::warn!(
648 "Failed to update sender key devices for {}: {:?}",
649 group_jid,
650 e
651 );
652 }
653 self.sender_key_device_cache.invalidate(group_jid).await;
654 }
655
656 fn spawn_phash_validation(
659 &self,
660 rx: futures::channel::oneshot::Receiver<std::sync::Arc<wacore_binary::OwnedNodeRef>>,
661 our_phash: String,
662 jid: Jid,
663 invalidate_group_cache: bool,
664 message_id: String,
665 ) {
666 let Some(client) = self.self_weak.get().and_then(|w| w.upgrade()) else {
667 return;
668 };
669 self.runtime
670 .spawn(Box::pin(async move {
671 let ack = match wacore::runtime::timeout(
672 &*client.runtime,
673 std::time::Duration::from_secs(10),
674 rx,
675 )
676 .await
677 {
678 Ok(Ok(node)) => node,
679 _ => {
680 client.response_waiters.lock().await.remove(&message_id);
682 return;
683 }
684 };
685 if let Some(server) = ack.get().get_attr("phash").map(|v| v.as_str())
686 && server != our_phash
687 {
688 log::warn!(
689 "Phash mismatch for {jid}: ours={our_phash}, server={server}. Invalidating caches."
690 );
691 if !jid.is_group() && !jid.is_status_broadcast() {
694 client.invalidate_device_cache(&jid.user).await;
695 if let Some(own_pn) =
696 &client.persistence_manager.get_device_snapshot().await.pn
697 {
698 client.invalidate_device_cache(&own_pn.user).await;
699 }
700 }
701 let jid_str = jid.to_string();
702 if jid.is_group() || jid.is_status_broadcast() {
709 let cleared = client
710 .persistence_manager
711 .clear_sender_key_devices(&jid_str)
712 .await;
713 if let Err(e) = cleared {
714 log::warn!(
715 "phash mismatch: clear_sender_key_devices failed: {e} — \
716 deleting own sender key as fallback to force redistribution"
717 );
718 use wacore::libsignal::store::sender_key_name::SenderKeyName;
719 use wacore::types::jid::JidExt;
720 let snapshot =
721 client.persistence_manager.get_device_snapshot().await;
722 for own in snapshot.lid.iter().chain(snapshot.pn.iter()) {
723 let sk =
724 SenderKeyName::from_parts(&jid_str, own.to_protocol_address().as_str());
725 client.signal_cache.delete_sender_key(sk.cache_key()).await;
726 }
727 let _ = client
728 .flush_signal_cache_logged("phash-mismatch-fallback", None)
729 .await;
730 }
731 }
732 client.sender_key_device_cache.invalidate(&jid_str).await;
733 if invalidate_group_cache {
734 client.get_group_cache().await.invalidate(&jid).await;
735 }
736 }
737 }))
738 .detach();
739 }
740
741 async fn ensure_status_participants(
747 &self,
748 stanza: wacore_binary::Node,
749 group_info: &wacore::client::context::GroupInfo,
750 ) -> Result<wacore_binary::Node, anyhow::Error> {
751 Ok(wacore::send::ensure_status_participants(stanza, group_info))
752 }
753
754 pub async fn revoke_message(
765 &self,
766 to: Jid,
767 message_id: impl Into<String>,
768 revoke_type: RevokeType,
769 ) -> Result<(), anyhow::Error> {
770 let message_id = message_id.into();
771 self.require_pn().await?;
772
773 let (from_me, participant, edit_attr) = match &revoke_type {
774 RevokeType::Sender => {
775 (
778 true,
779 None,
780 crate::types::message::EditAttribute::SenderRevoke,
781 )
782 }
783 RevokeType::Admin { original_sender } => {
784 if !to.is_group() {
786 return Err(anyhow!("Admin revoke is only valid for group chats"));
787 }
788 let participant_str = original_sender.to_non_ad().to_string();
791 log::debug!(
792 "Admin revoke: using participant {} for MessageKey",
793 participant_str
794 );
795 (
796 false,
797 Some(participant_str),
798 crate::types::message::EditAttribute::AdminRevoke,
799 )
800 }
801 };
802
803 let revoke_message = build_revoke_message(&to, from_me, message_id, participant);
804
805 let force_skdm = matches!(revoke_type, RevokeType::Admin { .. });
812 self.send_message_impl(
813 to,
814 &revoke_message,
815 None,
816 false,
817 force_skdm,
818 Some(edit_attr),
819 vec![],
820 )
821 .await
822 }
823
824 pub async fn pin_message(
826 &self,
827 chat: Jid,
828 key: wa::MessageKey,
829 duration: PinDuration,
830 ) -> Result<(), anyhow::Error> {
831 self.send_pin(
832 chat,
833 key,
834 wa::message::pin_in_chat_message::Type::PinForAll,
835 duration.as_secs(),
836 )
837 .await
838 }
839
840 pub async fn unpin_message(&self, chat: Jid, key: wa::MessageKey) -> Result<(), anyhow::Error> {
842 self.send_pin(
843 chat,
844 key,
845 wa::message::pin_in_chat_message::Type::UnpinForAll,
846 0,
847 )
848 .await
849 }
850
851 async fn send_pin(
852 &self,
853 chat: Jid,
854 key: wa::MessageKey,
855 pin_type: wa::message::pin_in_chat_message::Type,
856 duration_secs: u32,
857 ) -> Result<(), anyhow::Error> {
858 let message = wa::Message {
859 pin_in_chat_message: Some(wa::message::PinInChatMessage {
860 key: Some(key),
861 r#type: Some(pin_type as i32),
862 sender_timestamp_ms: Some(wacore::time::now_millis()),
863 }),
864 message_context_info: Some(wa::MessageContextInfo {
865 message_add_on_duration_in_secs: Some(duration_secs),
866 ..Default::default()
867 }),
868 ..Default::default()
869 };
870
871 self.send_message_impl(
872 chat,
873 &message,
874 None,
875 false,
876 false,
877 Some(crate::types::message::EditAttribute::PinInChat),
878 vec![],
879 )
880 .await
881 }
882
883 #[allow(clippy::too_many_arguments)]
884 pub(crate) async fn send_message_impl(
885 &self,
886 to: Jid,
887 message: &wa::Message,
888 request_id_override: Option<String>,
889 peer: bool,
890 force_key_distribution: bool,
891 edit: Option<crate::types::message::EditAttribute>,
892 extra_stanza_nodes: Vec<Node>,
893 ) -> Result<(), anyhow::Error> {
894 let (to, is_status_addon) = if to.is_status_broadcast() {
897 let author = message
898 .reaction_message
899 .as_ref()
900 .and_then(|rm| rm.key.as_ref())
901 .and_then(|k| k.participant.as_ref())
902 .and_then(|p| p.parse::<Jid>().ok())
903 .filter(|jid| jid.is_pn() || jid.is_lid())
904 .ok_or_else(|| {
905 anyhow!(
906 "send_message to status@broadcast requires \
907 reaction_message.key.participant = status author (user JID). \
908 Use client.status() for posting new statuses."
909 )
910 })?;
911 (author, true)
912 } else {
913 (to, false)
914 };
915
916 let request_id = match request_id_override {
918 Some(id) => id,
919 None => self.generate_message_id().await,
920 };
921
922 struct SkdmUpdate {
925 to_str: String,
926 devices: Vec<Jid>,
927 stale_users: Vec<String>,
928 }
929 let mut skdm_update: Option<SkdmUpdate> = None;
930 let mut should_issue_tc_token_after_send = false;
931 let mut used_cached_tc_token_key: Option<String> = None;
932 let tc_issue_target = to.clone();
933
934 let mut dm_phash: Option<String> = None;
935 let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
936 let encryption_jid = self.resolve_encryption_jid(&to).await;
939 let signal_addr = encryption_jid.to_protocol_address();
940
941 let session_mutex = self.session_lock_for(signal_addr.as_str()).await;
942 let _session_guard = session_mutex.lock().await;
943
944 let mut store_adapter = self.signal_adapter().await;
945
946 wacore::send::prepare_peer_stanza(
947 &mut store_adapter.session_store,
948 &mut store_adapter.identity_store,
949 to,
950 &signal_addr,
951 message,
952 request_id,
953 )
954 .await?
955 } else if to.is_group() {
956 let mut group_info = self.groups().query_info(&to).await?;
961
962 let mut device_snapshot = self.persistence_manager.get_device_snapshot().await;
963 let account_info = device_snapshot.account.take();
964 let own_jid = device_snapshot
965 .pn
966 .take()
967 .ok_or(crate::client::ClientError::NotLoggedIn)?;
968 let own_lid = device_snapshot
969 .lid
970 .take()
971 .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
972
973 self.add_recent_message(&to, &request_id, message).await;
975
976 let device_store_arc = self.persistence_manager.get_device_arc().await;
977 let to_str = to.to_string();
978
979 let (own_sending_jid, _) = match group_info.addressing_mode {
980 crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
981 crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
982 };
983
984 if !group_info
985 .participants
986 .iter()
987 .any(|participant| participant.is_same_user_as(&own_sending_jid))
988 {
989 group_info.participants.push(own_sending_jid.to_non_ad());
990 }
991
992 let force_skdm = {
993 use wacore::libsignal::store::sender_key_name::SenderKeyName;
994 let sender_address = own_sending_jid.to_protocol_address();
995 let sender_key_name = SenderKeyName::from_parts(&to_str, sender_address.as_str());
996
997 let device_guard = device_store_arc.read().await;
998 let record = self
999 .signal_cache
1000 .get_sender_key(&sender_key_name, &*device_guard.backend)
1001 .await?;
1002 let key_exists = record.is_some();
1003
1004 const SENDER_KEY_ROTATION_THRESHOLD: u32 = 1000;
1008 let needs_rotation = record
1009 .and_then(|mut r| r.sender_key_state_mut().ok().cloned())
1010 .and_then(|state| state.sender_chain_key().map(|ck| ck.iteration()))
1011 .is_some_and(|iter| iter >= SENDER_KEY_ROTATION_THRESHOLD);
1012 drop(device_guard);
1013
1014 if needs_rotation {
1015 log::info!(
1016 "Periodic sender-key rotation for {to} (chain iteration ≥ {SENDER_KEY_ROTATION_THRESHOLD})"
1017 );
1018 self.signal_cache
1019 .delete_sender_key(sender_key_name.cache_key())
1020 .await;
1021 if let Err(e) = self
1022 .persistence_manager
1023 .clear_sender_key_devices(&to_str)
1024 .await
1025 {
1026 log::warn!("periodic rotation: clear_sender_key_devices failed: {e}");
1027 }
1028 self.sender_key_device_cache.invalidate(&to_str).await;
1029 }
1030
1031 force_key_distribution || !key_exists || needs_rotation
1032 };
1033
1034 let mut store_adapter = self.signal_adapter_from(device_store_arc.clone());
1035
1036 let mut stores = store_adapter.as_signal_stores();
1037
1038 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
1041 None
1042 } else {
1043 self.resolve_skdm_targets(&to_str, &group_info, &own_sending_jid)
1044 .await
1045 };
1046
1047 match wacore::send::prepare_group_stanza(
1048 &*self.runtime,
1049 &mut stores,
1050 self,
1051 &mut group_info,
1052 &own_jid,
1053 &own_lid,
1054 account_info.as_ref(),
1055 to.clone(),
1056 message,
1057 request_id.clone(),
1058 force_skdm,
1059 skdm_target_devices,
1060 edit.clone(),
1061 &extra_stanza_nodes,
1062 )
1063 .await
1064 {
1065 Ok(prepared) => {
1066 skdm_update = Some(SkdmUpdate {
1067 to_str: to_str.clone(),
1068 devices: prepared.skdm_devices,
1069 stale_users: prepared.stale_device_users,
1070 });
1071 prepared.node
1072 }
1073 Err(e) => {
1074 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
1075 e.downcast_ref::<SignalProtocolError>()
1076 {
1077 log::warn!("No sender key for group {}, forcing distribution.", to);
1078
1079 if let Err(e) = self
1080 .persistence_manager
1081 .clear_sender_key_devices(&to_str)
1082 .await
1083 {
1084 log::warn!("Failed to clear SKDM recipients: {:?}", e);
1085 }
1086 self.sender_key_device_cache.invalidate(&to_str).await;
1087
1088 let mut store_adapter_retry =
1089 self.signal_adapter_from(device_store_arc.clone());
1090 let mut stores_retry = store_adapter_retry.as_signal_stores();
1091
1092 let retry_prepared = wacore::send::prepare_group_stanza(
1093 &*self.runtime,
1094 &mut stores_retry,
1095 self,
1096 &mut group_info,
1097 &own_jid,
1098 &own_lid,
1099 account_info.as_ref(),
1100 to,
1101 message,
1102 request_id,
1103 true,
1104 None,
1105 edit.clone(),
1106 &extra_stanza_nodes,
1107 )
1108 .await?;
1109
1110 skdm_update = Some(SkdmUpdate {
1111 to_str,
1112 devices: retry_prepared.skdm_devices,
1113 stale_users: retry_prepared.stale_device_users,
1114 });
1115 retry_prepared.node
1116 } else {
1117 return Err(e);
1118 }
1119 }
1120 }
1121 } else {
1122 if is_status_addon {
1128 self.add_recent_message(&Jid::status_broadcast(), &request_id, message)
1129 .await;
1130 } else {
1131 self.add_recent_message(&to, &request_id, message).await;
1132 }
1133
1134 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1135 let own_jid = device_snapshot
1136 .pn
1137 .as_ref()
1138 .ok_or(crate::client::ClientError::NotLoggedIn)?;
1139
1140 if to.is_pn() && self.lid_pn_cache.get_current_lid(&to.user).await.is_none() {
1142 let sid = self.generate_request_id();
1143 let spec = wacore::iq::usync::LidQuerySpec::new(vec![to.to_non_ad()], sid);
1144 match self.execute(spec).await {
1146 Ok(resp) => {
1147 for mapping in &resp.lid_mappings {
1148 if let Err(e) = self
1149 .add_lid_pn_mapping(
1150 &mapping.lid,
1151 &mapping.phone_number,
1152 crate::lid_pn_cache::LearningSource::Usync,
1153 )
1154 .await
1155 {
1156 log::warn!(
1157 "Failed to persist LID mapping {} -> {}: {e:?}",
1158 mapping.phone_number,
1159 mapping.lid
1160 );
1161 }
1162 }
1163 }
1164 Err(e) => {
1165 log::warn!("LID query failed for {}, falling back to PN: {e:?}", to);
1166 }
1167 }
1168 }
1169
1170 let recipient_bare = self.resolve_encryption_jid(&to).await.to_non_ad();
1174
1175 let mut recipient_cached = self.get_devices_from_registry(&recipient_bare).await;
1178 if recipient_cached.is_none() {
1179 let _ = self.get_user_devices(std::slice::from_ref(&to)).await;
1180 recipient_cached = self.get_devices_from_registry(&recipient_bare).await;
1181 }
1182
1183 let mut own_cached = self.get_devices_from_registry(own_jid).await;
1184 if own_cached.is_none() {
1185 let _ = self.get_user_devices(std::slice::from_ref(own_jid)).await;
1186 own_cached = self.get_devices_from_registry(own_jid).await;
1187 }
1188
1189 let mut all_dm_jids = match recipient_cached {
1191 Some(mut devices) => {
1192 devices.retain(|j| !j.is_hosted());
1193 devices
1194 }
1195 None => vec![recipient_bare],
1197 };
1198
1199 if let Some(mut own_devices) = own_cached {
1200 own_devices.retain(|j| !j.is_hosted());
1201 all_dm_jids.append(&mut own_devices);
1202 }
1203
1204 let own_lid = device_snapshot.lid.as_ref();
1207 all_dm_jids.retain(|j| {
1208 let is_sender = (j.is_same_user_as(own_jid) && j.device == own_jid.device)
1209 || own_lid.is_some_and(|lid| j.is_same_user_as(lid) && j.device == lid.device);
1210 !is_sender
1211 });
1212
1213 wacore::types::jid::sort_dedup_by_device(&mut all_dm_jids);
1217
1218 self.ensure_e2e_sessions(&all_dm_jids).await?;
1219
1220 let mut extra_stanza_nodes = extra_stanza_nodes;
1221 if !to.is_group() && !to.is_newsletter() && !is_status_addon {
1224 let (should_issue_after_send, cached_token_key) = self
1225 .maybe_include_tc_token(&to, &mut extra_stanza_nodes)
1226 .await;
1227 should_issue_tc_token_after_send = should_issue_after_send;
1228 if should_issue_after_send {
1229 used_cached_tc_token_key = cached_token_key;
1230 }
1231 }
1232 if should_issue_tc_token_after_send {
1233 debug!(target: "Client/TcToken", "Scheduled tc token issuance after send for {}", to);
1234 }
1235
1236 let lock_jids = self.build_session_lock_keys(&all_dm_jids).await;
1237 let _session_mutexes = self.session_mutexes_for(&lock_jids).await;
1238 let mut _session_guards = Vec::with_capacity(_session_mutexes.len());
1239 for mutex in &_session_mutexes {
1240 _session_guards.push(mutex.lock().await);
1241 }
1242
1243 let mut store_adapter = self.signal_adapter().await;
1244
1245 let mut stores = store_adapter.as_signal_stores();
1246
1247 let prepared = wacore::send::prepare_dm_stanza(
1248 &*self.runtime,
1249 &mut stores,
1250 self,
1251 own_jid,
1252 device_snapshot.lid.as_ref(),
1253 device_snapshot.account.as_ref(),
1254 to,
1255 message,
1256 request_id,
1257 edit,
1258 &extra_stanza_nodes,
1259 all_dm_jids,
1260 )
1261 .await?;
1262 dm_phash = prepared.phash;
1263 prepared.node
1264 };
1265
1266 let ack = if let Some(phash) = dm_phash
1267 && let Some(msg_id) = stanza_to_send
1268 .attrs()
1269 .optional_string("id")
1270 .map(|s| s.into_owned())
1271 {
1272 let rx = self.register_ack_waiter(&msg_id).await;
1273 Some((rx, phash, msg_id))
1274 } else {
1275 None
1276 };
1277
1278 let mut stanza_to_send = stanza_to_send;
1281 if is_status_addon {
1282 stanza_to_send.attrs.insert("to", Jid::status_broadcast());
1283 }
1284
1285 if let Err(e) = self.send_node(stanza_to_send).await {
1286 if let Some((_, _, ref msg_id)) = ack {
1287 self.response_waiters.lock().await.remove(msg_id);
1288 }
1289 return Err(e.into());
1290 }
1291
1292 if let Some((rx, phash, msg_id)) = ack {
1293 let invalidate_group = tc_issue_target.is_group();
1296 self.spawn_phash_validation(
1297 rx,
1298 phash,
1299 tc_issue_target.clone(),
1300 invalidate_group,
1301 msg_id,
1302 );
1303 }
1304
1305 if let Some(update) = skdm_update {
1306 self.update_sender_key_devices(&update.to_str, &update.devices)
1307 .await;
1308 for user in &update.stale_users {
1309 self.invalidate_device_cache(user).await;
1310 }
1311 }
1312
1313 self.flush_signal_cache_logged("send_message_impl", None)
1315 .await;
1316
1317 if should_issue_tc_token_after_send {
1320 if let Some(client) = self.self_weak.get().and_then(|w| w.upgrade()) {
1321 let target = tc_issue_target;
1322 let cached_key = used_cached_tc_token_key;
1323 self.runtime
1324 .spawn(Box::pin(async move {
1325 let issued_ok = client.issue_tc_token_after_send(&target).await;
1326 if issued_ok && let Some(token_key) = cached_key {
1327 client.mark_tc_token_used_after_send(&token_key).await;
1328 }
1329 }))
1330 .detach();
1331 } else {
1332 log::debug!(target: "Client/TcToken", "Skipping fire-and-forget issuance: client dropped");
1333 }
1334 }
1335
1336 Ok(())
1337 }
1338
1339 async fn maybe_include_tc_token(
1349 &self,
1350 to: &Jid,
1351 extra_nodes: &mut Vec<Node>,
1352 ) -> (bool, Option<String>) {
1353 use wacore::iq::props::config_codes;
1354 use wacore::iq::tctoken::{
1355 build_cs_token_node, build_tc_token_node, compute_cs_token, is_tc_token_expired_with,
1356 should_send_new_tc_token_with,
1357 };
1358
1359 let snapshot = self.persistence_manager.get_device_snapshot().await;
1361 let is_self = snapshot
1362 .pn
1363 .as_ref()
1364 .is_some_and(|pn| pn.is_same_user_as(to))
1365 || snapshot
1366 .lid
1367 .as_ref()
1368 .is_some_and(|lid| lid.is_same_user_as(to));
1369 if is_self {
1370 return (false, None);
1371 }
1372
1373 if to.is_bot() || to.is_status_broadcast() {
1375 return (false, None);
1376 }
1377
1378 let cached_lid = if to.is_lid() {
1381 None
1382 } else {
1383 self.lid_pn_cache.get_current_lid(&to.user).await
1384 };
1385 let resolved_lid_user: Option<&str> = if to.is_lid() {
1386 Some(&to.user)
1387 } else {
1388 cached_lid.as_deref()
1389 };
1390 let token_jid: &str = resolved_lid_user.unwrap_or(&to.user);
1391
1392 let backend = self.persistence_manager.backend();
1393 let tc_config = self.tc_token_config().await;
1394
1395 let existing = match backend.get_tc_token(token_jid).await {
1397 Ok(entry) => entry,
1398 Err(e) => {
1399 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1400 None
1401 }
1402 };
1403
1404 let should_issue_after_send = should_send_new_tc_token_with(
1407 existing.as_ref().and_then(|entry| entry.sender_timestamp),
1408 &tc_config,
1409 );
1410
1411 let token_send_enabled = self
1413 .ab_props
1414 .is_enabled_or(config_codes::PRIVACY_TOKEN_ON_ALL_1_ON_1_MESSAGES, false)
1415 .await;
1416
1417 if token_send_enabled {
1418 match existing {
1419 Some(ref entry)
1420 if !is_tc_token_expired_with(entry.token_timestamp, &tc_config)
1421 && !entry.token.is_empty() =>
1422 {
1423 extra_nodes.push(build_tc_token_node(&entry.token));
1424 return (should_issue_after_send, Some(token_jid.to_string()));
1425 }
1426 _ => {
1427 let nct_send_enabled = self
1429 .ab_props
1430 .is_enabled_or(config_codes::NCT_TOKEN_SEND_ENABLED, false)
1431 .await;
1432
1433 if nct_send_enabled
1434 && let Some(salt) = &snapshot.nct_salt
1435 && let Some(lid_user) = &resolved_lid_user
1436 {
1437 let recipient_lid =
1440 wacore_binary::Jid::new(*lid_user, Server::Lid).to_string();
1441 let cs_token = compute_cs_token(salt, &recipient_lid);
1442 extra_nodes.push(build_cs_token_node(&cs_token));
1443 log::debug!(target: "Client/CsToken", "Attached cstoken for {} (NCT fallback)", to);
1444 } else {
1445 log::debug!(target: "Client/CsToken", "No tctoken or NCT salt/LID available for {}", to);
1446 }
1447 }
1448 }
1449 }
1450
1451 (should_issue_after_send, None)
1452 }
1453
1454 async fn issue_tc_token_after_send(&self, to: &Jid) -> bool {
1456 use wacore::iq::tctoken::IssuePrivacyTokensSpec;
1457
1458 if to.is_bot() || to.is_status_broadcast() {
1460 return false;
1461 }
1462
1463 let issuance_jid = self.resolve_issuance_jid(to).await;
1464 let Ok(response) = self
1465 .execute(IssuePrivacyTokensSpec::new(std::slice::from_ref(
1466 &issuance_jid,
1467 )))
1468 .await
1469 else {
1470 log::debug!(target: "Client/TcToken", "Failed to issue tc_token for {}", issuance_jid);
1471 return false;
1472 };
1473
1474 self.store_issued_tc_tokens(&response.tokens).await
1475 }
1476
1477 pub(crate) async fn store_issued_tc_tokens(
1479 &self,
1480 tokens: &[wacore::iq::tctoken::ReceivedTcToken],
1481 ) -> bool {
1482 use wacore::store::traits::TcTokenEntry;
1483
1484 if tokens.is_empty() {
1485 return false;
1486 }
1487
1488 let backend = self.persistence_manager.backend();
1489 let now = wacore::time::now_secs();
1490 let mut any_stored = false;
1491 for received in tokens {
1492 if received.token.is_empty() {
1493 log::warn!(target: "Client/TcToken", "Server returned empty tc_token for {}, skipping", received.jid);
1494 continue;
1495 }
1496
1497 let entry = TcTokenEntry {
1498 token: received.token.clone(),
1499 token_timestamp: received.timestamp,
1500 sender_timestamp: Some(now),
1501 };
1502
1503 if let Err(e) = backend.put_tc_token(&received.jid.user, &entry).await {
1504 log::warn!(target: "Client/TcToken", "Failed to store issued tc_token: {e}");
1505 } else {
1506 any_stored = true;
1507 }
1508 }
1509 any_stored
1510 }
1511
1512 async fn store_issued_tc_tokens_with_sender_ts(
1515 &self,
1516 tokens: &[wacore::iq::tctoken::ReceivedTcToken],
1517 sender_ts: i64,
1518 ) {
1519 use wacore::store::traits::TcTokenEntry;
1520
1521 let backend = self.persistence_manager.backend();
1522 for received in tokens {
1523 if received.token.is_empty() {
1524 continue;
1525 }
1526 let entry = TcTokenEntry {
1527 token: received.token.clone(),
1528 token_timestamp: received.timestamp,
1529 sender_timestamp: Some(sender_ts),
1530 };
1531 if let Err(e) = backend.put_tc_token(&received.jid.user, &entry).await {
1532 log::warn!(target: "Client/TcToken", "Failed to store re-issued tc_token: {e}");
1533 }
1534 }
1535 }
1536
1537 async fn mark_tc_token_used_after_send(&self, token_key: &str) {
1538 use wacore::store::traits::TcTokenEntry;
1539
1540 let backend = self.persistence_manager.backend();
1541 let existing = match backend.get_tc_token(token_key).await {
1542 Ok(entry) => entry,
1543 Err(e) => {
1544 log::warn!(target: "Client/TcToken", "Failed to reload tc_token for {}: {e}", token_key);
1545 return;
1546 }
1547 };
1548
1549 let Some(entry) = existing else {
1550 return;
1551 };
1552 if entry.token.is_empty() {
1553 return;
1554 }
1555
1556 let updated_entry = TcTokenEntry {
1557 sender_timestamp: Some(wacore::time::now_secs()),
1558 ..entry
1559 };
1560 if let Err(e) = backend.put_tc_token(token_key, &updated_entry).await {
1561 log::warn!(target: "Client/TcToken", "Failed to update sender_timestamp for {}: {e}", token_key);
1562 }
1563 }
1564
1565 pub(crate) async fn reissue_tc_token_after_identity_change(&self, sender: &Jid) {
1569 use wacore::iq::tctoken::{IssuePrivacyTokensSpec, is_sender_tc_token_expired};
1570
1571 let bare = sender.to_non_ad().to_string();
1573 let mutex = self.session_lock_for(&bare).await;
1574 let Some(_guard) = mutex.try_lock() else {
1575 return;
1576 };
1577
1578 let resolved_lid = if sender.is_lid() {
1579 None
1580 } else {
1581 self.lid_pn_cache.get_current_lid(&sender.user).await
1582 };
1583 let token_jid: &str = resolved_lid.as_deref().unwrap_or(&sender.user);
1584
1585 let backend = self.persistence_manager.backend();
1586 let entry = match backend.get_tc_token(token_jid).await {
1587 Ok(Some(e)) => e,
1588 _ => return,
1589 };
1590
1591 let Some(sender_ts) = entry.sender_timestamp else {
1592 return;
1593 };
1594
1595 let tc_config = self.tc_token_config().await;
1597 if is_sender_tc_token_expired(sender_ts, &tc_config) {
1598 return;
1599 }
1600
1601 let issuance_jid = self.resolve_issuance_jid(sender).await;
1603 match self
1604 .execute(IssuePrivacyTokensSpec::with_timestamp(
1605 std::slice::from_ref(&issuance_jid),
1606 sender_ts,
1607 ))
1608 .await
1609 {
1610 Ok(response) => {
1611 self.store_issued_tc_tokens_with_sender_ts(&response.tokens, sender_ts)
1613 .await;
1614 log::debug!(
1615 target: "Client/TcToken",
1616 "Re-issued tctoken after identity change for {}",
1617 sender
1618 );
1619 }
1620 Err(e) => {
1621 log::debug!(
1622 target: "Client/TcToken",
1623 "Failed to re-issue tctoken after identity change for {}: {e}",
1624 sender
1625 );
1626 }
1627 }
1628 }
1629
1630 pub(crate) async fn lookup_tc_token_for_jid(&self, jid: &Jid) -> Option<Vec<u8>> {
1634 use wacore::iq::tctoken::is_tc_token_expired_with;
1635
1636 let resolved_lid = if jid.is_lid() {
1637 None
1638 } else {
1639 self.lid_pn_cache.get_current_lid(&jid.user).await
1640 };
1641 let token_jid: &str = resolved_lid.as_deref().unwrap_or(&jid.user);
1642
1643 let tc_config = self.tc_token_config().await;
1644 let backend = self.persistence_manager.backend();
1645 match backend.get_tc_token(token_jid).await {
1646 Ok(Some(entry))
1647 if !entry.token.is_empty()
1648 && !is_tc_token_expired_with(entry.token_timestamp, &tc_config) =>
1649 {
1650 Some(entry.token)
1651 }
1652 Ok(_) => None,
1653 Err(e) => {
1654 log::warn!(target: "Client/TcToken", "Failed to get tc_token for {}: {e}", token_jid);
1655 None
1656 }
1657 }
1658 }
1659
1660 pub(crate) async fn build_session_lock_keys(&self, device_jids: &[Jid]) -> Vec<Jid> {
1665 let mut keys: Vec<Jid> = Vec::with_capacity(device_jids.len());
1666 for jid in device_jids {
1667 keys.push(self.resolve_encryption_jid(jid).await);
1668 }
1669 keys.sort_unstable_by(wacore::types::jid::cmp_for_lock_order);
1670 keys.dedup_by(|a, b| wacore::types::jid::cmp_for_lock_order(a, b).is_eq());
1671 keys
1672 }
1673
1674 pub(crate) async fn session_mutexes_for(
1676 &self,
1677 jids: &[Jid],
1678 ) -> Vec<std::sync::Arc<async_lock::Mutex<()>>> {
1679 let mut mutexes = Vec::with_capacity(jids.len());
1680 let mut buf = wacore::types::jid::make_address_buffer();
1681 for jid in jids {
1682 wacore::types::jid::write_protocol_address_to(jid, &mut buf);
1683 mutexes.push(self.session_lock_for(&buf).await);
1684 }
1685 mutexes
1686 }
1687
1688 pub(crate) async fn tc_token_config(&self) -> wacore::iq::tctoken::TcTokenConfig {
1690 use wacore::iq::props::config_codes;
1691 use wacore::iq::tctoken::{TC_TOKEN_BUCKET_DURATION, TC_TOKEN_NUM_BUCKETS, TcTokenConfig};
1692
1693 TcTokenConfig {
1694 bucket_duration: self
1695 .ab_props
1696 .get_int(config_codes::TCTOKEN_DURATION, TC_TOKEN_BUCKET_DURATION)
1697 .await,
1698 num_buckets: self
1699 .ab_props
1700 .get_int(config_codes::TCTOKEN_NUM_BUCKETS, TC_TOKEN_NUM_BUCKETS)
1701 .await,
1702 sender_bucket_duration: self
1703 .ab_props
1704 .get_int(
1705 config_codes::TCTOKEN_DURATION_SENDER,
1706 TC_TOKEN_BUCKET_DURATION,
1707 )
1708 .await,
1709 sender_num_buckets: self
1710 .ab_props
1711 .get_int(
1712 config_codes::TCTOKEN_NUM_BUCKETS_SENDER,
1713 TC_TOKEN_NUM_BUCKETS,
1714 )
1715 .await,
1716 }
1717 .clamped()
1718 }
1719
1720 async fn resolve_to_lid_jid(&self, jid: &Jid) -> Jid {
1722 if jid.is_lid() {
1723 return jid.to_non_ad();
1724 }
1725
1726 if let Some(lid_user) = self.lid_pn_cache.get_current_lid(&jid.user).await {
1727 Jid::new(&lid_user, Server::Lid)
1728 } else {
1729 jid.to_non_ad()
1730 }
1731 }
1732
1733 async fn resolve_issuance_jid(&self, jid: &Jid) -> Jid {
1736 use wacore::iq::props::config_codes;
1737
1738 let issue_to_lid = self
1740 .ab_props
1741 .is_enabled_or(config_codes::LID_TRUSTED_TOKEN_ISSUE_TO_LID, true)
1742 .await;
1743
1744 let resolved = if issue_to_lid {
1745 self.resolve_to_lid_jid(jid).await
1746 } else if jid.is_lid() {
1747 if let Some(pn) = self.lid_pn_cache.get_phone_number(&jid.user).await {
1748 Jid::new(&pn, Server::Pn)
1749 } else {
1750 jid.to_non_ad()
1751 }
1752 } else {
1753 jid.to_non_ad()
1754 };
1755 resolved.to_non_ad()
1757 }
1758}
1759
1760#[cfg(test)]
1761mod tests {
1762 use super::*;
1763 use std::str::FromStr;
1764
1765 #[tokio::test]
1766 async fn send_message_to_status_without_reaction_errors() {
1767 let client = crate::test_utils::create_test_client().await;
1768 let to = Jid::status_broadcast();
1769 let err = client
1770 .send_message(
1771 to,
1772 wa::Message {
1773 conversation: Some("hi".into()),
1774 ..Default::default()
1775 },
1776 )
1777 .await
1778 .expect_err("status@broadcast without reaction must error");
1779 let msg = format!("{err}");
1780 assert!(
1781 msg.contains("reaction_message") || msg.contains("status"),
1782 "unexpected error: {msg}"
1783 );
1784 }
1785
1786 #[tokio::test]
1787 async fn send_message_to_status_reaction_rejects_non_user_participant() {
1788 let client = crate::test_utils::create_test_client().await;
1789 let to = Jid::status_broadcast();
1790 let err = client
1791 .send_message(
1792 to,
1793 wa::Message {
1794 reaction_message: Some(wa::message::ReactionMessage {
1795 key: Some(wa::MessageKey {
1796 remote_jid: Some("status@broadcast".into()),
1797 from_me: Some(false),
1798 id: Some("ORIGID".into()),
1799 participant: Some("120363040237990503@g.us".into()),
1800 }),
1801 text: Some("❤️".into()),
1802 sender_timestamp_ms: Some(1),
1803 ..Default::default()
1804 }),
1805 ..Default::default()
1806 },
1807 )
1808 .await
1809 .expect_err("group JID as participant must error");
1810 assert!(
1811 format!("{err}").contains("user JID"),
1812 "expected user-JID error, got: {err}"
1813 );
1814 }
1815
1816 #[tokio::test]
1817 async fn send_message_to_status_reaction_without_participant_errors() {
1818 let client = crate::test_utils::create_test_client().await;
1819 let to = Jid::status_broadcast();
1820 let err = client
1821 .send_message(
1822 to,
1823 wa::Message {
1824 reaction_message: Some(wa::message::ReactionMessage {
1825 key: Some(wa::MessageKey {
1826 remote_jid: Some("status@broadcast".into()),
1827 from_me: Some(false),
1828 id: Some("ORIGID".into()),
1829 participant: None,
1830 }),
1831 text: Some("❤️".into()),
1832 sender_timestamp_ms: Some(1),
1833 ..Default::default()
1834 }),
1835 ..Default::default()
1836 },
1837 )
1838 .await
1839 .expect_err("reaction without key.participant must error");
1840 assert!(
1841 format!("{err}").contains("participant"),
1842 "expected participant error, got: {err}"
1843 );
1844 }
1845
1846 #[test]
1847 fn test_revoke_type_default_is_sender() {
1848 let revoke_type = RevokeType::default();
1850 assert_eq!(revoke_type, RevokeType::Sender);
1851 }
1852
1853 #[test]
1854 fn test_force_skdm_only_for_admin_revoke() {
1855 let sender_jid = Jid::from_str("123456@s.whatsapp.net").unwrap();
1859
1860 let sender_revoke = RevokeType::Sender;
1861 let admin_revoke = RevokeType::Admin {
1862 original_sender: sender_jid,
1863 };
1864
1865 let force_skdm_sender = matches!(sender_revoke, RevokeType::Admin { .. });
1867 let force_skdm_admin = matches!(admin_revoke, RevokeType::Admin { .. });
1868
1869 assert!(!force_skdm_sender, "Sender revoke should NOT force SKDM");
1870 assert!(force_skdm_admin, "Admin revoke MUST force SKDM");
1871 }
1872
1873 #[test]
1874 fn test_sender_revoke_message_key_structure() {
1875 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1878 let message_id = "3EB0ABC123".to_string();
1879
1880 let (from_me, participant, edit_attr) = match RevokeType::Sender {
1881 RevokeType::Sender => (
1882 true,
1883 None,
1884 crate::types::message::EditAttribute::SenderRevoke,
1885 ),
1886 RevokeType::Admin { original_sender } => (
1887 false,
1888 Some(original_sender.to_non_ad().to_string()),
1889 crate::types::message::EditAttribute::AdminRevoke,
1890 ),
1891 };
1892
1893 assert!(from_me, "Sender revoke must have from_me=true");
1894 assert!(
1895 participant.is_none(),
1896 "Sender revoke must NOT set participant"
1897 );
1898 assert_eq!(edit_attr.to_string_val(), "7");
1899
1900 let revoke_message = build_revoke_message(&to, from_me, message_id.clone(), participant);
1901
1902 let proto_msg = revoke_message.protocol_message.unwrap();
1903 let key = proto_msg.key.unwrap();
1904 assert_eq!(key.from_me, Some(true));
1905 assert_eq!(key.participant, None);
1906 assert_eq!(key.id, Some(message_id));
1907 }
1908
1909 #[test]
1910 fn test_admin_revoke_message_key_structure() {
1911 let to = Jid::from_str("120363040237990503@g.us").unwrap();
1914 let message_id = "3EB0ABC123".to_string();
1915 let original_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1916
1917 let revoke_type = RevokeType::Admin {
1918 original_sender: original_sender.clone(),
1919 };
1920 let (from_me, participant, edit_attr) = match revoke_type {
1921 RevokeType::Sender => (
1922 true,
1923 None,
1924 crate::types::message::EditAttribute::SenderRevoke,
1925 ),
1926 RevokeType::Admin { original_sender } => (
1927 false,
1928 Some(original_sender.to_non_ad().to_string()),
1929 crate::types::message::EditAttribute::AdminRevoke,
1930 ),
1931 };
1932
1933 assert!(!from_me, "Admin revoke must have from_me=false");
1934 assert!(
1935 participant.is_some(),
1936 "Admin revoke MUST set participant to original sender"
1937 );
1938 assert_eq!(edit_attr.to_string_val(), "8");
1939
1940 let revoke_message =
1941 build_revoke_message(&to, from_me, message_id.clone(), participant.clone());
1942
1943 let proto_msg = revoke_message.protocol_message.unwrap();
1944 let key = proto_msg.key.unwrap();
1945 assert_eq!(key.from_me, Some(false));
1946 assert_eq!(key.participant, Some("236395184570386@lid".to_string()));
1948 assert_eq!(key.id, Some(message_id));
1949 }
1950
1951 #[test]
1952 fn test_admin_revoke_preserves_lid_format() {
1953 let lid_sender = Jid::from_str("236395184570386:22@lid").unwrap();
1957 let participant_str = lid_sender.to_non_ad().to_string();
1958
1959 assert_eq!(participant_str, "236395184570386@lid");
1961 assert!(
1962 participant_str.ends_with("@lid"),
1963 "LID participant must preserve @lid suffix"
1964 );
1965 }
1966
1967 #[test]
1970 fn test_skdm_recipient_filtering_basic() {
1971 use std::collections::HashSet;
1972
1973 let known_recipients: Vec<Jid> = [
1974 "1234567890:0@s.whatsapp.net",
1975 "1234567890:5@s.whatsapp.net",
1976 "9876543210:0@s.whatsapp.net",
1977 ]
1978 .into_iter()
1979 .map(|s| Jid::from_str(s).unwrap())
1980 .collect();
1981
1982 let all_devices: Vec<Jid> = [
1983 "1234567890:0@s.whatsapp.net",
1984 "1234567890:5@s.whatsapp.net",
1985 "9876543210:0@s.whatsapp.net",
1986 "5555555555:0@s.whatsapp.net", ]
1988 .into_iter()
1989 .map(|s| Jid::from_str(s).unwrap())
1990 .collect();
1991
1992 let known_set: HashSet<DeviceKey<'_>> =
1993 known_recipients.iter().map(|j| j.device_key()).collect();
1994
1995 let new_devices: Vec<Jid> = all_devices
1996 .into_iter()
1997 .filter(|device| !known_set.contains(&device.device_key()))
1998 .collect();
1999
2000 assert_eq!(new_devices.len(), 1);
2001 assert_eq!(new_devices[0].user, "5555555555");
2002 }
2003
2004 #[test]
2005 fn test_skdm_recipient_filtering_lid_jids() {
2006 use std::collections::HashSet;
2007
2008 let known_recipients: Vec<Jid> = [
2009 "236395184570386:91@lid",
2010 "129171292463295:0@lid",
2011 "45857667830004:14@lid",
2012 ]
2013 .into_iter()
2014 .map(|s| Jid::from_str(s).unwrap())
2015 .collect();
2016
2017 let all_devices: Vec<Jid> = [
2018 "236395184570386:91@lid",
2019 "129171292463295:0@lid",
2020 "45857667830004:14@lid",
2021 "45857667830004:15@lid", ]
2023 .into_iter()
2024 .map(|s| Jid::from_str(s).unwrap())
2025 .collect();
2026
2027 let known_set: HashSet<DeviceKey<'_>> =
2028 known_recipients.iter().map(|j| j.device_key()).collect();
2029
2030 let new_devices: Vec<Jid> = all_devices
2031 .into_iter()
2032 .filter(|device| !known_set.contains(&device.device_key()))
2033 .collect();
2034
2035 assert_eq!(new_devices.len(), 1);
2036 assert_eq!(new_devices[0].user, "45857667830004");
2037 assert_eq!(new_devices[0].device, 15);
2038 }
2039
2040 #[test]
2041 fn test_skdm_recipient_filtering_all_known() {
2042 use std::collections::HashSet;
2043
2044 let known_recipients: Vec<Jid> =
2045 ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
2046 .into_iter()
2047 .map(|s| Jid::from_str(s).unwrap())
2048 .collect();
2049
2050 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "1234567890:5@s.whatsapp.net"]
2051 .into_iter()
2052 .map(|s| Jid::from_str(s).unwrap())
2053 .collect();
2054
2055 let known_set: HashSet<DeviceKey<'_>> =
2056 known_recipients.iter().map(|j| j.device_key()).collect();
2057
2058 let new_devices: Vec<Jid> = all_devices
2059 .into_iter()
2060 .filter(|device| !known_set.contains(&device.device_key()))
2061 .collect();
2062
2063 assert!(new_devices.is_empty());
2064 }
2065
2066 #[test]
2067 fn test_skdm_recipient_filtering_all_new() {
2068 use std::collections::HashSet;
2069
2070 let known_recipients: Vec<Jid> = vec![];
2071
2072 let all_devices: Vec<Jid> = ["1234567890:0@s.whatsapp.net", "9876543210:0@s.whatsapp.net"]
2073 .into_iter()
2074 .map(|s| Jid::from_str(s).unwrap())
2075 .collect();
2076
2077 let known_set: HashSet<DeviceKey<'_>> =
2078 known_recipients.iter().map(|j| j.device_key()).collect();
2079
2080 let new_devices: Vec<Jid> = all_devices
2081 .clone()
2082 .into_iter()
2083 .filter(|device| !known_set.contains(&device.device_key()))
2084 .collect();
2085
2086 assert_eq!(new_devices.len(), all_devices.len());
2087 }
2088
2089 #[test]
2090 fn test_device_key_comparison() {
2091 let test_cases = [
2094 (
2095 "1234567890:0@s.whatsapp.net",
2096 "1234567890@s.whatsapp.net",
2097 true,
2098 ),
2099 (
2100 "1234567890:5@s.whatsapp.net",
2101 "1234567890:5@s.whatsapp.net",
2102 true,
2103 ),
2104 (
2105 "1234567890:5@s.whatsapp.net",
2106 "1234567890:6@s.whatsapp.net",
2107 false,
2108 ),
2109 ("236395184570386:91@lid", "236395184570386:91@lid", true),
2110 ("236395184570386:0@lid", "236395184570386@lid", true),
2111 ("user1@s.whatsapp.net", "user2@s.whatsapp.net", false),
2112 ];
2113
2114 for (jid1_str, jid2_str, should_match) in test_cases {
2115 let jid1: Jid = jid1_str.parse().expect("should parse jid1");
2116 let jid2: Jid = jid2_str.parse().expect("should parse jid2");
2117
2118 let key1 = jid1.device_key();
2119 let key2 = jid2.device_key();
2120
2121 assert_eq!(
2122 key1 == key2,
2123 should_match,
2124 "DeviceKey comparison failed for '{}' vs '{}': expected match={}, got match={}",
2125 jid1_str,
2126 jid2_str,
2127 should_match,
2128 key1 == key2
2129 );
2130
2131 assert_eq!(
2132 jid1.device_eq(&jid2),
2133 should_match,
2134 "device_eq failed for '{}' vs '{}'",
2135 jid1_str,
2136 jid2_str
2137 );
2138 }
2139 }
2140
2141 #[test]
2142 fn empty_sender_key_device_map_marks_all_devices_for_skdm() {
2143 use crate::sender_key_device_cache::SenderKeyDeviceMap;
2144
2145 let map = SenderKeyDeviceMap::from_db_rows(&[]);
2146 assert_eq!(map.device_has_key("271060335329480", 0), None);
2147 assert!(!map.is_user_forgotten("271060335329480"));
2148
2149 let all_resolved_devices: Vec<Jid> = [
2150 "271060335329480@lid",
2151 "77610646245392@lid",
2152 "276661023027320:5@lid",
2153 ]
2154 .into_iter()
2155 .map(|s| Jid::from_str(s).unwrap())
2156 .collect();
2157
2158 let needs_skdm: Vec<&Jid> = all_resolved_devices
2159 .iter()
2160 .filter(|device| {
2161 !map.device_has_key(&device.user, device.device)
2162 .unwrap_or(false)
2163 || map.is_user_forgotten(&device.user)
2164 })
2165 .collect();
2166
2167 assert_eq!(needs_skdm.len(), all_resolved_devices.len());
2168 }
2169
2170 #[tokio::test]
2172 async fn resolve_skdm_targets_distributes_when_cache_empty_but_devices_known() {
2173 use wacore::client::context::GroupInfo;
2174 use wacore::store::traits::{DeviceInfo, DeviceListRecord};
2175 use wacore::types::message::AddressingMode;
2176
2177 let client = crate::test_utils::create_test_client().await;
2178 let group_jid = "120363161500776365@g.us";
2179 let own_lid = Jid::from_str("193832511623409:13@lid").unwrap();
2180
2181 let participant_users = ["271060335329480", "77610646245392", "276661023027320"];
2182
2183 for user in &participant_users {
2185 let record = DeviceListRecord {
2186 user: (*user).into(),
2187 devices: vec![DeviceInfo {
2188 device_id: 0,
2189 key_index: None,
2190 }],
2191 timestamp: wacore::time::now_secs(),
2192 phash: None,
2193 raw_id: None,
2194 };
2195 client
2196 .device_registry_cache
2197 .insert((*user).into(), record)
2198 .await;
2199 }
2200
2201 let participants: Vec<Jid> = participant_users
2202 .iter()
2203 .map(|u| Jid::from_str(&format!("{u}@lid")).unwrap())
2204 .collect();
2205
2206 let group_info = GroupInfo::new(participants.clone(), AddressingMode::Lid);
2207
2208 let result = client
2209 .resolve_skdm_targets(group_jid, &group_info, &own_lid)
2210 .await
2211 .expect("None means the empty-cache early-exit is back");
2212
2213 assert_eq!(result.len(), participants.len());
2214 for user in &participant_users {
2215 assert!(result.iter().any(|j| j.user == *user));
2216 }
2217 }
2218
2219 #[test]
2220 fn single_forgotten_row_keeps_full_distribution() {
2221 use crate::sender_key_device_cache::SenderKeyDeviceMap;
2222
2223 let map = SenderKeyDeviceMap::from_db_rows(&[("271060335329480@lid".to_string(), false)]);
2224 assert_eq!(map.device_has_key("271060335329480", 0), Some(false));
2225 assert!(map.is_user_forgotten("271060335329480"));
2226
2227 let all_resolved_devices: Vec<Jid> = [
2228 "271060335329480@lid",
2229 "77610646245392@lid",
2230 "276661023027320:5@lid",
2231 ]
2232 .into_iter()
2233 .map(|s| Jid::from_str(s).unwrap())
2234 .collect();
2235
2236 let needs_skdm: Vec<&Jid> = all_resolved_devices
2237 .iter()
2238 .filter(|device| {
2239 !map.device_has_key(&device.user, device.device)
2240 .unwrap_or(false)
2241 || map.is_user_forgotten(&device.user)
2242 })
2243 .collect();
2244
2245 assert_eq!(
2246 needs_skdm.len(),
2247 3,
2248 "after retry inserts one row, ALL devices correctly flagged for SKDM \
2249 (this is what unblocks redistribution on the SECOND message)"
2250 );
2251 }
2252
2253 #[test]
2254 fn test_skdm_filtering_large_group() {
2255 use std::collections::HashSet;
2256
2257 let mut known_recipients: Vec<Jid> = Vec::with_capacity(1000);
2258 let mut all_devices: Vec<Jid> = Vec::with_capacity(1010);
2259
2260 for i in 0..1000i64 {
2261 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
2262 let jid = Jid::from_str(&jid_str).unwrap();
2263 known_recipients.push(jid.clone());
2264 all_devices.push(jid);
2265 }
2266
2267 for i in 1000i64..1010i64 {
2268 let jid_str = format!("{}:1@lid", 100000000000000i64 + i);
2269 all_devices.push(Jid::from_str(&jid_str).unwrap());
2270 }
2271
2272 let known_set: HashSet<DeviceKey<'_>> =
2273 known_recipients.iter().map(|j| j.device_key()).collect();
2274
2275 let new_devices: Vec<Jid> = all_devices
2276 .into_iter()
2277 .filter(|device| !known_set.contains(&device.device_key()))
2278 .collect();
2279
2280 assert_eq!(new_devices.len(), 10);
2281 }
2282
2283 mod infer_stanza {
2284 use super::*;
2285
2286 #[test]
2287 fn regular_message_returns_none() {
2288 let msg = wa::Message {
2289 conversation: Some("hello".into()),
2290 ..Default::default()
2291 };
2292 let (edit, node) = infer_stanza_metadata(&msg);
2293 assert!(edit.is_none());
2294 assert!(node.is_none());
2295 }
2296
2297 #[test]
2298 fn pin_returns_edit_attribute() {
2299 let msg = wa::Message {
2300 pin_in_chat_message: Some(wa::message::PinInChatMessage::default()),
2301 ..Default::default()
2302 };
2303 let (edit, node) = infer_stanza_metadata(&msg);
2304 assert_eq!(edit, Some(EditAttribute::PinInChat));
2305 assert!(node.is_none());
2306 }
2307
2308 #[test]
2309 fn poll_creation_v3_returns_meta_node() {
2310 let msg = wa::Message {
2311 poll_creation_message_v3: Some(Box::default()),
2312 ..Default::default()
2313 };
2314 let (edit, node) = infer_stanza_metadata(&msg);
2315 assert!(edit.is_none());
2316 let node = node.expect("should have meta node");
2317 assert_eq!(node.tag, "meta");
2318 let mut attrs = node.attrs();
2319 assert_eq!(
2320 attrs.optional_string("polltype").unwrap().as_ref(),
2321 "creation"
2322 );
2323 }
2324
2325 #[test]
2326 fn event_returns_meta_node() {
2327 let msg = wa::Message {
2328 event_message: Some(Box::default()),
2329 ..Default::default()
2330 };
2331 let (edit, node) = infer_stanza_metadata(&msg);
2332 assert!(edit.is_none());
2333 let node = node.expect("should have meta node");
2334 assert_eq!(node.tag, "meta");
2335 let mut attrs = node.attrs();
2336 assert_eq!(
2337 attrs.optional_string("event_type").unwrap().as_ref(),
2338 "creation"
2339 );
2340 }
2341
2342 #[test]
2343 fn empty_message_returns_none() {
2344 let (edit, node) = infer_stanza_metadata(&wa::Message::default());
2345 assert!(edit.is_none());
2346 assert!(node.is_none());
2347 }
2348
2349 #[test]
2350 fn poll_creation_v1_returns_meta_node() {
2351 let msg = wa::Message {
2352 poll_creation_message: Some(Box::default()),
2353 ..Default::default()
2354 };
2355 let (edit, node) = infer_stanza_metadata(&msg);
2356 assert!(edit.is_none());
2357 let node = node.expect("should have meta node");
2358 assert_eq!(node.tag, "meta");
2359 let mut attrs = node.attrs();
2360 assert_eq!(
2361 attrs.optional_string("polltype").unwrap().as_ref(),
2362 "creation"
2363 );
2364 }
2365
2366 #[test]
2367 fn poll_creation_v2_returns_meta_node() {
2368 let msg = wa::Message {
2369 poll_creation_message_v2: Some(Box::default()),
2370 ..Default::default()
2371 };
2372 let (edit, node) = infer_stanza_metadata(&msg);
2373 assert!(edit.is_none());
2374 let node = node.expect("should have meta node");
2375 assert_eq!(node.tag, "meta");
2376 let mut attrs = node.attrs();
2377 assert_eq!(
2378 attrs.optional_string("polltype").unwrap().as_ref(),
2379 "creation"
2380 );
2381 }
2382
2383 #[test]
2384 fn poll_vote_returns_meta_node() {
2385 let msg = wa::Message {
2386 poll_update_message: Some(wa::message::PollUpdateMessage {
2387 vote: Some(wa::message::PollEncValue::default()),
2388 ..Default::default()
2389 }),
2390 ..Default::default()
2391 };
2392 let (edit, node) = infer_stanza_metadata(&msg);
2393 assert!(edit.is_none());
2394 let node = node.expect("should have meta node");
2395 assert_eq!(node.tag, "meta");
2396 let mut attrs = node.attrs();
2397 assert_eq!(attrs.optional_string("polltype").unwrap().as_ref(), "vote");
2398 }
2399
2400 #[test]
2401 fn event_response_returns_meta_node() {
2402 let msg = wa::Message {
2403 enc_event_response_message: Some(Default::default()),
2404 ..Default::default()
2405 };
2406 let (edit, node) = infer_stanza_metadata(&msg);
2407 assert!(edit.is_none());
2408 let node = node.expect("should have meta node");
2409 assert_eq!(node.tag, "meta");
2410 let mut attrs = node.attrs();
2411 assert_eq!(
2412 attrs.optional_string("event_type").unwrap().as_ref(),
2413 "response"
2414 );
2415 }
2416
2417 #[test]
2418 fn poll_update_without_vote_returns_none() {
2419 let msg = wa::Message {
2420 poll_update_message: Some(wa::message::PollUpdateMessage {
2421 vote: None,
2422 ..Default::default()
2423 }),
2424 ..Default::default()
2425 };
2426 let (edit, node) = infer_stanza_metadata(&msg);
2427 assert!(edit.is_none());
2428 assert!(node.is_none());
2429 }
2430 }
2431
2432 mod infer_biz {
2433 use super::*;
2434 use wa::message::interactive_message::{
2435 self, NativeFlowMessage, native_flow_message::NativeFlowButton,
2436 };
2437
2438 fn msg_with_native_flow(button_name: &str) -> wa::Message {
2439 wa::Message {
2440 document_with_caption_message: Some(Box::new(wa::message::FutureProofMessage {
2441 message: Some(Box::new(wa::Message {
2442 interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2443 interactive_message: Some(
2444 interactive_message::InteractiveMessage::NativeFlowMessage(
2445 NativeFlowMessage {
2446 buttons: vec![NativeFlowButton {
2447 name: Some(button_name.to_string()),
2448 button_params_json: None,
2449 }],
2450 message_version: Some(1),
2451 message_params_json: None,
2452 },
2453 ),
2454 ),
2455 ..Default::default()
2456 })),
2457 ..Default::default()
2458 })),
2459 })),
2460 ..Default::default()
2461 }
2462 }
2463
2464 fn assert_biz_node(node: &Node, expected_flow_name: &str) {
2465 assert_eq!(node.tag, "biz");
2466 assert!(
2467 node.attrs().optional_string("native_flow_name").is_none(),
2468 "should NOT use simple attribute form"
2469 );
2470 let interactive = node.get_optional_child("interactive").unwrap();
2471 let mut attrs = interactive.attrs();
2472 assert_eq!(
2473 attrs.optional_string("type").unwrap().as_ref(),
2474 "native_flow"
2475 );
2476 assert_eq!(attrs.optional_string("v").unwrap().as_ref(), "1");
2477 let nf = interactive.get_optional_child("native_flow").unwrap();
2478 let mut nf_attrs = nf.attrs();
2479 assert_eq!(
2480 nf_attrs.optional_string("name").unwrap().as_ref(),
2481 expected_flow_name
2482 );
2483 }
2484
2485 #[test]
2486 fn all_button_types_use_nested_structure() {
2487 for (button, expected_flow) in [
2488 ("cta_url", "cta_url"),
2489 ("payment_info", "payment_info"),
2490 ("review_and_pay", "order_details"),
2491 ("cta_catalog", "cta_catalog"),
2492 ("mpm", "mpm"),
2493 ("quick_reply", "quick_reply"),
2494 ] {
2495 let node = infer_biz_node(&msg_with_native_flow(button))
2496 .unwrap_or_else(|| panic!("{button} should produce biz node"));
2497 assert_biz_node(&node, expected_flow);
2498 }
2499 }
2500
2501 #[test]
2502 fn no_interactive_returns_none() {
2503 let msg = wa::Message {
2504 conversation: Some("hello".into()),
2505 ..Default::default()
2506 };
2507 assert!(infer_biz_node(&msg).is_none());
2508 }
2509
2510 #[test]
2511 fn interactive_without_native_flow_returns_none() {
2512 let msg = wa::Message {
2513 interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2514 interactive_message: Some(
2515 interactive_message::InteractiveMessage::CollectionMessage(
2516 Default::default(),
2517 ),
2518 ),
2519 ..Default::default()
2520 })),
2521 ..Default::default()
2522 };
2523 assert!(infer_biz_node(&msg).is_none());
2524 }
2525
2526 #[test]
2527 fn native_flow_without_buttons_returns_none() {
2528 let msg = wa::Message {
2529 interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2530 interactive_message: Some(
2531 interactive_message::InteractiveMessage::NativeFlowMessage(
2532 NativeFlowMessage {
2533 buttons: vec![],
2534 message_version: Some(1),
2535 message_params_json: None,
2536 },
2537 ),
2538 ),
2539 ..Default::default()
2540 })),
2541 ..Default::default()
2542 };
2543 assert!(infer_biz_node(&msg).is_none());
2544 }
2545
2546 #[test]
2547 fn direct_interactive_message_without_wrapper() {
2548 let msg = wa::Message {
2549 interactive_message: Some(Box::new(wa::message::InteractiveMessage {
2550 interactive_message: Some(
2551 interactive_message::InteractiveMessage::NativeFlowMessage(
2552 NativeFlowMessage {
2553 buttons: vec![NativeFlowButton {
2554 name: Some("cta_url".to_string()),
2555 button_params_json: None,
2556 }],
2557 message_version: Some(1),
2558 message_params_json: None,
2559 },
2560 ),
2561 ),
2562 ..Default::default()
2563 })),
2564 ..Default::default()
2565 };
2566 let node = infer_biz_node(&msg).unwrap();
2567 assert_biz_node(&node, "cta_url");
2568 }
2569 }
2570
2571 mod session_lock_regression {
2573 use super::*;
2574
2575 #[tokio::test]
2576 async fn per_device_lock_keys_cover_all_devices() {
2577 let client = crate::test_utils::create_test_client().await;
2578
2579 let devices: Vec<Jid> = [
2580 "100000012345678@lid",
2581 "100000012345678:5@lid",
2582 "100000012345678:33@lid",
2583 ]
2584 .iter()
2585 .map(|s| Jid::from_str(s).unwrap())
2586 .collect();
2587
2588 let send_lock_keys = client.build_session_lock_keys(&devices).await;
2590
2591 assert_eq!(send_lock_keys.len(), 3);
2592 assert_eq!(send_lock_keys[0].device, 0);
2594 assert_eq!(send_lock_keys[1].device, 5);
2595 assert_eq!(send_lock_keys[2].device, 33);
2596
2597 for device_jid in &devices {
2599 assert!(
2600 send_lock_keys.contains(device_jid),
2601 "device {device_jid} not in send keys: {send_lock_keys:?}"
2602 );
2603 }
2604
2605 let bare_key = devices[0].to_protocol_address_string();
2607 let device5_key = devices[1].to_protocol_address_string();
2608 assert_ne!(bare_key, device5_key);
2609 }
2610
2611 #[tokio::test]
2612 async fn per_device_lock_serializes_concurrent_session_access() {
2613 use std::sync::Arc;
2614 use std::sync::atomic::{AtomicU32, Ordering};
2615
2616 let session_locks: crate::cache::Cache<String, Arc<async_lock::Mutex<()>>> =
2617 crate::cache::Cache::builder().max_capacity(100).build();
2618
2619 let lock_key = "100000012345678:5@lid.0".to_string();
2620 let access_counter = Arc::new(AtomicU32::new(0));
2621 let max_concurrent = Arc::new(AtomicU32::new(0));
2622
2623 let mut handles = Vec::new();
2624 for _ in 0..10 {
2625 let locks = session_locks.clone();
2626 let key = lock_key.clone();
2627 let counter = access_counter.clone();
2628 let max = max_concurrent.clone();
2629
2630 handles.push(tokio::spawn(async move {
2631 let mutex: Arc<async_lock::Mutex<()>> = locks
2632 .get_with_by_ref(&key, async { Arc::new(async_lock::Mutex::new(())) })
2633 .await;
2634 let _guard = mutex.lock_arc().await;
2637
2638 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
2639 max.fetch_max(active, Ordering::SeqCst);
2640 tokio::task::yield_now().await;
2641 counter.fetch_sub(1, Ordering::SeqCst);
2642 }));
2643 }
2644
2645 for handle in handles {
2646 handle.await.unwrap();
2647 }
2648
2649 assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
2650 }
2651
2652 #[tokio::test]
2653 async fn different_device_locks_are_independent() {
2654 use std::sync::Arc;
2655 use std::sync::atomic::{AtomicU32, Ordering};
2656
2657 let session_locks: crate::cache::Cache<String, Arc<async_lock::Mutex<()>>> =
2658 crate::cache::Cache::builder().max_capacity(100).build();
2659
2660 let max_concurrent = Arc::new(AtomicU32::new(0));
2661 let counter = Arc::new(AtomicU32::new(0));
2662 let barrier = Arc::new(tokio::sync::Barrier::new(2));
2663
2664 let keys = ["100000012345678@lid.0", "100000012345678:5@lid.0"];
2665
2666 let mut handles = Vec::new();
2667 for key in keys {
2668 let locks = session_locks.clone();
2669 let key = key.to_string();
2670 let c = counter.clone();
2671 let m = max_concurrent.clone();
2672 let b = barrier.clone();
2673
2674 handles.push(tokio::spawn(async move {
2675 let mutex: Arc<async_lock::Mutex<()>> = locks
2676 .get_with_by_ref(&key, async { Arc::new(async_lock::Mutex::new(())) })
2677 .await;
2678 let _guard = mutex.lock_arc().await;
2680
2681 let active = c.fetch_add(1, Ordering::SeqCst) + 1;
2682 m.fetch_max(active, Ordering::SeqCst);
2683 b.wait().await;
2684 c.fetch_sub(1, Ordering::SeqCst);
2685 }));
2686 }
2687
2688 for handle in handles {
2689 handle.await.unwrap();
2690 }
2691
2692 assert_eq!(max_concurrent.load(Ordering::SeqCst), 2);
2693 }
2694
2695 #[tokio::test]
2699 async fn dm_recipient_uses_bare_address() {
2700 let client = crate::test_utils::create_test_client().await;
2701
2702 let recipient_device33 = Jid::from_str("100000012345678:33@lid").unwrap();
2704 let own_device_5 = Jid::from_str("999999999999:5@s.whatsapp.net").unwrap();
2705
2706 let recipient_bare = client
2708 .resolve_encryption_jid(&recipient_device33)
2709 .await
2710 .to_non_ad();
2711
2712 let all_dm_jids = vec![recipient_bare.clone(), own_device_5.clone()];
2713 let lock_jids = client.build_session_lock_keys(&all_dm_jids).await;
2714
2715 assert_eq!(
2717 recipient_bare.to_protocol_address_string(),
2718 "100000012345678@lid.0"
2719 );
2720 assert!(lock_jids.contains(&recipient_bare));
2721
2722 assert!(lock_jids.contains(&own_device_5));
2724
2725 assert!(
2727 !lock_jids.contains(&recipient_device33),
2728 "recipient must NOT use device-specific address"
2729 );
2730 }
2731
2732 #[test]
2734 fn bare_normalization_deduplicates_recipient_devices() {
2735 let devices: Vec<Jid> = [
2736 "100000012345678@lid",
2737 "100000012345678:5@lid",
2738 "100000012345678:33@lid",
2739 ]
2740 .iter()
2741 .map(|s| Jid::from_str(s).unwrap())
2742 .collect();
2743
2744 let bare: Vec<Jid> = devices.iter().map(|j| j.to_non_ad()).collect();
2746 assert!(bare.windows(2).all(|w| w[0] == w[1]));
2747 assert_eq!(
2748 bare[0].to_protocol_address_string(),
2749 "100000012345678@lid.0"
2750 );
2751 }
2752 }
2753}