use crate::client::Client;
use crate::message::RetryReason;
use crate::types::events::Receipt;
use log::{debug, info, warn};
use prost::Message;
use wacore::types::message::MessageCategory;
use scopeguard;
use std::sync::Arc;
use wacore::iq::prekeys::{OneTimePreKeyNode, SignedPreKeyNode};
use wacore::libsignal::protocol::{
KeyPair, PreKeyBundle, PublicKey, UsePQRatchet, process_prekey_bundle,
};
use wacore::libsignal::store::PreKeyStore;
use wacore::protocol::ProtocolNode;
use wacore::types::jid::JidExt;
use wacore_binary::JidExt as _;
use wacore_binary::builder::NodeBuilder;
use wacore_binary::{Jid, OwnedNodeRef};
#[cfg(test)]
use wacore_binary::{Node, NodeContent};
use wacore_binary::{NodeContentRef, NodeRef};
#[cfg(test)]
fn get_bytes_content(node: &Node) -> Option<&[u8]> {
match &node.content {
Some(NodeContent::Bytes(b)) => Some(b.as_slice()),
_ => None,
}
}
fn get_bytes_content_ref<'a>(node: &'a NodeRef<'_>) -> Option<&'a [u8]> {
match node.content.as_deref() {
Some(NodeContentRef::Bytes(b)) => Some(b.as_ref()),
_ => None,
}
}
#[cfg(test)]
fn extract_registration_id_from_node(node: &Node) -> Option<u32> {
let registration_node = node.get_optional_child("registration")?;
let bytes = get_bytes_content(registration_node)?;
if bytes.len() >= 4 {
Some(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
} else if !bytes.is_empty() {
let mut arr = [0u8; 4];
let start = 4 - bytes.len();
arr[start..].copy_from_slice(bytes);
Some(u32::from_be_bytes(arr))
} else {
None
}
}
fn extract_registration_id_from_node_ref(node: &NodeRef<'_>) -> Option<u32> {
let registration_node = node.get_optional_child("registration")?;
let bytes = get_bytes_content_ref(registration_node)?;
if bytes.len() >= 4 {
Some(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
} else if !bytes.is_empty() {
let mut arr = [0u8; 4];
let start = 4 - bytes.len();
arr[start..].copy_from_slice(bytes);
Some(u32::from_be_bytes(arr))
} else {
None
}
}
const MAX_RETRY_COUNT: u8 = 5;
const MIN_RETRY_FOR_BASE_KEY_CHECK: u8 = 2;
struct RetryChatInfo {
chat: Jid,
requester: Jid,
original_from: Jid,
is_bot: bool,
}
fn resolve_retry_chat_info(
receipt: &Receipt,
node: &NodeRef<'_>,
own_pn: Option<&Jid>,
own_lid: Option<&Jid>,
) -> RetryChatInfo {
let from = &receipt.source.chat;
if from.is_group() || from.is_status_broadcast() {
let requester = node
.attrs()
.optional_jid("participant")
.unwrap_or_else(|| receipt.source.sender.clone());
RetryChatInfo {
chat: from.clone(),
requester,
original_from: from.clone(),
is_bot: false,
}
} else {
let recipient = node.attrs().optional_jid("recipient");
let is_bot = from.is_bot();
let is_peer = own_pn.is_some_and(|pn| from.is_same_user_as(pn))
|| own_lid.is_some_and(|lid| from.is_same_user_as(lid));
let chat = if is_bot && let Some(r) = recipient.as_ref() {
r.to_non_ad()
} else if is_peer {
match recipient.as_ref() {
Some(r) => r.to_non_ad(),
None => {
log::warn!(
"Peer device retry without recipient attr — message lookup may fail"
);
from.to_non_ad()
}
}
} else {
from.to_non_ad()
};
let requester = if from.device() == 0 && from.agent == 0 {
chat.clone()
} else {
from.clone()
};
RetryChatInfo {
chat,
requester,
original_from: from.clone(),
is_bot,
}
}
}
fn build_retry_processing_key(chat: &Jid, message_id: &str, participant_jid: &Jid) -> String {
let mut key = String::with_capacity(message_id.len() + 64);
chat.push_to(&mut key);
key.push(':');
key.push_str(message_id);
key.push(':');
participant_jid.push_to(&mut key);
key
}
impl Client {
pub(crate) async fn handle_retry_receipt(
self: &Arc<Self>,
receipt: &Receipt,
node: &Arc<OwnedNodeRef>,
) -> Result<(), anyhow::Error> {
let nr = node.get();
let retry_child = nr
.get_optional_child("retry")
.ok_or_else(|| anyhow::anyhow!("<retry> child missing from receipt"))?;
let message_id = retry_child
.get_attr("id")
.map(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("<retry> missing 'id' attribute"))?
.into_owned();
let retry_count: u8 = retry_child
.get_attr("count")
.map(|v| v.as_str())
.and_then(|s| s.parse().ok())
.unwrap_or(1);
if retry_count >= MAX_RETRY_COUNT {
warn!(
"Refusing retry #{} for message {} from {}: exceeds max attempts ({})",
retry_count, message_id, receipt.source.sender, MAX_RETRY_COUNT
);
return Ok(());
}
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
let mut info = resolve_retry_chat_info(
receipt,
nr,
device_snapshot.pn.as_ref(),
device_snapshot.lid.as_ref(),
);
let is_group_or_status = info.chat.is_group() || info.chat.is_status_broadcast();
let processing_key = build_retry_processing_key(&info.chat, &message_id, &info.requester);
if !self
.pending_retries
.lock()
.unwrap_or_else(|p| p.into_inner())
.insert(processing_key.clone())
{
log::debug!("Ignoring retry for {processing_key}: a retry is already in progress.");
return Ok(());
}
let pending = Arc::clone(&self.pending_retries);
let _guard = scopeguard::guard((), move |()| {
pending
.lock()
.unwrap_or_else(|p| p.into_inner())
.remove(&processing_key);
});
let (original_msg, alt_chat) = match self.take_recent_message(&info.chat, &message_id).await
{
Some(result) => result,
None => {
log::debug!(
"Ignoring retry for message {message_id}: already handled or not found in cache."
);
return Ok(());
}
};
self.add_recent_message(&info.chat, &message_id, &original_msg)
.await;
let resolved_jid = if let Some(alt_chat) = alt_chat
&& !is_group_or_status
&& !info.is_bot
{
let requester = &info.requester;
info.requester = Jid {
user: alt_chat.user,
server: alt_chat.server,
device: requester.device,
agent: requester.agent,
integrator: requester.integrator,
};
info.requester.clone()
} else {
self.resolve_encryption_jid(&info.requester).await
};
let sender_device_id = info.requester.device() as u32;
if !self
.has_device(&info.requester.user, sender_device_id)
.await
{
warn!(
"handle_retry_receipt: device not found for device={}, user={}",
sender_device_id, info.requester.user
);
return Ok(());
}
let is_peer = device_snapshot
.pn
.as_ref()
.is_some_and(|our_pn| info.requester.is_same_user_as(our_pn))
|| device_snapshot
.lid
.as_ref()
.is_some_and(|our_lid| info.requester.is_same_user_as(our_lid));
let cached_group_info = if info.chat.is_group() {
match self.groups().query_info(&info.chat).await {
Ok(gi) => Some(gi),
Err(e) => {
log::warn!(
"Failed to fetch group info for retry of msg {} in {}: {e}",
message_id,
info.chat
);
None
}
}
} else {
None
};
if is_group_or_status && !info.requester.is_lid() && !info.chat.is_status_broadcast() {
let group_jid = info.chat.to_string();
let is_known_participant = cached_group_info
.as_ref()
.is_some_and(|g| g.participants.iter().any(|p| p.user == info.requester.user));
if !is_known_participant {
log::warn!(
"Unknown device {} in group {} — forcing full sender key rotation \
(matches WA Web's rotateKey behavior)",
info.requester,
group_jid
);
let addressing_mode = cached_group_info.as_ref().map(|g| g.addressing_mode);
let jids_to_delete: Vec<_> = match addressing_mode {
Some(wacore::types::message::AddressingMode::Lid) => {
device_snapshot.lid.as_ref().into_iter().collect()
}
Some(wacore::types::message::AddressingMode::Pn) => {
device_snapshot.pn.as_ref().into_iter().collect()
}
None => device_snapshot
.lid
.as_ref()
.into_iter()
.chain(device_snapshot.pn.as_ref())
.collect(),
};
for own_jid in jids_to_delete {
use wacore::libsignal::store::sender_key_name::SenderKeyName;
let sk_name = SenderKeyName::from_parts(
&group_jid,
own_jid.to_protocol_address().as_str(),
);
self.signal_cache
.delete_sender_key(sk_name.cache_key())
.await;
}
if let Err(e) = self
.persistence_manager
.clear_sender_key_devices(&group_jid)
.await
{
log::warn!("Failed to clear sender key devices for rotation: {}", e);
}
self.sender_key_device_cache.invalidate(&group_jid).await;
}
}
self.update_local_signal_session(
&info,
&resolved_jid,
&message_id,
retry_count,
nr,
is_peer,
)
.await;
if info.chat.is_status_broadcast() {
info!(
"Status broadcast retry for {} — participant marked for fresh SKDM, \
will be included in next status send",
message_id
);
return Ok(());
}
info!(
"Resending message {} to {} (retry #{})",
message_id, info.chat, retry_count
);
if info.chat.is_group() {
self.ensure_e2e_sessions_resolved(std::slice::from_ref(&resolved_jid))
.await?;
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
let addressing_mode = cached_group_info
.as_ref()
.map(|g| g.addressing_mode)
.unwrap_or_default();
let signal_address = resolved_jid.to_protocol_address();
let session_mutex = self.session_lock_for(signal_address.as_str()).await;
let _session_guard = session_mutex.lock().await;
let mut store_adapter = self.signal_adapter().await;
let stanza = wacore::send::prepare_group_retry_stanza(
&mut store_adapter.session_store,
&mut store_adapter.identity_store,
info.chat,
info.requester,
resolved_jid.clone(),
&original_msg,
message_id,
retry_count,
device_snapshot.account.as_ref(),
addressing_mode,
)
.await?;
self.send_node(stanza).await?;
self.flush_signal_cache().await?;
} else {
self.ensure_e2e_sessions_resolved(std::slice::from_ref(&resolved_jid))
.await?;
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
let signal_address = resolved_jid.to_protocol_address();
let session_mutex = self.session_lock_for(signal_address.as_str()).await;
let _session_guard = session_mutex.lock().await;
let mut store_adapter = self.signal_adapter().await;
let stanza = wacore::send::prepare_dm_retry_stanza(
&mut store_adapter.session_store,
&mut store_adapter.identity_store,
info.original_from,
info.requester,
resolved_jid.clone(),
&original_msg,
message_id,
retry_count,
device_snapshot.account.as_ref(),
)
.await?;
self.send_node(stanza).await?;
self.flush_signal_cache().await?;
}
Ok(())
}
async fn update_local_signal_session(
&self,
info: &RetryChatInfo,
resolved_jid: &Jid,
message_id: &str,
retry_count: u8,
node: &NodeRef<'_>,
is_peer: bool,
) {
if info.chat.is_group() || info.chat.is_status_broadcast() {
let group_jid = info.chat.to_string();
match self
.mark_forget_sender_key(&group_jid, std::slice::from_ref(&info.requester))
.await
{
Ok(()) => {
let chat_type = if info.chat.is_status_broadcast() {
"status broadcast"
} else {
"group"
};
info!(
"Marked {} for fresh SKDM in {} {} due to retry receipt",
info.requester, chat_type, group_jid
);
}
Err(e) => log::warn!(
"Failed to mark sender key forget for {} in {}: {}",
info.requester,
group_jid,
e
),
}
}
let key_bundle_result = self
.process_retry_key_bundle(node, resolved_jid, is_peer)
.await;
let key_bundle_processed = key_bundle_result.is_ok();
if !key_bundle_processed {
if let Err(ref e) = key_bundle_result {
log::debug!(
"No key bundle in retry receipt for {}: {}. Checking for reg ID mismatch.",
resolved_jid,
e
);
}
if let Some(received_reg_id) = extract_registration_id_from_node_ref(node) {
let signal_address = resolved_jid.to_protocol_address();
let device_store = self.persistence_manager.get_device_arc().await;
let device_guard = device_store.read().await;
let session = self
.signal_cache
.peek_session(&signal_address, &*device_guard.backend)
.await
.ok()
.flatten();
drop(device_guard);
if let Some(session) = session
&& let Ok(stored_reg_id) = session.remote_registration_id()
&& stored_reg_id != 0
&& stored_reg_id != received_reg_id
{
info!(
"Registration ID mismatch for {} (stored: {}, received: {}). \
Deleting session since no key bundle provided.",
signal_address, stored_reg_id, received_reg_id
);
let lock = self.session_lock_for(signal_address.as_str()).await;
let _guard = lock.lock().await;
self.signal_cache.delete_session(&signal_address).await;
drop(_guard);
self.flush_signal_cache_logged("reg ID mismatch session deletion", None)
.await;
}
}
}
let signal_address = resolved_jid.to_protocol_address();
let device_store = self.persistence_manager.get_device_arc().await;
let device_guard = device_store.read().await;
let session = self
.signal_cache
.peek_session(&signal_address, &*device_guard.backend)
.await
.ok()
.flatten();
let Some(session) = session else {
return;
};
let Ok(current_base_key) = session.alice_base_key() else {
return;
};
let addr_str = signal_address.as_str();
if retry_count == MIN_RETRY_FOR_BASE_KEY_CHECK {
match device_guard
.backend
.save_base_key(addr_str, message_id, current_base_key)
.await
{
Ok(()) => info!(
"Saved base key for {} at retry #{} for collision detection",
signal_address, retry_count
),
Err(e) => warn!("Failed to save base key for {}: {}", signal_address, e),
}
return;
}
if retry_count > MIN_RETRY_FOR_BASE_KEY_CHECK {
match device_guard
.backend
.has_same_base_key(addr_str, message_id, current_base_key)
.await
{
Ok(true) => {
warn!(
"Base key collision detected for {} at retry #{}. \
Session hasn't been regenerated. Forcing fresh session.",
signal_address, retry_count
);
let _ = device_guard
.backend
.delete_base_key(addr_str, message_id)
.await;
drop(device_guard);
let lock = self.session_lock_for(signal_address.as_str()).await;
let _guard = lock.lock().await;
self.signal_cache.delete_session(&signal_address).await;
drop(_guard);
self.flush_signal_cache_logged(
"base key collision — forcing fresh session",
None,
)
.await;
}
Ok(false) => {
info!(
"Base key changed for {} at retry #{} - session regenerated",
signal_address, retry_count
);
let _ = device_guard
.backend
.delete_base_key(addr_str, message_id)
.await;
}
Err(e) => {
warn!("Failed to check base key for {}: {}", signal_address, e);
}
}
}
}
async fn process_retry_key_bundle(
&self,
node: &NodeRef<'_>,
requester_jid: &wacore_binary::Jid,
is_peer: bool,
) -> Result<(), anyhow::Error> {
let keys_node = node
.get_optional_child("keys")
.ok_or_else(|| anyhow::anyhow!("<keys> child missing from retry receipt"))?;
let registration_node = node.get_optional_child("registration");
let registration_id = registration_node
.and_then(get_bytes_content_ref)
.map(|bytes| {
if bytes.len() >= 4 {
u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
} else if !bytes.is_empty() {
let mut arr = [0u8; 4];
let start = 4 - bytes.len();
arr[start..].copy_from_slice(bytes);
u32::from_be_bytes(arr)
} else {
0
}
})
.unwrap_or(0);
if registration_id == 0 {
return Err(anyhow::anyhow!("Invalid registration ID in retry receipt"));
}
let signal_address = requester_jid.to_protocol_address();
{
let device_store = self.persistence_manager.get_device_arc().await;
let device_guard = device_store.read().await;
let session = self
.signal_cache
.peek_session(&signal_address, &*device_guard.backend)
.await
.ok()
.flatten();
drop(device_guard);
if let Some(session) = session {
let existing_reg_id = session.remote_registration_id()?;
if existing_reg_id != 0 && existing_reg_id != registration_id {
if is_peer {
return Err(anyhow::anyhow!(
"Registration ID changed for peer device {} (was {}, now {}). \
This may indicate the device was reinstalled.",
signal_address,
existing_reg_id,
registration_id
));
}
info!(
"Registration ID changed for {} (was {}, now {}). Session will be replaced.",
signal_address, existing_reg_id, registration_id
);
}
}
}
let identity_bytes = keys_node
.get_optional_child("identity")
.and_then(get_bytes_content_ref)
.ok_or_else(|| anyhow::anyhow!("Missing identity key in retry receipt"))?;
let identity_key = PublicKey::from_djb_public_key_bytes(identity_bytes)?;
let prekey_data = if let Some(key_ref) = keys_node.get_optional_child("key") {
let prekey_node = OneTimePreKeyNode::try_from_node_ref(key_ref)?;
let prekey_public = PublicKey::from_djb_public_key_bytes(&prekey_node.public_bytes)?;
Some((prekey_node.id.into(), prekey_public))
} else {
None
};
let skey_ref = keys_node
.get_optional_child("skey")
.ok_or_else(|| anyhow::anyhow!("Missing signed prekey in retry receipt"))?;
let signed_prekey = SignedPreKeyNode::try_from_node_ref(skey_ref)?;
let skey_public = PublicKey::from_djb_public_key_bytes(&signed_prekey.public_bytes)?;
let skey_signature: [u8; 64] = signed_prekey
.signature
.as_slice()
.try_into()
.map_err(|_| anyhow::anyhow!("Invalid signature length"))?;
let bundle = PreKeyBundle::new(
registration_id,
u32::from(requester_jid.device).into(),
prekey_data,
signed_prekey.id.into(),
skey_public,
skey_signature.into(),
identity_key.into(),
)?;
let session_mutex = self.session_lock_for(signal_address.as_str()).await;
let _session_guard = session_mutex.lock().await;
let mut adapter = self.signal_adapter().await;
process_prekey_bundle(
&signal_address,
&mut adapter.session_store,
&mut adapter.identity_store,
&bundle,
&mut rand::make_rng::<rand::rngs::StdRng>(),
UsePQRatchet::No,
)
.await?;
self.flush_signal_cache().await?;
info!(
"Processed key bundle from retry receipt for {}",
signal_address
);
Ok(())
}
pub(crate) async fn send_retry_receipt(
&self,
info: &crate::types::message::MessageInfo,
retry_count: u8,
reason: RetryReason,
) -> Result<(), anyhow::Error> {
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
let we_are_bot = device_snapshot
.pn
.as_ref()
.map(|our_pn| our_pn.is_bot())
.unwrap_or(false);
let sender_is_bot = info.source.sender.is_bot();
if !we_are_bot && sender_is_bot {
log::debug!(
"Skipping retry receipt for message {} from bot {}: bots don't process retries",
info.id,
info.source.sender
);
return Ok(());
}
debug!(
"Sending retry receipt #{} for message {} from {} (reason: {:?})",
retry_count, info.id, info.source.sender, reason
);
let mut retry_builder = NodeBuilder::new("retry")
.attr("v", "1")
.attr("id", info.id.clone())
.attr("t", info.timestamp.timestamp())
.attr("count", retry_count);
if reason != RetryReason::UnknownError {
retry_builder = retry_builder.attr("error", reason as u8);
}
let retry_node = retry_builder.build();
let registration_id_bytes = device_snapshot.registration_id.to_be_bytes().to_vec();
let registration_node = NodeBuilder::new("registration")
.bytes(registration_id_bytes)
.build();
let keys_node = if wacore::protocol::retry::should_include_keys(retry_count, reason) {
let device_store = self.persistence_manager.get_device_arc().await;
let device_guard = device_store.read().await;
let new_prekey_id = (rand::random::<u32>() % 16777215) + 1;
let new_prekey_keypair = KeyPair::generate(&mut rand::make_rng::<rand::rngs::StdRng>());
let new_prekey_record = wacore::libsignal::store::record_helpers::new_pre_key_record(
new_prekey_id,
&new_prekey_keypair,
);
if let Err(e) = device_guard
.store_prekey(new_prekey_id, new_prekey_record, false)
.await
{
warn!("Failed to store new prekey for retry receipt: {e:?}");
}
drop(device_guard);
let identity_key_bytes = device_snapshot
.identity_key
.public_key
.public_key_bytes()
.to_vec();
let prekey_value_bytes = new_prekey_keypair.public_key.serialize().to_vec();
let skey_id = device_snapshot.signed_pre_key_id;
let skey_value_bytes = device_snapshot
.signed_pre_key
.public_key
.serialize()
.to_vec();
let skey_sig_bytes = device_snapshot.signed_pre_key_signature.to_vec();
let device_identity_bytes = device_snapshot
.account
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Missing device account info for retry receipt"))?
.encode_to_vec();
let type_bytes = vec![5u8];
Some(
NodeBuilder::new("keys")
.children([
NodeBuilder::new("type").bytes(type_bytes).build(),
NodeBuilder::new("identity")
.bytes(identity_key_bytes)
.build(),
OneTimePreKeyNode::new(new_prekey_id, prekey_value_bytes).into_node(),
SignedPreKeyNode::new(skey_id, skey_value_bytes, skey_sig_bytes)
.into_node(),
NodeBuilder::new("device-identity")
.bytes(device_identity_bytes)
.build(),
])
.build(),
)
} else {
None
};
let receipt_to = if info.source.is_group {
&info.source.chat
} else {
&info.source.sender
};
let mut builder = NodeBuilder::new("receipt")
.attr("to", receipt_to)
.attr("id", info.id.clone())
.attr("type", "retry");
if info.source.is_group {
builder = builder.attr("participant", &info.source.sender);
}
if !info.source.is_group {
let is_from_own_account = device_snapshot
.pn
.as_ref()
.is_some_and(|pn| info.source.sender.is_same_user_as(pn))
|| device_snapshot
.lid
.as_ref()
.is_some_and(|lid| info.source.sender.is_same_user_as(lid));
if is_from_own_account {
if info.category == MessageCategory::Peer {
builder = builder.attr("category", MessageCategory::Peer.as_str());
} else {
let recipient = info.source.recipient.as_ref().unwrap_or(&info.source.chat);
builder = builder.attr("recipient", recipient);
}
}
}
let receipt_node = if let Some(keys) = keys_node {
builder
.children([retry_node, registration_node, keys])
.build()
} else {
builder.children([retry_node, registration_node]).build()
};
self.send_node(receipt_node).await?;
Ok(())
}
#[allow(dead_code)] pub(crate) async fn send_enc_rekey_retry_receipt(
&self,
stanza_id: &str,
peer_jid: &wacore_binary::Jid,
call_id: &str,
call_creator: &wacore_binary::Jid,
retry_count: u8,
) -> Result<(), anyhow::Error> {
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
let registration_id_bytes = device_snapshot.registration_id.to_be_bytes().to_vec();
let enc_rekey_node = NodeBuilder::new("enc_rekey")
.attr("call-creator", call_creator)
.attr("call-id", call_id)
.attr("count", retry_count)
.build();
let registration_node = NodeBuilder::new("registration")
.bytes(registration_id_bytes)
.build();
let receipt_node = NodeBuilder::new("receipt")
.attr("to", peer_jid)
.attr("id", stanza_id)
.attr("type", "enc_rekey_retry")
.children([enc_rekey_node, registration_node])
.build();
info!(
"Sending enc_rekey_retry receipt for call-id={} to {} (count={})",
call_id, peer_jid, retry_count
);
self.send_node(receipt_node).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::persistence_manager::PersistenceManager;
use crate::test_utils::MockHttpClient;
use std::borrow::Cow;
use std::sync::Arc;
use wacore::types::jid::JidExt as _;
use wacore_binary::{Jid, JidExt};
use waproto::whatsapp as wa;
#[tokio::test]
async fn recent_message_cache_insert_and_take() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let chat: Jid = "120363021033254949@g.us"
.parse()
.expect("test JID should be valid");
let msg_id = "ABC123".to_string();
let msg = wa::Message {
conversation: Some("hello".into()),
..Default::default()
};
client.add_recent_message(&chat, &msg_id, &msg).await;
let taken = client.take_recent_message(&chat, &msg_id).await;
assert!(taken.is_some());
let (msg, alt_chat) = taken.unwrap();
assert!(alt_chat.is_none(), "primary key should match");
assert_eq!(msg.conversation.as_deref(), Some("hello"));
let taken_again = client.take_recent_message(&chat, &msg_id).await;
assert!(taken_again.is_none());
}
#[test]
fn get_bytes_content_extracts_bytes() {
use wacore_binary::{Attrs, Node};
let node = Node {
tag: Cow::Borrowed("test"),
attrs: Attrs::new(),
content: Some(NodeContent::Bytes(vec![1, 2, 3, 4])),
};
assert_eq!(get_bytes_content(&node), Some(&[1, 2, 3, 4][..]));
let node_str = Node {
tag: Cow::Borrowed("test"),
attrs: Attrs::new(),
content: Some(NodeContent::String("hello".into())),
};
assert_eq!(get_bytes_content(&node_str), None);
let node_empty = Node {
tag: Cow::Borrowed("test"),
attrs: Attrs::new(),
content: None,
};
assert_eq!(get_bytes_content(&node_empty), None);
}
#[test]
fn peer_detection_logic() {
let our_jid = Jid::pn("559911112222");
let peer_jid = Jid::pn_device("559911112222", 1);
let other_jid = Jid::pn("559933334444");
assert_eq!(our_jid.user, peer_jid.user);
assert_ne!(our_jid.user, other_jid.user);
}
#[test]
fn retry_receipt_attributes_for_device_sync_vs_peer_vs_group() {
use wacore::types::message::{MessageCategory, MessageInfo, MessageSource};
use wacore_binary::builder::NodeBuilder;
let our_pn = Jid::pn("559999999999");
let our_lid = Jid::lid("100000000000001");
fn build_retry_receipt(
info: &MessageInfo,
our_pn: &Jid,
our_lid: &Jid,
) -> wacore_binary::Node {
let receipt_to = if info.source.is_group {
&info.source.chat
} else {
&info.source.sender
};
let mut builder = NodeBuilder::new("receipt")
.attr("to", receipt_to)
.attr("id", info.id.clone())
.attr("type", "retry");
if info.source.is_group {
builder = builder.attr("participant", &info.source.sender);
}
if !info.source.is_group {
let is_from_own_account = info.source.sender.is_same_user_as(our_pn)
|| info.source.sender.is_same_user_as(our_lid);
if is_from_own_account {
if info.category == MessageCategory::Peer {
builder = builder.attr("category", MessageCategory::Peer.as_str());
} else {
let recipient = info.source.recipient.as_ref().unwrap_or(&info.source.chat);
builder = builder.attr("recipient", recipient);
}
}
}
builder.build()
}
let recipient_lid = Jid::lid("200000000000002");
let device_sync_info = MessageInfo {
id: "DEVICE_SYNC_MSG_001".to_string(),
source: MessageSource {
chat: recipient_lid.clone(),
sender: our_lid.clone(),
is_from_me: true,
is_group: false,
recipient: Some(recipient_lid.clone()),
..Default::default()
},
category: MessageCategory::default(),
..Default::default()
};
let node = build_retry_receipt(&device_sync_info, &our_pn, &our_lid);
assert_eq!(
node.attrs
.get("recipient")
.map(|v| v == "200000000000002@lid"),
Some(true),
"Device sync DM should include recipient"
);
assert!(
node.attrs.get("category").is_none(),
"Device sync DM should NOT have category=peer"
);
assert!(
node.attrs.get("participant").is_none(),
"DM should NOT have participant"
);
let other_pn = Jid::pn("551188888888");
let peer_info = MessageInfo {
id: "PEER123".to_string(),
source: MessageSource {
chat: other_pn.clone(),
sender: our_pn.clone(),
is_from_me: true,
is_group: false,
recipient: None,
..Default::default()
},
category: MessageCategory::Peer,
..Default::default()
};
let node = build_retry_receipt(&peer_info, &our_pn, &our_lid);
assert_eq!(
node.attrs.get("category").map(|v| v == "peer"),
Some(true),
"Peer DM should have category=peer"
);
assert!(
node.attrs.get("recipient").is_none(),
"Peer DM should NOT have recipient"
);
let group_info = MessageInfo {
id: "GROUP123".to_string(),
source: MessageSource {
chat: "123456789@g.us".parse().unwrap(),
sender: our_lid.clone(),
is_from_me: true,
is_group: true,
recipient: None,
..Default::default()
},
category: MessageCategory::default(),
..Default::default()
};
let node = build_retry_receipt(&group_info, &our_pn, &our_lid);
assert!(
node.attrs.get("participant").is_some(),
"Group should have participant"
);
assert!(
node.attrs.get("category").is_none(),
"Group should NOT have category"
);
assert!(
node.attrs.get("recipient").is_none(),
"Group should NOT have recipient"
);
let other_dm_info = MessageInfo {
id: "OTHER123".to_string(),
source: MessageSource {
chat: other_pn.clone(),
sender: other_pn.clone(),
is_from_me: false,
is_group: false,
recipient: None,
..Default::default()
},
category: MessageCategory::default(),
..Default::default()
};
let node = build_retry_receipt(&other_dm_info, &our_pn, &our_lid);
assert!(
node.attrs.get("category").is_none(),
"DM from other should NOT have category"
);
assert!(
node.attrs.get("recipient").is_none(),
"DM from other should NOT have recipient"
);
}
#[test]
fn enc_rekey_retry_receipt_node_structure() {
use wacore_binary::builder::NodeBuilder;
let peer_jid: Jid = "5511999999999@s.whatsapp.net".parse().expect("peer JID");
let call_creator: Jid = "5511888888888@s.whatsapp.net".parse().expect("creator JID");
let call_id = "CALL-ABC-123";
let stanza_id = "3EB0AABBCCDD";
let retry_count: u8 = 2;
let registration_id: u32 = 12345;
let enc_rekey_node = NodeBuilder::new("enc_rekey")
.attr("call-creator", call_creator)
.attr("call-id", call_id)
.attr("count", retry_count)
.build();
let registration_node = NodeBuilder::new("registration")
.bytes(registration_id.to_be_bytes().to_vec())
.build();
let receipt_node = NodeBuilder::new("receipt")
.attr("to", peer_jid)
.attr("id", stanza_id)
.attr("type", "enc_rekey_retry")
.children([enc_rekey_node, registration_node])
.build();
assert_eq!(
receipt_node.attrs().optional_string("type").as_deref(),
Some("enc_rekey_retry"),
"receipt type must be enc_rekey_retry"
);
assert!(
receipt_node
.attrs
.get("to")
.is_some_and(|v| *v == "5511999999999@s.whatsapp.net"),
"receipt 'to' must be peer JID"
);
assert_eq!(
receipt_node.attrs().optional_string("id").as_deref(),
Some("3EB0AABBCCDD")
);
assert!(
receipt_node.get_optional_child("retry").is_none(),
"enc_rekey_retry must NOT contain <retry> child"
);
let enc_rekey = receipt_node
.get_optional_child("enc_rekey")
.expect("<enc_rekey> child must exist");
assert_eq!(
enc_rekey.attrs().optional_string("call-id").as_deref(),
Some("CALL-ABC-123")
);
assert!(
enc_rekey
.attrs
.get("call-creator")
.is_some_and(|v| *v == "5511888888888@s.whatsapp.net"),
"enc_rekey 'call-creator' must be creator JID"
);
assert_eq!(
enc_rekey.attrs().optional_string("count").as_deref(),
Some("2")
);
let registration = receipt_node
.get_optional_child("registration")
.expect("<registration> child must exist");
let reg_bytes = match ®istration.content {
Some(wacore_binary::NodeContent::Bytes(b)) => b.clone(),
_ => panic!("registration must contain bytes"),
};
assert_eq!(
u32::from_be_bytes(reg_bytes.try_into().unwrap()),
12345,
"registration ID must be 4-byte big-endian"
);
}
#[test]
fn prekey_id_parsing() {
let id_bytes = [0x01, 0x02, 0x03];
let prekey_id = u32::from_be_bytes([0, id_bytes[0], id_bytes[1], id_bytes[2]]);
assert_eq!(prekey_id, 0x00010203);
let skey_id_bytes = [0xFF, 0xFE, 0xFD];
let skey_id = u32::from_be_bytes([0, skey_id_bytes[0], skey_id_bytes[1], skey_id_bytes[2]]);
assert_eq!(skey_id, 0x00FFFEFD);
}
#[tokio::test]
async fn base_key_store_operations() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let address = "12345.0:1";
let msg_id = "ABC123";
let base_key = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let result = backend.has_same_base_key(address, msg_id, &base_key).await;
assert!(result.is_ok());
assert!(!result.unwrap());
let save_result = backend.save_base_key(address, msg_id, &base_key).await;
assert!(save_result.is_ok());
let result = backend.has_same_base_key(address, msg_id, &base_key).await;
assert!(result.is_ok());
assert!(result.unwrap());
let different_key = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1];
let result = backend
.has_same_base_key(address, msg_id, &different_key)
.await;
assert!(result.is_ok());
assert!(!result.unwrap());
let delete_result = backend.delete_base_key(address, msg_id).await;
assert!(delete_result.is_ok());
let result = backend.has_same_base_key(address, msg_id, &base_key).await;
assert!(result.is_ok());
assert!(!result.unwrap());
}
#[tokio::test]
async fn base_key_store_upsert() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let address = "12345.0:1";
let msg_id = "MSG001";
let first_key = vec![1, 2, 3];
let second_key = vec![4, 5, 6];
backend
.save_base_key(address, msg_id, &first_key)
.await
.unwrap();
assert!(
backend
.has_same_base_key(address, msg_id, &first_key)
.await
.unwrap()
);
assert!(
!backend
.has_same_base_key(address, msg_id, &second_key)
.await
.unwrap()
);
backend
.save_base_key(address, msg_id, &second_key)
.await
.unwrap();
assert!(
!backend
.has_same_base_key(address, msg_id, &first_key)
.await
.unwrap()
);
assert!(
backend
.has_same_base_key(address, msg_id, &second_key)
.await
.unwrap()
);
}
#[tokio::test]
async fn base_key_store_multiple_messages() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let address = "12345.0:1";
let msg_id_1 = "MSG001";
let msg_id_2 = "MSG002";
let key_1 = vec![1, 2, 3];
let key_2 = vec![4, 5, 6];
backend
.save_base_key(address, msg_id_1, &key_1)
.await
.unwrap();
backend
.save_base_key(address, msg_id_2, &key_2)
.await
.unwrap();
assert!(
backend
.has_same_base_key(address, msg_id_1, &key_1)
.await
.unwrap()
);
assert!(
!backend
.has_same_base_key(address, msg_id_1, &key_2)
.await
.unwrap()
);
assert!(
!backend
.has_same_base_key(address, msg_id_2, &key_1)
.await
.unwrap()
);
assert!(
backend
.has_same_base_key(address, msg_id_2, &key_2)
.await
.unwrap()
);
backend.delete_base_key(address, msg_id_1).await.unwrap();
assert!(
!backend
.has_same_base_key(address, msg_id_1, &key_1)
.await
.unwrap()
);
assert!(
backend
.has_same_base_key(address, msg_id_2, &key_2)
.await
.unwrap()
);
}
fn build_retry_receipt_without_keys() -> Node {
use wacore_binary::builder::NodeBuilder;
NodeBuilder::new("receipt").build()
}
fn build_retry_receipt_with_registration(reg_id: u32) -> Node {
use wacore_binary::builder::NodeBuilder;
NodeBuilder::new("receipt")
.children([NodeBuilder::new("registration")
.bytes(reg_id.to_be_bytes().to_vec())
.build()])
.build()
}
fn dm_retry_info(resolved_jid: &Jid) -> RetryChatInfo {
RetryChatInfo {
chat: resolved_jid.to_non_ad(),
requester: resolved_jid.clone(),
original_from: resolved_jid.clone(),
is_bot: false,
}
}
fn valid_serialized_session(remote_regid: u32, base_key: Vec<u8>) -> Vec<u8> {
use wacore::libsignal::protocol::{SessionRecord, SessionState};
use waproto::whatsapp::SessionStructure;
let state = SessionState::from_session_structure(SessionStructure {
session_version: Some(3),
local_identity_public: None,
remote_identity_public: None,
root_key: None,
previous_counter: Some(0),
sender_chain: None,
receiver_chains: vec![],
pending_pre_key: None,
remote_registration_id: Some(remote_regid),
local_registration_id: Some(0),
alice_base_key: Some(base_key),
needs_refresh: None,
pending_key_exchange: None,
});
SessionRecord::new(state)
.serialize()
.expect("serialize session record")
}
#[tokio::test]
async fn update_local_signal_session_preserves_dm_session_at_retry_1() {
let client =
crate::test_utils::create_test_client_with_failing_http("retry_preserve_retry_1").await;
let user = "100000000000088".to_string();
let resolved_jid = Jid::lid_device(user.clone(), 33);
let backend = client.persistence_manager.backend();
let device_0 = Jid::lid_device(user.clone(), 0).to_protocol_address();
let device_33 = Jid::lid_device(user, 33).to_protocol_address();
let session_bytes_33 = valid_serialized_session(4242, vec![0xAA; 32]);
let session_bytes_0 = valid_serialized_session(4243, vec![0xBB; 32]);
backend
.put_session(device_0.as_str(), &session_bytes_0)
.await
.unwrap();
backend
.put_session(device_33.as_str(), &session_bytes_33)
.await
.unwrap();
let node = build_retry_receipt_without_keys();
let node_ref = node.as_node_ref();
client
.update_local_signal_session(
&dm_retry_info(&resolved_jid),
&resolved_jid,
"MSG-RETRY-1",
1,
&node_ref,
false,
)
.await;
client.flush_signal_cache().await.unwrap();
assert!(
backend
.get_session(device_0.as_str())
.await
.unwrap()
.is_some(),
"non-requesting device session must be preserved"
);
assert!(
backend
.get_session(device_33.as_str())
.await
.unwrap()
.is_some(),
"requesting device session with valid record must be preserved at retry #1"
);
}
#[tokio::test]
async fn update_local_signal_session_deletes_on_regid_mismatch() {
let client =
crate::test_utils::create_test_client_with_failing_http("retry_regid_mismatch").await;
let resolved_jid = Jid::lid_device("100000000000099".to_string(), 17);
let signal_address = resolved_jid.to_protocol_address();
let backend = client.persistence_manager.backend();
let stored_regid = 4242u32;
let session_bytes = valid_serialized_session(stored_regid, vec![0xAA; 32]);
backend
.put_session(signal_address.as_str(), &session_bytes)
.await
.unwrap();
let received_regid = 0xDEAD_BEEFu32;
assert_ne!(stored_regid, received_regid);
let node = build_retry_receipt_with_registration(received_regid);
let node_ref = node.as_node_ref();
client
.update_local_signal_session(
&dm_retry_info(&resolved_jid),
&resolved_jid,
"MSG-REGID",
1,
&node_ref,
false,
)
.await;
client.flush_signal_cache().await.unwrap();
assert!(
backend
.get_session(signal_address.as_str())
.await
.unwrap()
.is_none(),
"session must be deleted when retry has no keys and reg IDs differ"
);
}
#[tokio::test]
async fn update_local_signal_session_handles_unparseable_session_gracefully() {
let client =
crate::test_utils::create_test_client_with_failing_http("retry_unparseable_session")
.await;
let resolved_jid = Jid::lid_device("100000000000099".to_string(), 17);
let signal_address = resolved_jid.to_protocol_address();
let backend = client.persistence_manager.backend();
backend
.put_session(signal_address.as_str(), b"invalid-session")
.await
.unwrap();
let node = build_retry_receipt_with_registration(0xDEAD_BEEF);
let node_ref = node.as_node_ref();
client
.update_local_signal_session(
&dm_retry_info(&resolved_jid),
&resolved_jid,
"MSG-REGID",
1,
&node_ref,
false,
)
.await;
client.flush_signal_cache().await.unwrap();
assert!(
backend
.get_session(signal_address.as_str())
.await
.unwrap()
.is_some(),
"unparseable bytes skip every branch; nothing should delete them"
);
}
#[tokio::test]
async fn update_local_signal_session_no_session_is_noop() {
let client =
crate::test_utils::create_test_client_with_failing_http("retry_no_session").await;
let resolved_jid = Jid::lid_device("100000000000199".to_string(), 42);
let node = build_retry_receipt_without_keys();
let node_ref = node.as_node_ref();
client
.update_local_signal_session(
&dm_retry_info(&resolved_jid),
&resolved_jid,
"MSG-NOSESS",
1,
&node_ref,
false,
)
.await;
}
#[tokio::test]
async fn update_local_signal_session_preserves_group_session_at_retry_1() {
let client =
crate::test_utils::create_test_client_with_failing_http("retry_group_preserve").await;
let resolved_jid = Jid::lid_device("100000000000088".to_string(), 33);
let signal_address = resolved_jid.to_protocol_address();
let backend = client.persistence_manager.backend();
let session_bytes = valid_serialized_session(9999, vec![0xCC; 32]);
backend
.put_session(signal_address.as_str(), &session_bytes)
.await
.unwrap();
let group_chat: Jid = "120363042537531116@g.us".parse().unwrap();
let info = RetryChatInfo {
chat: group_chat.clone(),
requester: resolved_jid.clone(),
original_from: group_chat,
is_bot: false,
};
let node = build_retry_receipt_without_keys();
let node_ref = node.as_node_ref();
client
.update_local_signal_session(&info, &resolved_jid, "MSG-GRP-1", 1, &node_ref, false)
.await;
client.flush_signal_cache().await.unwrap();
assert!(
backend
.get_session(signal_address.as_str())
.await
.unwrap()
.is_some(),
"group retry at #1 should not delete the session"
);
}
#[tokio::test]
async fn ensure_e2e_sessions_resolved_is_noop_when_session_exists() {
use std::sync::atomic::Ordering;
let client = crate::test_utils::create_test_client_with_failing_http(
"group_retry_ensure_sessions_noop",
)
.await;
client.offline_sync_completed.store(true, Ordering::Relaxed);
let resolved_jid = Jid::lid_device("100000000000199".to_string(), 17);
let signal_address = resolved_jid.to_protocol_address();
let session_bytes = valid_serialized_session(5555, vec![0xDD; 32]);
client
.persistence_manager
.backend()
.put_session(signal_address.as_str(), &session_bytes)
.await
.unwrap();
client
.ensure_e2e_sessions_resolved(std::slice::from_ref(&resolved_jid))
.await
.expect("no-op when session exists");
}
#[test]
fn bot_jid_detection() {
use wacore_binary::JidExt as _;
let regular_user: Jid = "1234567890@s.whatsapp.net".parse().unwrap();
assert!(!regular_user.is_bot());
let bot_server: Jid = "somebot@bot".parse().unwrap();
assert!(bot_server.is_bot());
let legacy_bot: Jid = "1313555123456@s.whatsapp.net".parse().unwrap();
assert!(legacy_bot.is_bot());
let legacy_bot2: Jid = "131655500123456@s.whatsapp.net".parse().unwrap();
assert!(legacy_bot2.is_bot());
let not_bot: Jid = "1313556123456@s.whatsapp.net".parse().unwrap();
assert!(!not_bot.is_bot());
}
#[test]
fn extract_registration_id_from_node_test() {
use wacore_binary::{Attrs, Node};
let reg_bytes = vec![0x00, 0x01, 0x02, 0x03]; let reg_node = Node {
tag: Cow::Borrowed("registration"),
attrs: Attrs::new(),
content: Some(NodeContent::Bytes(reg_bytes)),
};
let parent = Node {
tag: Cow::Borrowed("receipt"),
attrs: Attrs::new(),
content: Some(NodeContent::Nodes(vec![reg_node])),
};
assert_eq!(extract_registration_id_from_node(&parent), Some(0x00010203));
let reg_bytes_short = vec![0x01, 0x02, 0x03]; let reg_node_short = Node {
tag: Cow::Borrowed("registration"),
attrs: Attrs::new(),
content: Some(NodeContent::Bytes(reg_bytes_short)),
};
let parent_short = Node {
tag: Cow::Borrowed("receipt"),
attrs: Attrs::new(),
content: Some(NodeContent::Nodes(vec![reg_node_short])),
};
assert_eq!(
extract_registration_id_from_node(&parent_short),
Some(0x00010203)
);
let parent_no_reg = Node {
tag: Cow::Borrowed("receipt"),
attrs: Attrs::new(),
content: Some(NodeContent::Nodes(vec![])),
};
assert_eq!(extract_registration_id_from_node(&parent_no_reg), None);
let reg_node_empty = Node {
tag: Cow::Borrowed("registration"),
attrs: Attrs::new(),
content: Some(NodeContent::Bytes(vec![])),
};
let parent_empty = Node {
tag: Cow::Borrowed("receipt"),
attrs: Attrs::new(),
content: Some(NodeContent::Nodes(vec![reg_node_empty])),
};
assert_eq!(extract_registration_id_from_node(&parent_empty), None);
}
#[test]
fn group_or_status_detection_for_sender_key_handling() {
use wacore_binary::JidExt as _;
let group: Jid = "120363021033254949@g.us".parse().unwrap();
let status: Jid = "status@broadcast".parse().unwrap();
let dm: Jid = "1234567890@s.whatsapp.net".parse().unwrap();
assert!(group.is_group() || group.is_status_broadcast());
assert!(status.is_group() || status.is_status_broadcast());
assert!(!(dm.is_group() || dm.is_status_broadcast()));
}
#[test]
fn keys_inclusion_optimization_for_no_session_errors() {
use crate::message::RetryReason;
let test_cases = [
(
1,
RetryReason::NoSession,
true,
"NoSession at retry#1 should include keys (optimization)",
),
(
2,
RetryReason::NoSession,
true,
"NoSession at retry#2 should include keys",
),
(
3,
RetryReason::NoSession,
true,
"NoSession at retry#3 should include keys",
),
(
1,
RetryReason::InvalidMessage,
false,
"InvalidMessage at retry#1 should NOT include keys",
),
(
2,
RetryReason::InvalidMessage,
true,
"InvalidMessage at retry#2 should include keys",
),
(
3,
RetryReason::InvalidMessage,
true,
"InvalidMessage at retry#3 should include keys",
),
(
1,
RetryReason::BadMac,
false,
"BadMac at retry#1 should NOT include keys",
),
(
2,
RetryReason::BadMac,
true,
"BadMac at retry#2 should include keys",
),
(
1,
RetryReason::UnknownError,
false,
"UnknownError at retry#1 should NOT include keys",
),
(
2,
RetryReason::UnknownError,
true,
"UnknownError at retry#2 should include keys",
),
];
for (retry_count, reason, should_include_keys, description) in test_cases {
let would_include_keys =
wacore::protocol::retry::should_include_keys(retry_count, reason);
assert_eq!(
would_include_keys, should_include_keys,
"Failed: {description}. retry_count={retry_count}, reason={reason:?}"
);
}
}
#[tokio::test]
async fn concurrent_offline_messages_retry_key_optimization() {
use crate::message::RetryReason;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Barrier;
let _ = env_logger::builder().is_test(true).try_init();
let num_messages = 50;
let barrier = Arc::new(Barrier::new(num_messages));
let keys_included_count = Arc::new(AtomicUsize::new(0));
let no_keys_count = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for i in 0..num_messages {
let barrier = barrier.clone();
let keys_included = keys_included_count.clone();
let no_keys = no_keys_count.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
let retry_count = 1; let reason = if i % 5 == 0 {
RetryReason::InvalidMessage
} else {
RetryReason::NoSession
};
let would_include_keys =
wacore::protocol::retry::should_include_keys(retry_count, reason);
if would_include_keys {
keys_included.fetch_add(1, Ordering::SeqCst);
} else {
no_keys.fetch_add(1, Ordering::SeqCst);
}
}));
}
for handle in handles {
handle.await.expect("task should complete");
}
let total_keys_included = keys_included_count.load(Ordering::SeqCst);
let total_no_keys = no_keys_count.load(Ordering::SeqCst);
assert_eq!(
total_keys_included, 40,
"Expected 40 messages to include keys (NoSession), got {total_keys_included}"
);
assert_eq!(
total_no_keys, 10,
"Expected 10 messages to NOT include keys (InvalidMessage), got {total_no_keys}"
);
let optimization_benefit = (total_keys_included as f64 / num_messages as f64) * 100.0;
assert!(
optimization_benefit >= 80.0,
"Optimization should benefit at least 80% of NoSession messages, got {optimization_benefit:.1}%"
);
}
#[test]
fn retry_optimization_with_removed_device_scenario() {
use crate::message::RetryReason;
let retry_count = 1;
let reason = RetryReason::NoSession;
let would_include_keys = wacore::protocol::retry::should_include_keys(retry_count, reason);
assert!(
would_include_keys,
"NoSession should include keys on retry#1 to give sender best chance to respond"
);
}
fn make_test_receipt(from: &str) -> Receipt {
Receipt {
source: crate::types::message::MessageSource {
chat: from.parse().unwrap(),
sender: from.parse().unwrap(),
..Default::default()
},
message_ids: vec!["MSG001".to_string()],
timestamp: wacore::time::now_utc(),
r#type: crate::types::presence::ReceiptType::Retry,
}
}
#[test]
fn resolve_retry_chat_info_dm_with_device() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt").build();
let receipt = make_test_receipt("5511999999999:33@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert_eq!(info.chat.device(), 0);
assert_eq!(info.chat.user, "5511999999999");
assert!(info.chat.is_pn());
assert_eq!(info.requester.device(), 33);
assert_eq!(info.requester.user, "5511999999999");
}
#[test]
fn resolve_retry_chat_info_lid_dm_with_device() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt").build();
let receipt = make_test_receipt("236395184570386:5@lid");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert_eq!(info.chat.device(), 0);
assert_eq!(info.chat.user, "236395184570386");
assert!(info.chat.is_lid());
assert_eq!(info.requester.device(), 5);
assert_eq!(info.requester.user, "236395184570386");
assert!(info.requester.is_lid());
}
#[test]
fn resolve_retry_chat_info_dm_bare() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt").build();
let receipt = make_test_receipt("5511999999999@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert_eq!(info.chat.device(), 0);
assert_eq!(info.requester.device(), 0);
assert_eq!(info.chat, info.requester);
}
#[test]
fn resolve_retry_chat_info_group() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt")
.attr("from", "120363021033254949@g.us")
.attr("id", "MSG001")
.attr("participant", "236395184570386:33@lid")
.attr("type", "retry")
.build();
let receipt = Receipt {
source: crate::types::message::MessageSource {
chat: "120363021033254949@g.us".parse().unwrap(),
sender: "236395184570386:33@lid".parse().unwrap(),
..Default::default()
},
message_ids: vec!["MSG001".to_string()],
timestamp: wacore::time::now_utc(),
r#type: crate::types::presence::ReceiptType::Retry,
};
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert!(info.chat.is_group());
assert_eq!(info.chat.user, "120363021033254949");
assert!(info.requester.is_lid());
assert_eq!(info.requester.device(), 33);
}
#[test]
fn resolve_retry_chat_info_status_broadcast() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt")
.attr("from", "status@broadcast")
.attr("id", "3EB06D00CAB92340790621")
.attr("participant", "236395184570386@lid")
.attr("type", "retry")
.build();
let receipt = make_test_receipt("status@broadcast");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert!(info.chat.is_status_broadcast());
assert!(info.requester.is_lid());
assert_eq!(info.requester.user, "236395184570386");
}
#[test]
fn resolve_retry_chat_info_status_broadcast_no_participant() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt")
.attr("from", "status@broadcast")
.attr("id", "MSG001")
.attr("type", "retry")
.build();
let receipt = make_test_receipt("status@broadcast");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert!(info.chat.is_status_broadcast());
assert!(info.requester.is_status_broadcast());
}
#[test]
fn retry_processing_key_per_participant() {
let msg_id = "3EB06D00CAB92340790621";
let status_chat = Jid::status_broadcast();
let status_participant_a: Jid = "236395184570386@lid".parse().unwrap();
let status_participant_b: Jid = "559985213786@s.whatsapp.net".parse().unwrap();
let status_key_a = build_retry_processing_key(&status_chat, msg_id, &status_participant_a);
let status_key_b = build_retry_processing_key(&status_chat, msg_id, &status_participant_b);
assert_ne!(
status_key_a, status_key_b,
"Different status participants must have different processing keys"
);
assert_eq!(
status_key_a,
build_retry_processing_key(&status_chat, msg_id, &status_participant_a),
"Same participant must produce the same key — any retry count for that \
participant serializes through pending_retries"
);
let dm_chat = Jid::pn("559911112222");
let dm_device_a = Jid::pn_device("559922223333", 1);
let dm_device_b = Jid::pn_device("559922223333", 2);
let dm_key_a = build_retry_processing_key(&dm_chat, msg_id, &dm_device_a);
let dm_key_b = build_retry_processing_key(&dm_chat, msg_id, &dm_device_b);
assert_ne!(
dm_key_a, dm_key_b,
"Different DM requester devices must have different processing keys"
);
assert_eq!(
dm_key_a,
build_retry_processing_key(&dm_chat, msg_id, &dm_device_a),
"Same DM requester device must produce the same processing key"
);
}
#[tokio::test]
async fn recent_message_cache_readd_after_take() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let msg = wa::Message {
extended_text_message: Some(Box::new(wa::message::ExtendedTextMessage {
text: Some("status text".to_string()),
..Default::default()
})),
..Default::default()
};
for (chat, msg_id) in [
(Jid::status_broadcast(), "STATUS_MSG_001".to_string()),
(Jid::pn("559911112222"), "DM_MSG_001".to_string()),
] {
client.add_recent_message(&chat, &msg_id, &msg).await;
let taken = client.take_recent_message(&chat, &msg_id).await;
assert!(taken.is_some(), "First take should succeed for {chat}");
let (taken_msg, _) = taken.unwrap();
client.add_recent_message(&chat, &msg_id, &taken_msg).await;
let taken2 = client.take_recent_message(&chat, &msg_id).await;
assert!(
taken2.is_some(),
"Second take should succeed after re-add for {chat}"
);
assert_eq!(
taken2
.unwrap()
.0
.extended_text_message
.as_ref()
.unwrap()
.text
.as_deref(),
Some("status text")
);
}
}
#[tokio::test]
async fn dm_retry_message_lookup_uses_bare_jid() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let bare_jid: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let msg_id = "RETRY_MSG_001";
let msg = wa::Message {
conversation: Some("test dm".into()),
..Default::default()
};
client.add_recent_message(&bare_jid, msg_id, &msg).await;
let taken = client.take_recent_message(&bare_jid, msg_id).await;
assert!(taken.is_some(), "Lookup via bare JID should succeed");
let (msg_out, alt_chat) = taken.unwrap();
assert!(alt_chat.is_none(), "primary key should match for bare JID");
client.add_recent_message(&bare_jid, msg_id, &msg_out).await;
let taken2 = client.take_recent_message(&bare_jid, msg_id).await;
assert!(
taken2.is_some(),
"Second lookup via bare JID should succeed after re-add"
);
}
#[tokio::test]
async fn alternate_key_lookup_pn_to_lid() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let pn_jid: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let lid_jid: Jid = "236395184570386@lid".parse().unwrap();
let msg_id = "RETRY_ALT_001";
let msg = wa::Message {
conversation: Some("alternate key test".into()),
..Default::default()
};
client.add_recent_message(&pn_jid, msg_id, &msg).await;
client
.lid_pn_cache
.add(&wacore::types::lid_pn::LidPnEntry {
lid: lid_jid.user.to_string(),
phone_number: pn_jid.user.to_string(),
created_at: 0,
learning_source: wacore::types::lid_pn::LearningSource::Usync,
})
.await;
let taken = client.take_recent_message(&lid_jid, msg_id).await;
assert!(
taken.is_some(),
"Alternate PN key lookup should find message stored under PN"
);
let (msg_out, alt_chat) = taken.unwrap();
let alt_chat = alt_chat.expect("should be found via alternate key");
assert!(alt_chat.is_pn(), "alternate chat should be PN");
assert_eq!(alt_chat.user, pn_jid.user);
assert_eq!(msg_out.conversation.as_deref(), Some("alternate key test"));
}
#[tokio::test]
async fn swap_pn_lid_namespace_preserves_device() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let (client, _sync_rx) = Client::new(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
)
.await;
let pn_jid: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let lid_jid: Jid = "236395184570386@lid".parse().unwrap();
client
.lid_pn_cache
.add(&wacore::types::lid_pn::LidPnEntry {
lid: lid_jid.user.to_string(),
phone_number: pn_jid.user.to_string(),
created_at: 0,
learning_source: wacore::types::lid_pn::LearningSource::Usync,
})
.await;
let lid_with_device: Jid = "236395184570386:5@lid".parse().unwrap();
let swapped = client.swap_pn_lid_namespace(&lid_with_device).await;
let swapped = swapped.expect("should resolve LID→PN");
assert!(swapped.is_pn());
assert_eq!(swapped.user, "5511999999999");
assert_eq!(swapped.device(), 5);
let pn_with_device: Jid = "5511999999999:3@s.whatsapp.net".parse().unwrap();
let swapped = client.swap_pn_lid_namespace(&pn_with_device).await;
let swapped = swapped.expect("should resolve PN→LID");
assert!(swapped.is_lid());
assert_eq!(swapped.user, "236395184570386");
assert_eq!(swapped.device(), 3);
let group: Jid = "120363021033254949@g.us".parse().unwrap();
assert!(client.swap_pn_lid_namespace(&group).await.is_none());
}
#[tokio::test]
async fn alternate_key_lookup_pn_input_server_changed() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let pn_jid: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let lid_jid: Jid = "236395184570386@lid".parse().unwrap();
let msg_id = "RETRY_ALT_PN";
let msg = wa::Message {
conversation: Some("pn input alternate".into()),
..Default::default()
};
client.add_recent_message(&pn_jid, msg_id, &msg).await;
client
.lid_pn_cache
.add(&wacore::types::lid_pn::LidPnEntry {
lid: lid_jid.user.to_string(),
phone_number: pn_jid.user.to_string(),
created_at: 0,
learning_source: wacore::types::lid_pn::LearningSource::Usync,
})
.await;
let taken = client.take_recent_message(&pn_jid, msg_id).await;
assert!(
taken.is_some(),
"Should find message via server-changed path"
);
let (msg_out, alt_chat) = taken.unwrap();
let alt_chat = alt_chat.expect("should be alternate hit");
assert!(
alt_chat.is_pn(),
"alternate chat should be PN (the original input)"
);
assert_eq!(alt_chat.user, pn_jid.user);
assert_eq!(msg_out.conversation.as_deref(), Some("pn input alternate"));
}
#[tokio::test]
async fn no_alternate_without_mapping() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let lid_jid: Jid = "236395184570386@lid".parse().unwrap();
let msg_id = "RETRY_NO_ALT";
let msg = wa::Message {
conversation: Some("no alternate".into()),
..Default::default()
};
client.add_recent_message(&lid_jid, msg_id, &msg).await;
let taken = client.take_recent_message(&lid_jid, msg_id).await;
assert!(taken.is_some());
let (_, alt_chat) = taken.unwrap();
assert!(alt_chat.is_none(), "primary hit should have no alt_chat");
let missing = client.take_recent_message(&lid_jid, "NONEXISTENT").await;
assert!(missing.is_none(), "non-existent message should return None");
}
#[tokio::test]
async fn alternate_key_both_miss() {
let _ = env_logger::builder().is_test(true).try_init();
let backend = crate::test_utils::create_test_backend().await;
let pm = Arc::new(
PersistenceManager::new(backend)
.await
.expect("persistence manager should initialize"),
);
let mut config = crate::cache_config::CacheConfig::default();
config.recent_messages.capacity = 1_000;
let (client, _sync_rx) = Client::new_with_cache_config(
Arc::new(crate::runtime_impl::TokioRuntime),
pm.clone(),
Arc::new(crate::transport::mock::MockTransportFactory::new()),
Arc::new(MockHttpClient),
None,
config,
)
.await;
let pn_jid: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let lid_jid: Jid = "236395184570386@lid".parse().unwrap();
client
.lid_pn_cache
.add(&wacore::types::lid_pn::LidPnEntry {
lid: lid_jid.user.to_string(),
phone_number: pn_jid.user.to_string(),
created_at: 0,
learning_source: wacore::types::lid_pn::LearningSource::Usync,
})
.await;
let taken = client.take_recent_message(&pn_jid, "MISSING").await;
assert!(taken.is_none(), "both primary and alternate miss → None");
}
#[test]
fn resolve_retry_chat_info_peer_device_with_recipient() {
use wacore_binary::builder::NodeBuilder;
let our_pn: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let recipient: Jid = "5522888888888@s.whatsapp.net".parse().unwrap();
let node = NodeBuilder::new("receipt")
.attr("recipient", "5522888888888@s.whatsapp.net")
.build();
let receipt = make_test_receipt("5511999999999:2@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), Some(&our_pn), None);
assert_eq!(info.chat.user, recipient.user);
assert_eq!(info.chat.device(), 0, "chat should be bare");
assert_eq!(info.requester.user, our_pn.user);
assert_eq!(info.requester.device(), 2);
}
#[test]
fn resolve_retry_chat_info_peer_device_without_recipient() {
use wacore_binary::builder::NodeBuilder;
let our_pn: Jid = "5511999999999@s.whatsapp.net".parse().unwrap();
let node = NodeBuilder::new("receipt").build();
let receipt = make_test_receipt("5511999999999:2@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), Some(&our_pn), None);
assert_eq!(info.chat.user, our_pn.user);
assert_eq!(info.chat.device(), 0);
}
#[test]
fn resolve_retry_chat_info_bot_with_recipient() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt")
.attr("recipient", "5522888888888@s.whatsapp.net")
.build();
let receipt = make_test_receipt("131355500001@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert!(info.is_bot, "bot JID should be detected");
assert_eq!(info.chat.user, "5522888888888");
assert_eq!(info.chat.device(), 0);
}
#[test]
fn resolve_retry_chat_info_bot_without_recipient() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt").build();
let receipt = make_test_receipt("131355500001@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert!(info.is_bot);
assert_eq!(info.chat.user, "131355500001");
}
#[test]
fn resolve_retry_chat_info_preserves_original_from() {
use wacore_binary::builder::NodeBuilder;
let node = NodeBuilder::new("receipt").build();
let receipt = make_test_receipt("5511999999999:33@s.whatsapp.net");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, None);
assert_eq!(info.original_from.device(), 33);
assert_eq!(info.original_from.user, "5511999999999");
assert_eq!(info.chat.device(), 0);
assert_eq!(info.chat.user, "5511999999999");
}
#[test]
fn resolve_retry_chat_info_peer_via_lid() {
use wacore_binary::builder::NodeBuilder;
let our_lid: Jid = "236395184570386@lid".parse().unwrap();
let recipient: Jid = "5522888888888@s.whatsapp.net".parse().unwrap();
let node = NodeBuilder::new("receipt")
.attr("recipient", "5522888888888@s.whatsapp.net")
.build();
let receipt = make_test_receipt("236395184570386:5@lid");
let info = resolve_retry_chat_info(&receipt, &node.as_node_ref(), None, Some(&our_lid));
assert_eq!(info.chat.user, recipient.user);
assert_eq!(info.chat.device(), 0);
assert_eq!(info.requester.device(), 5);
}
}