use anyhow::Result;
use std::sync::atomic::Ordering;
use std::time::Duration;
use wacore::libsignal::store::SessionStore;
use wacore::types::jid::JidExt;
use wacore_binary::jid::Jid;
use super::Client;
use crate::types::events::{Event, OfflineSyncCompleted};
impl Client {
pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
pub(crate) fn complete_offline_sync(&self, count: i32) {
self.offline_sync_metrics
.active
.store(false, Ordering::Release);
match self.offline_sync_metrics.start_time.lock() {
Ok(mut guard) => *guard = None,
Err(poison) => *poison.into_inner() = None,
}
if self
.offline_sync_completed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.swap_message_semaphore(64);
self.offline_sync_notifier.notify(usize::MAX);
self.core
.event_bus
.dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
}
}
pub(crate) async fn wait_for_offline_delivery_end(&self) {
self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
.await;
}
pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
let wait_generation = self.connection_generation.load(Ordering::Acquire);
let offline_fut = self.offline_sync_notifier.listen();
if self.offline_sync_completed.load(Ordering::Relaxed) {
return;
}
if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
.await
.is_err()
{
if self.connection_generation.load(Ordering::Acquire) != wait_generation
|| self.expected_disconnect.load(Ordering::Relaxed)
{
log::debug!(
target: "Client/OfflineSync",
"Offline sync timeout ignored: connection generation changed or disconnected",
);
return;
}
let processed = self
.offline_sync_metrics
.processed_messages
.load(Ordering::Acquire);
let expected = self
.offline_sync_metrics
.total_messages
.load(Ordering::Acquire);
log::warn!(
target: "Client/OfflineSync",
"Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
timeout,
processed,
expected,
);
self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
}
}
pub(crate) fn begin_history_sync_task(&self) {
self.history_sync_tasks_in_flight
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn finish_history_sync_task(&self) {
let previous = self
.history_sync_tasks_in_flight
.fetch_sub(1, Ordering::Relaxed);
if previous <= 1 {
self.history_sync_tasks_in_flight
.store(0, Ordering::Relaxed);
self.history_sync_idle_notifier.notify(usize::MAX);
}
}
pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
use anyhow::anyhow;
use wacore::time::Instant;
let deadline = Instant::now() + timeout;
let offline_fut = self.offline_sync_notifier.listen();
if !self.offline_sync_completed.load(Ordering::Relaxed) {
let remaining = deadline.saturating_duration_since(Instant::now());
wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
.await
.map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
}
loop {
let history_fut = self.history_sync_idle_notifier.listen();
if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
return Ok(());
}
let remaining = deadline.saturating_duration_since(Instant::now());
wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
.await
.map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
}
}
pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
use wacore::types::jid::JidExt;
if device_jids.is_empty() {
return Ok(());
}
self.wait_for_offline_delivery_end().await;
let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
let device_store = self.persistence_manager.get_device_arc().await;
let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
{
let device_guard = device_store.read().await;
for jid in resolved_jids {
let signal_addr = jid.to_protocol_address();
match self
.signal_cache
.has_session(&signal_addr, &*device_guard.backend)
.await
{
Ok(true) => {}
Ok(false) => jids_needing_sessions.push(jid),
Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
}
}
}
if jids_needing_sessions.is_empty() {
return Ok(());
}
for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
self.fetch_and_establish_sessions(batch).await?;
}
Ok(())
}
async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
use wacore::types::jid::JidExt;
if jids.is_empty() {
return Ok(0);
}
let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
let device_store = self.persistence_manager.get_device_arc().await;
let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
device_store,
self.signal_cache.clone(),
);
let mut success_count = 0;
let mut missing_count = 0;
let mut failed_count = 0;
for jid in jids {
if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
let signal_addr = jid.to_protocol_address();
let session_mutex = self
.session_locks
.get_with_by_ref(signal_addr.as_str(), async {
std::sync::Arc::new(async_lock::Mutex::new(()))
})
.await;
let _session_guard = session_mutex.lock().await;
match process_prekey_bundle(
&signal_addr,
&mut adapter.session_store,
&mut adapter.identity_store,
bundle,
&mut rand::make_rng::<rand::rngs::StdRng>(),
UsePQRatchet::No,
)
.await
{
Ok(_) => {
success_count += 1;
log::debug!("Successfully established session with {}", jid);
}
Err(e) => {
failed_count += 1;
log::warn!("Failed to establish session with {}: {}", jid, e);
}
}
} else {
missing_count += 1;
if jid.device == 0 {
log::warn!("Server did not return prekeys for primary phone {}", jid);
} else {
log::debug!("Server did not return prekeys for {}", jid);
}
}
}
if missing_count > 0 || failed_count > 0 {
log::debug!(
"Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
success_count,
missing_count,
failed_count,
jids.len()
);
}
if success_count > 0 {
self.flush_signal_cache().await?;
}
Ok(success_count)
}
pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
let own_pn = device_snapshot
.pn
.clone()
.ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
let primary_phone_pn = own_pn.with_device(0);
let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
let pn_session_exists =
self.check_session_exists(&primary_phone_pn)
.await
.map_err(|e| {
anyhow::anyhow!(
"Cannot verify PN session existence for primary phone {}: {}. \
Refusing to establish session to prevent potential MAC failures.",
primary_phone_pn,
e
)
})?;
if pn_session_exists {
log::debug!(
"PN session with primary phone {} already exists",
primary_phone_pn
);
} else {
log::debug!(
"No PN session with primary phone {} - will be established via LID pkmsg",
primary_phone_pn
);
}
if let Some(ref lid_jid) = primary_phone_lid {
match self.check_session_exists(lid_jid).await {
Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
Ok(false) => log::debug!(
"No LID session with {} - established on first message",
lid_jid
),
Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
}
}
Ok(())
}
async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
let device_store = self.persistence_manager.get_device_arc().await;
let device_guard = device_store.read().await;
let signal_addr = jid.to_protocol_address();
device_guard
.contains_session(&signal_addr)
.await
.map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
#[test]
fn test_primary_phone_jid_creation_from_pn() {
let own_pn = Jid::pn("559999999999");
let primary_phone_jid = own_pn.with_device(0);
assert_eq!(primary_phone_jid.user, "559999999999");
assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
assert_eq!(primary_phone_jid.device, 0);
assert_eq!(primary_phone_jid.agent, 0);
assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
}
#[test]
fn test_primary_phone_jid_overwrites_existing_device() {
let own_pn = Jid::pn_device("559999999999", 33);
let primary_phone_jid = own_pn.with_device(0);
assert_eq!(primary_phone_jid.user, "559999999999");
assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
assert_eq!(primary_phone_jid.device, 0);
}
#[test]
fn test_primary_phone_jid_is_not_ad() {
let primary_phone_jid = Jid::pn("559999999999").with_device(0);
assert!(!primary_phone_jid.is_ad()); }
#[test]
fn test_linked_device_is_ad() {
let linked_device_jid = Jid::pn_device("559999999999", 33);
assert!(linked_device_jid.is_ad()); }
#[test]
fn test_primary_phone_jid_from_lid() {
let own_lid = Jid::lid("100000000000001");
let primary_phone_jid = own_lid.with_device(0);
assert_eq!(primary_phone_jid.user, "100000000000001");
assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
assert_eq!(primary_phone_jid.device, 0);
assert!(!primary_phone_jid.is_ad());
}
#[test]
fn test_primary_phone_jid_roundtrip() {
let own_pn = Jid::pn("559999999999");
let primary_phone_jid = own_pn.with_device(0);
let jid_string = primary_phone_jid.to_string();
assert_eq!(jid_string, "559999999999@s.whatsapp.net");
let parsed: Jid = jid_string.parse().expect("JID should be parseable");
assert_eq!(parsed.user, "559999999999");
assert_eq!(parsed.server, DEFAULT_USER_SERVER);
assert_eq!(parsed.device, 0);
}
#[test]
fn test_with_device_preserves_identity() {
let pn = Jid::pn("1234567890");
let pn_device_0 = pn.with_device(0);
let pn_device_5 = pn.with_device(5);
assert_eq!(pn_device_0.user, pn_device_5.user);
assert_eq!(pn_device_0.server, pn_device_5.server);
assert_eq!(pn_device_0.device, 0);
assert_eq!(pn_device_5.device, 5);
let lid = Jid::lid("100000012345678");
let lid_device_0 = lid.with_device(0);
let lid_device_33 = lid.with_device(33);
assert_eq!(lid_device_0.user, lid_device_33.user);
assert_eq!(lid_device_0.server, lid_device_33.server);
assert_eq!(lid_device_0.device, 0);
assert_eq!(lid_device_33.device, 33);
}
#[test]
fn test_primary_phone_vs_companion_devices() {
let user = "559999999999";
let primary = Jid::pn(user).with_device(0);
let companion_web = Jid::pn_device(user, 33);
let companion_desktop = Jid::pn_device(user, 34);
assert_eq!(primary.user, companion_web.user);
assert_eq!(primary.user, companion_desktop.user);
assert_eq!(primary.device, 0);
assert_eq!(companion_web.device, 33);
assert_eq!(companion_desktop.device, 34);
assert!(!primary.is_ad());
assert!(companion_web.is_ad());
assert!(companion_desktop.is_ad());
}
#[test]
fn test_session_check_behavior_documentation() {
enum SessionCheckResult {
Exists,
NotExists,
CheckFailed,
}
fn should_establish_session(
check_result: SessionCheckResult,
) -> Result<bool, &'static str> {
match check_result {
SessionCheckResult::Exists => Ok(false), SessionCheckResult::NotExists => Ok(true), SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
}
}
assert_eq!(
should_establish_session(SessionCheckResult::Exists),
Ok(false)
);
assert_eq!(
should_establish_session(SessionCheckResult::NotExists),
Ok(true)
);
assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
}
#[test]
fn test_protocol_address_format_for_session_lookup() {
use wacore::types::jid::JidExt;
let pn = Jid::pn("559999999999").with_device(0);
let addr = pn.to_protocol_address();
assert_eq!(addr.name(), "559999999999@c.us");
assert_eq!(u32::from(addr.device_id()), 0);
assert_eq!(addr.to_string(), "559999999999@c.us.0");
let companion = Jid::pn_device("559999999999", 33);
let companion_addr = companion.to_protocol_address();
assert_eq!(companion_addr.name(), "559999999999:33@c.us");
assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
let lid = Jid::lid("100000000000001").with_device(0);
let lid_addr = lid.to_protocol_address();
assert_eq!(lid_addr.name(), "100000000000001@lid");
assert_eq!(u32::from(lid_addr.device_id()), 0);
assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
let lid_device = Jid::lid_device("100000000000001", 33);
let lid_device_addr = lid_device.to_protocol_address();
assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
}
#[test]
fn test_filter_logic_for_session_establishment() {
let jids = vec![
Jid::pn_device("111", 0),
Jid::pn_device("222", 0),
Jid::pn_device("333", 0),
];
let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
match jid.user.as_str() {
"111" => Ok(true), "222" => Ok(false), "333" => Err("DB error"), _ => Ok(false),
}
};
let mut jids_needing_sessions = Vec::with_capacity(jids.len());
for jid in &jids {
match session_exists(jid) {
Ok(true) => {} Ok(false) => jids_needing_sessions.push(jid.clone()), Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), }
}
assert_eq!(jids_needing_sessions.len(), 1);
assert_eq!(jids_needing_sessions[0].user, "222");
}
#[test]
fn test_dual_addressing_pn_and_lid_are_independent() {
let pn_address = Jid::pn("551199887766").with_device(0);
let lid_address = Jid::lid("236395184570386").with_device(0);
assert_ne!(pn_address.user, lid_address.user);
assert_ne!(pn_address.server, lid_address.server);
use wacore::types::jid::JidExt;
let pn_signal_addr = pn_address.to_protocol_address();
let lid_signal_addr = lid_address.to_protocol_address();
assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
assert_eq!(pn_address.device, 0);
assert_eq!(lid_address.device, 0);
}
#[test]
fn test_lid_extraction_from_own_device() {
let own_lid_with_device = Jid::lid_device("236395184570386", 61);
let primary_lid = own_lid_with_device.with_device(0);
assert_eq!(primary_lid.user, "236395184570386");
assert_eq!(primary_lid.device, 0);
assert!(!primary_lid.is_ad());
}
#[test]
fn test_stale_session_scenario_documentation() {
fn should_establish_pn_session(pn_exists: bool) -> bool {
!pn_exists
}
fn should_establish_lid_session(_lid_exists: bool) -> bool {
false }
assert!(!should_establish_pn_session(true));
assert!(should_establish_pn_session(false));
assert!(!should_establish_lid_session(true));
assert!(!should_establish_lid_session(false));
}
#[test]
fn test_retry_mechanism_for_stale_sessions() {
const RETRY_ERROR_NO_SESSION: u8 = 1;
const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
fn action_for_error(error_code: u8) -> &'static str {
match error_code {
RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
_ => "Unknown error",
}
}
assert_eq!(
action_for_error(RETRY_ERROR_NO_SESSION),
"Establish new session via prekey"
);
assert_eq!(
action_for_error(RETRY_ERROR_INVALID_MESSAGE),
"Delete stale session, resend message"
);
}
#[test]
fn test_session_establishment_lookup_normalization() {
use std::collections::HashMap;
use wacore_binary::jid::Jid;
let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new();
let normalized_jid = Jid::lid("123456789"); prekey_bundles.insert(normalized_jid.clone(), ());
let mut requested_jid = Jid::lid("123456789");
requested_jid.agent = 1;
assert!(
!prekey_bundles.contains_key(&requested_jid),
"Direct lookup of non-normalized JID should fail"
);
let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
assert!(
prekey_bundles.contains_key(&normalized_lookup),
"Normalized lookup should succeed"
);
assert_eq!(normalized_lookup, normalized_jid);
}
}