1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::cache::Cache;
8use crate::cache_store::TypedCache;
9use crate::handshake;
10use crate::lid_pn_cache::LidPnCache;
11use crate::pair;
12use anyhow::{Result, anyhow};
13use futures::FutureExt;
14use std::borrow::Cow;
15use std::collections::{HashMap, HashSet};
16
17use wacore::xml::DisplayableNode;
18use wacore_binary::builder::NodeBuilder;
19use wacore_binary::jid::JidExt;
20use wacore_binary::node::{Attrs, Node, NodeValue};
21
22use crate::appstate_sync::AppStateProcessor;
23use crate::handlers::chatstate::ChatStateEvent;
24use crate::jid_utils::server_jid;
25use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
26use crate::types::enc_handler::EncHandler;
27use crate::types::events::{ConnectFailureReason, Event};
28
29use log::{debug, error, info, trace, warn};
30
31use rand::{Rng, RngExt};
32use scopeguard;
33use wacore_binary::jid::Jid;
34
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
37
38#[derive(Debug, Clone)]
55pub struct NodeFilter {
56 tag: String,
57 attrs: Vec<(String, String)>,
58}
59
60impl NodeFilter {
61 pub fn tag(tag: impl Into<String>) -> Self {
63 Self {
64 tag: tag.into(),
65 attrs: Vec::new(),
66 }
67 }
68
69 pub fn attr(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
71 self.attrs.push((key.into(), value.into()));
72 self
73 }
74
75 pub fn from_jid(self, jid: &Jid) -> Self {
77 self.attr("from", jid.to_string())
78 }
79
80 fn matches(&self, node: &Node) -> bool {
81 node.tag == self.tag
82 && self
83 .attrs
84 .iter()
85 .all(|(k, v)| node.attrs.get(k.as_str()).is_some_and(|attr| *attr == *v))
86 }
87}
88
89struct NodeWaiter {
90 filter: NodeFilter,
91 tx: futures::channel::oneshot::Sender<Arc<Node>>,
92}
93
94use async_lock::Mutex;
95use async_lock::RwLock;
96use std::time::Duration;
97use thiserror::Error;
98
99use wacore::appstate::patch_decode::WAPatchName;
100use wacore::client::context::GroupInfo;
101use wacore::runtime::timeout as rt_timeout;
102use waproto::whatsapp as wa;
103
104use crate::cache_config::CacheConfig;
105use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
106use crate::sync_task::MajorSyncTask;
107use wacore::runtime::Runtime;
108
109type ChatStateHandler = Arc<dyn Fn(ChatStateEvent) + Send + Sync>;
111
112const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
113
114const TRANSPORT_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
117
118#[cfg(feature = "debug-diagnostics")]
125#[derive(Debug, Clone)]
126pub struct MemoryDiagnostics {
127 pub group_cache: u64,
129 pub device_cache: u64,
130 pub device_registry_cache: u64,
131 pub lid_pn_lid_entries: u64,
132 pub lid_pn_pn_entries: u64,
133 pub retried_group_messages: u64,
134 pub recent_messages: u64,
135 pub message_retry_counts: u64,
136 pub pdo_pending_requests: u64,
137 pub session_locks: u64,
139 pub message_queues: u64,
140 pub message_enqueue_locks: u64,
141 pub response_waiters: usize,
143 pub node_waiters: usize,
144 pub pending_retries: usize,
145 pub presence_subscriptions: usize,
146 pub app_state_key_requests: usize,
147 pub app_state_syncing: usize,
148 pub signal_cache_sessions: usize,
149 pub signal_cache_identities: usize,
150 pub signal_cache_sender_keys: usize,
151 pub chatstate_handlers: usize,
153 pub custom_enc_handlers: usize,
154}
155
156#[cfg(feature = "debug-diagnostics")]
157impl std::fmt::Display for MemoryDiagnostics {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 writeln!(f, "=== Memory Diagnostics ===")?;
160 writeln!(f, "--- Moka caches (TTL-bounded) ---")?;
161 writeln!(f, " group_cache: {}", self.group_cache)?;
162 writeln!(f, " device_cache: {}", self.device_cache)?;
163 writeln!(
164 f,
165 " device_registry_cache: {}",
166 self.device_registry_cache
167 )?;
168 writeln!(f, " lid_pn (lid): {}", self.lid_pn_lid_entries)?;
169 writeln!(f, " lid_pn (pn): {}", self.lid_pn_pn_entries)?;
170 writeln!(
171 f,
172 " retried_group_messages: {}",
173 self.retried_group_messages
174 )?;
175 writeln!(f, " recent_messages: {}", self.recent_messages)?;
176 writeln!(f, " message_retry_counts: {}", self.message_retry_counts)?;
177 writeln!(f, " pdo_pending_requests: {}", self.pdo_pending_requests)?;
178 writeln!(f, "--- Moka caches (capacity-only) ---")?;
179 writeln!(f, " session_locks: {}", self.session_locks)?;
180 writeln!(f, " message_queues: {}", self.message_queues)?;
181 writeln!(
182 f,
183 " message_enqueue_locks: {}",
184 self.message_enqueue_locks
185 )?;
186 writeln!(f, "--- Unbounded collections ---")?;
187 writeln!(f, " response_waiters: {}", self.response_waiters)?;
188 writeln!(f, " node_waiters: {}", self.node_waiters)?;
189 writeln!(f, " pending_retries: {}", self.pending_retries)?;
190 writeln!(
191 f,
192 " presence_subscriptions: {}",
193 self.presence_subscriptions
194 )?;
195 writeln!(
196 f,
197 " app_state_key_requests: {}",
198 self.app_state_key_requests
199 )?;
200 writeln!(f, " app_state_syncing: {}", self.app_state_syncing)?;
201 writeln!(
202 f,
203 " signal_sessions: {}",
204 self.signal_cache_sessions
205 )?;
206 writeln!(
207 f,
208 " signal_identities: {}",
209 self.signal_cache_identities
210 )?;
211 writeln!(
212 f,
213 " signal_sender_keys: {}",
214 self.signal_cache_sender_keys
215 )?;
216 writeln!(f, "--- Misc ---")?;
217 writeln!(f, " chatstate_handlers: {}", self.chatstate_handlers)?;
218 writeln!(f, " custom_enc_handlers: {}", self.custom_enc_handlers)?;
219 Ok(())
220 }
221}
222
223#[derive(Debug, Error)]
224pub enum ClientError {
225 #[error("client is not connected")]
226 NotConnected,
227 #[error("socket error: {0}")]
228 Socket(#[from] SocketError),
229 #[error("encrypt/send error: {0}")]
230 EncryptSend(#[from] EncryptSendError),
231 #[error("client is already connected")]
232 AlreadyConnected,
233 #[error("client is not logged in")]
234 NotLoggedIn,
235}
236
237use wacore::types::message::StanzaKey;
238
239#[derive(Debug)]
241pub(crate) struct OfflineSyncMetrics {
242 pub active: AtomicBool,
243 pub total_messages: AtomicUsize,
244 pub processed_messages: AtomicUsize,
245 pub start_time: std::sync::Mutex<Option<wacore::time::Instant>>,
247}
248
249pub struct Client {
250 pub(crate) runtime: Arc<dyn Runtime>,
251 pub(crate) core: wacore::client::CoreClient,
252
253 pub(crate) persistence_manager: Arc<PersistenceManager>,
254 pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
255
256 pub(crate) is_logged_in: Arc<AtomicBool>,
257 pub(crate) is_connecting: Arc<AtomicBool>,
258 pub(crate) is_running: Arc<AtomicBool>,
259 is_connected: Arc<AtomicBool>,
263 pub(crate) shutdown_notifier: Arc<event_listener::Event>,
264 pub(crate) last_data_received_ms: Arc<AtomicU64>,
268 pub(crate) last_data_sent_ms: Arc<AtomicU64>,
272
273 pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
274 pub(crate) transport_events:
275 Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
276 pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
277 pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
278
279 pub(crate) response_waiters:
280 Arc<Mutex<HashMap<String, futures::channel::oneshot::Sender<wacore_binary::Node>>>>,
281
282 node_waiters: std::sync::Mutex<Vec<NodeWaiter>>,
286 node_waiter_count: AtomicUsize,
287
288 pub(crate) unique_id: String,
289 pub(crate) id_counter: Arc<AtomicU64>,
290
291 pub(crate) unified_session: crate::unified_session::UnifiedSessionManager,
292
293 pub(crate) signal_cache: Arc<crate::store::signal_cache::SignalStoreCache>,
297
298 pub(crate) message_processing_semaphore: std::sync::Mutex<Arc<async_lock::Semaphore>>,
301 pub(crate) message_semaphore_generation: Arc<AtomicU64>,
303
304 pub(crate) session_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
310
311 pub(crate) message_queues: Cache<String, async_channel::Sender<Arc<Node>>>,
315
316 pub(crate) lid_pn_cache: Arc<LidPnCache>,
321
322 pub(crate) message_enqueue_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
326
327 pub group_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, GroupInfo>>>>,
328 #[allow(clippy::type_complexity)]
329 pub device_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, Vec<Jid>>>>>,
330
331 pub(crate) retried_group_messages: Cache<String, ()>,
332 pub(crate) expected_disconnect: Arc<AtomicBool>,
333
334 pub(crate) connection_generation: Arc<AtomicU64>,
337
338 pub(crate) recent_messages: Cache<StanzaKey, Vec<u8>>,
341
342 pub(crate) pending_retries: Arc<async_lock::Mutex<HashSet<String>>>,
343
344 pub(crate) message_retry_counts: Cache<String, u8>,
348
349 pub enable_auto_reconnect: Arc<AtomicBool>,
350 pub auto_reconnect_errors: Arc<AtomicU32>,
351
352 pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
353
354 pub(crate) app_state_processor: async_lock::Mutex<Option<Arc<AppStateProcessor>>>,
355 pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, wacore::time::Instant>>>,
356 pub(crate) app_state_syncing: Arc<Mutex<HashSet<WAPatchName>>>,
359 pub(crate) initial_keys_synced_notifier: Arc<event_listener::Event>,
360 pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
361
362 pub(crate) server_has_prekeys: Arc<AtomicBool>,
365 pub(crate) prekey_upload_lock: Arc<async_lock::Mutex<()>>,
367 pub(crate) offline_sync_notifier: Arc<event_listener::Event>,
370 pub(crate) offline_sync_completed: Arc<AtomicBool>,
372 pub(crate) history_sync_tasks_in_flight: Arc<AtomicUsize>,
374 pub(crate) history_sync_idle_notifier: Arc<event_listener::Event>,
376 pub(crate) presence_subscriptions: Arc<async_lock::Mutex<HashSet<Jid>>>,
378 pub(crate) offline_sync_metrics: Arc<OfflineSyncMetrics>,
380 pub(crate) socket_ready_notifier: Arc<event_listener::Event>,
383 pub(crate) is_ready: Arc<AtomicBool>,
388 pub(crate) connected_notifier: Arc<event_listener::Event>,
391 pub(crate) major_sync_task_sender: async_channel::Sender<MajorSyncTask>,
392 pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<async_channel::Sender<()>>>>,
393
394 pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
397
398 pub custom_enc_handlers: Arc<async_lock::RwLock<HashMap<String, Arc<dyn EncHandler>>>>,
400
401 pub(crate) chatstate_handlers: Arc<RwLock<Vec<ChatStateHandler>>>,
404
405 pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
408
409 pub(crate) device_registry_cache: TypedCache<String, wacore::store::traits::DeviceListRecord>,
413
414 pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
416
417 pub(crate) synchronous_ack: bool,
419
420 pub http_client: Arc<dyn crate::http::HttpClient>,
422
423 pub(crate) override_version: Option<(u32, u32, u32)>,
425
426 pub(crate) skip_history_sync: AtomicBool,
429
430 pub(crate) cache_config: CacheConfig,
433}
434
435impl Client {
436 pub(crate) fn swap_message_semaphore(&self, permits: usize) {
442 let mut guard = match self.message_processing_semaphore.lock() {
443 Ok(g) => g,
444 Err(poisoned) => poisoned.into_inner(),
445 };
446 *guard = Arc::new(async_lock::Semaphore::new(permits));
447 self.message_semaphore_generation
448 .fetch_add(1, Ordering::SeqCst);
449 }
450
451 fn should_downgrade_sync_error(&self, err: &anyhow::Error) -> bool {
452 if self.is_shutting_down() {
453 return true;
454 }
455
456 matches!(
457 err.downcast_ref::<crate::request::IqError>(),
458 Some(
459 crate::request::IqError::NotConnected
460 | crate::request::IqError::InternalChannelClosed
461 )
462 )
463 }
464
465 fn log_sync_error(&self, context: &str, err: &anyhow::Error) {
467 if self.should_downgrade_sync_error(err) {
468 debug!("Skipping {context} during shutdown: {err}");
469 } else {
470 warn!("Failed {context}: {err}");
471 }
472 }
473
474 fn is_fully_ready(&self) -> bool {
478 self.is_connected() && self.is_logged_in() && self.is_ready.load(Ordering::Relaxed)
479 }
480
481 fn dispatch_connected(&self) {
483 self.is_ready.store(true, Ordering::Relaxed);
484 self.core
485 .event_bus
486 .dispatch(&Event::Connected(crate::types::events::Connected));
487 self.connected_notifier.notify(usize::MAX);
488 }
489
490 pub fn set_skip_history_sync(&self, enabled: bool) {
495 self.skip_history_sync.store(enabled, Ordering::Relaxed);
496 }
497
498 pub async fn process_sync_task(self: &Arc<Self>, task: crate::sync_task::MajorSyncTask) {
500 match task {
501 crate::sync_task::MajorSyncTask::HistorySync {
502 message_id,
503 notification,
504 } => {
505 self.process_history_sync_task(message_id, *notification)
506 .await;
507 self.finish_history_sync_task();
508 }
509 crate::sync_task::MajorSyncTask::AppStateSync { name, full_sync } => {
510 if let Err(e) = self.process_app_state_sync_task(name, full_sync).await {
511 log::warn!("App state sync task for {name:?} failed: {e}");
512 }
513 }
514 }
515 }
516
517 pub fn skip_history_sync_enabled(&self) -> bool {
519 self.skip_history_sync.load(Ordering::Relaxed)
520 }
521
522 pub(crate) fn is_shutting_down(&self) -> bool {
523 self.expected_disconnect.load(Ordering::Relaxed) || !self.is_running.load(Ordering::Relaxed)
524 }
525
526 pub async fn new(
531 runtime: Arc<dyn Runtime>,
532 persistence_manager: Arc<PersistenceManager>,
533 transport_factory: Arc<dyn crate::transport::TransportFactory>,
534 http_client: Arc<dyn crate::http::HttpClient>,
535 override_version: Option<(u32, u32, u32)>,
536 ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
537 Self::new_with_cache_config(
538 runtime,
539 persistence_manager,
540 transport_factory,
541 http_client,
542 override_version,
543 CacheConfig::default(),
544 )
545 .await
546 }
547
548 pub async fn new_with_cache_config(
550 runtime: Arc<dyn Runtime>,
551 persistence_manager: Arc<PersistenceManager>,
552 transport_factory: Arc<dyn crate::transport::TransportFactory>,
553 http_client: Arc<dyn crate::http::HttpClient>,
554 override_version: Option<(u32, u32, u32)>,
555 cache_config: CacheConfig,
556 ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
557 let mut unique_id_bytes = [0u8; 2];
558 rand::make_rng::<rand::rngs::StdRng>().fill_bytes(&mut unique_id_bytes);
559
560 let device_snapshot = persistence_manager.get_device_snapshot().await;
561 let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
562
563 let (tx, rx) = async_channel::bounded(32);
564
565 let this = Self {
566 runtime: runtime.clone(),
567 core,
568 persistence_manager: persistence_manager.clone(),
569 media_conn: Arc::new(RwLock::new(None)),
570 is_logged_in: Arc::new(AtomicBool::new(false)),
571 is_connecting: Arc::new(AtomicBool::new(false)),
572 is_running: Arc::new(AtomicBool::new(false)),
573 is_connected: Arc::new(AtomicBool::new(false)),
574 shutdown_notifier: Arc::new(event_listener::Event::new()),
575 last_data_received_ms: Arc::new(AtomicU64::new(0)),
576 last_data_sent_ms: Arc::new(AtomicU64::new(0)),
577
578 transport: Arc::new(Mutex::new(None)),
579 transport_events: Arc::new(Mutex::new(None)),
580 transport_factory,
581 noise_socket: Arc::new(Mutex::new(None)),
582
583 response_waiters: Arc::new(Mutex::new(HashMap::new())),
584 node_waiters: std::sync::Mutex::new(Vec::new()),
585 node_waiter_count: AtomicUsize::new(0),
586 unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
587 id_counter: Arc::new(AtomicU64::new(0)),
588 unified_session: crate::unified_session::UnifiedSessionManager::new(),
589
590 signal_cache: Arc::new(crate::store::signal_cache::SignalStoreCache::new()),
591 message_processing_semaphore: std::sync::Mutex::new(Arc::new(
592 async_lock::Semaphore::new(1),
593 )),
594 message_semaphore_generation: Arc::new(AtomicU64::new(0)),
595 session_locks: Cache::builder()
599 .max_capacity(cache_config.session_locks_capacity.max(1))
600 .build(),
601 message_queues: Cache::builder()
602 .max_capacity(cache_config.message_queues_capacity.max(1))
603 .build(),
604 lid_pn_cache: Arc::new(LidPnCache::with_config(
605 &cache_config.lid_pn_cache,
606 cache_config.cache_stores.lid_pn_cache.clone(),
607 )),
608 message_enqueue_locks: Cache::builder()
609 .max_capacity(cache_config.message_enqueue_locks_capacity.max(1))
610 .build(),
611 group_cache: async_lock::Mutex::new(None),
612 device_cache: async_lock::Mutex::new(None),
613 retried_group_messages: cache_config.retried_group_messages.build_with_ttl(),
614
615 expected_disconnect: Arc::new(AtomicBool::new(false)),
616 connection_generation: Arc::new(AtomicU64::new(0)),
617
618 recent_messages: cache_config.recent_messages.build_with_ttl(),
619
620 pending_retries: Arc::new(async_lock::Mutex::new(HashSet::new())),
621
622 message_retry_counts: cache_config.message_retry_counts.build_with_ttl(),
623
624 offline_sync_metrics: Arc::new(OfflineSyncMetrics {
625 active: AtomicBool::new(false),
626 total_messages: AtomicUsize::new(0),
627 processed_messages: AtomicUsize::new(0),
628 start_time: std::sync::Mutex::new(None),
629 }),
630
631 enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
632 auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
633
634 needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
635
636 app_state_processor: async_lock::Mutex::new(None),
637 app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
638 app_state_syncing: Arc::new(Mutex::new(HashSet::new())),
639 initial_keys_synced_notifier: Arc::new(event_listener::Event::new()),
640 initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
641 server_has_prekeys: Arc::new(AtomicBool::new(true)),
642 prekey_upload_lock: Arc::new(async_lock::Mutex::new(())),
643 offline_sync_notifier: Arc::new(event_listener::Event::new()),
644 offline_sync_completed: Arc::new(AtomicBool::new(false)),
645 history_sync_tasks_in_flight: Arc::new(AtomicUsize::new(0)),
646 history_sync_idle_notifier: Arc::new(event_listener::Event::new()),
647 presence_subscriptions: Arc::new(async_lock::Mutex::new(HashSet::new())),
648 socket_ready_notifier: Arc::new(event_listener::Event::new()),
649 is_ready: Arc::new(AtomicBool::new(false)),
650 connected_notifier: Arc::new(event_listener::Event::new()),
651 major_sync_task_sender: tx,
652 pairing_cancellation_tx: Arc::new(Mutex::new(None)),
653 pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
654 custom_enc_handlers: Arc::new(async_lock::RwLock::new(HashMap::new())),
655 chatstate_handlers: Arc::new(RwLock::new(Vec::new())),
656 pdo_pending_requests: cache_config.pdo_pending_requests.build_with_ttl(),
657 device_registry_cache: cache_config.device_registry_cache.build_typed_ttl(
658 cache_config.cache_stores.device_registry_cache.clone(),
659 "device_registry",
660 ),
661 stanza_router: Self::create_stanza_router(),
662 synchronous_ack: false,
663 http_client,
664 override_version,
665 skip_history_sync: AtomicBool::new(false),
666 cache_config,
667 };
668
669 let arc = Arc::new(this);
670
671 let warm_up_arc = arc.clone();
673 arc.runtime
674 .spawn(Box::pin(async move {
675 if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
676 warn!("Failed to warm up LID-PN cache: {e}");
677 }
678 }))
679 .detach();
680
681 let cleanup_arc = arc.clone();
683 arc.runtime
684 .spawn(Box::pin(async move {
685 cleanup_arc.device_registry_cleanup_loop().await;
686 }))
687 .detach();
688
689 (arc, rx)
690 }
691
692 pub(crate) async fn get_group_cache(&self) -> Arc<TypedCache<Jid, GroupInfo>> {
693 let mut guard = self.group_cache.lock().await;
694 if let Some(cache) = guard.as_ref() {
695 return cache.clone();
696 }
697 debug!("Initializing Group Cache for the first time.");
698 let cache = Arc::new(
699 self.cache_config
700 .group_cache
701 .build_typed_ttl(self.cache_config.cache_stores.group_cache.clone(), "group"),
702 );
703 *guard = Some(cache.clone());
704 cache
705 }
706
707 pub(crate) async fn get_device_cache(&self) -> Arc<TypedCache<Jid, Vec<Jid>>> {
708 let mut guard = self.device_cache.lock().await;
709 if let Some(cache) = guard.as_ref() {
710 return cache.clone();
711 }
712 debug!("Initializing Device Cache for the first time.");
713 let cache = Arc::new(self.cache_config.device_cache.build_typed_ttl(
714 self.cache_config.cache_stores.device_cache.clone(),
715 "device",
716 ));
717 *guard = Some(cache.clone());
718 cache
719 }
720
721 pub(crate) async fn get_app_state_processor(&self) -> Arc<AppStateProcessor> {
722 let mut guard = self.app_state_processor.lock().await;
723 if let Some(proc) = guard.as_ref() {
724 return proc.clone();
725 }
726 debug!("Initializing AppStateProcessor for the first time.");
727 let proc = Arc::new(AppStateProcessor::new(
728 self.persistence_manager.backend(),
729 self.runtime.clone(),
730 ));
731 *guard = Some(proc.clone());
732 proc
733 }
734
735 fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
737 use crate::handlers::{
738 basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
739 chatstate::ChatstateHandler,
740 ib::IbHandler,
741 iq::IqHandler,
742 message::MessageHandler,
743 notification::NotificationHandler,
744 receipt::ReceiptHandler,
745 router::StanzaRouter,
746 unimplemented::UnimplementedHandler,
747 };
748
749 let mut router = StanzaRouter::new();
750
751 router.register(Arc::new(MessageHandler));
753 router.register(Arc::new(ReceiptHandler));
754 router.register(Arc::new(IqHandler));
755 router.register(Arc::new(SuccessHandler));
756 router.register(Arc::new(FailureHandler));
757 router.register(Arc::new(StreamErrorHandler));
758 router.register(Arc::new(IbHandler));
759 router.register(Arc::new(NotificationHandler));
760 router.register(Arc::new(AckHandler));
761 router.register(Arc::new(ChatstateHandler));
762
763 router.register(Arc::new(UnimplementedHandler::for_call()));
765 router.register(Arc::new(crate::handlers::presence::PresenceHandler));
766
767 router
768 }
769
770 pub fn register_handler(&self, handler: Arc<dyn wacore::types::events::EventHandler>) {
772 self.core.event_bus.add_handler(handler);
773 }
774
775 pub async fn register_chatstate_handler(
779 &self,
780 handler: Arc<dyn Fn(ChatStateEvent) + Send + Sync>,
781 ) {
782 self.chatstate_handlers.write().await.push(handler);
783 }
784
785 pub(crate) async fn dispatch_chatstate_event(
789 &self,
790 stanza: wacore::iq::chatstate::ChatstateStanza,
791 ) {
792 use wacore::iq::chatstate::{ChatstateSource, ReceivedChatState};
793 use wacore::types::events::ChatPresenceUpdate;
794 use wacore::types::message::MessageSource;
795 use wacore::types::presence::{ChatPresence, ChatPresenceMedia};
796
797 let (chat, sender, is_group) = match &stanza.source {
799 ChatstateSource::User { from } => (from.clone(), from.clone(), false),
800 ChatstateSource::Group { from, participant } => {
801 (from.clone(), participant.clone(), true)
802 }
803 };
804
805 let (state, media) = match stanza.state {
806 ReceivedChatState::Typing => (ChatPresence::Composing, ChatPresenceMedia::Text),
807 ReceivedChatState::RecordingAudio => {
808 (ChatPresence::Composing, ChatPresenceMedia::Audio)
809 }
810 ReceivedChatState::Idle => (ChatPresence::Paused, ChatPresenceMedia::Text),
811 };
812
813 self.core
814 .event_bus
815 .dispatch(&Event::ChatPresence(ChatPresenceUpdate {
816 source: MessageSource {
817 chat,
818 sender,
819 is_from_me: false,
820 is_group,
821 addressing_mode: None,
822 sender_alt: None,
823 recipient_alt: None,
824 broadcast_list_owner: None,
825 recipient: None,
826 },
827 state,
828 media,
829 }));
830
831 let event = ChatStateEvent::from_stanza(stanza);
833 let handlers = self.chatstate_handlers.read().await.clone();
834 for handler in handlers {
835 let event_clone = event.clone();
836 let handler_clone = handler.clone();
837 self.runtime
838 .spawn(Box::pin(async move {
839 (handler_clone)(event_clone);
840 }))
841 .detach();
842 }
843 }
844
845 pub async fn run(self: &Arc<Self>) {
846 if self.is_running.swap(true, Ordering::SeqCst) {
847 warn!("Client `run` method called while already running.");
848 return;
849 }
850 while self.is_running.load(Ordering::Relaxed) {
851 self.expected_disconnect.store(false, Ordering::Relaxed);
852
853 if let Err(connect_err) = self.connect().await {
854 error!("Failed to connect: {connect_err:#}. Will retry...");
855 } else {
856 if self.read_messages_loop().await.is_err() {
857 warn!(
858 "Message loop exited with an error. Will attempt to reconnect if enabled."
859 );
860 } else if self.expected_disconnect.load(Ordering::Relaxed) {
861 debug!("Message loop exited gracefully (expected disconnect).");
862 } else {
863 info!("Message loop exited gracefully.");
864 }
865
866 self.cleanup_connection_state().await;
867 }
868
869 if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
870 info!("Auto-reconnect disabled, shutting down.");
871 self.is_running.store(false, Ordering::Relaxed);
872 break;
873 }
874
875 if self.expected_disconnect.load(Ordering::Relaxed) {
877 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
878 info!("Expected disconnect (e.g., 515), reconnecting immediately...");
879 continue;
880 }
881
882 let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
883 let delay = fibonacci_backoff(error_count);
887 info!(
888 "Will attempt to reconnect in {:?} (attempt {})",
889 delay,
890 error_count + 1
891 );
892 self.runtime.sleep(delay).await;
893 }
894 info!("Client run loop has shut down.");
895 }
896
897 pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
898 if self.is_connecting.swap(true, Ordering::SeqCst) {
899 return Err(ClientError::AlreadyConnected.into());
900 }
901
902 let _guard = scopeguard::guard((), |_| {
903 self.is_connecting.store(false, Ordering::Relaxed);
904 });
905
906 if self.is_connected() {
907 return Err(ClientError::AlreadyConnected.into());
908 }
909
910 self.is_logged_in.store(false, Ordering::Relaxed);
914 self.is_ready.store(false, Ordering::Relaxed);
915 self.is_connected.store(false, Ordering::Relaxed);
916 self.offline_sync_completed.store(false, Ordering::Relaxed);
917 self.server_has_prekeys.store(true, Ordering::Relaxed);
918
919 let version_future = rt_timeout(
923 &*self.runtime,
924 TRANSPORT_CONNECT_TIMEOUT,
925 crate::version::resolve_and_update_version(
926 &self.persistence_manager,
927 &self.http_client,
928 self.override_version,
929 ),
930 );
931 let transport_future = rt_timeout(
932 &*self.runtime,
933 TRANSPORT_CONNECT_TIMEOUT,
934 self.transport_factory.create_transport(),
935 );
936
937 debug!("Connecting WebSocket and fetching latest client version in parallel...");
938 let (version_result, transport_result) = futures::join!(version_future, transport_future);
939
940 version_result
941 .map_err(|_| anyhow!("Version fetch timed out after {TRANSPORT_CONNECT_TIMEOUT:?}"))?
942 .map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
943 let (transport, mut transport_events) = transport_result.map_err(|_| {
944 anyhow!("Transport connect timed out after {TRANSPORT_CONNECT_TIMEOUT:?}")
945 })??;
946 debug!("Version fetch and transport connection established.");
947
948 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
949
950 let noise_socket = handshake::do_handshake(
951 self.runtime.clone(),
952 &device_snapshot,
953 transport.clone(),
954 &mut transport_events,
955 )
956 .await?;
957
958 *self.transport.lock().await = Some(transport);
959 *self.transport_events.lock().await = Some(transport_events);
960 *self.noise_socket.lock().await = Some(noise_socket);
961 self.is_connected.store(true, Ordering::Release);
962
963 self.socket_ready_notifier.notify(usize::MAX);
965
966 let client_clone = self.clone();
967 self.runtime
968 .spawn(Box::pin(async move { client_clone.keepalive_loop().await }))
969 .detach();
970
971 Ok(())
972 }
973
974 pub async fn disconnect(self: &Arc<Self>) {
975 info!("Disconnecting client intentionally.");
976 self.expected_disconnect.store(true, Ordering::Relaxed);
977 self.is_running.store(false, Ordering::Relaxed);
978 self.shutdown_notifier.notify(usize::MAX);
979
980 if let Err(e) = self.persistence_manager.flush().await {
982 log::error!("Failed to flush device state during disconnect: {e}");
983 }
984
985 if let Some(transport) = self.transport.lock().await.as_ref() {
986 transport.disconnect().await;
987 }
988 self.cleanup_connection_state().await;
989 }
990
991 pub const RECONNECT_BACKOFF_STEP: u32 = 4;
999
1000 pub async fn reconnect(self: &Arc<Self>) {
1012 info!("Reconnecting: dropping transport for auto-reconnect.");
1013 self.auto_reconnect_errors
1014 .store(Self::RECONNECT_BACKOFF_STEP, Ordering::Relaxed);
1015 if let Some(transport) = self.transport.lock().await.as_ref() {
1016 transport.disconnect().await;
1017 }
1018 }
1019
1020 pub async fn reconnect_immediately(self: &Arc<Self>) {
1026 info!("Reconnecting immediately (expected disconnect).");
1027 self.expected_disconnect.store(true, Ordering::Relaxed);
1028 if let Some(transport) = self.transport.lock().await.as_ref() {
1029 transport.disconnect().await;
1030 }
1031 }
1032
1033 async fn cleanup_connection_state(&self) {
1034 self.is_logged_in.store(false, Ordering::Relaxed);
1035 self.is_ready.store(false, Ordering::Relaxed);
1036 self.shutdown_notifier.notify(usize::MAX);
1040 *self.transport.lock().await = None;
1041 *self.transport_events.lock().await = None;
1042 *self.noise_socket.lock().await = None;
1043 self.is_connected.store(false, Ordering::Release);
1047 self.retried_group_messages.invalidate_all();
1048 self.signal_cache.clear().await;
1050 self.swap_message_semaphore(1);
1052 self.last_data_received_ms.store(0, Ordering::Relaxed);
1055 self.last_data_sent_ms.store(0, Ordering::Relaxed);
1056 self.offline_sync_completed.store(false, Ordering::Relaxed);
1058 self.offline_sync_metrics
1059 .active
1060 .store(false, Ordering::Release);
1061 self.offline_sync_metrics
1062 .total_messages
1063 .store(0, Ordering::Release);
1064 self.offline_sync_metrics
1065 .processed_messages
1066 .store(0, Ordering::Release);
1067 match self.offline_sync_metrics.start_time.lock() {
1068 Ok(mut guard) => *guard = None,
1069 Err(poison) => *poison.into_inner() = None,
1070 }
1071 self.server_has_prekeys.store(true, Ordering::Relaxed);
1072 self.history_sync_tasks_in_flight
1073 .store(0, Ordering::Relaxed);
1074 self.history_sync_idle_notifier.notify(usize::MAX);
1075 let mut waiters_map = self.response_waiters.lock().await;
1078 let waiter_count = waiters_map.len();
1079 *waiters_map = HashMap::new();
1082 drop(waiters_map);
1083 if waiter_count > 0 {
1084 debug!(
1085 "Dropping {} orphaned IQ response waiter(s) on disconnect",
1086 waiter_count
1087 );
1088 }
1089
1090 *self.app_state_key_requests.lock().await = HashMap::new();
1093 *self.app_state_syncing.lock().await = HashSet::new();
1094
1095 *self.media_conn.write().await = None;
1097
1098 if let Some(proc) = self.app_state_processor.lock().await.as_ref() {
1100 proc.clear_key_cache().await;
1101 }
1102 }
1103
1104 #[cfg(feature = "debug-diagnostics")]
1111 pub async fn memory_diagnostics(&self) -> MemoryDiagnostics {
1112 let (sig_sessions, sig_identities, sig_sender_keys) =
1113 self.signal_cache.entry_counts().await;
1114 let (lid_lid, lid_pn) = self.lid_pn_cache.entry_counts();
1115
1116 MemoryDiagnostics {
1117 group_cache: self
1118 .group_cache
1119 .lock()
1120 .await
1121 .as_ref()
1122 .map_or(0, |c| c.entry_count()),
1123 device_cache: self
1124 .device_cache
1125 .lock()
1126 .await
1127 .as_ref()
1128 .map_or(0, |c| c.entry_count()),
1129 device_registry_cache: self.device_registry_cache.entry_count(),
1130 lid_pn_lid_entries: lid_lid,
1131 lid_pn_pn_entries: lid_pn,
1132 retried_group_messages: self.retried_group_messages.entry_count(),
1133 recent_messages: self.recent_messages.entry_count(),
1134 message_retry_counts: self.message_retry_counts.entry_count(),
1135 pdo_pending_requests: self.pdo_pending_requests.entry_count(),
1136 session_locks: self.session_locks.entry_count(),
1137 message_queues: self.message_queues.entry_count(),
1138 message_enqueue_locks: self.message_enqueue_locks.entry_count(),
1139 response_waiters: self.response_waiters.lock().await.len(),
1140 node_waiters: self.node_waiter_count.load(Ordering::Relaxed),
1141 pending_retries: self.pending_retries.lock().await.len(),
1142 presence_subscriptions: self.presence_subscriptions.lock().await.len(),
1143 app_state_key_requests: self.app_state_key_requests.lock().await.len(),
1144 app_state_syncing: self.app_state_syncing.lock().await.len(),
1145 signal_cache_sessions: sig_sessions,
1146 signal_cache_identities: sig_identities,
1147 signal_cache_sender_keys: sig_sender_keys,
1148 chatstate_handlers: self.chatstate_handlers.read().await.len(),
1149 custom_enc_handlers: self.custom_enc_handlers.read().await.len(),
1150 }
1151 }
1152
1153 pub(crate) async fn flush_signal_cache(&self) -> Result<(), anyhow::Error> {
1156 let device = self.persistence_manager.get_device_arc().await;
1157 let device_guard = device.read().await;
1158 self.signal_cache
1159 .flush(&*device_guard.backend)
1160 .await
1161 .map_err(|e| anyhow::anyhow!("Failed to flush signal cache: {e}"))
1162 }
1163
1164 async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
1165 debug!("Starting message processing loop...");
1166
1167 let mut rx_guard = self.transport_events.lock().await;
1168 let transport_events = rx_guard
1169 .take()
1170 .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
1171 drop(rx_guard);
1172
1173 let mut frame_decoder = wacore::framing::FrameDecoder::new();
1175
1176 loop {
1177 futures::select_biased! {
1178 _ = self.shutdown_notifier.listen().fuse() => {
1179 debug!("Shutdown signaled in message loop. Exiting message loop.");
1180 return Ok(());
1181 },
1182 event_result = transport_events.recv().fuse() => {
1183 match event_result {
1184 Ok(crate::transport::TransportEvent::DataReceived(data)) => {
1185 self.last_data_received_ms.store(
1187 wacore::time::now_millis() as u64,
1188 Ordering::Relaxed,
1189 );
1190
1191 frame_decoder.feed(&data);
1193
1194 let mut frames_in_batch: u32 = 0;
1198
1199 while let Some(encrypted_frame) = frame_decoder.decode_frame() {
1200 if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
1202 let process_inline = matches!(
1210 node.tag.as_ref(),
1211 "success" | "failure" | "stream:error" | "message" | "ib"
1212 );
1213
1214 if process_inline {
1215 self.process_decrypted_node(node).await;
1216 } else {
1217 let client = self.clone();
1218 self.runtime.spawn(Box::pin(async move {
1219 client.process_decrypted_node(node).await;
1220 })).detach();
1221 }
1222 }
1223
1224 if self.expected_disconnect.load(Ordering::Relaxed) {
1226 debug!("Expected disconnect signaled during frame processing. Exiting message loop.");
1227 return Ok(());
1228 }
1229
1230 frames_in_batch += 1;
1232 if frames_in_batch.is_multiple_of(self.runtime.yield_frequency())
1233 && let Some(yield_fut) = self.runtime.yield_now()
1234 {
1235 yield_fut.await;
1236 }
1237 }
1238 },
1239 Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
1240 self.cleanup_connection_state().await;
1241 if !self.expected_disconnect.load(Ordering::Relaxed) {
1242 self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
1243 debug!("Transport disconnected unexpectedly.");
1244 return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
1245 } else {
1246 debug!("Transport disconnected as expected.");
1247 return Ok(());
1248 }
1249 }
1250 Ok(crate::transport::TransportEvent::Connected) => {
1251 debug!("Transport connected event received");
1253 }
1254 }
1255 }
1256 }
1257 }
1258 }
1259
1260 pub(crate) async fn decrypt_frame(
1263 self: &Arc<Self>,
1264 encrypted_frame: &bytes::Bytes,
1265 ) -> Option<wacore_binary::node::Node> {
1266 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1267 let noise_socket = match noise_socket_arc {
1268 Some(s) => s,
1269 None => {
1270 log::error!("Cannot process frame: not connected (no noise socket)");
1271 return None;
1272 }
1273 };
1274
1275 let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
1276 Ok(p) => p,
1277 Err(e) => {
1278 log::error!("Failed to decrypt frame: {e}");
1279 return None;
1280 }
1281 };
1282
1283 let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
1284 Ok(data) => data,
1285 Err(e) => {
1286 log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
1287 return None;
1288 }
1289 };
1290
1291 match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
1292 Ok(node_ref) => Some(node_ref.to_owned()),
1293 Err(e) => {
1294 log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
1295 None
1296 }
1297 }
1298 }
1299
1300 pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
1304 let node_arc = Arc::new(node);
1306 self.process_node(node_arc).await;
1307 }
1308
1309 pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
1311 use wacore::xml::DisplayableNode;
1312
1313 if node.tag.as_ref() == "ib" {
1315 if let Some(preview) = node.get_optional_child("offline_preview") {
1317 let count: usize = preview
1318 .attrs
1319 .get("count")
1320 .and_then(|v| v.as_str().parse().ok())
1321 .unwrap_or(0);
1322
1323 if count == 0 {
1324 self.offline_sync_metrics
1325 .active
1326 .store(false, Ordering::Release);
1327 debug!(target: "Client/OfflineSync", "Sync COMPLETED: 0 items.");
1328 } else {
1329 self.offline_sync_metrics
1331 .total_messages
1332 .store(count, Ordering::Release);
1333 self.offline_sync_metrics
1334 .processed_messages
1335 .store(0, Ordering::Release);
1336 self.offline_sync_metrics
1337 .active
1338 .store(true, Ordering::Release);
1339 match self.offline_sync_metrics.start_time.lock() {
1340 Ok(mut guard) => *guard = Some(wacore::time::Instant::now()),
1341 Err(poison) => *poison.into_inner() = Some(wacore::time::Instant::now()),
1342 }
1343 debug!(target: "Client/OfflineSync", "Sync STARTED: Expecting {} items.", count);
1344 }
1345 } else if self.offline_sync_metrics.active.load(Ordering::Acquire)
1346 && node.get_optional_child("offline").is_some()
1347 {
1348 let processed = self
1352 .offline_sync_metrics
1353 .processed_messages
1354 .load(Ordering::Acquire);
1355 let elapsed = match self.offline_sync_metrics.start_time.lock() {
1356 Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1357 Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1358 };
1359 debug!(target: "Client/OfflineSync", "Sync COMPLETED: End marker received. Processed {} items in {:.2?}.", processed, elapsed);
1360 self.offline_sync_metrics
1361 .active
1362 .store(false, Ordering::Release);
1363 }
1364 }
1365
1366 if self.offline_sync_metrics.active.load(Ordering::Acquire) {
1368 if node.attrs.contains_key("offline") {
1370 let processed = self
1371 .offline_sync_metrics
1372 .processed_messages
1373 .fetch_add(1, Ordering::Release)
1374 + 1;
1375 let total = self
1376 .offline_sync_metrics
1377 .total_messages
1378 .load(Ordering::Acquire);
1379
1380 if processed.is_multiple_of(50) || processed == total {
1381 trace!(target: "Client/OfflineSync", "Sync Progress: {}/{}", processed, total);
1382 }
1383
1384 if processed >= total {
1385 let elapsed = match self.offline_sync_metrics.start_time.lock() {
1386 Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1387 Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1388 };
1389 debug!(target: "Client/OfflineSync", "Sync COMPLETED: Processed {} items in {:.2?}.", processed, elapsed);
1390 self.offline_sync_metrics
1391 .active
1392 .store(false, Ordering::Release);
1393 }
1394 }
1395 }
1396 if node.tag.as_ref() == "iq"
1399 && let Some(sync_node) = node.get_optional_child("sync")
1400 && let Some(collection_node) = sync_node.get_optional_child("collection")
1401 {
1402 let name = collection_node.attrs().optional_string("name");
1403 let name = name.as_deref().unwrap_or("<unknown>");
1404 debug!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
1405 } else {
1406 debug!(target: "Client/Recv","{}", DisplayableNode(&node));
1407 }
1408
1409 let mut cancelled = false;
1411
1412 if node.tag.as_ref() == "xmlstreamend" {
1413 if self.expected_disconnect.load(Ordering::Relaxed) {
1414 debug!("Received <xmlstreamend/>, expected disconnect.");
1415 } else {
1416 warn!("Received <xmlstreamend/>, treating as disconnect.");
1417 }
1418 self.shutdown_notifier.notify(usize::MAX);
1419 return;
1420 }
1421
1422 if self.node_waiter_count.load(Ordering::Relaxed) > 0 {
1424 self.resolve_node_waiters(&node);
1425 }
1426
1427 if node.tag.as_ref() == "iq"
1428 && let Some(id) = node.attrs.get("id").map(|v| v.as_str())
1429 {
1430 let has_waiter = self.response_waiters.lock().await.contains_key(id.as_ref());
1431 if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
1432 return;
1433 }
1434 }
1435
1436 if !self
1439 .stanza_router
1440 .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
1441 .await
1442 {
1443 warn!(
1444 "Received unknown top-level node: {}",
1445 DisplayableNode(&node)
1446 );
1447 }
1448
1449 if self.should_ack(&node) && !cancelled {
1451 self.maybe_deferred_ack(node).await;
1452 }
1453 }
1454
1455 fn should_ack(&self, node: &Node) -> bool {
1457 matches!(
1458 node.tag.as_ref(),
1459 "message" | "receipt" | "notification" | "call"
1460 ) && node.attrs.contains_key("id")
1461 && node.attrs.contains_key("from")
1462 }
1463
1464 async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
1468 if self.synchronous_ack {
1469 if let Err(e) = self.send_ack_for(&node).await {
1470 warn!("Failed to send ack: {e:?}");
1471 }
1472 } else {
1473 let this = self.clone();
1474 self.runtime
1476 .spawn(Box::pin(async move {
1477 if let Err(e) = this.send_ack_for(&node).await {
1478 warn!("Failed to send ack: {e:?}");
1479 }
1480 }))
1481 .detach();
1482 }
1483 }
1484
1485 async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
1487 if self.expected_disconnect.load(Ordering::Relaxed) {
1488 return Ok(());
1489 }
1490 if !self.is_connected() {
1491 return Err(ClientError::NotConnected);
1492 }
1493 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1494 let ack = match build_ack_node(node, device_snapshot.pn.as_ref()) {
1495 Some(ack) => ack,
1496 None => return Ok(()),
1497 };
1498 self.send_node(ack).await
1499 }
1500
1501 pub(crate) async fn handle_unimplemented(&self, tag: &str) {
1502 warn!("TODO: Implement handler for <{tag}>");
1503 }
1504
1505 pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
1506 use wacore::iq::passive::PassiveModeSpec;
1507 self.execute(PassiveModeSpec::new(passive)).await
1508 }
1509
1510 pub async fn clean_dirty_bits(
1511 &self,
1512 type_: &str,
1513 timestamp: Option<&str>,
1514 ) -> Result<(), crate::request::IqError> {
1515 use wacore::iq::dirty::CleanDirtyBitsSpec;
1516
1517 let spec = CleanDirtyBitsSpec::single(type_, timestamp)?;
1518 self.execute(spec).await
1519 }
1520
1521 pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
1522 use wacore::iq::props::PropsSpec;
1523 use wacore::store::commands::DeviceCommand;
1524
1525 let stored_hash = self
1526 .persistence_manager
1527 .get_device_snapshot()
1528 .await
1529 .props_hash
1530 .clone();
1531
1532 let spec = match &stored_hash {
1533 Some(hash) => {
1534 debug!("Fetching props with hash for delta update...");
1535 PropsSpec::with_hash(hash)
1536 }
1537 None => {
1538 debug!("Fetching props (full, no stored hash)...");
1539 PropsSpec::new()
1540 }
1541 };
1542
1543 let response = self.execute(spec).await?;
1544
1545 if response.delta_update {
1546 debug!(
1547 "Props delta update received ({} changed props)",
1548 response.props.len()
1549 );
1550 } else {
1551 debug!(
1552 "Props full update received ({} props, hash={:?})",
1553 response.props.len(),
1554 response.hash
1555 );
1556 }
1557
1558 if let Some(new_hash) = response.hash {
1559 self.persistence_manager
1560 .process_command(DeviceCommand::SetPropsHash(Some(new_hash)))
1561 .await;
1562 }
1563
1564 Ok(())
1565 }
1566
1567 pub async fn fetch_privacy_settings(
1568 &self,
1569 ) -> Result<wacore::iq::privacy::PrivacySettingsResponse, crate::request::IqError> {
1570 use wacore::iq::privacy::PrivacySettingsSpec;
1571
1572 debug!("Fetching privacy settings...");
1573
1574 self.execute(PrivacySettingsSpec::new()).await
1575 }
1576
1577 pub async fn set_privacy_setting(
1579 &self,
1580 category: &str,
1581 value: &str,
1582 ) -> Result<(), crate::request::IqError> {
1583 use wacore::iq::privacy::SetPrivacySettingSpec;
1584 self.execute(SetPrivacySettingSpec::new(category, value))
1585 .await
1586 }
1587
1588 pub async fn set_default_disappearing_mode(
1590 &self,
1591 duration: u32,
1592 ) -> Result<(), crate::request::IqError> {
1593 use wacore::iq::privacy::SetDefaultDisappearingModeSpec;
1594 self.execute(SetDefaultDisappearingModeSpec::new(duration))
1595 .await
1596 }
1597
1598 pub async fn get_business_profile(
1600 &self,
1601 jid: &wacore_binary::jid::Jid,
1602 ) -> Result<Option<wacore::iq::business::BusinessProfile>, crate::request::IqError> {
1603 use wacore::iq::business::BusinessProfileSpec;
1604 self.execute(BusinessProfileSpec::new(jid)).await
1605 }
1606
1607 pub async fn reject_call(
1609 &self,
1610 call_id: &str,
1611 call_from: &wacore_binary::jid::Jid,
1612 ) -> Result<(), anyhow::Error> {
1613 anyhow::ensure!(!call_id.is_empty(), "call_id cannot be empty");
1614 let id = self.generate_request_id();
1615
1616 let stanza = wacore_binary::builder::NodeBuilder::new("call")
1617 .attr("to", call_from.clone())
1618 .attr("id", id)
1619 .children([wacore_binary::builder::NodeBuilder::new("reject")
1620 .attr("call-id", call_id)
1621 .attr("call-creator", call_from.clone())
1622 .attr("count", "0")
1623 .build()])
1624 .build();
1625
1626 self.send_node(stanza).await?;
1627 Ok(())
1628 }
1629
1630 pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
1631 use wacore::iq::prekeys::DigestKeyBundleSpec;
1632
1633 debug!("Sending digest key bundle...");
1634
1635 self.execute(DigestKeyBundleSpec::new()).await.map(|_| ())
1636 }
1637
1638 pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
1639 if self.expected_disconnect.load(Ordering::Relaxed) {
1643 debug!("Ignoring <success> stanza: expected disconnect pending");
1644 return;
1645 }
1646
1647 if self.is_logged_in.swap(true, Ordering::SeqCst) {
1650 debug!("Ignoring duplicate <success> stanza (already logged in)");
1651 return;
1652 }
1653
1654 let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
1657
1658 info!(
1659 "Successfully authenticated with WhatsApp servers! (gen={})",
1660 current_generation
1661 );
1662 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
1663
1664 self.update_server_time_offset(node);
1665
1666 if let Some(lid_value) = node.attrs.get("lid") {
1667 if let Some(lid) = lid_value.to_jid() {
1668 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1669 if device_snapshot.lid.as_ref() != Some(&lid) {
1670 debug!("Updating LID from server to '{lid}'");
1671 self.persistence_manager
1672 .process_command(DeviceCommand::SetLid(Some(lid)))
1673 .await;
1674 }
1675 } else {
1676 warn!("Failed to parse LID from success stanza: {lid_value}");
1677 }
1678 } else {
1679 warn!("LID not found in <success> stanza. Group messaging may fail.");
1680 }
1681
1682 let client_clone = self.clone();
1683 let task_generation = current_generation;
1684 self.runtime.spawn(Box::pin(async move {
1685 macro_rules! check_generation {
1687 () => {
1688 if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
1689 {
1690 debug!("Post-login task cancelled: connection generation changed");
1691 return;
1692 }
1693 };
1694 }
1695
1696 debug!(
1697 "Starting post-login initialization sequence (gen={})...",
1698 task_generation
1699 );
1700
1701 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1704 let needs_pushname_from_sync = device_snapshot.push_name.is_empty();
1705 if needs_pushname_from_sync {
1706 debug!("Push name is empty - will be set from app state sync (setting_pushName)");
1707 }
1708
1709 if !client_clone.is_connected() {
1713 debug!(
1714 "Skipping post-login init: connection closed (likely pairing phase reconnect)"
1715 );
1716 return;
1717 }
1718
1719 check_generation!();
1720 client_clone.send_unified_session().await;
1721
1722 check_generation!();
1727 if let Err(e) = client_clone
1728 .establish_primary_phone_session_immediate()
1729 .await
1730 {
1731 warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
1732 }
1734
1735 check_generation!();
1738 if let Err(e) = client_clone.upload_pre_keys(false).await {
1739 warn!("Failed to upload pre-keys during startup: {e:?}");
1740 }
1741
1742 check_generation!();
1746 if let Err(e) = client_clone.set_passive(false).await {
1747 warn!("Failed to send post-connect active IQ: {e:?}");
1748 }
1749
1750 client_clone.wait_for_offline_delivery_end().await;
1753
1754 check_generation!();
1756
1757 check_generation!();
1759 if !client_clone.is_connected() {
1760 debug!("Skipping presence: connection closed");
1761 return;
1762 }
1763
1764 let bg_client = client_clone.clone();
1766 let bg_generation = task_generation;
1767 client_clone.runtime.spawn(Box::pin(async move {
1768 if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1770 debug!("Skipping background init queries: connection generation changed");
1771 return;
1772 }
1773 if !bg_client.is_connected() {
1774 debug!("Skipping background init queries: connection closed");
1775 return;
1776 }
1777
1778 debug!(
1779 "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1780 );
1781
1782 let props_fut = bg_client.fetch_props();
1783 let binding = bg_client.blocking();
1784 let blocklist_fut = binding.get_blocklist();
1785 let privacy_fut = bg_client.fetch_privacy_settings();
1786 let digest_fut = bg_client.validate_digest_key();
1787
1788 let (r_props, r_block, r_priv, r_digest) =
1789 futures::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1790
1791 if let Err(e) = r_props {
1792 warn!("Background init: Failed to fetch props: {e:?}");
1793 }
1794 if let Err(e) = r_block {
1795 warn!("Background init: Failed to fetch blocklist: {e:?}");
1796 }
1797 if let Err(e) = r_priv {
1798 warn!("Background init: Failed to fetch privacy settings: {e:?}");
1799 }
1800 if let Err(e) = r_digest {
1801 warn!("Background init: Failed to validate digest key: {e:?}");
1802 }
1803
1804 if let Err(e) = bg_client.tc_token().prune_expired().await {
1806 warn!("Background init: Failed to prune expired tc_tokens: {e:?}");
1807 }
1808 })).detach();
1809
1810 check_generation!();
1811
1812 let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1813 let needs_initial_sync = flag_set || needs_pushname_from_sync;
1814
1815 if needs_initial_sync {
1816 debug!(
1820 target: "Client/AppState",
1821 "Starting Initial App State Sync (flag_set={flag_set}, needs_pushname={needs_pushname_from_sync})"
1822 );
1823
1824 if !client_clone
1825 .initial_app_state_keys_received
1826 .load(Ordering::Relaxed)
1827 {
1828 debug!(
1829 target: "Client/AppState",
1830 "Waiting up to 5s for app state keys..."
1831 );
1832 let _ = rt_timeout(
1833 &*client_clone.runtime,
1834 Duration::from_secs(5),
1835 client_clone.initial_keys_synced_notifier.listen(),
1836 )
1837 .await;
1838
1839 check_generation!();
1841 }
1842
1843 const CRITICAL_SYNC_TIMEOUT_SECS: u64 = 180;
1848 let timeout_client = client_clone.clone();
1849 let timeout_generation = task_generation;
1850 let timeout_rt = client_clone.runtime.clone();
1851 let critical_sync_timeout_handle = timeout_rt.spawn(Box::pin(async move {
1852 timeout_client.runtime.sleep(Duration::from_secs(CRITICAL_SYNC_TIMEOUT_SECS)).await;
1853 if timeout_client.connection_generation.load(Ordering::SeqCst)
1855 != timeout_generation
1856 {
1857 return;
1858 }
1859 let push_name = timeout_client.get_push_name().await;
1862 if push_name.is_empty() {
1863 warn!(
1864 target: "Client/AppState",
1865 "Critical app state sync timed out after {CRITICAL_SYNC_TIMEOUT_SECS}s \
1866 (push_name not synced). Reconnecting to retry."
1867 );
1868 timeout_client.reconnect_immediately().await;
1872 } else {
1873 debug!(
1874 target: "Client/AppState",
1875 "Critical sync timeout fired but push_name was already synced"
1876 );
1877 }
1878 }));
1879
1880 check_generation!();
1882 match client_clone
1883 .sync_collections_batched(vec![
1884 WAPatchName::CriticalBlock,
1885 WAPatchName::CriticalUnblockLow,
1886 ])
1887 .await
1888 {
1889 Ok(()) => {
1890 critical_sync_timeout_handle.abort();
1892
1893 check_generation!();
1894
1895 client_clone
1896 .resubscribe_presence_subscriptions(task_generation)
1897 .await;
1898
1899 check_generation!();
1900
1901 client_clone.dispatch_connected();
1906 }
1907 Err(e) => {
1908 client_clone.log_sync_error("critical app state sync", &e);
1909 return;
1913 }
1914 }
1915
1916 let sync_client = client_clone.clone();
1918 let sync_generation = task_generation;
1919 client_clone.runtime.spawn(Box::pin(async move {
1920 if sync_client.connection_generation.load(Ordering::SeqCst) != sync_generation {
1921 debug!("App state sync cancelled: connection generation changed");
1922 return;
1923 }
1924
1925 if let Err(e) = sync_client
1926 .sync_collections_batched(vec![
1927 WAPatchName::RegularLow,
1928 WAPatchName::RegularHigh,
1929 WAPatchName::Regular,
1930 ])
1931 .await
1932 {
1933 sync_client.log_sync_error("non-critical app state sync", &e);
1934 }
1935
1936 sync_client
1937 .needs_initial_full_sync
1938 .store(false, Ordering::Relaxed);
1939 debug!(target: "Client/AppState", "Initial App State Sync Completed.");
1940 })).detach();
1941 } else {
1942 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1945 if !device_snapshot.push_name.is_empty() {
1946 if let Err(e) = client_clone.presence().set_available().await {
1947 warn!("Failed to send initial presence: {e:?}");
1948 } else {
1949 debug!("Initial presence sent successfully.");
1950 }
1951 }
1952
1953 client_clone
1954 .resubscribe_presence_subscriptions(task_generation)
1955 .await;
1956
1957 check_generation!();
1960
1961 client_clone.dispatch_connected();
1962 }
1963 })).detach();
1964 }
1965
1966 pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1971 let id_opt = node.attrs.get("id").map(|v| v.as_str().into_owned());
1972 if let Some(id) = id_opt
1973 && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1974 {
1975 if waiter.send(node).is_err() {
1976 warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1977 }
1978 return true;
1979 }
1980 false
1981 }
1982
1983 #[allow(dead_code)] pub(crate) async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1985 {
1989 let mut syncing = self.app_state_syncing.lock().await;
1990 if !syncing.insert(name) {
1991 debug!(target: "Client/AppState", "Skipping sync for {:?}: already in flight", name);
1992 return Ok(());
1993 }
1994 }
1995
1996 let result = self.fetch_app_state_with_retry_inner(name).await;
1997
1998 self.app_state_syncing.lock().await.remove(&name);
2000
2001 result
2002 }
2003
2004 #[allow(dead_code)]
2005 async fn fetch_app_state_with_retry_inner(&self, name: WAPatchName) -> anyhow::Result<()> {
2006 let mut attempt = 0u32;
2007 loop {
2008 attempt += 1;
2009 let res = self.process_app_state_sync_task(name, false).await;
2013 match res {
2014 Ok(()) => return Ok(()),
2015 Err(e) => {
2016 if e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
2017 .is_some_and(|ase| {
2018 matches!(ase, crate::appstate_sync::AppStateSyncError::KeyNotFound(_))
2019 })
2020 && attempt == 1
2021 {
2022 if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
2023 debug!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
2024 if rt_timeout(
2025 &*self.runtime,
2026 Duration::from_secs(10),
2027 self.initial_keys_synced_notifier.listen(),
2028 )
2029 .await
2030 .is_err()
2031 {
2032 warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
2033 }
2034 }
2035 continue;
2036 }
2037 let is_db_locked = e.downcast_ref::<wacore::store::error::StoreError>()
2038 .is_some_and(|se| matches!(se, wacore::store::error::StoreError::Database(msg) if msg.contains("locked") || msg.contains("busy")))
2039 || e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
2040 .is_some_and(|ase| matches!(ase, crate::appstate_sync::AppStateSyncError::Store(wacore::store::error::StoreError::Database(msg)) if msg.contains("locked") || msg.contains("busy")));
2041 if is_db_locked && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
2042 let backoff = Duration::from_millis(200 * attempt as u64 + 150);
2043 warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
2044 self.runtime.sleep(backoff).await;
2045 continue;
2046 }
2047 return Err(e);
2048 }
2049 }
2050 }
2051 }
2052
2053 pub(crate) async fn sync_collections_batched(
2057 &self,
2058 collections: Vec<WAPatchName>,
2059 ) -> anyhow::Result<()> {
2060 if collections.is_empty() {
2061 return Ok(());
2062 }
2063
2064 let pending = {
2066 let mut syncing = self.app_state_syncing.lock().await;
2067 let mut filtered = Vec::with_capacity(collections.len());
2068 for name in collections {
2069 if syncing.insert(name) {
2070 filtered.push(name);
2071 } else {
2072 debug!(target: "Client/AppState", "Skipping {:?} in batch: already in flight", name);
2073 }
2074 }
2075 filtered
2076 };
2077
2078 if pending.is_empty() {
2079 return Ok(());
2080 }
2081
2082 let all_collections: Vec<WAPatchName> = pending.clone();
2084
2085 let result = self.sync_collections_batched_inner(pending).await;
2086
2087 {
2089 let mut syncing = self.app_state_syncing.lock().await;
2090 for name in &all_collections {
2091 syncing.remove(name);
2092 }
2093 }
2094
2095 result
2096 }
2097
2098 async fn sync_collections_batched_inner(
2099 &self,
2100 mut pending: Vec<WAPatchName>,
2101 ) -> anyhow::Result<()> {
2102 use wacore::appstate::patch_decode::CollectionSyncError;
2103 const MAX_ITERATIONS: usize = 5;
2104 let mut iteration = 0;
2105
2106 while !pending.is_empty() && iteration < MAX_ITERATIONS {
2107 iteration += 1;
2108 debug!(
2109 target: "Client/AppState",
2110 "Batched sync iteration {}/{}: {:?}",
2111 iteration, MAX_ITERATIONS, pending
2112 );
2113
2114 let backend = self.persistence_manager.backend();
2115
2116 let mut collection_nodes = Vec::with_capacity(pending.len());
2118 let mut was_snapshot = std::collections::HashSet::new();
2119 for &name in &pending {
2120 let state = backend.get_version(name.as_str()).await?;
2121 let want_snapshot = state.version == 0;
2122 if want_snapshot {
2123 was_snapshot.insert(name);
2124 }
2125 let mut builder = NodeBuilder::new("collection")
2126 .attr("name", name.as_str())
2127 .attr(
2128 "return_snapshot",
2129 if want_snapshot { "true" } else { "false" },
2130 );
2131 if !want_snapshot {
2132 builder = builder.attr("version", state.version.to_string());
2133 }
2134 collection_nodes.push(builder.build());
2135 }
2136
2137 let sync_node = NodeBuilder::new("sync").children(collection_nodes).build();
2138 let iq = crate::request::InfoQuery {
2139 namespace: "w:sync:app:state",
2140 query_type: crate::request::InfoQueryType::Set,
2141 to: server_jid().clone(),
2142 target: None,
2143 id: None,
2144 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2145 timeout: Some(Duration::from_secs(30)),
2146 };
2147
2148 let resp = self.send_iq(iq).await?;
2149
2150 let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2152 std::collections::HashMap::new();
2153
2154 if let Ok(patch_lists) = wacore::appstate::patch_decode::parse_patch_lists(&resp) {
2155 for pl in &patch_lists {
2156 if let Some(ext) = &pl.snapshot_ref
2158 && let Some(path) = &ext.direct_path
2159 {
2160 match self.download(ext).await {
2161 Ok(bytes) => {
2162 pre_downloaded.insert(path.clone(), bytes);
2163 }
2164 Err(e) => {
2165 warn!(
2166 "Failed to download external snapshot for {:?}: {e}",
2167 pl.name
2168 );
2169 }
2170 }
2171 }
2172
2173 for patch in &pl.patches {
2175 if let Some(ext) = &patch.external_mutations
2176 && let Some(path) = &ext.direct_path
2177 {
2178 match self.download(ext).await {
2179 Ok(bytes) => {
2180 pre_downloaded.insert(path.clone(), bytes);
2181 }
2182 Err(e) => {
2183 let v =
2184 patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2185 warn!(
2186 "Failed to download external mutations for patch v{}: {e}",
2187 v
2188 );
2189 }
2190 }
2191 }
2192 }
2193 }
2194 }
2195
2196 let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2197 if let Some(path) = &ext.direct_path {
2198 if let Some(bytes) = pre_downloaded.get(path) {
2199 Ok(bytes.clone())
2200 } else {
2201 Err(anyhow::anyhow!(
2202 "external blob not pre-downloaded: {}",
2203 path
2204 ))
2205 }
2206 } else {
2207 Err(anyhow::anyhow!("external blob has no directPath"))
2208 }
2209 };
2210
2211 let proc = self.get_app_state_processor().await;
2213 let results = proc.decode_multi_patch_list(&resp, &download, true).await?;
2214
2215 let mut needs_refetch = Vec::new();
2216
2217 for (mutations, new_state, list) in results {
2218 let name = list.name;
2219
2220 if let Some(ref err) = list.error {
2222 match err {
2223 CollectionSyncError::Conflict { has_more } => {
2224 if *has_more {
2225 warn!(target: "Client/AppState", "Collection {:?} conflict (has_more=true), will refetch", name);
2227 needs_refetch.push(name);
2228 } else {
2229 debug!(target: "Client/AppState", "Collection {:?} conflict (has_more=false), treating as success (no pending mutations)", name);
2233 }
2234 continue;
2235 }
2236 CollectionSyncError::Fatal { code, text } => {
2237 warn!(target: "Client/AppState", "Collection {:?} fatal error {}: {}", name, code, text);
2238 continue;
2239 }
2240 CollectionSyncError::Retry { code, text } => {
2241 warn!(target: "Client/AppState", "Collection {:?} retryable error {}: {}, will refetch", name, code, text);
2242 needs_refetch.push(name);
2243 continue;
2244 }
2245 }
2246 }
2247
2248 let missing = match proc.get_missing_key_ids(&list).await {
2250 Ok(v) => v,
2251 Err(e) => {
2252 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2253 Vec::new()
2254 }
2255 };
2256 if !missing.is_empty() {
2257 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2258 let mut guard = self.app_state_key_requests.lock().await;
2259 let now = wacore::time::Instant::now();
2260 for key_id in missing {
2261 let hex_id = hex::encode(&key_id);
2262 let should = guard
2263 .get(&hex_id)
2264 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2265 .unwrap_or(true);
2266 if should {
2267 guard.insert(hex_id, now);
2268 to_request.push(key_id);
2269 }
2270 }
2271 guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2273 drop(guard);
2274 if !to_request.is_empty() {
2275 self.request_app_state_keys(&to_request).await;
2276 }
2277 }
2278
2279 let full_sync = was_snapshot.contains(&name);
2283 for m in mutations {
2284 self.dispatch_app_state_mutation(&m, full_sync).await;
2285 }
2286
2287 backend
2289 .set_version(name.as_str(), new_state.clone())
2290 .await?;
2291
2292 if list.has_more_patches {
2294 needs_refetch.push(name);
2295 }
2296
2297 debug!(
2298 target: "Client/AppState",
2299 "Batched sync: {:?} done (version={}, has_more={})",
2300 name, new_state.version, list.has_more_patches
2301 );
2302 }
2303
2304 pending = needs_refetch;
2305 }
2306
2307 if !pending.is_empty() {
2308 warn!(
2309 target: "Client/AppState",
2310 "Batched sync: max iterations ({}) reached for {:?}",
2311 MAX_ITERATIONS, pending
2312 );
2313 }
2314
2315 Ok(())
2316 }
2317
2318 pub(crate) async fn process_app_state_sync_task(
2319 &self,
2320 name: WAPatchName,
2321 full_sync: bool,
2322 ) -> anyhow::Result<()> {
2323 if self.is_shutting_down() {
2324 debug!(target: "Client/AppState", "Skipping app state sync task {:?}: client is shutting down", name);
2325 return Ok(());
2326 }
2327
2328 let backend = self.persistence_manager.backend();
2329 let mut full_sync = full_sync;
2330
2331 let mut state = backend.get_version(name.as_str()).await?;
2332 if state.version == 0 {
2333 full_sync = true;
2334 }
2335
2336 let mut has_more = true;
2337 let mut want_snapshot = full_sync;
2338 const MAX_PAGINATION_ITERATIONS: u32 = 500;
2341 let mut iteration = 0u32;
2342
2343 while has_more {
2344 if self.is_shutting_down() {
2345 debug!(target: "Client/AppState", "Stopping app state sync task {:?}: shutdown detected", name);
2346 break;
2347 }
2348 iteration += 1;
2349 if iteration > MAX_PAGINATION_ITERATIONS {
2350 warn!(target: "Client/AppState", "App state sync for {:?} exceeded {} iterations, aborting", name, MAX_PAGINATION_ITERATIONS);
2351 break;
2352 }
2353 debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
2354
2355 let mut collection_builder = NodeBuilder::new("collection")
2356 .attr("name", name.as_str())
2357 .attr(
2358 "return_snapshot",
2359 if want_snapshot { "true" } else { "false" },
2360 );
2361 if !want_snapshot {
2362 collection_builder = collection_builder.attr("version", state.version.to_string());
2363 }
2364 let sync_node = NodeBuilder::new("sync")
2365 .children([collection_builder.build()])
2366 .build();
2367 let iq = crate::request::InfoQuery {
2368 namespace: "w:sync:app:state",
2369 query_type: crate::request::InfoQueryType::Set,
2370 to: server_jid().clone(),
2371 target: None,
2372 id: None,
2373 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2374 timeout: None,
2375 };
2376
2377 let resp = self.send_iq(iq).await?;
2378 if self.is_shutting_down() {
2379 debug!(target: "Client/AppState", "Discarding app state sync response for {:?}: shutdown detected", name);
2380 break;
2381 }
2382 debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
2383
2384 let _decode_start = wacore::time::Instant::now();
2385
2386 let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2389 std::collections::HashMap::new();
2390
2391 if let Ok(pl) = wacore::appstate::patch_decode::parse_patch_list(&resp) {
2392 debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={} patches_count={}",
2393 name, pl.snapshot_ref.is_some(), pl.has_more_patches, pl.patches.len());
2394
2395 if let Some(ext) = &pl.snapshot_ref
2397 && let Some(path) = &ext.direct_path
2398 {
2399 match self.download(ext).await {
2400 Ok(bytes) => {
2401 debug!(target: "Client/AppState", "Downloaded external snapshot ({} bytes)", bytes.len());
2402 pre_downloaded.insert(path.clone(), bytes);
2403 }
2404 Err(e) => {
2405 warn!("Failed to download external snapshot: {e}");
2406 }
2407 }
2408 }
2409
2410 for patch in &pl.patches {
2412 if let Some(ext) = &patch.external_mutations
2413 && let Some(path) = &ext.direct_path
2414 {
2415 let patch_version =
2416 patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2417 match self.download(ext).await {
2418 Ok(bytes) => {
2419 debug!(target: "Client/AppState", "Downloaded external mutations for patch v{} ({} bytes)", patch_version, bytes.len());
2420 pre_downloaded.insert(path.clone(), bytes);
2421 }
2422 Err(e) => {
2423 warn!(
2424 "Failed to download external mutations for patch v{}: {e}",
2425 patch_version
2426 );
2427 }
2428 }
2429 }
2430 }
2431 }
2432
2433 let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2434 if let Some(path) = &ext.direct_path {
2435 if let Some(bytes) = pre_downloaded.get(path) {
2436 Ok(bytes.clone())
2437 } else {
2438 Err(anyhow::anyhow!(
2439 "external blob not pre-downloaded: {}",
2440 path
2441 ))
2442 }
2443 } else {
2444 Err(anyhow::anyhow!("external blob has no directPath"))
2445 }
2446 };
2447
2448 let proc = self.get_app_state_processor().await;
2449 let (mutations, new_state, list) =
2450 proc.decode_patch_list(&resp, &download, true).await?;
2451 let decode_elapsed = _decode_start.elapsed();
2452 if decode_elapsed.as_millis() > 500 {
2453 debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
2454 }
2455
2456 let missing = match proc.get_missing_key_ids(&list).await {
2457 Ok(v) => v,
2458 Err(e) => {
2459 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2460 Vec::new()
2461 }
2462 };
2463 if !missing.is_empty() {
2464 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2465 let mut guard = self.app_state_key_requests.lock().await;
2466 let now = wacore::time::Instant::now();
2467 for key_id in missing {
2468 let hex_id = hex::encode(&key_id);
2469 let should = guard
2470 .get(&hex_id)
2471 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2472 .unwrap_or(true);
2473 if should {
2474 guard.insert(hex_id, now);
2475 to_request.push(key_id);
2476 }
2477 }
2478 guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2480 drop(guard);
2481 if !to_request.is_empty() {
2482 self.request_app_state_keys(&to_request).await;
2483 }
2484 }
2485
2486 for m in mutations {
2487 debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
2488 self.dispatch_app_state_mutation(&m, full_sync).await;
2489 }
2490
2491 state = new_state;
2492 has_more = list.has_more_patches;
2493 want_snapshot = false;
2495 debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more} new_version={}", name, state.version);
2496 }
2497
2498 backend.set_version(name.as_str(), state.clone()).await?;
2499
2500 debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
2501 Ok(())
2502 }
2503
2504 async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
2505 if raw_key_ids.is_empty() {
2506 return;
2507 }
2508 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2509 let own_jid = match device_snapshot.pn.clone() {
2510 Some(j) => j,
2511 None => return,
2512 };
2513 let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
2514 .iter()
2515 .map(|k| wa::message::AppStateSyncKeyId {
2516 key_id: Some(k.clone()),
2517 })
2518 .collect();
2519 let msg = wa::Message {
2520 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
2521 r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
2522 app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
2523 ..Default::default()
2524 })),
2525 ..Default::default()
2526 };
2527 if let Err(e) = self
2528 .send_message_impl(
2529 own_jid,
2530 &msg,
2531 Some(self.generate_message_id().await),
2532 true,
2533 false,
2534 None,
2535 vec![],
2536 )
2537 .await
2538 {
2539 warn!("Failed to send app state key request: {e}");
2540 }
2541 }
2542
2543 pub(crate) async fn send_app_state_patch(
2547 &self,
2548 collection_name: &str,
2549 mutations: Vec<(wa::SyncdMutation, Vec<u8>)>,
2550 ) -> Result<()> {
2551 let proc = self.get_app_state_processor().await;
2552 let (patch_bytes, base_version) = proc.build_patch(collection_name, mutations).await?;
2553
2554 let collection_node = NodeBuilder::new("collection")
2555 .attr("name", collection_name)
2556 .attr("version", base_version.to_string())
2557 .attr("return_snapshot", "false")
2558 .children([NodeBuilder::new("patch").bytes(patch_bytes).build()])
2559 .build();
2560 let sync_node = NodeBuilder::new("sync").children([collection_node]).build();
2561 let iq = crate::request::InfoQuery {
2562 namespace: "w:sync:app:state",
2563 query_type: crate::request::InfoQueryType::Set,
2564 to: server_jid().clone(),
2565 target: None,
2566 id: None,
2567 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2568 timeout: None,
2569 };
2570
2571 self.send_iq(iq).await?;
2572
2573 if let Ok(patch_name) = collection_name.parse::<WAPatchName>()
2576 && let Err(e) = self.fetch_app_state_with_retry(patch_name).await
2577 {
2578 log::warn!("Failed to re-sync {collection_name} after patch send: {e}");
2579 }
2580
2581 Ok(())
2582 }
2583
2584 async fn dispatch_app_state_mutation(
2585 &self,
2586 m: &crate::appstate_sync::Mutation,
2587 full_sync: bool,
2588 ) {
2589 use wacore::types::events::Event;
2590
2591 if m.operation != wa::syncd_mutation::SyncdOperation::Set {
2592 return;
2593 }
2594 if m.index.is_empty() {
2595 return;
2596 }
2597
2598 if crate::features::chat_actions::dispatch_chat_mutation(&self.core.event_bus, m, full_sync)
2600 {
2601 return;
2602 }
2603
2604 if m.index[0] == "setting_pushName"
2606 && let Some(val) = &m.action_value
2607 && let Some(act) = &val.push_name_setting
2608 && let Some(new_name) = &act.name
2609 {
2610 let new_name = new_name.clone();
2611 let bus = self.core.event_bus.clone();
2612
2613 let snapshot = self.persistence_manager.get_device_snapshot().await;
2614 let old = snapshot.push_name.clone();
2615 if old != new_name {
2616 debug!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
2617 self.persistence_manager
2618 .process_command(DeviceCommand::SetPushName(new_name.clone()))
2619 .await;
2620 bus.dispatch(&Event::SelfPushNameUpdated(
2621 crate::types::events::SelfPushNameUpdated {
2622 from_server: true,
2623 old_name: old.clone(),
2624 new_name: new_name.clone(),
2625 },
2626 ));
2627
2628 if old.is_empty() && !new_name.is_empty() {
2630 debug!(target: "Client/AppState", "Sending presence after receiving initial pushname from app state sync");
2631 if let Err(e) = self.presence().set_available().await {
2632 warn!(target: "Client/AppState", "Failed to send presence after pushname sync: {e:?}");
2633 }
2634 }
2635 } else {
2636 debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
2637 }
2638 }
2639 }
2640
2641 async fn expect_disconnect(&self) {
2642 self.expected_disconnect.store(true, Ordering::Relaxed);
2643 }
2644
2645 pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
2646 self.is_logged_in.store(false, Ordering::Relaxed);
2647
2648 let mut attrs = node.attrs();
2649 let code_cow = attrs.optional_string("code");
2650 let code = code_cow.as_deref().unwrap_or("");
2651 let conflict_type = node
2652 .get_optional_child("conflict")
2653 .map(|n| {
2654 n.attrs()
2655 .optional_string("type")
2656 .as_deref()
2657 .unwrap_or("")
2658 .to_string()
2659 })
2660 .unwrap_or_default();
2661
2662 if !conflict_type.is_empty() {
2663 info!(
2664 "Got stream error indicating client was removed or replaced (conflict={}). Logging out.",
2665 conflict_type
2666 );
2667 self.expect_disconnect().await;
2668 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2669
2670 let event = if conflict_type == "replaced" {
2671 Event::StreamReplaced(crate::types::events::StreamReplaced)
2672 } else {
2673 Event::LoggedOut(crate::types::events::LoggedOut {
2674 on_connect: false,
2675 reason: ConnectFailureReason::LoggedOut,
2676 })
2677 };
2678 self.core.event_bus.dispatch(&event);
2679
2680 let transport_opt = self.transport.lock().await.clone();
2681 if let Some(transport) = transport_opt {
2682 self.runtime
2683 .spawn(Box::pin(async move {
2684 info!("Disconnecting transport after conflict");
2685 transport.disconnect().await;
2686 }))
2687 .detach();
2688 }
2689 } else {
2690 match code {
2691 "515" => {
2692 info!(
2694 "Got 515 stream error, server is closing stream (expected after pairing). Will auto-reconnect."
2695 );
2696 self.expect_disconnect().await;
2697 let transport_opt = self.transport.lock().await.clone();
2700 if let Some(transport) = transport_opt {
2701 self.runtime
2703 .spawn(Box::pin(async move {
2704 info!("Disconnecting transport after 515");
2705 transport.disconnect().await;
2706 }))
2707 .detach();
2708 }
2709 }
2710 "516" => {
2711 info!("Got 516 stream error (device removed). Logging out.");
2712 self.expect_disconnect().await;
2713 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2714 self.core.event_bus.dispatch(&Event::LoggedOut(
2715 crate::types::events::LoggedOut {
2716 on_connect: false,
2717 reason: ConnectFailureReason::LoggedOut,
2718 },
2719 ));
2720
2721 let transport_opt = self.transport.lock().await.clone();
2722 if let Some(transport) = transport_opt {
2723 self.runtime
2724 .spawn(Box::pin(async move {
2725 info!("Disconnecting transport after 516");
2726 transport.disconnect().await;
2727 }))
2728 .detach();
2729 }
2730 }
2731 "401" => {
2732 info!("Got 401 stream error (unauthorized). Logging out.");
2735 self.expect_disconnect().await;
2736 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2737 self.core.event_bus.dispatch(&Event::LoggedOut(
2738 crate::types::events::LoggedOut {
2739 on_connect: false,
2740 reason: ConnectFailureReason::LoggedOut,
2741 },
2742 ));
2743
2744 let transport_opt = self.transport.lock().await.clone();
2745 if let Some(transport) = transport_opt {
2746 self.runtime
2747 .spawn(Box::pin(async move {
2748 info!("Disconnecting transport after 401");
2749 transport.disconnect().await;
2750 }))
2751 .detach();
2752 }
2753 }
2754 "409" => {
2755 info!("Got 409 stream error (conflict). Another session replaced this one.");
2758 self.expect_disconnect().await;
2759 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2760 self.core
2761 .event_bus
2762 .dispatch(&Event::StreamReplaced(crate::types::events::StreamReplaced));
2763
2764 let transport_opt = self.transport.lock().await.clone();
2765 if let Some(transport) = transport_opt {
2766 self.runtime
2767 .spawn(Box::pin(async move {
2768 info!("Disconnecting transport after 409");
2769 transport.disconnect().await;
2770 }))
2771 .detach();
2772 }
2773 }
2774 "429" => {
2775 warn!(
2778 "Got 429 stream error (rate limited). Will auto-reconnect with extended backoff."
2779 );
2780 self.auto_reconnect_errors.fetch_add(5, Ordering::Relaxed);
2781 }
2782 "503" => {
2783 info!("Got 503 service unavailable, will auto-reconnect.");
2784 }
2785 _ => {
2786 error!("Unknown stream error: {}", DisplayableNode(node));
2787 self.expect_disconnect().await;
2788 self.core.event_bus.dispatch(&Event::StreamError(
2789 crate::types::events::StreamError {
2790 code: code.to_string(),
2791 raw: Some(node.clone()),
2792 },
2793 ));
2794 }
2795 }
2796 }
2797
2798 info!("Notifying shutdown from stream error handler");
2799 self.shutdown_notifier.notify(usize::MAX);
2800 }
2801
2802 pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
2803 self.expected_disconnect.store(true, Ordering::Relaxed);
2804 self.shutdown_notifier.notify(usize::MAX);
2805
2806 let mut attrs = node.attrs();
2807 let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
2808 let reason = ConnectFailureReason::from(reason_code);
2809
2810 if reason.should_reconnect() {
2811 self.expected_disconnect.store(false, Ordering::Relaxed);
2812 } else {
2813 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2814 }
2815
2816 if reason.is_logged_out() {
2817 info!("Got {reason:?} connect failure, logging out.");
2818 self.core
2819 .event_bus
2820 .dispatch(&wacore::types::events::Event::LoggedOut(
2821 crate::types::events::LoggedOut {
2822 on_connect: true,
2823 reason,
2824 },
2825 ));
2826 } else if let ConnectFailureReason::TempBanned = reason {
2827 let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
2828 let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
2829 let expire_duration =
2830 chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
2831 warn!("Temporary ban connect failure: {}", DisplayableNode(node));
2832 self.core.event_bus.dispatch(&Event::TemporaryBan(
2833 crate::types::events::TemporaryBan {
2834 code: crate::types::events::TempBanReason::from(ban_code),
2835 expire: expire_duration,
2836 },
2837 ));
2838 } else if let ConnectFailureReason::ClientOutdated = reason {
2839 error!("Client is outdated and was rejected by server.");
2840 self.core
2841 .event_bus
2842 .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
2843 } else {
2844 warn!("Unknown connect failure: {}", DisplayableNode(node));
2845 self.core.event_bus.dispatch(&Event::ConnectFailure(
2846 crate::types::events::ConnectFailure {
2847 reason,
2848 message: attrs
2849 .optional_string("message")
2850 .as_deref()
2851 .unwrap_or("")
2852 .to_string(),
2853 raw: Some(node.clone()),
2854 },
2855 ));
2856 }
2857 }
2858
2859 pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
2860 if node.attrs.get("type").is_some_and(|s| s == "get")
2861 && (node.get_optional_child("ping").is_some()
2862 || node
2863 .attrs
2864 .get("xmlns")
2865 .is_some_and(|s| s == "urn:xmpp:ping"))
2866 {
2867 info!("Received ping, sending pong.");
2868 let mut parser = node.attrs();
2869 let from_jid = parser.jid("from");
2870 let id = parser.optional_string("id").map(|s| s.to_string());
2871 let pong = build_pong(from_jid.to_string(), id.as_deref());
2872 if let Err(e) = self.send_node(pong).await {
2873 warn!("Failed to send pong: {e:?}");
2874 }
2875 return true;
2876 }
2877
2878 if pair::handle_iq(self, node).await {
2880 return true;
2881 }
2882
2883 false
2884 }
2885
2886 pub fn is_connected(&self) -> bool {
2887 self.is_connected.load(Ordering::Acquire)
2888 }
2889
2890 pub fn is_logged_in(&self) -> bool {
2891 self.is_logged_in.load(Ordering::Relaxed)
2892 }
2893
2894 pub fn wait_for_node(
2912 &self,
2913 filter: NodeFilter,
2914 ) -> futures::channel::oneshot::Receiver<Arc<Node>> {
2915 let (tx, rx) = futures::channel::oneshot::channel();
2916 self.node_waiter_count.fetch_add(1, Ordering::Release);
2917 let mut waiters = self
2918 .node_waiters
2919 .lock()
2920 .unwrap_or_else(|poisoned| poisoned.into_inner());
2921 waiters.push(NodeWaiter { filter, tx });
2922 rx
2923 }
2924
2925 fn resolve_node_waiters(&self, node: &Arc<Node>) {
2928 let mut waiters = self
2929 .node_waiters
2930 .lock()
2931 .unwrap_or_else(|poisoned| poisoned.into_inner());
2932 let mut i = 0;
2933 while i < waiters.len() {
2934 if waiters[i].tx.is_canceled() {
2935 waiters.swap_remove(i);
2937 self.node_waiter_count.fetch_sub(1, Ordering::Release);
2938 } else if waiters[i].filter.matches(node) {
2939 let w = waiters.swap_remove(i);
2941 self.node_waiter_count.fetch_sub(1, Ordering::Release);
2942 let _ = w.tx.send(Arc::clone(node));
2943 } else {
2944 i += 1;
2945 }
2946 }
2947 }
2948
2949 pub(crate) fn update_server_time_offset(&self, node: &wacore_binary::node::Node) {
2950 self.unified_session.update_server_time_offset(node);
2951 }
2952
2953 pub(crate) async fn send_unified_session(&self) {
2954 if !self.is_connected() {
2955 debug!(target: "Client/UnifiedSession", "Skipping: not connected");
2956 return;
2957 }
2958
2959 let Some((node, _sequence)) = self.unified_session.prepare_send().await else {
2960 return;
2961 };
2962
2963 if let Err(e) = self.send_node(node).await {
2964 debug!(target: "Client/UnifiedSession", "Send failed: {e}");
2965 self.unified_session.clear_last_sent().await;
2966 }
2967 }
2968
2969 pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
2977 if self.is_connected() {
2979 return Ok(());
2980 }
2981
2982 let notified = self.socket_ready_notifier.listen();
2985 if self.is_connected() {
2986 return Ok(());
2987 }
2988
2989 rt_timeout(&*self.runtime, timeout, notified)
2990 .await
2991 .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
2992 }
2993
2994 pub async fn wait_for_connected(
3002 &self,
3003 timeout: std::time::Duration,
3004 ) -> Result<(), anyhow::Error> {
3005 if self.is_fully_ready() {
3007 return Ok(());
3008 }
3009
3010 let notified = self.connected_notifier.listen();
3013 if self.is_fully_ready() {
3014 return Ok(());
3015 }
3016
3017 rt_timeout(&*self.runtime, timeout, notified)
3018 .await
3019 .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
3020 }
3021
3022 pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
3025 self.persistence_manager.clone()
3026 }
3027
3028 pub async fn edit_message(
3029 &self,
3030 to: Jid,
3031 original_id: impl Into<String>,
3032 new_content: wa::Message,
3033 ) -> Result<String, anyhow::Error> {
3034 let original_id = original_id.into();
3035
3036 let participant = if to.is_group() {
3039 Some(
3040 self.get_own_jid_for_group(&to)
3041 .await?
3042 .to_non_ad()
3043 .to_string(),
3044 )
3045 } else {
3046 if self.get_pn().await.is_none() {
3047 return Err(anyhow::Error::from(ClientError::NotLoggedIn));
3048 }
3049 None
3050 };
3051
3052 let edit_container_message = wa::Message {
3053 edited_message: Some(Box::new(wa::message::FutureProofMessage {
3054 message: Some(Box::new(wa::Message {
3055 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
3056 key: Some(wa::MessageKey {
3057 remote_jid: Some(to.to_string()),
3058 from_me: Some(true),
3059 id: Some(original_id.clone()),
3060 participant,
3061 }),
3062 r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
3063 edited_message: Some(Box::new(new_content)),
3064 timestamp_ms: Some(wacore::time::now_millis()),
3065 ..Default::default()
3066 })),
3067 ..Default::default()
3068 })),
3069 })),
3070 ..Default::default()
3071 };
3072
3073 self.send_message_impl(
3079 to,
3080 &edit_container_message,
3081 None,
3082 false,
3083 false,
3084 Some(crate::types::message::EditAttribute::MessageEdit),
3085 vec![],
3086 )
3087 .await?;
3088
3089 Ok(original_id)
3090 }
3091
3092 pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
3093 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
3094 let noise_socket = match noise_socket_arc {
3095 Some(socket) => socket,
3096 None => return Err(ClientError::NotConnected),
3097 };
3098
3099 debug!(target: "Client/Send", "{}", DisplayableNode(&node));
3100
3101 let mut plaintext_buf = Vec::with_capacity(1024);
3102
3103 if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
3104 error!("Failed to marshal node: {e:?}");
3105 return Err(SocketError::Crypto("Marshal error".to_string()).into());
3106 }
3107
3108 let encrypted_buf = Vec::with_capacity(plaintext_buf.len() + 32);
3110
3111 if let Err(e) = noise_socket
3112 .encrypt_and_send(plaintext_buf, encrypted_buf)
3113 .await
3114 {
3115 return Err(e.into());
3116 }
3117
3118 self.last_data_sent_ms
3120 .store(wacore::time::now_millis() as u64, Ordering::Relaxed);
3121
3122 Ok(())
3123 }
3124
3125 pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
3126 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3127 let old_name = device_snapshot.push_name.clone();
3128
3129 if old_name == new_name {
3130 return;
3131 }
3132
3133 log::debug!("Updating push name from '{}' -> '{}'", old_name, new_name);
3134 self.persistence_manager
3135 .process_command(DeviceCommand::SetPushName(new_name.clone()))
3136 .await;
3137
3138 self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
3139 crate::types::events::SelfPushNameUpdated {
3140 from_server: true,
3141 old_name,
3142 new_name: new_name.clone(),
3143 },
3144 ));
3145
3146 let client_clone = self.clone();
3147 self.runtime
3148 .spawn(Box::pin(async move {
3149 if let Err(e) = client_clone.presence().set_available().await {
3150 log::warn!("Failed to send presence after push name update: {:?}", e);
3151 } else {
3152 log::debug!("Sent presence after push name update.");
3153 }
3154 }))
3155 .detach();
3156 }
3157
3158 pub async fn get_push_name(&self) -> String {
3159 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3160 device_snapshot.push_name.clone()
3161 }
3162
3163 pub async fn get_pn(&self) -> Option<Jid> {
3164 let snapshot = self.persistence_manager.get_device_snapshot().await;
3165 snapshot.pn.clone()
3166 }
3167
3168 pub async fn get_lid(&self) -> Option<Jid> {
3169 let snapshot = self.persistence_manager.get_device_snapshot().await;
3170 snapshot.lid.clone()
3171 }
3172
3173 pub(crate) async fn get_own_jid_for_group(
3178 &self,
3179 group_jid: &Jid,
3180 ) -> Result<Jid, anyhow::Error> {
3181 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3182 let own_pn = device_snapshot
3183 .pn
3184 .clone()
3185 .ok_or_else(|| anyhow::Error::from(ClientError::NotLoggedIn))?;
3186
3187 let addressing_mode = self
3188 .groups()
3189 .query_info(group_jid)
3190 .await
3191 .map(|info| info.addressing_mode)
3192 .unwrap_or(crate::types::message::AddressingMode::Pn);
3193
3194 Ok(match addressing_mode {
3195 crate::types::message::AddressingMode::Lid => {
3196 device_snapshot.lid.clone().unwrap_or(own_pn)
3197 }
3198 crate::types::message::AddressingMode::Pn => own_pn,
3199 })
3200 }
3201
3202 pub(crate) async fn make_stanza_key(&self, chat: Jid, id: String) -> StanzaKey {
3204 let chat = self.resolve_encryption_jid(&chat).await;
3206
3207 StanzaKey { chat, id }
3208 }
3209
3210 pub(crate) async fn send_protocol_receipt(
3213 &self,
3214 id: String,
3215 receipt_type: crate::types::presence::ReceiptType,
3216 ) {
3217 if id.is_empty() {
3218 return;
3219 }
3220 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3221 if let Some(own_jid) = &device_snapshot.pn {
3222 let type_str = match receipt_type {
3223 crate::types::presence::ReceiptType::HistorySync => "hist_sync",
3224 crate::types::presence::ReceiptType::Read => "read",
3225 crate::types::presence::ReceiptType::ReadSelf => "read-self",
3226 crate::types::presence::ReceiptType::Delivered => "delivery",
3227 crate::types::presence::ReceiptType::Played => "played",
3228 crate::types::presence::ReceiptType::PlayedSelf => "played-self",
3229 crate::types::presence::ReceiptType::Inactive => "inactive",
3230 crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
3231 crate::types::presence::ReceiptType::Sender => "sender",
3232 crate::types::presence::ReceiptType::ServerError => "server-error",
3233 crate::types::presence::ReceiptType::Retry => "retry",
3234 crate::types::presence::ReceiptType::EncRekeyRetry => "enc_rekey_retry",
3235 crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
3236 };
3237
3238 let node = NodeBuilder::new("receipt")
3239 .attrs([
3240 ("id", id),
3241 ("type", type_str.to_string()),
3242 ("to", own_jid.to_non_ad().to_string()),
3243 ])
3244 .build();
3245
3246 if let Err(e) = self.send_node(node).await {
3247 warn!(
3248 "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
3249 receipt_type, self.unique_id, e
3250 );
3251 }
3252 }
3253 }
3254}
3255
3256fn build_pong(to: String, id: Option<&str>) -> wacore_binary::node::Node {
3261 let mut builder = NodeBuilder::new("iq").attr("to", to).attr("type", "result");
3262 if let Some(id) = id {
3263 builder = builder.attr("id", id);
3264 }
3265 builder.build()
3266}
3267
3268fn build_ack_node(node: &Node, own_device_pn: Option<&Jid>) -> Option<Node> {
3281 let id = node.attrs.get("id")?.clone();
3282 let from = node.attrs.get("from")?.clone();
3283 let participant = node.attrs.get("participant").cloned();
3284
3285 let typ = if node.tag != "message" && !is_encrypt_identity_notification(node) {
3288 node.attrs.get("type").cloned()
3289 } else {
3290 None
3291 };
3292
3293 let mut attrs = Attrs::new();
3294 attrs.insert("class", NodeValue::String(node.tag.to_string()));
3295 attrs.insert("id", id);
3296 attrs.insert("to", from);
3297
3298 if node.tag == "message"
3299 && let Some(own_device_pn) = own_device_pn
3300 {
3301 attrs.insert("from", NodeValue::Jid(own_device_pn.clone()));
3302 }
3303 if let Some(p) = participant {
3304 attrs.insert("participant", p);
3305 }
3306 if let Some(t) = typ {
3307 attrs.insert("type", t);
3308 }
3309
3310 Some(Node {
3311 tag: Cow::Borrowed("ack"),
3312 attrs,
3313 content: None,
3314 })
3315}
3316
3317fn is_encrypt_identity_notification(node: &Node) -> bool {
3319 node.tag == "notification"
3320 && node.attrs.get("type").is_some_and(|v| v == "encrypt")
3321 && node.get_optional_child("identity").is_some()
3322}
3323
3324fn fibonacci_backoff(attempt: u32) -> Duration {
3330 const MAX_MS: u64 = 900_000; let mut a: u64 = 1000;
3333 let mut b: u64 = 1000;
3334 for _ in 0..attempt {
3335 let next = a.saturating_add(b).min(MAX_MS);
3336 a = b;
3337 b = next;
3338 }
3339 let base = a.min(MAX_MS);
3340
3341 let jitter_range = base / 10;
3343 let jitter = if jitter_range > 0 {
3344 rand::make_rng::<rand::rngs::StdRng>().random_range(0..=(jitter_range * 2)) as i64
3345 - jitter_range as i64
3346 } else {
3347 0
3348 };
3349 let ms = (base as i64 + jitter).max(0) as u64;
3350 Duration::from_millis(ms)
3351}
3352
3353#[cfg(test)]
3354mod tests {
3355 use super::*;
3356 use crate::lid_pn_cache::LearningSource;
3357 use crate::test_utils::MockHttpClient;
3358 use futures::channel::oneshot;
3359 use wacore_binary::jid::SERVER_JID;
3360
3361 #[tokio::test]
3362 async fn test_ack_behavior_for_incoming_stanzas() {
3363 let backend = crate::test_utils::create_test_backend().await;
3364 let pm = Arc::new(
3365 PersistenceManager::new(backend)
3366 .await
3367 .expect("persistence manager should initialize"),
3368 );
3369 let (client, _rx) = Client::new(
3370 Arc::new(crate::runtime_impl::TokioRuntime),
3371 pm,
3372 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3373 Arc::new(MockHttpClient),
3374 None,
3375 )
3376 .await;
3377
3378 use wacore_binary::node::{Attrs, Node, NodeContent};
3382
3383 let mut receipt_attrs = Attrs::new();
3384 receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3385 receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
3386 let receipt_node = Node::new(
3387 "receipt",
3388 receipt_attrs,
3389 Some(NodeContent::String("test".to_string())),
3390 );
3391
3392 let mut notification_attrs = Attrs::new();
3393 notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3394 notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
3395 let notification_node = Node::new(
3396 "notification",
3397 notification_attrs,
3398 Some(NodeContent::String("test".to_string())),
3399 );
3400
3401 assert!(
3402 client.should_ack(&receipt_node),
3403 "should_ack must still return TRUE for <receipt> stanzas."
3404 );
3405 assert!(
3406 client.should_ack(¬ification_node),
3407 "should_ack must still return TRUE for <notification> stanzas."
3408 );
3409
3410 info!(
3411 "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
3412 );
3413 }
3414
3415 #[tokio::test]
3416 async fn test_ack_waiter_resolves() {
3417 let backend = crate::test_utils::create_test_backend().await;
3418 let pm = Arc::new(
3419 PersistenceManager::new(backend)
3420 .await
3421 .expect("persistence manager should initialize"),
3422 );
3423 let (client, _rx) = Client::new(
3424 Arc::new(crate::runtime_impl::TokioRuntime),
3425 pm,
3426 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3427 Arc::new(MockHttpClient),
3428 None,
3429 )
3430 .await;
3431
3432 let test_id = "ack-test-123".to_string();
3434 let (tx, rx) = oneshot::channel();
3435 client
3436 .response_waiters
3437 .lock()
3438 .await
3439 .insert(test_id.clone(), tx);
3440 assert!(
3441 client.response_waiters.lock().await.contains_key(&test_id),
3442 "Waiter should be inserted before handling ack"
3443 );
3444
3445 let ack_node = NodeBuilder::new("ack")
3447 .attr("id", test_id.clone())
3448 .attr("from", SERVER_JID)
3449 .build();
3450
3451 let handled = client.handle_ack_response(ack_node).await;
3453 assert!(
3454 handled,
3455 "handle_ack_response should return true when waiter exists"
3456 );
3457
3458 match tokio::time::timeout(Duration::from_secs(1), rx).await {
3460 Ok(Ok(response_node)) => {
3461 assert!(
3462 response_node
3463 .attrs
3464 .get("id")
3465 .is_some_and(|v| v == test_id.as_str()),
3466 "Response node should have correct ID"
3467 );
3468 }
3469 Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
3470 Err(_) => panic!("Test timed out waiting for ack response"),
3471 }
3472
3473 assert!(
3475 !client.response_waiters.lock().await.contains_key(&test_id),
3476 "Waiter should be removed after handling"
3477 );
3478
3479 info!(
3480 "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
3481 );
3482 }
3483
3484 #[tokio::test]
3485 async fn test_ack_without_matching_waiter() {
3486 let backend = crate::test_utils::create_test_backend().await;
3487 let pm = Arc::new(
3488 PersistenceManager::new(backend)
3489 .await
3490 .expect("persistence manager should initialize"),
3491 );
3492 let (client, _rx) = Client::new(
3493 Arc::new(crate::runtime_impl::TokioRuntime),
3494 pm,
3495 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3496 Arc::new(MockHttpClient),
3497 None,
3498 )
3499 .await;
3500
3501 let ack_node = NodeBuilder::new("ack")
3503 .attr("id", "non-existent-id")
3504 .attr("from", SERVER_JID)
3505 .build();
3506
3507 let handled = client.handle_ack_response(ack_node).await;
3509 assert!(
3510 !handled,
3511 "handle_ack_response should return false when no waiter exists"
3512 );
3513
3514 info!(
3515 "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
3516 );
3517 }
3518
3519 #[tokio::test]
3525 async fn test_lid_pn_cache_basic_operations() {
3526 let backend = Arc::new(
3527 crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
3528 .await
3529 .expect("Failed to create in-memory backend for test"),
3530 );
3531 let pm = Arc::new(
3532 PersistenceManager::new(backend)
3533 .await
3534 .expect("persistence manager should initialize"),
3535 );
3536 let (client, _rx) = Client::new(
3537 Arc::new(crate::runtime_impl::TokioRuntime),
3538 pm,
3539 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3540 Arc::new(MockHttpClient),
3541 None,
3542 )
3543 .await;
3544
3545 let phone = "559980000001";
3547 let lid = "100000012345678";
3548
3549 assert!(
3550 client.lid_pn_cache.get_current_lid(phone).await.is_none(),
3551 "Cache should be empty initially"
3552 );
3553
3554 client
3556 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3557 .await
3558 .expect("Failed to persist LID-PN mapping in tests");
3559
3560 let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
3562 assert!(cached_lid.is_some(), "Cache should contain the mapping");
3563 assert_eq!(
3564 cached_lid.expect("cache should have LID"),
3565 lid,
3566 "Cached LID should match what we inserted"
3567 );
3568
3569 let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
3571 assert!(cached_phone.is_some(), "Reverse lookup should work");
3572 assert_eq!(
3573 cached_phone.expect("reverse lookup should return phone"),
3574 phone,
3575 "Cached phone should match what we inserted"
3576 );
3577
3578 assert!(
3580 client
3581 .lid_pn_cache
3582 .get_current_lid("559980000002")
3583 .await
3584 .is_none(),
3585 "Different phone number should not have a mapping"
3586 );
3587
3588 info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
3589 }
3590
3591 #[tokio::test]
3595 async fn test_lid_pn_cache_timestamp_resolution() {
3596 let backend = Arc::new(
3597 crate::store::SqliteStore::new(
3598 "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
3599 )
3600 .await
3601 .expect("Failed to create in-memory backend for test"),
3602 );
3603 let pm = Arc::new(
3604 PersistenceManager::new(backend)
3605 .await
3606 .expect("persistence manager should initialize"),
3607 );
3608 let (client, _rx) = Client::new(
3609 Arc::new(crate::runtime_impl::TokioRuntime),
3610 pm,
3611 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3612 Arc::new(MockHttpClient),
3613 None,
3614 )
3615 .await;
3616
3617 let phone = "559980000001";
3618 let lid_old = "100000012345678";
3619 let lid_new = "100000087654321";
3620
3621 client
3623 .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
3624 .await
3625 .expect("Failed to persist LID-PN mapping in tests");
3626
3627 assert_eq!(
3628 client
3629 .lid_pn_cache
3630 .get_current_lid(phone)
3631 .await
3632 .expect("cache should have LID"),
3633 lid_old,
3634 "Initial LID should be stored"
3635 );
3636
3637 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3639
3640 client
3642 .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
3643 .await
3644 .expect("Failed to persist LID-PN mapping in tests");
3645
3646 assert_eq!(
3647 client
3648 .lid_pn_cache
3649 .get_current_lid(phone)
3650 .await
3651 .expect("cache should have newer LID"),
3652 lid_new,
3653 "Newer LID should be returned for phone lookup"
3654 );
3655
3656 assert_eq!(
3658 client
3659 .lid_pn_cache
3660 .get_phone_number(lid_old)
3661 .await
3662 .expect("reverse lookup should return phone"),
3663 phone,
3664 "Old LID should still map to phone"
3665 );
3666 assert_eq!(
3667 client
3668 .lid_pn_cache
3669 .get_phone_number(lid_new)
3670 .await
3671 .expect("reverse lookup should return phone"),
3672 phone,
3673 "New LID should also map to phone"
3674 );
3675
3676 info!(
3677 "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
3678 );
3679 }
3680
3681 #[tokio::test]
3685 async fn test_get_lid_for_phone_via_send_context_resolver() {
3686 use wacore::client::context::SendContextResolver;
3687
3688 let backend = Arc::new(
3689 crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
3690 .await
3691 .expect("Failed to create in-memory backend for test"),
3692 );
3693 let pm = Arc::new(
3694 PersistenceManager::new(backend)
3695 .await
3696 .expect("persistence manager should initialize"),
3697 );
3698 let (client, _rx) = Client::new(
3699 Arc::new(crate::runtime_impl::TokioRuntime),
3700 pm,
3701 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3702 Arc::new(MockHttpClient),
3703 None,
3704 )
3705 .await;
3706
3707 let phone = "559980000001";
3708 let lid = "100000012345678";
3709
3710 assert!(
3712 client.get_lid_for_phone(phone).await.is_none(),
3713 "get_lid_for_phone should return None before caching"
3714 );
3715
3716 client
3718 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3719 .await
3720 .expect("Failed to persist LID-PN mapping in tests");
3721
3722 let result = client.get_lid_for_phone(phone).await;
3724 assert!(
3725 result.is_some(),
3726 "get_lid_for_phone should return Some after caching"
3727 );
3728 assert_eq!(
3729 result.expect("get_lid_for_phone should return Some"),
3730 lid,
3731 "get_lid_for_phone should return the cached LID"
3732 );
3733
3734 info!(
3735 "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
3736 );
3737 }
3738
3739 #[tokio::test]
3741 async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
3742 let backend = Arc::new(
3743 crate::store::SqliteStore::new(
3744 "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
3745 )
3746 .await
3747 .expect("Failed to create in-memory backend for test"),
3748 );
3749 let pm = Arc::new(
3750 PersistenceManager::new(backend)
3751 .await
3752 .expect("persistence manager should initialize"),
3753 );
3754 let (client, _rx) = Client::new(
3755 Arc::new(crate::runtime_impl::TokioRuntime),
3756 pm,
3757 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3758 Arc::new(MockHttpClient),
3759 None,
3760 )
3761 .await;
3762
3763 client
3765 .offline_sync_completed
3766 .store(true, std::sync::atomic::Ordering::Relaxed);
3767
3768 let start = std::time::Instant::now();
3770 client.wait_for_offline_delivery_end().await;
3771 let elapsed = start.elapsed();
3772
3773 assert!(
3775 elapsed.as_millis() < 100,
3776 "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
3777 elapsed
3778 );
3779
3780 info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
3781 }
3782
3783 #[tokio::test]
3786 async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
3787 let backend = Arc::new(
3788 crate::store::SqliteStore::new(
3789 "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
3790 )
3791 .await
3792 .expect("Failed to create in-memory backend for test"),
3793 );
3794 let pm = Arc::new(
3795 PersistenceManager::new(backend)
3796 .await
3797 .expect("persistence manager should initialize"),
3798 );
3799 let (client, _rx) = Client::new(
3800 Arc::new(crate::runtime_impl::TokioRuntime),
3801 pm,
3802 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3803 Arc::new(MockHttpClient),
3804 None,
3805 )
3806 .await;
3807
3808 let start = std::time::Instant::now();
3811 client
3812 .wait_for_offline_delivery_end_with_timeout(std::time::Duration::from_millis(50))
3813 .await;
3814
3815 let elapsed = start.elapsed();
3816 let semaphore = match client.message_processing_semaphore.lock() {
3818 Ok(guard) => guard.clone(),
3819 Err(poisoned) => poisoned.into_inner().clone(),
3820 };
3821 let mut guards = Vec::new();
3822 while let Some(guard) = semaphore.try_acquire() {
3823 guards.push(guard);
3824 }
3825 let permits = guards.len();
3826 drop(guards);
3827
3828 assert!(
3829 elapsed.as_millis() >= 45, "Should have waited for the configured timeout duration, took {:?}",
3831 elapsed
3832 );
3833 assert!(
3834 client
3835 .offline_sync_completed
3836 .load(std::sync::atomic::Ordering::Relaxed),
3837 "wait_for_offline_delivery_end should mark offline sync complete on timeout"
3838 );
3839 assert_eq!(
3840 permits, 64,
3841 "timeout completion should restore parallel permits"
3842 );
3843
3844 info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
3845 }
3846
3847 #[tokio::test]
3849 async fn test_wait_for_offline_delivery_end_returns_on_notify() {
3850 let backend = Arc::new(
3851 crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
3852 .await
3853 .expect("Failed to create in-memory backend for test"),
3854 );
3855 let pm = Arc::new(
3856 PersistenceManager::new(backend)
3857 .await
3858 .expect("persistence manager should initialize"),
3859 );
3860 let (client, _rx) = Client::new(
3861 Arc::new(crate::runtime_impl::TokioRuntime),
3862 pm,
3863 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3864 Arc::new(MockHttpClient),
3865 None,
3866 )
3867 .await;
3868
3869 let client_clone = client.clone();
3870
3871 tokio::spawn(async move {
3873 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3874 client_clone.offline_sync_notifier.notify(usize::MAX);
3875 });
3876
3877 let start = std::time::Instant::now();
3878 client.wait_for_offline_delivery_end().await;
3879 let elapsed = start.elapsed();
3880
3881 assert!(
3883 elapsed.as_millis() < 200,
3884 "wait_for_offline_delivery_end should return when notified, took {:?}",
3885 elapsed
3886 );
3887 assert!(
3888 elapsed.as_millis() >= 45, "Should have waited for the notify, only took {:?}",
3890 elapsed
3891 );
3892
3893 info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
3894 }
3895
3896 #[tokio::test]
3898 async fn test_offline_sync_flag_initially_false() {
3899 let backend = Arc::new(
3900 crate::store::SqliteStore::new(
3901 "file:memdb_offline_flag_initial?mode=memory&cache=shared",
3902 )
3903 .await
3904 .expect("Failed to create in-memory backend for test"),
3905 );
3906 let pm = Arc::new(
3907 PersistenceManager::new(backend)
3908 .await
3909 .expect("persistence manager should initialize"),
3910 );
3911 let (client, _rx) = Client::new(
3912 Arc::new(crate::runtime_impl::TokioRuntime),
3913 pm,
3914 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3915 Arc::new(MockHttpClient),
3916 None,
3917 )
3918 .await;
3919
3920 assert!(
3922 !client
3923 .offline_sync_completed
3924 .load(std::sync::atomic::Ordering::Relaxed),
3925 "offline_sync_completed should be false when Client is first created"
3926 );
3927
3928 info!("✅ test_offline_sync_flag_initially_false passed");
3929 }
3930
3931 #[tokio::test]
3936 async fn test_offline_sync_lifecycle() {
3937 use std::sync::atomic::Ordering;
3938
3939 let backend = Arc::new(
3940 crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
3941 .await
3942 .expect("Failed to create in-memory backend for test"),
3943 );
3944 let pm = Arc::new(
3945 PersistenceManager::new(backend)
3946 .await
3947 .expect("persistence manager should initialize"),
3948 );
3949 let (client, _rx) = Client::new(
3950 Arc::new(crate::runtime_impl::TokioRuntime),
3951 pm,
3952 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3953 Arc::new(MockHttpClient),
3954 None,
3955 )
3956 .await;
3957
3958 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3960
3961 let client_waiter = client.clone();
3963 let waiter_handle = tokio::spawn(async move {
3964 client_waiter.wait_for_offline_delivery_end().await;
3965 true });
3967
3968 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3970
3971 assert!(
3973 !waiter_handle.is_finished(),
3974 "Waiter should still be waiting"
3975 );
3976
3977 client.offline_sync_completed.store(true, Ordering::Relaxed);
3979 client.offline_sync_notifier.notify(usize::MAX);
3980
3981 let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
3983 .await
3984 .expect("Waiter should complete after notify")
3985 .expect("Waiter task should not panic");
3986
3987 assert!(result, "Waiter should have completed successfully");
3988 assert!(client.offline_sync_completed.load(Ordering::Relaxed));
3989
3990 info!("✅ test_offline_sync_lifecycle passed");
3991 }
3992
3993 #[tokio::test]
3996 async fn test_establish_primary_phone_session_fails_without_pn() {
3997 let backend = Arc::new(
3998 crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
3999 .await
4000 .expect("Failed to create in-memory backend for test"),
4001 );
4002 let pm = Arc::new(
4003 PersistenceManager::new(backend)
4004 .await
4005 .expect("persistence manager should initialize"),
4006 );
4007 let (client, _rx) = Client::new(
4008 Arc::new(crate::runtime_impl::TokioRuntime),
4009 pm,
4010 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4011 Arc::new(MockHttpClient),
4012 None,
4013 )
4014 .await;
4015
4016 let result = client.establish_primary_phone_session_immediate().await;
4018
4019 assert!(
4020 result.is_err(),
4021 "establish_primary_phone_session_immediate should fail when no PN is set"
4022 );
4023
4024 let err = result.unwrap_err();
4025 assert!(
4026 err.downcast_ref::<ClientError>()
4027 .is_some_and(|e| matches!(e, ClientError::NotLoggedIn)),
4028 "Error should be ClientError::NotLoggedIn, got: {}",
4029 err
4030 );
4031
4032 info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
4033 }
4034
4035 #[tokio::test]
4039 async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
4040 use std::sync::atomic::Ordering;
4041 use wacore_binary::jid::Jid;
4042
4043 let backend = Arc::new(
4044 crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
4045 .await
4046 .expect("Failed to create in-memory backend for test"),
4047 );
4048 let pm = Arc::new(
4049 PersistenceManager::new(backend)
4050 .await
4051 .expect("persistence manager should initialize"),
4052 );
4053 let (client, _rx) = Client::new(
4054 Arc::new(crate::runtime_impl::TokioRuntime),
4055 pm,
4056 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4057 Arc::new(MockHttpClient),
4058 None,
4059 )
4060 .await;
4061
4062 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4064
4065 let client_clone = client.clone();
4068 let ensure_handle = tokio::spawn(async move {
4069 client_clone.ensure_e2e_sessions(vec![]).await
4072 });
4073
4074 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4076 assert!(
4077 ensure_handle.is_finished(),
4078 "ensure_e2e_sessions should return immediately for empty JID list"
4079 );
4080
4081 let client_clone = client.clone();
4083 let test_jid = Jid::pn("559999999999");
4084 let ensure_handle = tokio::spawn(async move {
4085 let start = std::time::Instant::now();
4087 let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
4088 start.elapsed()
4089 });
4090
4091 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
4093
4094 assert!(
4096 !ensure_handle.is_finished(),
4097 "ensure_e2e_sessions should be waiting for offline sync"
4098 );
4099
4100 client.offline_sync_completed.store(true, Ordering::Relaxed);
4102 client.offline_sync_notifier.notify(usize::MAX);
4103
4104 let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
4106
4107 assert!(
4108 result.is_ok(),
4109 "ensure_e2e_sessions should complete after offline sync"
4110 );
4111
4112 info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
4113 }
4114
4115 #[tokio::test]
4124 async fn test_immediate_session_does_not_wait_for_offline_sync() {
4125 use std::sync::atomic::Ordering;
4126 use wacore_binary::jid::Jid;
4127
4128 let backend = Arc::new(
4129 crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
4130 .await
4131 .expect("Failed to create in-memory backend for test"),
4132 );
4133 let pm = Arc::new(
4134 PersistenceManager::new(backend.clone())
4135 .await
4136 .expect("persistence manager should initialize"),
4137 );
4138
4139 pm.modify_device(|device| {
4141 device.pn = Some(Jid::pn("559999999999"));
4142 })
4143 .await;
4144
4145 let (client, _rx) = Client::new(
4146 Arc::new(crate::runtime_impl::TokioRuntime),
4147 pm,
4148 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4149 Arc::new(MockHttpClient),
4150 None,
4151 )
4152 .await;
4153
4154 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4156
4157 let start = std::time::Instant::now();
4160
4161 let result = tokio::time::timeout(
4164 std::time::Duration::from_millis(500),
4165 client.establish_primary_phone_session_immediate(),
4166 )
4167 .await;
4168
4169 let elapsed = start.elapsed();
4170
4171 assert!(
4173 result.is_ok(),
4174 "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
4175 );
4176
4177 assert!(
4179 elapsed.as_millis() < 500,
4180 "establish_primary_phone_session_immediate should not wait, took {:?}",
4181 elapsed
4182 );
4183
4184 info!(
4187 "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
4188 elapsed,
4189 result.unwrap().is_ok()
4190 );
4191
4192 info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
4193 }
4194
4195 #[tokio::test]
4203 async fn test_establish_session_skips_when_exists() {
4204 use wacore::libsignal::protocol::SessionRecord;
4205 use wacore::libsignal::store::SessionStore;
4206 use wacore::types::jid::JidExt;
4207 use wacore_binary::jid::Jid;
4208
4209 let backend = Arc::new(
4210 crate::store::SqliteStore::new("file:memdb_skip_existing?mode=memory&cache=shared")
4211 .await
4212 .expect("Failed to create in-memory backend for test"),
4213 );
4214 let pm = Arc::new(
4215 PersistenceManager::new(backend.clone())
4216 .await
4217 .expect("persistence manager should initialize"),
4218 );
4219
4220 let own_pn = Jid::pn("559999999999");
4222 pm.modify_device(|device| {
4223 device.pn = Some(own_pn.clone());
4224 })
4225 .await;
4226
4227 let primary_phone_jid = own_pn.with_device(0);
4229 let signal_addr = primary_phone_jid.to_protocol_address();
4230
4231 let dummy_session = SessionRecord::new_fresh();
4233 {
4234 let device_arc = pm.get_device_arc().await;
4235 let device = device_arc.read().await;
4236 device
4237 .store_session(&signal_addr, &dummy_session)
4238 .await
4239 .expect("Failed to store test session");
4240
4241 let exists = device
4243 .contains_session(&signal_addr)
4244 .await
4245 .expect("Failed to check session");
4246 assert!(exists, "Session should exist after store");
4247 }
4248
4249 let (client, _rx) = Client::new(
4250 Arc::new(crate::runtime_impl::TokioRuntime),
4251 pm.clone(),
4252 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4253 Arc::new(MockHttpClient),
4254 None,
4255 )
4256 .await;
4257
4258 let result = client.establish_primary_phone_session_immediate().await;
4261
4262 assert!(
4263 result.is_ok(),
4264 "establish_primary_phone_session_immediate should succeed when session exists"
4265 );
4266
4267 {
4270 let device_arc = pm.get_device_arc().await;
4271 let device = device_arc.read().await;
4272 let exists = device
4273 .contains_session(&signal_addr)
4274 .await
4275 .expect("Failed to check session");
4276 assert!(exists, "Session should still exist after the call");
4277 }
4278
4279 info!("✅ test_establish_session_skips_when_exists passed");
4280 }
4281
4282 #[test]
4285 fn test_mac_failure_prevention_flow_documentation() {
4286 fn should_establish_session(
4288 check_result: Result<bool, &'static str>,
4289 ) -> Result<bool, String> {
4290 match check_result {
4291 Ok(true) => Ok(false), Ok(false) => Ok(true), Err(e) => Err(format!("Cannot verify session: {}", e)), }
4295 }
4296
4297 let result = should_establish_session(Ok(true));
4299 assert_eq!(result, Ok(false), "Should skip when session exists");
4300
4301 let result = should_establish_session(Ok(false));
4303 assert_eq!(result, Ok(true), "Should establish when no session");
4304
4305 let result = should_establish_session(Err("database error"));
4307 assert!(result.is_err(), "Should fail when check fails");
4308
4309 info!("✅ test_mac_failure_prevention_flow_documentation passed");
4310 }
4311
4312 #[test]
4313 fn test_unified_session_id_calculation() {
4314 const DAY_MS: i64 = 24 * 60 * 60 * 1000;
4318 const WEEK_MS: i64 = 7 * DAY_MS;
4319 const OFFSET_MS: i64 = 3 * DAY_MS;
4320
4321 fn calculate_session_id(now_ms: i64, server_offset_ms: i64) -> i64 {
4323 let adjusted_now = now_ms + server_offset_ms;
4324 (adjusted_now + OFFSET_MS) % WEEK_MS
4325 }
4326
4327 let now_ms = 1706000000000_i64; let id = calculate_session_id(now_ms, 0);
4330 assert!(
4331 (0..WEEK_MS).contains(&id),
4332 "Session ID should be in [0, WEEK_MS)"
4333 );
4334
4335 let id_with_positive_offset = calculate_session_id(now_ms, 5000);
4337 assert!(
4338 (0..WEEK_MS).contains(&id_with_positive_offset),
4339 "Session ID should be in [0, WEEK_MS)"
4340 );
4341 let id_with_negative_offset = calculate_session_id(now_ms, -5000);
4346 assert!(
4347 (0..WEEK_MS).contains(&id_with_negative_offset),
4348 "Session ID should be in [0, WEEK_MS)"
4349 );
4350
4351 let wrap_test_now = WEEK_MS - OFFSET_MS + 1000; let wrapped_id = calculate_session_id(wrap_test_now, 0);
4355 assert_eq!(wrapped_id, 1000, "Should wrap around correctly");
4356
4357 let boundary_now = WEEK_MS - OFFSET_MS;
4359 let boundary_id = calculate_session_id(boundary_now, 0);
4360 assert_eq!(boundary_id, 0, "At exact boundary should be 0");
4361 }
4362
4363 #[tokio::test]
4364 async fn test_server_time_offset_extraction() {
4365 use wacore_binary::builder::NodeBuilder;
4366
4367 let backend = crate::test_utils::create_test_backend().await;
4368 let pm = Arc::new(
4369 PersistenceManager::new(backend)
4370 .await
4371 .expect("persistence manager should initialize"),
4372 );
4373 let (client, _rx) = Client::new(
4374 Arc::new(crate::runtime_impl::TokioRuntime),
4375 pm,
4376 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4377 Arc::new(MockHttpClient),
4378 None,
4379 )
4380 .await;
4381
4382 assert_eq!(
4384 client.unified_session.server_time_offset_ms(),
4385 0,
4386 "Initial offset should be 0"
4387 );
4388
4389 let server_time = wacore::time::now_secs() + 10; let node = NodeBuilder::new("success")
4392 .attr("t", server_time.to_string())
4393 .build();
4394
4395 client.update_server_time_offset(&node);
4397
4398 let offset = client.unified_session.server_time_offset_ms();
4401 assert!(
4402 (offset - 10000).abs() < 1000, "Offset should be approximately 10000ms, got {}",
4404 offset
4405 );
4406
4407 let node_no_t = NodeBuilder::new("success").build();
4409 client.update_server_time_offset(&node_no_t);
4410 let offset_after = client.unified_session.server_time_offset_ms();
4411 assert!(
4412 (offset_after - offset).abs() < 100, "Offset should not change when 't' is missing"
4414 );
4415
4416 let node_invalid = NodeBuilder::new("success")
4418 .attr("t", "not_a_number")
4419 .build();
4420 client.update_server_time_offset(&node_invalid);
4421 let offset_after_invalid = client.unified_session.server_time_offset_ms();
4422 assert!(
4423 (offset_after_invalid - offset).abs() < 100,
4424 "Offset should not change when 't' is invalid"
4425 );
4426
4427 let node_zero = NodeBuilder::new("success").attr("t", "0").build();
4429 client.update_server_time_offset(&node_zero);
4430 let offset_after_zero = client.unified_session.server_time_offset_ms();
4431 assert!(
4432 (offset_after_zero - offset).abs() < 100,
4433 "Offset should not change when 't' is 0"
4434 );
4435
4436 info!("✅ test_server_time_offset_extraction passed");
4437 }
4438
4439 #[tokio::test]
4440 async fn test_unified_session_manager_integration() {
4441 let backend = crate::test_utils::create_test_backend().await;
4444 let pm = Arc::new(
4445 PersistenceManager::new(backend)
4446 .await
4447 .expect("persistence manager should initialize"),
4448 );
4449 let (client, _rx) = Client::new(
4450 Arc::new(crate::runtime_impl::TokioRuntime),
4451 pm,
4452 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4453 Arc::new(MockHttpClient),
4454 None,
4455 )
4456 .await;
4457
4458 assert_eq!(
4460 client.unified_session.sequence(),
4461 0,
4462 "Initial sequence should be 0"
4463 );
4464
4465 loop {
4469 client.unified_session.reset().await;
4470
4471 let result = client.unified_session.prepare_send().await;
4472 assert!(result.is_some(), "First send should succeed");
4473 let (node, seq) = result.unwrap();
4474 assert_eq!(node.tag, "ib", "Should be an IB stanza");
4475 assert_eq!(seq, 1, "First sequence should be 1 (pre-increment)");
4476 assert_eq!(client.unified_session.sequence(), 1);
4477
4478 let result2 = client.unified_session.prepare_send().await;
4479 if result2.is_none() {
4480 assert_eq!(client.unified_session.sequence(), 1);
4482 break;
4483 }
4484 tokio::task::yield_now().await;
4486 }
4487
4488 client.unified_session.clear_last_sent().await;
4490 let result3 = client.unified_session.prepare_send().await;
4491 assert!(result3.is_some(), "Should succeed after clearing");
4492 let (_, seq3) = result3.unwrap();
4493 assert_eq!(seq3, 1, "Sequence resets when session ID changes");
4494 assert_eq!(client.unified_session.sequence(), 1);
4495
4496 info!("✅ test_unified_session_manager_integration passed");
4497 }
4498
4499 #[test]
4500 fn test_unified_session_protocol_node() {
4501 use wacore::ib::{IbStanza, UnifiedSession};
4503 use wacore::protocol::ProtocolNode;
4504
4505 let session = UnifiedSession::new("123456789");
4507 assert_eq!(session.id, "123456789");
4508 assert_eq!(session.tag(), "unified_session");
4509
4510 let node = session.into_node();
4512 assert_eq!(node.tag, "unified_session");
4513 assert!(node.attrs.get("id").is_some_and(|v| v == "123456789"));
4514
4515 let stanza = IbStanza::unified_session(UnifiedSession::new("987654321"));
4517 assert_eq!(stanza.tag(), "ib");
4518
4519 let ib_node = stanza.into_node();
4521 assert_eq!(ib_node.tag, "ib");
4522 let children = ib_node.children().expect("IB stanza should have children");
4523 assert_eq!(children.len(), 1);
4524 assert_eq!(children[0].tag, "unified_session");
4525 assert!(
4526 children[0]
4527 .attrs
4528 .get("id")
4529 .is_some_and(|v| v == "987654321")
4530 );
4531
4532 info!("✅ test_unified_session_protocol_node passed");
4533 }
4534
4535 async fn create_offline_sync_test_client() -> Arc<Client> {
4537 let backend = crate::test_utils::create_test_backend().await;
4538 let pm = Arc::new(
4539 PersistenceManager::new(backend)
4540 .await
4541 .expect("persistence manager should initialize"),
4542 );
4543 let (client, _rx) = Client::new(
4544 Arc::new(crate::runtime_impl::TokioRuntime),
4545 pm,
4546 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4547 Arc::new(MockHttpClient),
4548 None,
4549 )
4550 .await;
4551 client
4552 }
4553
4554 #[tokio::test]
4555 async fn test_ib_thread_metadata_does_not_end_sync() {
4556 let client = create_offline_sync_test_client().await;
4557 client
4558 .offline_sync_metrics
4559 .active
4560 .store(true, Ordering::Release);
4561
4562 let node = NodeBuilder::new("ib")
4563 .children([NodeBuilder::new("thread_metadata")
4564 .children([NodeBuilder::new("item").build()])
4565 .build()])
4566 .build();
4567
4568 client.process_node(Arc::new(node)).await;
4569 assert!(
4570 client.offline_sync_metrics.active.load(Ordering::Acquire),
4571 "<ib><thread_metadata> should NOT end offline sync"
4572 );
4573 }
4574
4575 #[tokio::test]
4576 async fn test_ib_edge_routing_does_not_end_sync() {
4577 let client = create_offline_sync_test_client().await;
4578 client
4579 .offline_sync_metrics
4580 .active
4581 .store(true, Ordering::Release);
4582
4583 let node = NodeBuilder::new("ib")
4584 .children([NodeBuilder::new("edge_routing")
4585 .children([NodeBuilder::new("routing_info")
4586 .bytes(vec![1, 2, 3])
4587 .build()])
4588 .build()])
4589 .build();
4590
4591 client.process_node(Arc::new(node)).await;
4592 assert!(
4593 client.offline_sync_metrics.active.load(Ordering::Acquire),
4594 "<ib><edge_routing> should NOT end offline sync"
4595 );
4596 }
4597
4598 #[tokio::test]
4599 async fn test_ib_dirty_does_not_end_sync() {
4600 let client = create_offline_sync_test_client().await;
4601 client
4602 .offline_sync_metrics
4603 .active
4604 .store(true, Ordering::Release);
4605
4606 let node = NodeBuilder::new("ib")
4607 .children([NodeBuilder::new("dirty")
4608 .attr("type", "groups")
4609 .attr("timestamp", "1234")
4610 .build()])
4611 .build();
4612
4613 client.process_node(Arc::new(node)).await;
4614 assert!(
4615 client.offline_sync_metrics.active.load(Ordering::Acquire),
4616 "<ib><dirty> should NOT end offline sync"
4617 );
4618 }
4619
4620 #[tokio::test]
4621 async fn test_ib_offline_child_ends_sync() {
4622 let client = create_offline_sync_test_client().await;
4623 client
4624 .offline_sync_metrics
4625 .active
4626 .store(true, Ordering::Release);
4627 client
4628 .offline_sync_metrics
4629 .total_messages
4630 .store(301, Ordering::Release);
4631
4632 let node = NodeBuilder::new("ib")
4633 .children([NodeBuilder::new("offline").attr("count", "301").build()])
4634 .build();
4635
4636 client.process_node(Arc::new(node)).await;
4637 assert!(
4638 !client.offline_sync_metrics.active.load(Ordering::Acquire),
4639 "<ib><offline count='301'/> should end offline sync"
4640 );
4641 }
4642
4643 #[tokio::test]
4644 async fn test_ib_offline_preview_starts_sync() {
4645 let client = create_offline_sync_test_client().await;
4646
4647 let node = NodeBuilder::new("ib")
4648 .children([NodeBuilder::new("offline_preview")
4649 .attr("count", "301")
4650 .attr("message", "168")
4651 .attr("notification", "62")
4652 .attr("receipt", "68")
4653 .attr("appdata", "0")
4654 .build()])
4655 .build();
4656
4657 client.process_node(Arc::new(node)).await;
4658 assert!(
4659 client.offline_sync_metrics.active.load(Ordering::Acquire),
4660 "offline_preview with count>0 should activate sync"
4661 );
4662 assert_eq!(
4663 client
4664 .offline_sync_metrics
4665 .total_messages
4666 .load(Ordering::Acquire),
4667 301
4668 );
4669 }
4670
4671 #[tokio::test]
4672 async fn test_offline_message_increments_processed() {
4673 let client = create_offline_sync_test_client().await;
4674 client
4675 .offline_sync_metrics
4676 .active
4677 .store(true, Ordering::Release);
4678 client
4679 .offline_sync_metrics
4680 .total_messages
4681 .store(100, Ordering::Release);
4682
4683 let node = NodeBuilder::new("message")
4684 .attr("offline", "1")
4685 .attr("from", "5551234567@s.whatsapp.net")
4686 .attr("id", "TEST123")
4687 .attr("t", "1772884671")
4688 .attr("type", "text")
4689 .build();
4690
4691 client.process_node(Arc::new(node)).await;
4692 assert_eq!(
4693 client
4694 .offline_sync_metrics
4695 .processed_messages
4696 .load(Ordering::Acquire),
4697 1,
4698 "offline message should increment processed count"
4699 );
4700 }
4701
4702 #[tokio::test]
4724 async fn test_handle_iq_ping_with_child_element() {
4725 let backend = crate::test_utils::create_test_backend().await;
4727 let pm = Arc::new(
4728 PersistenceManager::new(backend)
4729 .await
4730 .expect("persistence manager should initialize"),
4731 );
4732 let (client, _rx) = Client::new(
4733 Arc::new(crate::runtime_impl::TokioRuntime),
4734 pm,
4735 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4736 Arc::new(MockHttpClient),
4737 None,
4738 )
4739 .await;
4740
4741 let ping_node = NodeBuilder::new("iq")
4742 .attr("type", "get")
4743 .attr("from", SERVER_JID)
4744 .attr("id", "ping-child-1")
4745 .children([NodeBuilder::new("ping").build()])
4746 .build();
4747
4748 let handled = client.handle_iq(&ping_node).await;
4749 assert!(
4750 handled,
4751 "handle_iq must recognize ping with <ping> child element"
4752 );
4753 }
4754
4755 #[tokio::test]
4756 async fn test_handle_iq_ping_with_xmlns_attribute() {
4757 let backend = crate::test_utils::create_test_backend().await;
4761 let pm = Arc::new(
4762 PersistenceManager::new(backend)
4763 .await
4764 .expect("persistence manager should initialize"),
4765 );
4766 let (client, _rx) = Client::new(
4767 Arc::new(crate::runtime_impl::TokioRuntime),
4768 pm,
4769 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4770 Arc::new(MockHttpClient),
4771 None,
4772 )
4773 .await;
4774
4775 let ping_node = NodeBuilder::new("iq")
4776 .attr("type", "get")
4777 .attr("from", SERVER_JID)
4778 .attr("id", "ping-xmlns-1")
4779 .attr("xmlns", "urn:xmpp:ping")
4780 .build();
4781
4782 let handled = client.handle_iq(&ping_node).await;
4783 assert!(
4784 handled,
4785 "handle_iq must recognize ping with xmlns=\"urn:xmpp:ping\" attribute (no children)"
4786 );
4787 }
4788
4789 #[tokio::test]
4790 async fn test_handle_iq_ping_with_both_child_and_xmlns() {
4791 let backend = crate::test_utils::create_test_backend().await;
4794 let pm = Arc::new(
4795 PersistenceManager::new(backend)
4796 .await
4797 .expect("persistence manager should initialize"),
4798 );
4799 let (client, _rx) = Client::new(
4800 Arc::new(crate::runtime_impl::TokioRuntime),
4801 pm,
4802 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4803 Arc::new(MockHttpClient),
4804 None,
4805 )
4806 .await;
4807
4808 let ping_node = NodeBuilder::new("iq")
4809 .attr("type", "get")
4810 .attr("from", SERVER_JID)
4811 .attr("id", "ping-both-1")
4812 .attr("xmlns", "urn:xmpp:ping")
4813 .children([NodeBuilder::new("ping").build()])
4814 .build();
4815
4816 let handled = client.handle_iq(&ping_node).await;
4817 assert!(
4818 handled,
4819 "handle_iq must handle ping with both child and xmlns"
4820 );
4821 }
4822
4823 #[tokio::test]
4824 async fn test_handle_iq_non_ping_returns_false() {
4825 let backend = crate::test_utils::create_test_backend().await;
4827 let pm = Arc::new(
4828 PersistenceManager::new(backend)
4829 .await
4830 .expect("persistence manager should initialize"),
4831 );
4832 let (client, _rx) = Client::new(
4833 Arc::new(crate::runtime_impl::TokioRuntime),
4834 pm,
4835 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4836 Arc::new(MockHttpClient),
4837 None,
4838 )
4839 .await;
4840
4841 let non_ping_node = NodeBuilder::new("iq")
4842 .attr("type", "get")
4843 .attr("from", SERVER_JID)
4844 .attr("id", "not-a-ping")
4845 .attr("xmlns", "some:other:namespace")
4846 .build();
4847
4848 let handled = client.handle_iq(&non_ping_node).await;
4849 assert!(
4850 !handled,
4851 "handle_iq must NOT treat non-ping xmlns as a ping"
4852 );
4853 }
4854
4855 #[tokio::test]
4856 async fn test_handle_iq_ping_wrong_type_returns_false() {
4857 let backend = crate::test_utils::create_test_backend().await;
4859 let pm = Arc::new(
4860 PersistenceManager::new(backend)
4861 .await
4862 .expect("persistence manager should initialize"),
4863 );
4864 let (client, _rx) = Client::new(
4865 Arc::new(crate::runtime_impl::TokioRuntime),
4866 pm,
4867 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4868 Arc::new(MockHttpClient),
4869 None,
4870 )
4871 .await;
4872
4873 let result_node = NodeBuilder::new("iq")
4874 .attr("type", "result")
4875 .attr("from", SERVER_JID)
4876 .attr("id", "ping-result-1")
4877 .attr("xmlns", "urn:xmpp:ping")
4878 .build();
4879
4880 let handled = client.handle_iq(&result_node).await;
4881 assert!(
4882 !handled,
4883 "handle_iq must NOT respond to type=\"result\" even with ping xmlns"
4884 );
4885 }
4886
4887 #[test]
4890 fn test_build_pong_with_id() {
4891 let pong = build_pong("s.whatsapp.net".to_string(), Some("ping-123"));
4892 assert!(
4893 pong.attrs.get("id").is_some_and(|v| v == "ping-123"),
4894 "pong should include id when server ping has one"
4895 );
4896 assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4897 assert!(pong.attrs.get("to").is_some_and(|v| v == "s.whatsapp.net"));
4898 }
4899
4900 #[test]
4901 fn test_build_pong_without_id() {
4902 let pong = build_pong("s.whatsapp.net".to_string(), None);
4903 assert!(
4904 !pong.attrs.contains_key("id"),
4905 "pong should NOT include id when server ping has none"
4906 );
4907 assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4908 }
4909
4910 #[test]
4911 fn test_encrypt_identity_notification_omits_type() {
4912 let node = NodeBuilder::new("notification")
4913 .attr("from", "186303081611421@lid")
4914 .attr("id", "4128735301")
4915 .attr("type", "encrypt")
4916 .children([NodeBuilder::new("identity").build()])
4917 .build();
4918
4919 assert!(
4920 is_encrypt_identity_notification(&node),
4921 "identity-change notification ACK must omit type to match WA Web"
4922 );
4923 }
4924
4925 #[test]
4926 fn test_device_notification_is_not_encrypt_identity() {
4927 let node = NodeBuilder::new("notification")
4928 .attr("from", "186303081611421@lid")
4929 .attr("id", "269488578")
4930 .attr("type", "devices")
4931 .children([NodeBuilder::new("remove").build()])
4932 .build();
4933
4934 assert!(
4935 !is_encrypt_identity_notification(&node),
4936 "device notification is not an encrypt+identity notification"
4937 );
4938 }
4939
4940 #[test]
4941 fn test_build_ack_node_for_message_omits_type_includes_from() {
4942 let incoming = NodeBuilder::new("message")
4945 .attr("from", "120363161500776365@g.us")
4946 .attr("id", "A5791A5392EF60E3FB0670098DE010D4")
4947 .attr("type", "text")
4948 .attr("participant", "181531758878822@lid")
4949 .build();
4950 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4951 .parse()
4952 .expect("own device PN JID should parse");
4953
4954 let ack = build_ack_node(&incoming, Some(&own_device_pn))
4955 .expect("message ack should be buildable");
4956
4957 assert_eq!(ack.tag, "ack");
4958 assert!(ack.attrs.get("class").is_some_and(|v| v == "message"));
4961 assert!(
4962 ack.attrs
4963 .get("to")
4964 .is_some_and(|v| v == "120363161500776365@g.us")
4965 );
4966 assert!(
4967 ack.attrs
4968 .get("from")
4969 .is_some_and(|v| v == "155500012345:48@s.whatsapp.net")
4970 );
4971 assert!(
4972 ack.attrs
4973 .get("participant")
4974 .is_some_and(|v| v == "181531758878822@lid")
4975 );
4976 assert!(
4977 !ack.attrs.contains_key("type"),
4978 "message ACK must NOT echo type (matches whatsmeow behavior)"
4979 );
4980 }
4981
4982 #[test]
4983 fn test_build_ack_node_for_identity_change_omits_type_and_from() {
4984 let incoming = NodeBuilder::new("notification")
4985 .attr("from", "186303081611421@lid")
4986 .attr("id", "4128735301")
4987 .attr("type", "encrypt")
4988 .children([NodeBuilder::new("identity").build()])
4989 .build();
4990 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4991 .parse()
4992 .expect("own device PN JID should parse");
4993
4994 let ack = build_ack_node(&incoming, Some(&own_device_pn))
4995 .expect("notification ack should be buildable");
4996
4997 assert!(ack.attrs.get("class").is_some_and(|v| v == "notification"));
4998 assert!(
4999 !ack.attrs.contains_key("type"),
5000 "identity-change notification ACK must omit type"
5001 );
5002 assert!(
5003 !ack.attrs.contains_key("from"),
5004 "notification ACKs should not include our device PN"
5005 );
5006 }
5007
5008 #[test]
5009 fn test_build_ack_node_for_receipt_with_type_echoes_type() {
5010 let incoming = NodeBuilder::new("receipt")
5012 .attr("from", "156535032389744@lid")
5013 .attr("id", "RCPT-WITH-TYPE")
5014 .attr("type", "read")
5015 .build();
5016 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
5017 .parse()
5018 .expect("own device PN JID should parse");
5019
5020 let ack = build_ack_node(&incoming, Some(&own_device_pn))
5021 .expect("receipt ack should be buildable");
5022
5023 assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
5024 assert!(
5025 ack.attrs.get("type").is_some_and(|v| v == "read"),
5026 "receipt ACK must echo the type attribute when present"
5027 );
5028 assert!(
5029 !ack.attrs.contains_key("from"),
5030 "receipt ACKs should not include our device PN"
5031 );
5032 }
5033
5034 #[test]
5035 fn test_build_ack_node_for_receipt_without_type_omits_type() {
5036 let incoming = NodeBuilder::new("receipt")
5039 .attr("from", "156535032389744@lid")
5040 .attr("id", "RCPT-NO-TYPE")
5041 .build();
5042 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
5043 .parse()
5044 .expect("own device PN JID should parse");
5045
5046 let ack = build_ack_node(&incoming, Some(&own_device_pn))
5047 .expect("receipt ack should be buildable");
5048
5049 assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
5050 assert!(
5051 !ack.attrs.contains_key("type"),
5052 "receipt ACK must NOT contain type when the incoming receipt has no type attribute"
5053 );
5054 assert!(
5055 !ack.attrs.contains_key("from"),
5056 "receipt ACKs should not include our device PN"
5057 );
5058 }
5059
5060 #[tokio::test]
5062 async fn test_handle_iq_ping_without_id() {
5063 let backend = crate::test_utils::create_test_backend().await;
5064 let pm = Arc::new(
5065 PersistenceManager::new(backend)
5066 .await
5067 .expect("persistence manager should initialize"),
5068 );
5069 let (client, _rx) = Client::new(
5070 Arc::new(crate::runtime_impl::TokioRuntime),
5071 pm,
5072 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5073 Arc::new(MockHttpClient),
5074 None,
5075 )
5076 .await;
5077
5078 let ping_node = NodeBuilder::new("iq")
5080 .attr("type", "get")
5081 .attr("from", SERVER_JID)
5082 .attr("xmlns", "urn:xmpp:ping")
5083 .build();
5084
5085 let handled = client.handle_iq(&ping_node).await;
5086 assert!(
5087 handled,
5088 "handle_iq must recognize ping without id attribute"
5089 );
5090 }
5091
5092 #[test]
5095 fn test_fibonacci_backoff_sequence() {
5096 let expected_base_ms = [1000, 1000, 2000, 3000, 5000, 8000, 13000, 21000];
5099 for (attempt, &base) in expected_base_ms.iter().enumerate() {
5100 let delay = fibonacci_backoff(attempt as u32);
5101 let ms = delay.as_millis() as u64;
5102 let low = base - base / 10;
5103 let high = base + base / 10;
5104 assert!(
5105 ms >= low && ms <= high,
5106 "attempt {attempt}: expected {low}..={high}ms, got {ms}ms"
5107 );
5108 }
5109 }
5110
5111 #[test]
5112 fn test_fibonacci_backoff_max_900s() {
5113 let delay = fibonacci_backoff(100);
5115 let ms = delay.as_millis() as u64;
5116 assert!(
5117 ms <= 990_000,
5118 "should never exceed 900s + 10% jitter, got {ms}ms"
5119 );
5120 assert!(
5121 ms >= 810_000,
5122 "should be at least 900s - 10% jitter, got {ms}ms"
5123 );
5124 }
5125
5126 #[test]
5127 fn test_fibonacci_backoff_first_attempt_is_1s() {
5128 let delay = fibonacci_backoff(0);
5129 let ms = delay.as_millis() as u64;
5130 assert!(
5131 (900..=1100).contains(&ms),
5132 "first attempt should be ~1s (±10%), got {ms}ms"
5133 );
5134 }
5135
5136 #[tokio::test]
5139 async fn test_stream_error_401_disables_reconnect() {
5140 let client = create_offline_sync_test_client().await;
5141 let node = NodeBuilder::new("stream:error").attr("code", "401").build();
5142 client.handle_stream_error(&node).await;
5143 assert!(
5144 !client.enable_auto_reconnect.load(Ordering::Relaxed),
5145 "401 should disable auto-reconnect"
5146 );
5147 }
5148
5149 #[tokio::test]
5150 async fn test_stream_error_409_disables_reconnect() {
5151 let client = create_offline_sync_test_client().await;
5152 let node = NodeBuilder::new("stream:error").attr("code", "409").build();
5153 client.handle_stream_error(&node).await;
5154 assert!(
5155 !client.enable_auto_reconnect.load(Ordering::Relaxed),
5156 "409 should disable auto-reconnect"
5157 );
5158 }
5159
5160 #[tokio::test]
5161 async fn test_stream_error_429_keeps_reconnect_with_backoff() {
5162 let client = create_offline_sync_test_client().await;
5163 let before = client.auto_reconnect_errors.load(Ordering::Relaxed);
5164 let node = NodeBuilder::new("stream:error").attr("code", "429").build();
5165 client.handle_stream_error(&node).await;
5166 assert!(
5167 client.enable_auto_reconnect.load(Ordering::Relaxed),
5168 "429 should keep auto-reconnect enabled"
5169 );
5170 let after = client.auto_reconnect_errors.load(Ordering::Relaxed);
5171 assert_eq!(
5172 after,
5173 before + 5,
5174 "429 should increase backoff by exactly 5: before={before}, after={after}"
5175 );
5176 }
5177
5178 #[tokio::test]
5179 async fn test_stream_error_503_keeps_reconnect() {
5180 let client = create_offline_sync_test_client().await;
5181 let node = NodeBuilder::new("stream:error").attr("code", "503").build();
5182 client.handle_stream_error(&node).await;
5183 assert!(
5184 client.enable_auto_reconnect.load(Ordering::Relaxed),
5185 "503 should keep auto-reconnect enabled"
5186 );
5187 }
5188
5189 #[tokio::test]
5190 async fn test_custom_cache_config_is_respected() {
5191 use crate::cache_config::{CacheConfig, CacheEntryConfig};
5192 use std::time::Duration;
5193
5194 let backend = crate::test_utils::create_test_backend().await;
5195 let pm = Arc::new(
5196 PersistenceManager::new(backend)
5197 .await
5198 .expect("persistence manager should initialize"),
5199 );
5200
5201 let custom_config = CacheConfig {
5202 group_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5203 device_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5204 ..CacheConfig::default()
5205 };
5206
5207 let (client, _rx) = Client::new_with_cache_config(
5210 Arc::new(crate::runtime_impl::TokioRuntime),
5211 pm,
5212 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5213 Arc::new(MockHttpClient),
5214 None,
5215 custom_config,
5216 )
5217 .await;
5218
5219 assert!(!client.is_logged_in());
5220 }
5221
5222 #[tokio::test]
5231 async fn test_is_connected_not_affected_by_mutex_contention() {
5232 use crate::socket::NoiseSocket;
5233 use wacore::handshake::NoiseCipher;
5234
5235 let backend = crate::test_utils::create_test_backend().await;
5236 let pm = Arc::new(
5237 PersistenceManager::new(backend)
5238 .await
5239 .expect("persistence manager should initialize"),
5240 );
5241 let (client, _rx) = Client::new(
5242 Arc::new(crate::runtime_impl::TokioRuntime),
5243 pm,
5244 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5245 Arc::new(MockHttpClient),
5246 None,
5247 )
5248 .await;
5249
5250 assert!(!client.is_connected(), "should start disconnected");
5252
5253 let transport: Arc<dyn crate::transport::Transport> =
5255 Arc::new(crate::transport::mock::MockTransport);
5256 let key = [0u8; 32];
5257 let write_key = NoiseCipher::new(&key).expect("valid key");
5258 let read_key = NoiseCipher::new(&key).expect("valid key");
5259 let noise_socket = NoiseSocket::new(
5260 Arc::new(crate::runtime_impl::TokioRuntime),
5261 transport,
5262 write_key,
5263 read_key,
5264 );
5265 *client.noise_socket.lock().await = Some(Arc::new(noise_socket));
5266 client.is_connected.store(true, Ordering::Release);
5267
5268 assert!(client.is_connected(), "should report connected");
5269
5270 let _guard = client.noise_socket.lock().await;
5273 assert!(
5274 client.is_connected(),
5275 "is_connected() must return true even while noise_socket mutex is held"
5276 );
5277 }
5278
5279 #[tokio::test]
5283 async fn test_send_ack_for_returns_error_when_disconnected() {
5284 let backend = crate::test_utils::create_test_backend().await;
5285 let pm = Arc::new(
5286 PersistenceManager::new(backend)
5287 .await
5288 .expect("persistence manager should initialize"),
5289 );
5290 let (client, _rx) = Client::new(
5291 Arc::new(crate::runtime_impl::TokioRuntime),
5292 pm,
5293 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5294 Arc::new(MockHttpClient),
5295 None,
5296 )
5297 .await;
5298
5299 let receipt = NodeBuilder::new("receipt")
5301 .attr("from", "120363040237990503@g.us")
5302 .attr("id", "TEST-RECEIPT-ID")
5303 .attr("participant", "236395184570386@lid")
5304 .build();
5305
5306 let result = client.send_ack_for(&receipt).await;
5307 assert!(
5308 matches!(result, Err(ClientError::NotConnected)),
5309 "send_ack_for must return Err(NotConnected) when disconnected, got: {result:?}"
5310 );
5311 }
5312
5313 #[tokio::test]
5316 async fn test_send_ack_for_returns_ok_on_expected_disconnect() {
5317 let backend = crate::test_utils::create_test_backend().await;
5318 let pm = Arc::new(
5319 PersistenceManager::new(backend)
5320 .await
5321 .expect("persistence manager should initialize"),
5322 );
5323 let (client, _rx) = Client::new(
5324 Arc::new(crate::runtime_impl::TokioRuntime),
5325 pm,
5326 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5327 Arc::new(MockHttpClient),
5328 None,
5329 )
5330 .await;
5331
5332 client.expected_disconnect.store(true, Ordering::Relaxed);
5334
5335 let receipt = NodeBuilder::new("receipt")
5336 .attr("from", "120363040237990503@g.us")
5337 .attr("id", "TEST-RECEIPT-ID")
5338 .build();
5339
5340 let result = client.send_ack_for(&receipt).await;
5341 assert!(
5342 result.is_ok(),
5343 "send_ack_for should return Ok during expected disconnect"
5344 );
5345 }
5346}