1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::cache::Cache;
8use crate::cache_store::TypedCache;
9use crate::handshake;
10use crate::lid_pn_cache::LidPnCache;
11use crate::pair;
12use anyhow::{Result, anyhow};
13use futures::FutureExt;
14use std::borrow::Cow;
15use std::collections::{HashMap, HashSet};
16
17use wacore::xml::DisplayableNode;
18use wacore_binary::builder::NodeBuilder;
19use wacore_binary::jid::JidExt;
20use wacore_binary::node::{Attrs, Node, NodeValue};
21
22use crate::appstate_sync::AppStateProcessor;
23use crate::handlers::chatstate::ChatStateEvent;
24use crate::jid_utils::server_jid;
25use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
26use crate::types::enc_handler::EncHandler;
27use crate::types::events::{ConnectFailureReason, Event};
28
29use log::{debug, error, info, trace, warn};
30
31use rand::{Rng, RngExt};
32use scopeguard;
33use wacore_binary::jid::Jid;
34
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
37
38#[derive(Debug, Clone)]
55pub struct NodeFilter {
56 tag: String,
57 attrs: Vec<(String, String)>,
58}
59
60impl NodeFilter {
61 pub fn tag(tag: impl Into<String>) -> Self {
63 Self {
64 tag: tag.into(),
65 attrs: Vec::new(),
66 }
67 }
68
69 pub fn attr(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
71 self.attrs.push((key.into(), value.into()));
72 self
73 }
74
75 pub fn from_jid(self, jid: &Jid) -> Self {
77 self.attr("from", jid.to_string())
78 }
79
80 fn matches(&self, node: &Node) -> bool {
81 node.tag == self.tag
82 && self
83 .attrs
84 .iter()
85 .all(|(k, v)| node.attrs.get(k.as_str()).is_some_and(|attr| *attr == *v))
86 }
87}
88
89struct NodeWaiter {
90 filter: NodeFilter,
91 tx: futures::channel::oneshot::Sender<Arc<Node>>,
92}
93
94use async_lock::Mutex;
95use async_lock::RwLock;
96use std::time::Duration;
97use thiserror::Error;
98
99use wacore::appstate::patch_decode::WAPatchName;
100use wacore::client::context::GroupInfo;
101use wacore::runtime::timeout as rt_timeout;
102use waproto::whatsapp as wa;
103
104use crate::cache_config::CacheConfig;
105use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
106use crate::sync_task::MajorSyncTask;
107use wacore::runtime::Runtime;
108
109type ChatStateHandler = Arc<dyn Fn(ChatStateEvent) + Send + Sync>;
111
112const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
113
114const TRANSPORT_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
117
118#[cfg(feature = "debug-diagnostics")]
125#[derive(Debug, Clone)]
126pub struct MemoryDiagnostics {
127 pub group_cache: u64,
129 pub device_cache: u64,
130 pub device_registry_cache: u64,
131 pub lid_pn_lid_entries: u64,
132 pub lid_pn_pn_entries: u64,
133 pub retried_group_messages: u64,
134 pub recent_messages: u64,
135 pub message_retry_counts: u64,
136 pub pdo_pending_requests: u64,
137 pub session_locks: u64,
139 pub message_queues: u64,
140 pub message_enqueue_locks: u64,
141 pub response_waiters: usize,
143 pub node_waiters: usize,
144 pub pending_retries: usize,
145 pub presence_subscriptions: usize,
146 pub app_state_key_requests: usize,
147 pub app_state_syncing: usize,
148 pub signal_cache_sessions: usize,
149 pub signal_cache_identities: usize,
150 pub signal_cache_sender_keys: usize,
151 pub chatstate_handlers: usize,
153 pub custom_enc_handlers: usize,
154}
155
156#[cfg(feature = "debug-diagnostics")]
157impl std::fmt::Display for MemoryDiagnostics {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 writeln!(f, "=== Memory Diagnostics ===")?;
160 writeln!(f, "--- Moka caches (TTL-bounded) ---")?;
161 writeln!(f, " group_cache: {}", self.group_cache)?;
162 writeln!(f, " device_cache: {}", self.device_cache)?;
163 writeln!(
164 f,
165 " device_registry_cache: {}",
166 self.device_registry_cache
167 )?;
168 writeln!(f, " lid_pn (lid): {}", self.lid_pn_lid_entries)?;
169 writeln!(f, " lid_pn (pn): {}", self.lid_pn_pn_entries)?;
170 writeln!(
171 f,
172 " retried_group_messages: {}",
173 self.retried_group_messages
174 )?;
175 writeln!(f, " recent_messages: {}", self.recent_messages)?;
176 writeln!(f, " message_retry_counts: {}", self.message_retry_counts)?;
177 writeln!(f, " pdo_pending_requests: {}", self.pdo_pending_requests)?;
178 writeln!(f, "--- Moka caches (capacity-only) ---")?;
179 writeln!(f, " session_locks: {}", self.session_locks)?;
180 writeln!(f, " message_queues: {}", self.message_queues)?;
181 writeln!(
182 f,
183 " message_enqueue_locks: {}",
184 self.message_enqueue_locks
185 )?;
186 writeln!(f, "--- Unbounded collections ---")?;
187 writeln!(f, " response_waiters: {}", self.response_waiters)?;
188 writeln!(f, " node_waiters: {}", self.node_waiters)?;
189 writeln!(f, " pending_retries: {}", self.pending_retries)?;
190 writeln!(
191 f,
192 " presence_subscriptions: {}",
193 self.presence_subscriptions
194 )?;
195 writeln!(
196 f,
197 " app_state_key_requests: {}",
198 self.app_state_key_requests
199 )?;
200 writeln!(f, " app_state_syncing: {}", self.app_state_syncing)?;
201 writeln!(
202 f,
203 " signal_sessions: {}",
204 self.signal_cache_sessions
205 )?;
206 writeln!(
207 f,
208 " signal_identities: {}",
209 self.signal_cache_identities
210 )?;
211 writeln!(
212 f,
213 " signal_sender_keys: {}",
214 self.signal_cache_sender_keys
215 )?;
216 writeln!(f, "--- Misc ---")?;
217 writeln!(f, " chatstate_handlers: {}", self.chatstate_handlers)?;
218 writeln!(f, " custom_enc_handlers: {}", self.custom_enc_handlers)?;
219 Ok(())
220 }
221}
222
223#[derive(Debug, Error)]
224pub enum ClientError {
225 #[error("client is not connected")]
226 NotConnected,
227 #[error("socket error: {0}")]
228 Socket(#[from] SocketError),
229 #[error("encrypt/send error: {0}")]
230 EncryptSend(#[from] EncryptSendError),
231 #[error("client is already connected")]
232 AlreadyConnected,
233 #[error("client is not logged in")]
234 NotLoggedIn,
235}
236
237use wacore::types::message::StanzaKey;
238
239#[derive(Debug)]
241pub(crate) struct OfflineSyncMetrics {
242 pub active: AtomicBool,
243 pub total_messages: AtomicUsize,
244 pub processed_messages: AtomicUsize,
245 pub start_time: std::sync::Mutex<Option<wacore::time::Instant>>,
247}
248
249pub struct Client {
250 pub(crate) runtime: Arc<dyn Runtime>,
251 pub(crate) core: wacore::client::CoreClient,
252
253 pub(crate) persistence_manager: Arc<PersistenceManager>,
254 pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
255
256 pub(crate) is_logged_in: Arc<AtomicBool>,
257 pub(crate) is_connecting: Arc<AtomicBool>,
258 pub(crate) is_running: Arc<AtomicBool>,
259 is_connected: Arc<AtomicBool>,
263 pub(crate) shutdown_notifier: Arc<event_listener::Event>,
264 pub(crate) last_data_received_ms: Arc<AtomicU64>,
268 pub(crate) last_data_sent_ms: Arc<AtomicU64>,
272
273 pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
274 pub(crate) transport_events:
275 Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
276 pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
277 pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
278
279 pub(crate) response_waiters:
280 Arc<Mutex<HashMap<String, futures::channel::oneshot::Sender<wacore_binary::Node>>>>,
281
282 node_waiters: std::sync::Mutex<Vec<NodeWaiter>>,
286 node_waiter_count: AtomicUsize,
287
288 pub(crate) unique_id: String,
289 pub(crate) id_counter: Arc<AtomicU64>,
290
291 pub(crate) unified_session: crate::unified_session::UnifiedSessionManager,
292
293 pub(crate) signal_cache: Arc<crate::store::signal_cache::SignalStoreCache>,
297
298 pub(crate) message_processing_semaphore: std::sync::Mutex<Arc<async_lock::Semaphore>>,
303
304 pub(crate) session_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
310
311 pub(crate) message_queues: Cache<String, async_channel::Sender<Arc<Node>>>,
315
316 pub(crate) lid_pn_cache: Arc<LidPnCache>,
321
322 pub(crate) message_enqueue_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
326
327 pub group_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, GroupInfo>>>>,
328 #[allow(clippy::type_complexity)]
329 pub device_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, Vec<Jid>>>>>,
330
331 pub(crate) retried_group_messages: Cache<String, ()>,
332 pub(crate) expected_disconnect: Arc<AtomicBool>,
333
334 pub(crate) connection_generation: Arc<AtomicU64>,
337
338 pub(crate) recent_messages: Cache<StanzaKey, Vec<u8>>,
341
342 pub(crate) pending_retries: Arc<async_lock::Mutex<HashSet<String>>>,
343
344 pub(crate) message_retry_counts: Cache<String, u8>,
348
349 pub enable_auto_reconnect: Arc<AtomicBool>,
350 pub auto_reconnect_errors: Arc<AtomicU32>,
351
352 pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
353
354 pub(crate) app_state_processor: async_lock::Mutex<Option<Arc<AppStateProcessor>>>,
355 pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, wacore::time::Instant>>>,
356 pub(crate) app_state_syncing: Arc<Mutex<HashSet<WAPatchName>>>,
359 pub(crate) initial_keys_synced_notifier: Arc<event_listener::Event>,
360 pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
361
362 pub(crate) server_has_prekeys: Arc<AtomicBool>,
365 pub(crate) prekey_upload_lock: Arc<async_lock::Mutex<()>>,
367 pub(crate) offline_sync_notifier: Arc<event_listener::Event>,
370 pub(crate) offline_sync_completed: Arc<AtomicBool>,
372 pub(crate) history_sync_tasks_in_flight: Arc<AtomicUsize>,
374 pub(crate) history_sync_idle_notifier: Arc<event_listener::Event>,
376 pub(crate) presence_subscriptions: Arc<async_lock::Mutex<HashSet<Jid>>>,
378 pub(crate) offline_sync_metrics: Arc<OfflineSyncMetrics>,
380 pub(crate) socket_ready_notifier: Arc<event_listener::Event>,
383 pub(crate) is_ready: Arc<AtomicBool>,
388 pub(crate) connected_notifier: Arc<event_listener::Event>,
391 pub(crate) major_sync_task_sender: async_channel::Sender<MajorSyncTask>,
392 pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<async_channel::Sender<()>>>>,
393
394 pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
397
398 pub custom_enc_handlers: Arc<async_lock::RwLock<HashMap<String, Arc<dyn EncHandler>>>>,
400
401 pub(crate) chatstate_handlers: Arc<RwLock<Vec<ChatStateHandler>>>,
404
405 pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
408
409 pub(crate) device_registry_cache: TypedCache<String, wacore::store::traits::DeviceListRecord>,
413
414 pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
416
417 pub(crate) synchronous_ack: bool,
419
420 pub http_client: Arc<dyn crate::http::HttpClient>,
422
423 pub(crate) override_version: Option<(u32, u32, u32)>,
425
426 pub(crate) skip_history_sync: AtomicBool,
429
430 pub(crate) cache_config: CacheConfig,
433}
434
435impl Client {
436 fn should_downgrade_sync_error(&self, err: &anyhow::Error) -> bool {
437 if self.is_shutting_down() {
438 return true;
439 }
440
441 matches!(
442 err.downcast_ref::<crate::request::IqError>(),
443 Some(
444 crate::request::IqError::NotConnected
445 | crate::request::IqError::InternalChannelClosed
446 )
447 )
448 }
449
450 fn log_sync_error(&self, context: &str, err: &anyhow::Error) {
452 if self.should_downgrade_sync_error(err) {
453 debug!("Skipping {context} during shutdown: {err}");
454 } else {
455 warn!("Failed {context}: {err}");
456 }
457 }
458
459 fn is_fully_ready(&self) -> bool {
463 self.is_connected() && self.is_logged_in() && self.is_ready.load(Ordering::Relaxed)
464 }
465
466 fn dispatch_connected(&self) {
468 self.is_ready.store(true, Ordering::Relaxed);
469 self.core
470 .event_bus
471 .dispatch(&Event::Connected(crate::types::events::Connected));
472 self.connected_notifier.notify(usize::MAX);
473 }
474
475 pub fn set_skip_history_sync(&self, enabled: bool) {
480 self.skip_history_sync.store(enabled, Ordering::Relaxed);
481 }
482
483 pub fn skip_history_sync_enabled(&self) -> bool {
485 self.skip_history_sync.load(Ordering::Relaxed)
486 }
487
488 pub(crate) fn is_shutting_down(&self) -> bool {
489 self.expected_disconnect.load(Ordering::Relaxed) || !self.is_running.load(Ordering::Relaxed)
490 }
491
492 pub async fn new(
497 runtime: Arc<dyn Runtime>,
498 persistence_manager: Arc<PersistenceManager>,
499 transport_factory: Arc<dyn crate::transport::TransportFactory>,
500 http_client: Arc<dyn crate::http::HttpClient>,
501 override_version: Option<(u32, u32, u32)>,
502 ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
503 Self::new_with_cache_config(
504 runtime,
505 persistence_manager,
506 transport_factory,
507 http_client,
508 override_version,
509 CacheConfig::default(),
510 )
511 .await
512 }
513
514 pub async fn new_with_cache_config(
516 runtime: Arc<dyn Runtime>,
517 persistence_manager: Arc<PersistenceManager>,
518 transport_factory: Arc<dyn crate::transport::TransportFactory>,
519 http_client: Arc<dyn crate::http::HttpClient>,
520 override_version: Option<(u32, u32, u32)>,
521 cache_config: CacheConfig,
522 ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
523 let mut unique_id_bytes = [0u8; 2];
524 rand::make_rng::<rand::rngs::StdRng>().fill_bytes(&mut unique_id_bytes);
525
526 let device_snapshot = persistence_manager.get_device_snapshot().await;
527 let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
528
529 let (tx, rx) = async_channel::bounded(32);
530
531 let this = Self {
532 runtime: runtime.clone(),
533 core,
534 persistence_manager: persistence_manager.clone(),
535 media_conn: Arc::new(RwLock::new(None)),
536 is_logged_in: Arc::new(AtomicBool::new(false)),
537 is_connecting: Arc::new(AtomicBool::new(false)),
538 is_running: Arc::new(AtomicBool::new(false)),
539 is_connected: Arc::new(AtomicBool::new(false)),
540 shutdown_notifier: Arc::new(event_listener::Event::new()),
541 last_data_received_ms: Arc::new(AtomicU64::new(0)),
542 last_data_sent_ms: Arc::new(AtomicU64::new(0)),
543
544 transport: Arc::new(Mutex::new(None)),
545 transport_events: Arc::new(Mutex::new(None)),
546 transport_factory,
547 noise_socket: Arc::new(Mutex::new(None)),
548
549 response_waiters: Arc::new(Mutex::new(HashMap::new())),
550 node_waiters: std::sync::Mutex::new(Vec::new()),
551 node_waiter_count: AtomicUsize::new(0),
552 unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
553 id_counter: Arc::new(AtomicU64::new(0)),
554 unified_session: crate::unified_session::UnifiedSessionManager::new(),
555
556 signal_cache: Arc::new(crate::store::signal_cache::SignalStoreCache::new()),
557 message_processing_semaphore: std::sync::Mutex::new(Arc::new(
558 async_lock::Semaphore::new(1),
559 )),
560 session_locks: Cache::builder()
564 .max_capacity(cache_config.session_locks_capacity.max(1))
565 .build(),
566 message_queues: Cache::builder()
567 .max_capacity(cache_config.message_queues_capacity.max(1))
568 .build(),
569 lid_pn_cache: Arc::new(LidPnCache::with_config(
570 &cache_config.lid_pn_cache,
571 cache_config.cache_stores.lid_pn_cache.clone(),
572 )),
573 message_enqueue_locks: Cache::builder()
574 .max_capacity(cache_config.message_enqueue_locks_capacity.max(1))
575 .build(),
576 group_cache: async_lock::Mutex::new(None),
577 device_cache: async_lock::Mutex::new(None),
578 retried_group_messages: cache_config.retried_group_messages.build_with_ttl(),
579
580 expected_disconnect: Arc::new(AtomicBool::new(false)),
581 connection_generation: Arc::new(AtomicU64::new(0)),
582
583 recent_messages: cache_config.recent_messages.build_with_ttl(),
584
585 pending_retries: Arc::new(async_lock::Mutex::new(HashSet::new())),
586
587 message_retry_counts: cache_config.message_retry_counts.build_with_ttl(),
588
589 offline_sync_metrics: Arc::new(OfflineSyncMetrics {
590 active: AtomicBool::new(false),
591 total_messages: AtomicUsize::new(0),
592 processed_messages: AtomicUsize::new(0),
593 start_time: std::sync::Mutex::new(None),
594 }),
595
596 enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
597 auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
598
599 needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
600
601 app_state_processor: async_lock::Mutex::new(None),
602 app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
603 app_state_syncing: Arc::new(Mutex::new(HashSet::new())),
604 initial_keys_synced_notifier: Arc::new(event_listener::Event::new()),
605 initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
606 server_has_prekeys: Arc::new(AtomicBool::new(true)),
607 prekey_upload_lock: Arc::new(async_lock::Mutex::new(())),
608 offline_sync_notifier: Arc::new(event_listener::Event::new()),
609 offline_sync_completed: Arc::new(AtomicBool::new(false)),
610 history_sync_tasks_in_flight: Arc::new(AtomicUsize::new(0)),
611 history_sync_idle_notifier: Arc::new(event_listener::Event::new()),
612 presence_subscriptions: Arc::new(async_lock::Mutex::new(HashSet::new())),
613 socket_ready_notifier: Arc::new(event_listener::Event::new()),
614 is_ready: Arc::new(AtomicBool::new(false)),
615 connected_notifier: Arc::new(event_listener::Event::new()),
616 major_sync_task_sender: tx,
617 pairing_cancellation_tx: Arc::new(Mutex::new(None)),
618 pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
619 custom_enc_handlers: Arc::new(async_lock::RwLock::new(HashMap::new())),
620 chatstate_handlers: Arc::new(RwLock::new(Vec::new())),
621 pdo_pending_requests: cache_config.pdo_pending_requests.build_with_ttl(),
622 device_registry_cache: cache_config.device_registry_cache.build_typed_ttl(
623 cache_config.cache_stores.device_registry_cache.clone(),
624 "device_registry",
625 ),
626 stanza_router: Self::create_stanza_router(),
627 synchronous_ack: false,
628 http_client,
629 override_version,
630 skip_history_sync: AtomicBool::new(false),
631 cache_config,
632 };
633
634 let arc = Arc::new(this);
635
636 let warm_up_arc = arc.clone();
638 arc.runtime
639 .spawn(Box::pin(async move {
640 if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
641 warn!("Failed to warm up LID-PN cache: {e}");
642 }
643 }))
644 .detach();
645
646 let cleanup_arc = arc.clone();
648 arc.runtime
649 .spawn(Box::pin(async move {
650 cleanup_arc.device_registry_cleanup_loop().await;
651 }))
652 .detach();
653
654 (arc, rx)
655 }
656
657 pub(crate) async fn get_group_cache(&self) -> Arc<TypedCache<Jid, GroupInfo>> {
658 let mut guard = self.group_cache.lock().await;
659 if let Some(cache) = guard.as_ref() {
660 return cache.clone();
661 }
662 debug!("Initializing Group Cache for the first time.");
663 let cache = Arc::new(
664 self.cache_config
665 .group_cache
666 .build_typed_ttl(self.cache_config.cache_stores.group_cache.clone(), "group"),
667 );
668 *guard = Some(cache.clone());
669 cache
670 }
671
672 pub(crate) async fn get_device_cache(&self) -> Arc<TypedCache<Jid, Vec<Jid>>> {
673 let mut guard = self.device_cache.lock().await;
674 if let Some(cache) = guard.as_ref() {
675 return cache.clone();
676 }
677 debug!("Initializing Device Cache for the first time.");
678 let cache = Arc::new(self.cache_config.device_cache.build_typed_ttl(
679 self.cache_config.cache_stores.device_cache.clone(),
680 "device",
681 ));
682 *guard = Some(cache.clone());
683 cache
684 }
685
686 pub(crate) async fn get_app_state_processor(&self) -> Arc<AppStateProcessor> {
687 let mut guard = self.app_state_processor.lock().await;
688 if let Some(proc) = guard.as_ref() {
689 return proc.clone();
690 }
691 debug!("Initializing AppStateProcessor for the first time.");
692 let proc = Arc::new(AppStateProcessor::new(
693 self.persistence_manager.backend(),
694 self.runtime.clone(),
695 ));
696 *guard = Some(proc.clone());
697 proc
698 }
699
700 fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
702 use crate::handlers::{
703 basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
704 chatstate::ChatstateHandler,
705 ib::IbHandler,
706 iq::IqHandler,
707 message::MessageHandler,
708 notification::NotificationHandler,
709 receipt::ReceiptHandler,
710 router::StanzaRouter,
711 unimplemented::UnimplementedHandler,
712 };
713
714 let mut router = StanzaRouter::new();
715
716 router.register(Arc::new(MessageHandler));
718 router.register(Arc::new(ReceiptHandler));
719 router.register(Arc::new(IqHandler));
720 router.register(Arc::new(SuccessHandler));
721 router.register(Arc::new(FailureHandler));
722 router.register(Arc::new(StreamErrorHandler));
723 router.register(Arc::new(IbHandler));
724 router.register(Arc::new(NotificationHandler));
725 router.register(Arc::new(AckHandler));
726 router.register(Arc::new(ChatstateHandler));
727
728 router.register(Arc::new(UnimplementedHandler::for_call()));
730 router.register(Arc::new(crate::handlers::presence::PresenceHandler));
731
732 router
733 }
734
735 pub fn register_handler(&self, handler: Arc<dyn wacore::types::events::EventHandler>) {
737 self.core.event_bus.add_handler(handler);
738 }
739
740 pub async fn register_chatstate_handler(
744 &self,
745 handler: Arc<dyn Fn(ChatStateEvent) + Send + Sync>,
746 ) {
747 self.chatstate_handlers.write().await.push(handler);
748 }
749
750 pub(crate) async fn dispatch_chatstate_event(
754 &self,
755 stanza: wacore::iq::chatstate::ChatstateStanza,
756 ) {
757 use wacore::iq::chatstate::{ChatstateSource, ReceivedChatState};
758 use wacore::types::events::ChatPresenceUpdate;
759 use wacore::types::message::MessageSource;
760 use wacore::types::presence::{ChatPresence, ChatPresenceMedia};
761
762 let (chat, sender, is_group) = match &stanza.source {
764 ChatstateSource::User { from } => (from.clone(), from.clone(), false),
765 ChatstateSource::Group { from, participant } => {
766 (from.clone(), participant.clone(), true)
767 }
768 };
769
770 let (state, media) = match stanza.state {
771 ReceivedChatState::Typing => (ChatPresence::Composing, ChatPresenceMedia::Text),
772 ReceivedChatState::RecordingAudio => {
773 (ChatPresence::Composing, ChatPresenceMedia::Audio)
774 }
775 ReceivedChatState::Idle => (ChatPresence::Paused, ChatPresenceMedia::Text),
776 };
777
778 self.core
779 .event_bus
780 .dispatch(&Event::ChatPresence(ChatPresenceUpdate {
781 source: MessageSource {
782 chat,
783 sender,
784 is_from_me: false,
785 is_group,
786 addressing_mode: None,
787 sender_alt: None,
788 recipient_alt: None,
789 broadcast_list_owner: None,
790 recipient: None,
791 },
792 state,
793 media,
794 }));
795
796 let event = ChatStateEvent::from_stanza(stanza);
798 let handlers = self.chatstate_handlers.read().await.clone();
799 for handler in handlers {
800 let event_clone = event.clone();
801 let handler_clone = handler.clone();
802 self.runtime
803 .spawn(Box::pin(async move {
804 (handler_clone)(event_clone);
805 }))
806 .detach();
807 }
808 }
809
810 pub async fn run(self: &Arc<Self>) {
811 if self.is_running.swap(true, Ordering::SeqCst) {
812 warn!("Client `run` method called while already running.");
813 return;
814 }
815 while self.is_running.load(Ordering::Relaxed) {
816 self.expected_disconnect.store(false, Ordering::Relaxed);
817
818 if let Err(connect_err) = self.connect().await {
819 error!("Failed to connect: {connect_err:#}. Will retry...");
820 } else {
821 if self.read_messages_loop().await.is_err() {
822 warn!(
823 "Message loop exited with an error. Will attempt to reconnect if enabled."
824 );
825 } else if self.expected_disconnect.load(Ordering::Relaxed) {
826 debug!("Message loop exited gracefully (expected disconnect).");
827 } else {
828 info!("Message loop exited gracefully.");
829 }
830
831 self.cleanup_connection_state().await;
832 }
833
834 if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
835 info!("Auto-reconnect disabled, shutting down.");
836 self.is_running.store(false, Ordering::Relaxed);
837 break;
838 }
839
840 if self.expected_disconnect.load(Ordering::Relaxed) {
842 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
843 info!("Expected disconnect (e.g., 515), reconnecting immediately...");
844 continue;
845 }
846
847 let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
848 let delay = fibonacci_backoff(error_count);
852 info!(
853 "Will attempt to reconnect in {:?} (attempt {})",
854 delay,
855 error_count + 1
856 );
857 self.runtime.sleep(delay).await;
858 }
859 info!("Client run loop has shut down.");
860 }
861
862 pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
863 if self.is_connecting.swap(true, Ordering::SeqCst) {
864 return Err(ClientError::AlreadyConnected.into());
865 }
866
867 let _guard = scopeguard::guard((), |_| {
868 self.is_connecting.store(false, Ordering::Relaxed);
869 });
870
871 if self.is_connected() {
872 return Err(ClientError::AlreadyConnected.into());
873 }
874
875 self.is_logged_in.store(false, Ordering::Relaxed);
879 self.is_ready.store(false, Ordering::Relaxed);
880 self.is_connected.store(false, Ordering::Relaxed);
881 self.offline_sync_completed.store(false, Ordering::Relaxed);
882 self.server_has_prekeys.store(true, Ordering::Relaxed);
883
884 let version_future = rt_timeout(
888 &*self.runtime,
889 TRANSPORT_CONNECT_TIMEOUT,
890 crate::version::resolve_and_update_version(
891 &self.persistence_manager,
892 &self.http_client,
893 self.override_version,
894 ),
895 );
896 let transport_future = rt_timeout(
897 &*self.runtime,
898 TRANSPORT_CONNECT_TIMEOUT,
899 self.transport_factory.create_transport(),
900 );
901
902 debug!("Connecting WebSocket and fetching latest client version in parallel...");
903 let (version_result, transport_result) = futures::join!(version_future, transport_future);
904
905 version_result
906 .map_err(|_| anyhow!("Version fetch timed out after {TRANSPORT_CONNECT_TIMEOUT:?}"))?
907 .map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
908 let (transport, mut transport_events) = transport_result.map_err(|_| {
909 anyhow!("Transport connect timed out after {TRANSPORT_CONNECT_TIMEOUT:?}")
910 })??;
911 debug!("Version fetch and transport connection established.");
912
913 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
914
915 let noise_socket = handshake::do_handshake(
916 self.runtime.clone(),
917 &device_snapshot,
918 transport.clone(),
919 &mut transport_events,
920 )
921 .await?;
922
923 *self.transport.lock().await = Some(transport);
924 *self.transport_events.lock().await = Some(transport_events);
925 *self.noise_socket.lock().await = Some(noise_socket);
926 self.is_connected.store(true, Ordering::Release);
927
928 self.socket_ready_notifier.notify(usize::MAX);
930
931 let client_clone = self.clone();
932 self.runtime
933 .spawn(Box::pin(async move { client_clone.keepalive_loop().await }))
934 .detach();
935
936 Ok(())
937 }
938
939 pub async fn disconnect(self: &Arc<Self>) {
940 info!("Disconnecting client intentionally.");
941 self.expected_disconnect.store(true, Ordering::Relaxed);
942 self.is_running.store(false, Ordering::Relaxed);
943 self.shutdown_notifier.notify(usize::MAX);
944
945 if let Err(e) = self.persistence_manager.flush().await {
947 log::error!("Failed to flush device state during disconnect: {e}");
948 }
949
950 if let Some(transport) = self.transport.lock().await.as_ref() {
951 transport.disconnect().await;
952 }
953 self.cleanup_connection_state().await;
954 }
955
956 pub const RECONNECT_BACKOFF_STEP: u32 = 4;
964
965 pub async fn reconnect(self: &Arc<Self>) {
977 info!("Reconnecting: dropping transport for auto-reconnect.");
978 self.auto_reconnect_errors
979 .store(Self::RECONNECT_BACKOFF_STEP, Ordering::Relaxed);
980 if let Some(transport) = self.transport.lock().await.as_ref() {
981 transport.disconnect().await;
982 }
983 }
984
985 pub async fn reconnect_immediately(self: &Arc<Self>) {
991 info!("Reconnecting immediately (expected disconnect).");
992 self.expected_disconnect.store(true, Ordering::Relaxed);
993 if let Some(transport) = self.transport.lock().await.as_ref() {
994 transport.disconnect().await;
995 }
996 }
997
998 async fn cleanup_connection_state(&self) {
999 self.is_logged_in.store(false, Ordering::Relaxed);
1000 self.is_ready.store(false, Ordering::Relaxed);
1001 self.shutdown_notifier.notify(usize::MAX);
1005 *self.transport.lock().await = None;
1006 *self.transport_events.lock().await = None;
1007 *self.noise_socket.lock().await = None;
1008 self.is_connected.store(false, Ordering::Release);
1012 self.retried_group_messages.invalidate_all();
1013 self.signal_cache.clear().await;
1015 *self.message_processing_semaphore.lock().unwrap() =
1018 Arc::new(async_lock::Semaphore::new(1));
1019 self.last_data_received_ms.store(0, Ordering::Relaxed);
1022 self.last_data_sent_ms.store(0, Ordering::Relaxed);
1023 self.offline_sync_completed.store(false, Ordering::Relaxed);
1025 self.offline_sync_metrics
1026 .active
1027 .store(false, Ordering::Release);
1028 self.offline_sync_metrics
1029 .total_messages
1030 .store(0, Ordering::Release);
1031 self.offline_sync_metrics
1032 .processed_messages
1033 .store(0, Ordering::Release);
1034 match self.offline_sync_metrics.start_time.lock() {
1035 Ok(mut guard) => *guard = None,
1036 Err(poison) => *poison.into_inner() = None,
1037 }
1038 self.server_has_prekeys.store(true, Ordering::Relaxed);
1039 self.history_sync_tasks_in_flight
1040 .store(0, Ordering::Relaxed);
1041 self.history_sync_idle_notifier.notify(usize::MAX);
1042 let mut waiters_map = self.response_waiters.lock().await;
1045 let waiter_count = waiters_map.len();
1046 *waiters_map = HashMap::new();
1049 drop(waiters_map);
1050 if waiter_count > 0 {
1051 debug!(
1052 "Dropping {} orphaned IQ response waiter(s) on disconnect",
1053 waiter_count
1054 );
1055 }
1056
1057 *self.app_state_key_requests.lock().await = HashMap::new();
1060 *self.app_state_syncing.lock().await = HashSet::new();
1061
1062 *self.media_conn.write().await = None;
1064
1065 if let Some(proc) = self.app_state_processor.lock().await.as_ref() {
1067 proc.clear_key_cache().await;
1068 }
1069 }
1070
1071 #[cfg(feature = "debug-diagnostics")]
1078 pub async fn memory_diagnostics(&self) -> MemoryDiagnostics {
1079 let (sig_sessions, sig_identities, sig_sender_keys) =
1080 self.signal_cache.entry_counts().await;
1081 let (lid_lid, lid_pn) = self.lid_pn_cache.entry_counts();
1082
1083 MemoryDiagnostics {
1084 group_cache: self
1085 .group_cache
1086 .lock()
1087 .await
1088 .as_ref()
1089 .map_or(0, |c| c.entry_count()),
1090 device_cache: self
1091 .device_cache
1092 .lock()
1093 .await
1094 .as_ref()
1095 .map_or(0, |c| c.entry_count()),
1096 device_registry_cache: self.device_registry_cache.entry_count(),
1097 lid_pn_lid_entries: lid_lid,
1098 lid_pn_pn_entries: lid_pn,
1099 retried_group_messages: self.retried_group_messages.entry_count(),
1100 recent_messages: self.recent_messages.entry_count(),
1101 message_retry_counts: self.message_retry_counts.entry_count(),
1102 pdo_pending_requests: self.pdo_pending_requests.entry_count(),
1103 session_locks: self.session_locks.entry_count(),
1104 message_queues: self.message_queues.entry_count(),
1105 message_enqueue_locks: self.message_enqueue_locks.entry_count(),
1106 response_waiters: self.response_waiters.lock().await.len(),
1107 node_waiters: self.node_waiter_count.load(Ordering::Relaxed),
1108 pending_retries: self.pending_retries.lock().await.len(),
1109 presence_subscriptions: self.presence_subscriptions.lock().await.len(),
1110 app_state_key_requests: self.app_state_key_requests.lock().await.len(),
1111 app_state_syncing: self.app_state_syncing.lock().await.len(),
1112 signal_cache_sessions: sig_sessions,
1113 signal_cache_identities: sig_identities,
1114 signal_cache_sender_keys: sig_sender_keys,
1115 chatstate_handlers: self.chatstate_handlers.read().await.len(),
1116 custom_enc_handlers: self.custom_enc_handlers.read().await.len(),
1117 }
1118 }
1119
1120 pub(crate) async fn flush_signal_cache(&self) -> Result<(), anyhow::Error> {
1123 let device = self.persistence_manager.get_device_arc().await;
1124 let device_guard = device.read().await;
1125 self.signal_cache
1126 .flush(&*device_guard.backend)
1127 .await
1128 .map_err(|e| anyhow::anyhow!("Failed to flush signal cache: {e}"))
1129 }
1130
1131 async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
1132 debug!("Starting message processing loop...");
1133
1134 let mut rx_guard = self.transport_events.lock().await;
1135 let transport_events = rx_guard
1136 .take()
1137 .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
1138 drop(rx_guard);
1139
1140 let mut frame_decoder = wacore::framing::FrameDecoder::new();
1142
1143 loop {
1144 futures::select_biased! {
1145 _ = self.shutdown_notifier.listen().fuse() => {
1146 debug!("Shutdown signaled in message loop. Exiting message loop.");
1147 return Ok(());
1148 },
1149 event_result = transport_events.recv().fuse() => {
1150 match event_result {
1151 Ok(crate::transport::TransportEvent::DataReceived(data)) => {
1152 self.last_data_received_ms.store(
1154 wacore::time::now_millis() as u64,
1155 Ordering::Relaxed,
1156 );
1157
1158 frame_decoder.feed(&data);
1160
1161 let mut frames_in_batch: u32 = 0;
1165
1166 while let Some(encrypted_frame) = frame_decoder.decode_frame() {
1167 if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
1169 let process_inline = matches!(
1177 node.tag.as_ref(),
1178 "success" | "failure" | "stream:error" | "message" | "ib"
1179 );
1180
1181 if process_inline {
1182 self.process_decrypted_node(node).await;
1183 } else {
1184 let client = self.clone();
1185 self.runtime.spawn(Box::pin(async move {
1186 client.process_decrypted_node(node).await;
1187 })).detach();
1188 }
1189 }
1190
1191 if self.expected_disconnect.load(Ordering::Relaxed) {
1193 debug!("Expected disconnect signaled during frame processing. Exiting message loop.");
1194 return Ok(());
1195 }
1196
1197 frames_in_batch += 1;
1201 if frames_in_batch.is_multiple_of(10)
1202 && let Some(yield_fut) = self.runtime.yield_now()
1203 {
1204 yield_fut.await;
1205 }
1206 }
1207 },
1208 Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
1209 self.cleanup_connection_state().await;
1210 if !self.expected_disconnect.load(Ordering::Relaxed) {
1211 self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
1212 debug!("Transport disconnected unexpectedly.");
1213 return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
1214 } else {
1215 debug!("Transport disconnected as expected.");
1216 return Ok(());
1217 }
1218 }
1219 Ok(crate::transport::TransportEvent::Connected) => {
1220 debug!("Transport connected event received");
1222 }
1223 }
1224 }
1225 }
1226 }
1227 }
1228
1229 pub(crate) async fn decrypt_frame(
1232 self: &Arc<Self>,
1233 encrypted_frame: &bytes::Bytes,
1234 ) -> Option<wacore_binary::node::Node> {
1235 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1236 let noise_socket = match noise_socket_arc {
1237 Some(s) => s,
1238 None => {
1239 log::error!("Cannot process frame: not connected (no noise socket)");
1240 return None;
1241 }
1242 };
1243
1244 let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
1245 Ok(p) => p,
1246 Err(e) => {
1247 log::error!("Failed to decrypt frame: {e}");
1248 return None;
1249 }
1250 };
1251
1252 let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
1253 Ok(data) => data,
1254 Err(e) => {
1255 log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
1256 return None;
1257 }
1258 };
1259
1260 match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
1261 Ok(node_ref) => Some(node_ref.to_owned()),
1262 Err(e) => {
1263 log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
1264 None
1265 }
1266 }
1267 }
1268
1269 pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
1273 let node_arc = Arc::new(node);
1275 self.process_node(node_arc).await;
1276 }
1277
1278 pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
1280 use wacore::xml::DisplayableNode;
1281
1282 if node.tag.as_ref() == "ib" {
1284 if let Some(preview) = node.get_optional_child("offline_preview") {
1286 let count: usize = preview
1287 .attrs
1288 .get("count")
1289 .and_then(|v| v.as_str().parse().ok())
1290 .unwrap_or(0);
1291
1292 if count == 0 {
1293 self.offline_sync_metrics
1294 .active
1295 .store(false, Ordering::Release);
1296 debug!(target: "Client/OfflineSync", "Sync COMPLETED: 0 items.");
1297 } else {
1298 self.offline_sync_metrics
1300 .total_messages
1301 .store(count, Ordering::Release);
1302 self.offline_sync_metrics
1303 .processed_messages
1304 .store(0, Ordering::Release);
1305 self.offline_sync_metrics
1306 .active
1307 .store(true, Ordering::Release);
1308 match self.offline_sync_metrics.start_time.lock() {
1309 Ok(mut guard) => *guard = Some(wacore::time::Instant::now()),
1310 Err(poison) => *poison.into_inner() = Some(wacore::time::Instant::now()),
1311 }
1312 debug!(target: "Client/OfflineSync", "Sync STARTED: Expecting {} items.", count);
1313 }
1314 } else if self.offline_sync_metrics.active.load(Ordering::Acquire)
1315 && node.get_optional_child("offline").is_some()
1316 {
1317 let processed = self
1321 .offline_sync_metrics
1322 .processed_messages
1323 .load(Ordering::Acquire);
1324 let elapsed = match self.offline_sync_metrics.start_time.lock() {
1325 Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1326 Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1327 };
1328 debug!(target: "Client/OfflineSync", "Sync COMPLETED: End marker received. Processed {} items in {:.2?}.", processed, elapsed);
1329 self.offline_sync_metrics
1330 .active
1331 .store(false, Ordering::Release);
1332 }
1333 }
1334
1335 if self.offline_sync_metrics.active.load(Ordering::Acquire) {
1337 if node.attrs.contains_key("offline") {
1339 let processed = self
1340 .offline_sync_metrics
1341 .processed_messages
1342 .fetch_add(1, Ordering::Release)
1343 + 1;
1344 let total = self
1345 .offline_sync_metrics
1346 .total_messages
1347 .load(Ordering::Acquire);
1348
1349 if processed.is_multiple_of(50) || processed == total {
1350 trace!(target: "Client/OfflineSync", "Sync Progress: {}/{}", processed, total);
1351 }
1352
1353 if processed >= total {
1354 let elapsed = match self.offline_sync_metrics.start_time.lock() {
1355 Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1356 Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1357 };
1358 debug!(target: "Client/OfflineSync", "Sync COMPLETED: Processed {} items in {:.2?}.", processed, elapsed);
1359 self.offline_sync_metrics
1360 .active
1361 .store(false, Ordering::Release);
1362 }
1363 }
1364 }
1365 if node.tag.as_ref() == "iq"
1368 && let Some(sync_node) = node.get_optional_child("sync")
1369 && let Some(collection_node) = sync_node.get_optional_child("collection")
1370 {
1371 let name = collection_node.attrs().optional_string("name");
1372 let name = name.as_deref().unwrap_or("<unknown>");
1373 debug!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
1374 } else {
1375 debug!(target: "Client/Recv","{}", DisplayableNode(&node));
1376 }
1377
1378 let mut cancelled = false;
1380
1381 if node.tag.as_ref() == "xmlstreamend" {
1382 if self.expected_disconnect.load(Ordering::Relaxed) {
1383 debug!("Received <xmlstreamend/>, expected disconnect.");
1384 } else {
1385 warn!("Received <xmlstreamend/>, treating as disconnect.");
1386 }
1387 self.shutdown_notifier.notify(usize::MAX);
1388 return;
1389 }
1390
1391 if self.node_waiter_count.load(Ordering::Relaxed) > 0 {
1393 self.resolve_node_waiters(&node);
1394 }
1395
1396 if node.tag.as_ref() == "iq"
1397 && let Some(id) = node.attrs.get("id").map(|v| v.as_str())
1398 {
1399 let has_waiter = self.response_waiters.lock().await.contains_key(id.as_ref());
1400 if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
1401 return;
1402 }
1403 }
1404
1405 if !self
1408 .stanza_router
1409 .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
1410 .await
1411 {
1412 warn!(
1413 "Received unknown top-level node: {}",
1414 DisplayableNode(&node)
1415 );
1416 }
1417
1418 if self.should_ack(&node) && !cancelled {
1420 self.maybe_deferred_ack(node).await;
1421 }
1422 }
1423
1424 fn should_ack(&self, node: &Node) -> bool {
1426 matches!(
1427 node.tag.as_ref(),
1428 "message" | "receipt" | "notification" | "call"
1429 ) && node.attrs.contains_key("id")
1430 && node.attrs.contains_key("from")
1431 }
1432
1433 async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
1437 if self.synchronous_ack {
1438 if let Err(e) = self.send_ack_for(&node).await {
1439 warn!("Failed to send ack: {e:?}");
1440 }
1441 } else {
1442 let this = self.clone();
1443 self.runtime
1445 .spawn(Box::pin(async move {
1446 if let Err(e) = this.send_ack_for(&node).await {
1447 warn!("Failed to send ack: {e:?}");
1448 }
1449 }))
1450 .detach();
1451 }
1452 }
1453
1454 async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
1456 if self.expected_disconnect.load(Ordering::Relaxed) {
1457 return Ok(());
1458 }
1459 if !self.is_connected() {
1460 return Err(ClientError::NotConnected);
1461 }
1462 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1463 let ack = match build_ack_node(node, device_snapshot.pn.as_ref()) {
1464 Some(ack) => ack,
1465 None => return Ok(()),
1466 };
1467 self.send_node(ack).await
1468 }
1469
1470 pub(crate) async fn handle_unimplemented(&self, tag: &str) {
1471 warn!("TODO: Implement handler for <{tag}>");
1472 }
1473
1474 pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
1475 use wacore::iq::passive::PassiveModeSpec;
1476 self.execute(PassiveModeSpec::new(passive)).await
1477 }
1478
1479 pub async fn clean_dirty_bits(
1480 &self,
1481 type_: &str,
1482 timestamp: Option<&str>,
1483 ) -> Result<(), crate::request::IqError> {
1484 use wacore::iq::dirty::CleanDirtyBitsSpec;
1485
1486 let spec = CleanDirtyBitsSpec::single(type_, timestamp)?;
1487 self.execute(spec).await
1488 }
1489
1490 pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
1491 use wacore::iq::props::PropsSpec;
1492 use wacore::store::commands::DeviceCommand;
1493
1494 let stored_hash = self
1495 .persistence_manager
1496 .get_device_snapshot()
1497 .await
1498 .props_hash
1499 .clone();
1500
1501 let spec = match &stored_hash {
1502 Some(hash) => {
1503 debug!("Fetching props with hash for delta update...");
1504 PropsSpec::with_hash(hash)
1505 }
1506 None => {
1507 debug!("Fetching props (full, no stored hash)...");
1508 PropsSpec::new()
1509 }
1510 };
1511
1512 let response = self.execute(spec).await?;
1513
1514 if response.delta_update {
1515 debug!(
1516 "Props delta update received ({} changed props)",
1517 response.props.len()
1518 );
1519 } else {
1520 debug!(
1521 "Props full update received ({} props, hash={:?})",
1522 response.props.len(),
1523 response.hash
1524 );
1525 }
1526
1527 if let Some(new_hash) = response.hash {
1528 self.persistence_manager
1529 .process_command(DeviceCommand::SetPropsHash(Some(new_hash)))
1530 .await;
1531 }
1532
1533 Ok(())
1534 }
1535
1536 pub async fn fetch_privacy_settings(
1537 &self,
1538 ) -> Result<wacore::iq::privacy::PrivacySettingsResponse, crate::request::IqError> {
1539 use wacore::iq::privacy::PrivacySettingsSpec;
1540
1541 debug!("Fetching privacy settings...");
1542
1543 self.execute(PrivacySettingsSpec::new()).await
1544 }
1545
1546 pub async fn set_privacy_setting(
1548 &self,
1549 category: &str,
1550 value: &str,
1551 ) -> Result<(), crate::request::IqError> {
1552 use wacore::iq::privacy::SetPrivacySettingSpec;
1553 self.execute(SetPrivacySettingSpec::new(category, value))
1554 .await
1555 }
1556
1557 pub async fn set_default_disappearing_mode(
1559 &self,
1560 duration: u32,
1561 ) -> Result<(), crate::request::IqError> {
1562 use wacore::iq::privacy::SetDefaultDisappearingModeSpec;
1563 self.execute(SetDefaultDisappearingModeSpec::new(duration))
1564 .await
1565 }
1566
1567 pub async fn get_business_profile(
1569 &self,
1570 jid: &wacore_binary::jid::Jid,
1571 ) -> Result<Option<wacore::iq::business::BusinessProfile>, crate::request::IqError> {
1572 use wacore::iq::business::BusinessProfileSpec;
1573 self.execute(BusinessProfileSpec::new(jid)).await
1574 }
1575
1576 pub async fn reject_call(
1578 &self,
1579 call_id: &str,
1580 call_from: &wacore_binary::jid::Jid,
1581 ) -> Result<(), anyhow::Error> {
1582 anyhow::ensure!(!call_id.is_empty(), "call_id cannot be empty");
1583 let id = self.generate_request_id();
1584
1585 let stanza = wacore_binary::builder::NodeBuilder::new("call")
1586 .attr("to", call_from.clone())
1587 .attr("id", id)
1588 .children([wacore_binary::builder::NodeBuilder::new("reject")
1589 .attr("call-id", call_id)
1590 .attr("call-creator", call_from.clone())
1591 .attr("count", "0")
1592 .build()])
1593 .build();
1594
1595 self.send_node(stanza).await?;
1596 Ok(())
1597 }
1598
1599 pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
1600 use wacore::iq::prekeys::DigestKeyBundleSpec;
1601
1602 debug!("Sending digest key bundle...");
1603
1604 self.execute(DigestKeyBundleSpec::new()).await.map(|_| ())
1605 }
1606
1607 pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
1608 if self.expected_disconnect.load(Ordering::Relaxed) {
1612 debug!("Ignoring <success> stanza: expected disconnect pending");
1613 return;
1614 }
1615
1616 if self.is_logged_in.swap(true, Ordering::SeqCst) {
1619 debug!("Ignoring duplicate <success> stanza (already logged in)");
1620 return;
1621 }
1622
1623 let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
1626
1627 info!(
1628 "Successfully authenticated with WhatsApp servers! (gen={})",
1629 current_generation
1630 );
1631 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
1632
1633 self.update_server_time_offset(node);
1634
1635 if let Some(lid_value) = node.attrs.get("lid") {
1636 if let Some(lid) = lid_value.to_jid() {
1637 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1638 if device_snapshot.lid.as_ref() != Some(&lid) {
1639 debug!("Updating LID from server to '{lid}'");
1640 self.persistence_manager
1641 .process_command(DeviceCommand::SetLid(Some(lid)))
1642 .await;
1643 }
1644 } else {
1645 warn!("Failed to parse LID from success stanza: {lid_value}");
1646 }
1647 } else {
1648 warn!("LID not found in <success> stanza. Group messaging may fail.");
1649 }
1650
1651 let client_clone = self.clone();
1652 let task_generation = current_generation;
1653 self.runtime.spawn(Box::pin(async move {
1654 macro_rules! check_generation {
1656 () => {
1657 if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
1658 {
1659 debug!("Post-login task cancelled: connection generation changed");
1660 return;
1661 }
1662 };
1663 }
1664
1665 debug!(
1666 "Starting post-login initialization sequence (gen={})...",
1667 task_generation
1668 );
1669
1670 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1673 let needs_pushname_from_sync = device_snapshot.push_name.is_empty();
1674 if needs_pushname_from_sync {
1675 debug!("Push name is empty - will be set from app state sync (setting_pushName)");
1676 }
1677
1678 if !client_clone.is_connected() {
1682 debug!(
1683 "Skipping post-login init: connection closed (likely pairing phase reconnect)"
1684 );
1685 return;
1686 }
1687
1688 check_generation!();
1689 client_clone.send_unified_session().await;
1690
1691 check_generation!();
1696 if let Err(e) = client_clone
1697 .establish_primary_phone_session_immediate()
1698 .await
1699 {
1700 warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
1701 }
1703
1704 check_generation!();
1707 if let Err(e) = client_clone.upload_pre_keys(false).await {
1708 warn!("Failed to upload pre-keys during startup: {e:?}");
1709 }
1710
1711 check_generation!();
1715 if let Err(e) = client_clone.set_passive(false).await {
1716 warn!("Failed to send post-connect active IQ: {e:?}");
1717 }
1718
1719 client_clone.wait_for_offline_delivery_end().await;
1722
1723 check_generation!();
1725
1726 check_generation!();
1728 if !client_clone.is_connected() {
1729 debug!("Skipping presence: connection closed");
1730 return;
1731 }
1732
1733 let bg_client = client_clone.clone();
1735 let bg_generation = task_generation;
1736 client_clone.runtime.spawn(Box::pin(async move {
1737 if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1739 debug!("Skipping background init queries: connection generation changed");
1740 return;
1741 }
1742 if !bg_client.is_connected() {
1743 debug!("Skipping background init queries: connection closed");
1744 return;
1745 }
1746
1747 debug!(
1748 "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1749 );
1750
1751 let props_fut = bg_client.fetch_props();
1752 let binding = bg_client.blocking();
1753 let blocklist_fut = binding.get_blocklist();
1754 let privacy_fut = bg_client.fetch_privacy_settings();
1755 let digest_fut = bg_client.validate_digest_key();
1756
1757 let (r_props, r_block, r_priv, r_digest) =
1758 futures::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1759
1760 if let Err(e) = r_props {
1761 warn!("Background init: Failed to fetch props: {e:?}");
1762 }
1763 if let Err(e) = r_block {
1764 warn!("Background init: Failed to fetch blocklist: {e:?}");
1765 }
1766 if let Err(e) = r_priv {
1767 warn!("Background init: Failed to fetch privacy settings: {e:?}");
1768 }
1769 if let Err(e) = r_digest {
1770 warn!("Background init: Failed to validate digest key: {e:?}");
1771 }
1772
1773 if let Err(e) = bg_client.tc_token().prune_expired().await {
1775 warn!("Background init: Failed to prune expired tc_tokens: {e:?}");
1776 }
1777 })).detach();
1778
1779 check_generation!();
1780
1781 let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1782 let needs_initial_sync = flag_set || needs_pushname_from_sync;
1783
1784 if needs_initial_sync {
1785 debug!(
1789 target: "Client/AppState",
1790 "Starting Initial App State Sync (flag_set={flag_set}, needs_pushname={needs_pushname_from_sync})"
1791 );
1792
1793 if !client_clone
1794 .initial_app_state_keys_received
1795 .load(Ordering::Relaxed)
1796 {
1797 debug!(
1798 target: "Client/AppState",
1799 "Waiting up to 5s for app state keys..."
1800 );
1801 let _ = rt_timeout(
1802 &*client_clone.runtime,
1803 Duration::from_secs(5),
1804 client_clone.initial_keys_synced_notifier.listen(),
1805 )
1806 .await;
1807
1808 check_generation!();
1810 }
1811
1812 const CRITICAL_SYNC_TIMEOUT_SECS: u64 = 180;
1817 let timeout_client = client_clone.clone();
1818 let timeout_generation = task_generation;
1819 let timeout_rt = client_clone.runtime.clone();
1820 let critical_sync_timeout_handle = timeout_rt.spawn(Box::pin(async move {
1821 timeout_client.runtime.sleep(Duration::from_secs(CRITICAL_SYNC_TIMEOUT_SECS)).await;
1822 if timeout_client.connection_generation.load(Ordering::SeqCst)
1824 != timeout_generation
1825 {
1826 return;
1827 }
1828 let push_name = timeout_client.get_push_name().await;
1831 if push_name.is_empty() {
1832 warn!(
1833 target: "Client/AppState",
1834 "Critical app state sync timed out after {CRITICAL_SYNC_TIMEOUT_SECS}s \
1835 (push_name not synced). Reconnecting to retry."
1836 );
1837 timeout_client.reconnect_immediately().await;
1841 } else {
1842 debug!(
1843 target: "Client/AppState",
1844 "Critical sync timeout fired but push_name was already synced"
1845 );
1846 }
1847 }));
1848
1849 check_generation!();
1851 match client_clone
1852 .sync_collections_batched(vec![
1853 WAPatchName::CriticalBlock,
1854 WAPatchName::CriticalUnblockLow,
1855 ])
1856 .await
1857 {
1858 Ok(()) => {
1859 critical_sync_timeout_handle.abort();
1861
1862 check_generation!();
1863
1864 client_clone
1865 .resubscribe_presence_subscriptions(task_generation)
1866 .await;
1867
1868 check_generation!();
1869
1870 client_clone.dispatch_connected();
1875 }
1876 Err(e) => {
1877 client_clone.log_sync_error("critical app state sync", &e);
1878 return;
1882 }
1883 }
1884
1885 let sync_client = client_clone.clone();
1887 let sync_generation = task_generation;
1888 client_clone.runtime.spawn(Box::pin(async move {
1889 if sync_client.connection_generation.load(Ordering::SeqCst) != sync_generation {
1890 debug!("App state sync cancelled: connection generation changed");
1891 return;
1892 }
1893
1894 if let Err(e) = sync_client
1895 .sync_collections_batched(vec![
1896 WAPatchName::RegularLow,
1897 WAPatchName::RegularHigh,
1898 WAPatchName::Regular,
1899 ])
1900 .await
1901 {
1902 sync_client.log_sync_error("non-critical app state sync", &e);
1903 }
1904
1905 sync_client
1906 .needs_initial_full_sync
1907 .store(false, Ordering::Relaxed);
1908 debug!(target: "Client/AppState", "Initial App State Sync Completed.");
1909 })).detach();
1910 } else {
1911 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1914 if !device_snapshot.push_name.is_empty() {
1915 if let Err(e) = client_clone.presence().set_available().await {
1916 warn!("Failed to send initial presence: {e:?}");
1917 } else {
1918 debug!("Initial presence sent successfully.");
1919 }
1920 }
1921
1922 client_clone
1923 .resubscribe_presence_subscriptions(task_generation)
1924 .await;
1925
1926 check_generation!();
1929
1930 client_clone.dispatch_connected();
1931 }
1932 })).detach();
1933 }
1934
1935 pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1940 let id_opt = node.attrs.get("id").map(|v| v.as_str().into_owned());
1941 if let Some(id) = id_opt
1942 && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1943 {
1944 if waiter.send(node).is_err() {
1945 warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1946 }
1947 return true;
1948 }
1949 false
1950 }
1951
1952 #[allow(dead_code)] pub(crate) async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1954 {
1958 let mut syncing = self.app_state_syncing.lock().await;
1959 if !syncing.insert(name) {
1960 debug!(target: "Client/AppState", "Skipping sync for {:?}: already in flight", name);
1961 return Ok(());
1962 }
1963 }
1964
1965 let result = self.fetch_app_state_with_retry_inner(name).await;
1966
1967 self.app_state_syncing.lock().await.remove(&name);
1969
1970 result
1971 }
1972
1973 #[allow(dead_code)]
1974 async fn fetch_app_state_with_retry_inner(&self, name: WAPatchName) -> anyhow::Result<()> {
1975 let mut attempt = 0u32;
1976 loop {
1977 attempt += 1;
1978 let res = self.process_app_state_sync_task(name, false).await;
1982 match res {
1983 Ok(()) => return Ok(()),
1984 Err(e) => {
1985 if e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
1986 .is_some_and(|ase| {
1987 matches!(ase, crate::appstate_sync::AppStateSyncError::KeyNotFound(_))
1988 })
1989 && attempt == 1
1990 {
1991 if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
1992 debug!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
1993 if rt_timeout(
1994 &*self.runtime,
1995 Duration::from_secs(10),
1996 self.initial_keys_synced_notifier.listen(),
1997 )
1998 .await
1999 .is_err()
2000 {
2001 warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
2002 }
2003 }
2004 continue;
2005 }
2006 let is_db_locked = e.downcast_ref::<wacore::store::error::StoreError>()
2007 .is_some_and(|se| matches!(se, wacore::store::error::StoreError::Database(msg) if msg.contains("locked") || msg.contains("busy")))
2008 || e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
2009 .is_some_and(|ase| matches!(ase, crate::appstate_sync::AppStateSyncError::Store(wacore::store::error::StoreError::Database(msg)) if msg.contains("locked") || msg.contains("busy")));
2010 if is_db_locked && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
2011 let backoff = Duration::from_millis(200 * attempt as u64 + 150);
2012 warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
2013 self.runtime.sleep(backoff).await;
2014 continue;
2015 }
2016 return Err(e);
2017 }
2018 }
2019 }
2020 }
2021
2022 pub(crate) async fn sync_collections_batched(
2026 &self,
2027 collections: Vec<WAPatchName>,
2028 ) -> anyhow::Result<()> {
2029 if collections.is_empty() {
2030 return Ok(());
2031 }
2032
2033 let pending = {
2035 let mut syncing = self.app_state_syncing.lock().await;
2036 let mut filtered = Vec::with_capacity(collections.len());
2037 for name in collections {
2038 if syncing.insert(name) {
2039 filtered.push(name);
2040 } else {
2041 debug!(target: "Client/AppState", "Skipping {:?} in batch: already in flight", name);
2042 }
2043 }
2044 filtered
2045 };
2046
2047 if pending.is_empty() {
2048 return Ok(());
2049 }
2050
2051 let all_collections: Vec<WAPatchName> = pending.clone();
2053
2054 let result = self.sync_collections_batched_inner(pending).await;
2055
2056 {
2058 let mut syncing = self.app_state_syncing.lock().await;
2059 for name in &all_collections {
2060 syncing.remove(name);
2061 }
2062 }
2063
2064 result
2065 }
2066
2067 async fn sync_collections_batched_inner(
2068 &self,
2069 mut pending: Vec<WAPatchName>,
2070 ) -> anyhow::Result<()> {
2071 use wacore::appstate::patch_decode::CollectionSyncError;
2072 const MAX_ITERATIONS: usize = 5;
2073 let mut iteration = 0;
2074
2075 while !pending.is_empty() && iteration < MAX_ITERATIONS {
2076 iteration += 1;
2077 debug!(
2078 target: "Client/AppState",
2079 "Batched sync iteration {}/{}: {:?}",
2080 iteration, MAX_ITERATIONS, pending
2081 );
2082
2083 let backend = self.persistence_manager.backend();
2084
2085 let mut collection_nodes = Vec::with_capacity(pending.len());
2087 let mut was_snapshot = std::collections::HashSet::new();
2088 for &name in &pending {
2089 let state = backend.get_version(name.as_str()).await?;
2090 let want_snapshot = state.version == 0;
2091 if want_snapshot {
2092 was_snapshot.insert(name);
2093 }
2094 let mut builder = NodeBuilder::new("collection")
2095 .attr("name", name.as_str())
2096 .attr(
2097 "return_snapshot",
2098 if want_snapshot { "true" } else { "false" },
2099 );
2100 if !want_snapshot {
2101 builder = builder.attr("version", state.version.to_string());
2102 }
2103 collection_nodes.push(builder.build());
2104 }
2105
2106 let sync_node = NodeBuilder::new("sync").children(collection_nodes).build();
2107 let iq = crate::request::InfoQuery {
2108 namespace: "w:sync:app:state",
2109 query_type: crate::request::InfoQueryType::Set,
2110 to: server_jid().clone(),
2111 target: None,
2112 id: None,
2113 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2114 timeout: Some(Duration::from_secs(30)),
2115 };
2116
2117 let resp = self.send_iq(iq).await?;
2118
2119 let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2121 std::collections::HashMap::new();
2122
2123 if let Ok(patch_lists) = wacore::appstate::patch_decode::parse_patch_lists(&resp) {
2124 for pl in &patch_lists {
2125 if let Some(ext) = &pl.snapshot_ref
2127 && let Some(path) = &ext.direct_path
2128 {
2129 match self.download(ext).await {
2130 Ok(bytes) => {
2131 pre_downloaded.insert(path.clone(), bytes);
2132 }
2133 Err(e) => {
2134 warn!(
2135 "Failed to download external snapshot for {:?}: {e}",
2136 pl.name
2137 );
2138 }
2139 }
2140 }
2141
2142 for patch in &pl.patches {
2144 if let Some(ext) = &patch.external_mutations
2145 && let Some(path) = &ext.direct_path
2146 {
2147 match self.download(ext).await {
2148 Ok(bytes) => {
2149 pre_downloaded.insert(path.clone(), bytes);
2150 }
2151 Err(e) => {
2152 let v =
2153 patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2154 warn!(
2155 "Failed to download external mutations for patch v{}: {e}",
2156 v
2157 );
2158 }
2159 }
2160 }
2161 }
2162 }
2163 }
2164
2165 let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2166 if let Some(path) = &ext.direct_path {
2167 if let Some(bytes) = pre_downloaded.get(path) {
2168 Ok(bytes.clone())
2169 } else {
2170 Err(anyhow::anyhow!(
2171 "external blob not pre-downloaded: {}",
2172 path
2173 ))
2174 }
2175 } else {
2176 Err(anyhow::anyhow!("external blob has no directPath"))
2177 }
2178 };
2179
2180 let proc = self.get_app_state_processor().await;
2182 let results = proc.decode_multi_patch_list(&resp, &download, true).await?;
2183
2184 let mut needs_refetch = Vec::new();
2185
2186 for (mutations, new_state, list) in results {
2187 let name = list.name;
2188
2189 if let Some(ref err) = list.error {
2191 match err {
2192 CollectionSyncError::Conflict { has_more } => {
2193 if *has_more {
2194 warn!(target: "Client/AppState", "Collection {:?} conflict (has_more=true), will refetch", name);
2196 needs_refetch.push(name);
2197 } else {
2198 debug!(target: "Client/AppState", "Collection {:?} conflict (has_more=false), treating as success (no pending mutations)", name);
2202 }
2203 continue;
2204 }
2205 CollectionSyncError::Fatal { code, text } => {
2206 warn!(target: "Client/AppState", "Collection {:?} fatal error {}: {}", name, code, text);
2207 continue;
2208 }
2209 CollectionSyncError::Retry { code, text } => {
2210 warn!(target: "Client/AppState", "Collection {:?} retryable error {}: {}, will refetch", name, code, text);
2211 needs_refetch.push(name);
2212 continue;
2213 }
2214 }
2215 }
2216
2217 let missing = match proc.get_missing_key_ids(&list).await {
2219 Ok(v) => v,
2220 Err(e) => {
2221 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2222 Vec::new()
2223 }
2224 };
2225 if !missing.is_empty() {
2226 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2227 let mut guard = self.app_state_key_requests.lock().await;
2228 let now = wacore::time::Instant::now();
2229 for key_id in missing {
2230 let hex_id = hex::encode(&key_id);
2231 let should = guard
2232 .get(&hex_id)
2233 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2234 .unwrap_or(true);
2235 if should {
2236 guard.insert(hex_id, now);
2237 to_request.push(key_id);
2238 }
2239 }
2240 guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2242 drop(guard);
2243 if !to_request.is_empty() {
2244 self.request_app_state_keys(&to_request).await;
2245 }
2246 }
2247
2248 let full_sync = was_snapshot.contains(&name);
2252 for m in mutations {
2253 self.dispatch_app_state_mutation(&m, full_sync).await;
2254 }
2255
2256 backend
2258 .set_version(name.as_str(), new_state.clone())
2259 .await?;
2260
2261 if list.has_more_patches {
2263 needs_refetch.push(name);
2264 }
2265
2266 debug!(
2267 target: "Client/AppState",
2268 "Batched sync: {:?} done (version={}, has_more={})",
2269 name, new_state.version, list.has_more_patches
2270 );
2271 }
2272
2273 pending = needs_refetch;
2274 }
2275
2276 if !pending.is_empty() {
2277 warn!(
2278 target: "Client/AppState",
2279 "Batched sync: max iterations ({}) reached for {:?}",
2280 MAX_ITERATIONS, pending
2281 );
2282 }
2283
2284 Ok(())
2285 }
2286
2287 pub(crate) async fn process_app_state_sync_task(
2288 &self,
2289 name: WAPatchName,
2290 full_sync: bool,
2291 ) -> anyhow::Result<()> {
2292 if self.is_shutting_down() {
2293 debug!(target: "Client/AppState", "Skipping app state sync task {:?}: client is shutting down", name);
2294 return Ok(());
2295 }
2296
2297 let backend = self.persistence_manager.backend();
2298 let mut full_sync = full_sync;
2299
2300 let mut state = backend.get_version(name.as_str()).await?;
2301 if state.version == 0 {
2302 full_sync = true;
2303 }
2304
2305 let mut has_more = true;
2306 let mut want_snapshot = full_sync;
2307 const MAX_PAGINATION_ITERATIONS: u32 = 500;
2310 let mut iteration = 0u32;
2311
2312 while has_more {
2313 if self.is_shutting_down() {
2314 debug!(target: "Client/AppState", "Stopping app state sync task {:?}: shutdown detected", name);
2315 break;
2316 }
2317 iteration += 1;
2318 if iteration > MAX_PAGINATION_ITERATIONS {
2319 warn!(target: "Client/AppState", "App state sync for {:?} exceeded {} iterations, aborting", name, MAX_PAGINATION_ITERATIONS);
2320 break;
2321 }
2322 debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
2323
2324 let mut collection_builder = NodeBuilder::new("collection")
2325 .attr("name", name.as_str())
2326 .attr(
2327 "return_snapshot",
2328 if want_snapshot { "true" } else { "false" },
2329 );
2330 if !want_snapshot {
2331 collection_builder = collection_builder.attr("version", state.version.to_string());
2332 }
2333 let sync_node = NodeBuilder::new("sync")
2334 .children([collection_builder.build()])
2335 .build();
2336 let iq = crate::request::InfoQuery {
2337 namespace: "w:sync:app:state",
2338 query_type: crate::request::InfoQueryType::Set,
2339 to: server_jid().clone(),
2340 target: None,
2341 id: None,
2342 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2343 timeout: None,
2344 };
2345
2346 let resp = self.send_iq(iq).await?;
2347 if self.is_shutting_down() {
2348 debug!(target: "Client/AppState", "Discarding app state sync response for {:?}: shutdown detected", name);
2349 break;
2350 }
2351 debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
2352
2353 let _decode_start = wacore::time::Instant::now();
2354
2355 let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2358 std::collections::HashMap::new();
2359
2360 if let Ok(pl) = wacore::appstate::patch_decode::parse_patch_list(&resp) {
2361 debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={} patches_count={}",
2362 name, pl.snapshot_ref.is_some(), pl.has_more_patches, pl.patches.len());
2363
2364 if let Some(ext) = &pl.snapshot_ref
2366 && let Some(path) = &ext.direct_path
2367 {
2368 match self.download(ext).await {
2369 Ok(bytes) => {
2370 debug!(target: "Client/AppState", "Downloaded external snapshot ({} bytes)", bytes.len());
2371 pre_downloaded.insert(path.clone(), bytes);
2372 }
2373 Err(e) => {
2374 warn!("Failed to download external snapshot: {e}");
2375 }
2376 }
2377 }
2378
2379 for patch in &pl.patches {
2381 if let Some(ext) = &patch.external_mutations
2382 && let Some(path) = &ext.direct_path
2383 {
2384 let patch_version =
2385 patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2386 match self.download(ext).await {
2387 Ok(bytes) => {
2388 debug!(target: "Client/AppState", "Downloaded external mutations for patch v{} ({} bytes)", patch_version, bytes.len());
2389 pre_downloaded.insert(path.clone(), bytes);
2390 }
2391 Err(e) => {
2392 warn!(
2393 "Failed to download external mutations for patch v{}: {e}",
2394 patch_version
2395 );
2396 }
2397 }
2398 }
2399 }
2400 }
2401
2402 let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2403 if let Some(path) = &ext.direct_path {
2404 if let Some(bytes) = pre_downloaded.get(path) {
2405 Ok(bytes.clone())
2406 } else {
2407 Err(anyhow::anyhow!(
2408 "external blob not pre-downloaded: {}",
2409 path
2410 ))
2411 }
2412 } else {
2413 Err(anyhow::anyhow!("external blob has no directPath"))
2414 }
2415 };
2416
2417 let proc = self.get_app_state_processor().await;
2418 let (mutations, new_state, list) =
2419 proc.decode_patch_list(&resp, &download, true).await?;
2420 let decode_elapsed = _decode_start.elapsed();
2421 if decode_elapsed.as_millis() > 500 {
2422 debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
2423 }
2424
2425 let missing = match proc.get_missing_key_ids(&list).await {
2426 Ok(v) => v,
2427 Err(e) => {
2428 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2429 Vec::new()
2430 }
2431 };
2432 if !missing.is_empty() {
2433 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2434 let mut guard = self.app_state_key_requests.lock().await;
2435 let now = wacore::time::Instant::now();
2436 for key_id in missing {
2437 let hex_id = hex::encode(&key_id);
2438 let should = guard
2439 .get(&hex_id)
2440 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2441 .unwrap_or(true);
2442 if should {
2443 guard.insert(hex_id, now);
2444 to_request.push(key_id);
2445 }
2446 }
2447 guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2449 drop(guard);
2450 if !to_request.is_empty() {
2451 self.request_app_state_keys(&to_request).await;
2452 }
2453 }
2454
2455 for m in mutations {
2456 debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
2457 self.dispatch_app_state_mutation(&m, full_sync).await;
2458 }
2459
2460 state = new_state;
2461 has_more = list.has_more_patches;
2462 want_snapshot = false;
2464 debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more} new_version={}", name, state.version);
2465 }
2466
2467 backend.set_version(name.as_str(), state.clone()).await?;
2468
2469 debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
2470 Ok(())
2471 }
2472
2473 async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
2474 if raw_key_ids.is_empty() {
2475 return;
2476 }
2477 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2478 let own_jid = match device_snapshot.pn.clone() {
2479 Some(j) => j,
2480 None => return,
2481 };
2482 let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
2483 .iter()
2484 .map(|k| wa::message::AppStateSyncKeyId {
2485 key_id: Some(k.clone()),
2486 })
2487 .collect();
2488 let msg = wa::Message {
2489 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
2490 r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
2491 app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
2492 ..Default::default()
2493 })),
2494 ..Default::default()
2495 };
2496 if let Err(e) = self
2497 .send_message_impl(
2498 own_jid,
2499 &msg,
2500 Some(self.generate_message_id().await),
2501 true,
2502 false,
2503 None,
2504 vec![],
2505 )
2506 .await
2507 {
2508 warn!("Failed to send app state key request: {e}");
2509 }
2510 }
2511
2512 pub(crate) async fn send_app_state_patch(
2516 &self,
2517 collection_name: &str,
2518 mutations: Vec<(wa::SyncdMutation, Vec<u8>)>,
2519 ) -> Result<()> {
2520 let proc = self.get_app_state_processor().await;
2521 let (patch_bytes, base_version) = proc.build_patch(collection_name, mutations).await?;
2522
2523 let collection_node = NodeBuilder::new("collection")
2524 .attr("name", collection_name)
2525 .attr("version", base_version.to_string())
2526 .attr("return_snapshot", "false")
2527 .children([NodeBuilder::new("patch").bytes(patch_bytes).build()])
2528 .build();
2529 let sync_node = NodeBuilder::new("sync").children([collection_node]).build();
2530 let iq = crate::request::InfoQuery {
2531 namespace: "w:sync:app:state",
2532 query_type: crate::request::InfoQueryType::Set,
2533 to: server_jid().clone(),
2534 target: None,
2535 id: None,
2536 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2537 timeout: None,
2538 };
2539
2540 self.send_iq(iq).await?;
2541
2542 if let Ok(patch_name) = collection_name.parse::<WAPatchName>()
2545 && let Err(e) = self.fetch_app_state_with_retry(patch_name).await
2546 {
2547 log::warn!("Failed to re-sync {collection_name} after patch send: {e}");
2548 }
2549
2550 Ok(())
2551 }
2552
2553 async fn dispatch_app_state_mutation(
2554 &self,
2555 m: &crate::appstate_sync::Mutation,
2556 full_sync: bool,
2557 ) {
2558 use wacore::types::events::Event;
2559
2560 if m.operation != wa::syncd_mutation::SyncdOperation::Set {
2561 return;
2562 }
2563 if m.index.is_empty() {
2564 return;
2565 }
2566
2567 if crate::features::chat_actions::dispatch_chat_mutation(&self.core.event_bus, m, full_sync)
2569 {
2570 return;
2571 }
2572
2573 if m.index[0] == "setting_pushName"
2575 && let Some(val) = &m.action_value
2576 && let Some(act) = &val.push_name_setting
2577 && let Some(new_name) = &act.name
2578 {
2579 let new_name = new_name.clone();
2580 let bus = self.core.event_bus.clone();
2581
2582 let snapshot = self.persistence_manager.get_device_snapshot().await;
2583 let old = snapshot.push_name.clone();
2584 if old != new_name {
2585 debug!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
2586 self.persistence_manager
2587 .process_command(DeviceCommand::SetPushName(new_name.clone()))
2588 .await;
2589 bus.dispatch(&Event::SelfPushNameUpdated(
2590 crate::types::events::SelfPushNameUpdated {
2591 from_server: true,
2592 old_name: old.clone(),
2593 new_name: new_name.clone(),
2594 },
2595 ));
2596
2597 if old.is_empty() && !new_name.is_empty() {
2599 debug!(target: "Client/AppState", "Sending presence after receiving initial pushname from app state sync");
2600 if let Err(e) = self.presence().set_available().await {
2601 warn!(target: "Client/AppState", "Failed to send presence after pushname sync: {e:?}");
2602 }
2603 }
2604 } else {
2605 debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
2606 }
2607 }
2608 }
2609
2610 async fn expect_disconnect(&self) {
2611 self.expected_disconnect.store(true, Ordering::Relaxed);
2612 }
2613
2614 pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
2615 self.is_logged_in.store(false, Ordering::Relaxed);
2616
2617 let mut attrs = node.attrs();
2618 let code_cow = attrs.optional_string("code");
2619 let code = code_cow.as_deref().unwrap_or("");
2620 let conflict_type = node
2621 .get_optional_child("conflict")
2622 .map(|n| {
2623 n.attrs()
2624 .optional_string("type")
2625 .as_deref()
2626 .unwrap_or("")
2627 .to_string()
2628 })
2629 .unwrap_or_default();
2630
2631 if !conflict_type.is_empty() {
2632 info!(
2633 "Got stream error indicating client was removed or replaced (conflict={}). Logging out.",
2634 conflict_type
2635 );
2636 self.expect_disconnect().await;
2637 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2638
2639 let event = if conflict_type == "replaced" {
2640 Event::StreamReplaced(crate::types::events::StreamReplaced)
2641 } else {
2642 Event::LoggedOut(crate::types::events::LoggedOut {
2643 on_connect: false,
2644 reason: ConnectFailureReason::LoggedOut,
2645 })
2646 };
2647 self.core.event_bus.dispatch(&event);
2648
2649 let transport_opt = self.transport.lock().await.clone();
2650 if let Some(transport) = transport_opt {
2651 self.runtime
2652 .spawn(Box::pin(async move {
2653 info!("Disconnecting transport after conflict");
2654 transport.disconnect().await;
2655 }))
2656 .detach();
2657 }
2658 } else {
2659 match code {
2660 "515" => {
2661 info!(
2663 "Got 515 stream error, server is closing stream (expected after pairing). Will auto-reconnect."
2664 );
2665 self.expect_disconnect().await;
2666 let transport_opt = self.transport.lock().await.clone();
2669 if let Some(transport) = transport_opt {
2670 self.runtime
2672 .spawn(Box::pin(async move {
2673 info!("Disconnecting transport after 515");
2674 transport.disconnect().await;
2675 }))
2676 .detach();
2677 }
2678 }
2679 "516" => {
2680 info!("Got 516 stream error (device removed). Logging out.");
2681 self.expect_disconnect().await;
2682 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2683 self.core.event_bus.dispatch(&Event::LoggedOut(
2684 crate::types::events::LoggedOut {
2685 on_connect: false,
2686 reason: ConnectFailureReason::LoggedOut,
2687 },
2688 ));
2689
2690 let transport_opt = self.transport.lock().await.clone();
2691 if let Some(transport) = transport_opt {
2692 self.runtime
2693 .spawn(Box::pin(async move {
2694 info!("Disconnecting transport after 516");
2695 transport.disconnect().await;
2696 }))
2697 .detach();
2698 }
2699 }
2700 "401" => {
2701 info!("Got 401 stream error (unauthorized). Logging out.");
2704 self.expect_disconnect().await;
2705 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2706 self.core.event_bus.dispatch(&Event::LoggedOut(
2707 crate::types::events::LoggedOut {
2708 on_connect: false,
2709 reason: ConnectFailureReason::LoggedOut,
2710 },
2711 ));
2712
2713 let transport_opt = self.transport.lock().await.clone();
2714 if let Some(transport) = transport_opt {
2715 self.runtime
2716 .spawn(Box::pin(async move {
2717 info!("Disconnecting transport after 401");
2718 transport.disconnect().await;
2719 }))
2720 .detach();
2721 }
2722 }
2723 "409" => {
2724 info!("Got 409 stream error (conflict). Another session replaced this one.");
2727 self.expect_disconnect().await;
2728 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2729 self.core
2730 .event_bus
2731 .dispatch(&Event::StreamReplaced(crate::types::events::StreamReplaced));
2732
2733 let transport_opt = self.transport.lock().await.clone();
2734 if let Some(transport) = transport_opt {
2735 self.runtime
2736 .spawn(Box::pin(async move {
2737 info!("Disconnecting transport after 409");
2738 transport.disconnect().await;
2739 }))
2740 .detach();
2741 }
2742 }
2743 "429" => {
2744 warn!(
2747 "Got 429 stream error (rate limited). Will auto-reconnect with extended backoff."
2748 );
2749 self.auto_reconnect_errors.fetch_add(5, Ordering::Relaxed);
2750 }
2751 "503" => {
2752 info!("Got 503 service unavailable, will auto-reconnect.");
2753 }
2754 _ => {
2755 error!("Unknown stream error: {}", DisplayableNode(node));
2756 self.expect_disconnect().await;
2757 self.core.event_bus.dispatch(&Event::StreamError(
2758 crate::types::events::StreamError {
2759 code: code.to_string(),
2760 raw: Some(node.clone()),
2761 },
2762 ));
2763 }
2764 }
2765 }
2766
2767 info!("Notifying shutdown from stream error handler");
2768 self.shutdown_notifier.notify(usize::MAX);
2769 }
2770
2771 pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
2772 self.expected_disconnect.store(true, Ordering::Relaxed);
2773 self.shutdown_notifier.notify(usize::MAX);
2774
2775 let mut attrs = node.attrs();
2776 let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
2777 let reason = ConnectFailureReason::from(reason_code);
2778
2779 if reason.should_reconnect() {
2780 self.expected_disconnect.store(false, Ordering::Relaxed);
2781 } else {
2782 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2783 }
2784
2785 if reason.is_logged_out() {
2786 info!("Got {reason:?} connect failure, logging out.");
2787 self.core
2788 .event_bus
2789 .dispatch(&wacore::types::events::Event::LoggedOut(
2790 crate::types::events::LoggedOut {
2791 on_connect: true,
2792 reason,
2793 },
2794 ));
2795 } else if let ConnectFailureReason::TempBanned = reason {
2796 let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
2797 let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
2798 let expire_duration =
2799 chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
2800 warn!("Temporary ban connect failure: {}", DisplayableNode(node));
2801 self.core.event_bus.dispatch(&Event::TemporaryBan(
2802 crate::types::events::TemporaryBan {
2803 code: crate::types::events::TempBanReason::from(ban_code),
2804 expire: expire_duration,
2805 },
2806 ));
2807 } else if let ConnectFailureReason::ClientOutdated = reason {
2808 error!("Client is outdated and was rejected by server.");
2809 self.core
2810 .event_bus
2811 .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
2812 } else {
2813 warn!("Unknown connect failure: {}", DisplayableNode(node));
2814 self.core.event_bus.dispatch(&Event::ConnectFailure(
2815 crate::types::events::ConnectFailure {
2816 reason,
2817 message: attrs
2818 .optional_string("message")
2819 .as_deref()
2820 .unwrap_or("")
2821 .to_string(),
2822 raw: Some(node.clone()),
2823 },
2824 ));
2825 }
2826 }
2827
2828 pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
2829 if node.attrs.get("type").is_some_and(|s| s == "get")
2830 && (node.get_optional_child("ping").is_some()
2831 || node
2832 .attrs
2833 .get("xmlns")
2834 .is_some_and(|s| s == "urn:xmpp:ping"))
2835 {
2836 info!("Received ping, sending pong.");
2837 let mut parser = node.attrs();
2838 let from_jid = parser.jid("from");
2839 let id = parser.optional_string("id").map(|s| s.to_string());
2840 let pong = build_pong(from_jid.to_string(), id.as_deref());
2841 if let Err(e) = self.send_node(pong).await {
2842 warn!("Failed to send pong: {e:?}");
2843 }
2844 return true;
2845 }
2846
2847 if pair::handle_iq(self, node).await {
2849 return true;
2850 }
2851
2852 false
2853 }
2854
2855 pub fn is_connected(&self) -> bool {
2856 self.is_connected.load(Ordering::Acquire)
2857 }
2858
2859 pub fn is_logged_in(&self) -> bool {
2860 self.is_logged_in.load(Ordering::Relaxed)
2861 }
2862
2863 pub fn wait_for_node(
2881 &self,
2882 filter: NodeFilter,
2883 ) -> futures::channel::oneshot::Receiver<Arc<Node>> {
2884 let (tx, rx) = futures::channel::oneshot::channel();
2885 self.node_waiter_count.fetch_add(1, Ordering::Release);
2886 let mut waiters = self
2887 .node_waiters
2888 .lock()
2889 .unwrap_or_else(|poisoned| poisoned.into_inner());
2890 waiters.push(NodeWaiter { filter, tx });
2891 rx
2892 }
2893
2894 fn resolve_node_waiters(&self, node: &Arc<Node>) {
2897 let mut waiters = self
2898 .node_waiters
2899 .lock()
2900 .unwrap_or_else(|poisoned| poisoned.into_inner());
2901 let mut i = 0;
2902 while i < waiters.len() {
2903 if waiters[i].tx.is_canceled() {
2904 waiters.swap_remove(i);
2906 self.node_waiter_count.fetch_sub(1, Ordering::Release);
2907 } else if waiters[i].filter.matches(node) {
2908 let w = waiters.swap_remove(i);
2910 self.node_waiter_count.fetch_sub(1, Ordering::Release);
2911 let _ = w.tx.send(Arc::clone(node));
2912 } else {
2913 i += 1;
2914 }
2915 }
2916 }
2917
2918 pub(crate) fn update_server_time_offset(&self, node: &wacore_binary::node::Node) {
2919 self.unified_session.update_server_time_offset(node);
2920 }
2921
2922 pub(crate) async fn send_unified_session(&self) {
2923 if !self.is_connected() {
2924 debug!(target: "Client/UnifiedSession", "Skipping: not connected");
2925 return;
2926 }
2927
2928 let Some((node, _sequence)) = self.unified_session.prepare_send().await else {
2929 return;
2930 };
2931
2932 if let Err(e) = self.send_node(node).await {
2933 debug!(target: "Client/UnifiedSession", "Send failed: {e}");
2934 self.unified_session.clear_last_sent().await;
2935 }
2936 }
2937
2938 pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
2946 if self.is_connected() {
2948 return Ok(());
2949 }
2950
2951 let notified = self.socket_ready_notifier.listen();
2954 if self.is_connected() {
2955 return Ok(());
2956 }
2957
2958 rt_timeout(&*self.runtime, timeout, notified)
2959 .await
2960 .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
2961 }
2962
2963 pub async fn wait_for_connected(
2971 &self,
2972 timeout: std::time::Duration,
2973 ) -> Result<(), anyhow::Error> {
2974 if self.is_fully_ready() {
2976 return Ok(());
2977 }
2978
2979 let notified = self.connected_notifier.listen();
2982 if self.is_fully_ready() {
2983 return Ok(());
2984 }
2985
2986 rt_timeout(&*self.runtime, timeout, notified)
2987 .await
2988 .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
2989 }
2990
2991 pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
2994 self.persistence_manager.clone()
2995 }
2996
2997 pub async fn edit_message(
2998 &self,
2999 to: Jid,
3000 original_id: impl Into<String>,
3001 new_content: wa::Message,
3002 ) -> Result<String, anyhow::Error> {
3003 let original_id = original_id.into();
3004
3005 let participant = if to.is_group() {
3008 Some(
3009 self.get_own_jid_for_group(&to)
3010 .await?
3011 .to_non_ad()
3012 .to_string(),
3013 )
3014 } else {
3015 if self.get_pn().await.is_none() {
3016 return Err(anyhow::Error::from(ClientError::NotLoggedIn));
3017 }
3018 None
3019 };
3020
3021 let edit_container_message = wa::Message {
3022 edited_message: Some(Box::new(wa::message::FutureProofMessage {
3023 message: Some(Box::new(wa::Message {
3024 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
3025 key: Some(wa::MessageKey {
3026 remote_jid: Some(to.to_string()),
3027 from_me: Some(true),
3028 id: Some(original_id.clone()),
3029 participant,
3030 }),
3031 r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
3032 edited_message: Some(Box::new(new_content)),
3033 timestamp_ms: Some(wacore::time::now_millis()),
3034 ..Default::default()
3035 })),
3036 ..Default::default()
3037 })),
3038 })),
3039 ..Default::default()
3040 };
3041
3042 self.send_message_impl(
3048 to,
3049 &edit_container_message,
3050 None,
3051 false,
3052 false,
3053 Some(crate::types::message::EditAttribute::MessageEdit),
3054 vec![],
3055 )
3056 .await?;
3057
3058 Ok(original_id)
3059 }
3060
3061 pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
3062 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
3063 let noise_socket = match noise_socket_arc {
3064 Some(socket) => socket,
3065 None => return Err(ClientError::NotConnected),
3066 };
3067
3068 debug!(target: "Client/Send", "{}", DisplayableNode(&node));
3069
3070 let mut plaintext_buf = Vec::with_capacity(1024);
3071
3072 if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
3073 error!("Failed to marshal node: {e:?}");
3074 return Err(SocketError::Crypto("Marshal error".to_string()).into());
3075 }
3076
3077 let encrypted_buf = Vec::with_capacity(plaintext_buf.len() + 32);
3079
3080 if let Err(e) = noise_socket
3081 .encrypt_and_send(plaintext_buf, encrypted_buf)
3082 .await
3083 {
3084 return Err(e.into());
3085 }
3086
3087 self.last_data_sent_ms
3089 .store(wacore::time::now_millis() as u64, Ordering::Relaxed);
3090
3091 Ok(())
3092 }
3093
3094 pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
3095 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3096 let old_name = device_snapshot.push_name.clone();
3097
3098 if old_name == new_name {
3099 return;
3100 }
3101
3102 log::debug!("Updating push name from '{}' -> '{}'", old_name, new_name);
3103 self.persistence_manager
3104 .process_command(DeviceCommand::SetPushName(new_name.clone()))
3105 .await;
3106
3107 self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
3108 crate::types::events::SelfPushNameUpdated {
3109 from_server: true,
3110 old_name,
3111 new_name: new_name.clone(),
3112 },
3113 ));
3114
3115 let client_clone = self.clone();
3116 self.runtime
3117 .spawn(Box::pin(async move {
3118 if let Err(e) = client_clone.presence().set_available().await {
3119 log::warn!("Failed to send presence after push name update: {:?}", e);
3120 } else {
3121 log::debug!("Sent presence after push name update.");
3122 }
3123 }))
3124 .detach();
3125 }
3126
3127 pub async fn get_push_name(&self) -> String {
3128 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3129 device_snapshot.push_name.clone()
3130 }
3131
3132 pub async fn get_pn(&self) -> Option<Jid> {
3133 let snapshot = self.persistence_manager.get_device_snapshot().await;
3134 snapshot.pn.clone()
3135 }
3136
3137 pub async fn get_lid(&self) -> Option<Jid> {
3138 let snapshot = self.persistence_manager.get_device_snapshot().await;
3139 snapshot.lid.clone()
3140 }
3141
3142 pub(crate) async fn get_own_jid_for_group(
3147 &self,
3148 group_jid: &Jid,
3149 ) -> Result<Jid, anyhow::Error> {
3150 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3151 let own_pn = device_snapshot
3152 .pn
3153 .clone()
3154 .ok_or_else(|| anyhow::Error::from(ClientError::NotLoggedIn))?;
3155
3156 let addressing_mode = self
3157 .groups()
3158 .query_info(group_jid)
3159 .await
3160 .map(|info| info.addressing_mode)
3161 .unwrap_or(crate::types::message::AddressingMode::Pn);
3162
3163 Ok(match addressing_mode {
3164 crate::types::message::AddressingMode::Lid => {
3165 device_snapshot.lid.clone().unwrap_or(own_pn)
3166 }
3167 crate::types::message::AddressingMode::Pn => own_pn,
3168 })
3169 }
3170
3171 pub(crate) async fn make_stanza_key(&self, chat: Jid, id: String) -> StanzaKey {
3173 let chat = self.resolve_encryption_jid(&chat).await;
3175
3176 StanzaKey { chat, id }
3177 }
3178
3179 pub(crate) async fn send_protocol_receipt(
3182 &self,
3183 id: String,
3184 receipt_type: crate::types::presence::ReceiptType,
3185 ) {
3186 if id.is_empty() {
3187 return;
3188 }
3189 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3190 if let Some(own_jid) = &device_snapshot.pn {
3191 let type_str = match receipt_type {
3192 crate::types::presence::ReceiptType::HistorySync => "hist_sync",
3193 crate::types::presence::ReceiptType::Read => "read",
3194 crate::types::presence::ReceiptType::ReadSelf => "read-self",
3195 crate::types::presence::ReceiptType::Delivered => "delivery",
3196 crate::types::presence::ReceiptType::Played => "played",
3197 crate::types::presence::ReceiptType::PlayedSelf => "played-self",
3198 crate::types::presence::ReceiptType::Inactive => "inactive",
3199 crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
3200 crate::types::presence::ReceiptType::Sender => "sender",
3201 crate::types::presence::ReceiptType::ServerError => "server-error",
3202 crate::types::presence::ReceiptType::Retry => "retry",
3203 crate::types::presence::ReceiptType::EncRekeyRetry => "enc_rekey_retry",
3204 crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
3205 };
3206
3207 let node = NodeBuilder::new("receipt")
3208 .attrs([
3209 ("id", id),
3210 ("type", type_str.to_string()),
3211 ("to", own_jid.to_non_ad().to_string()),
3212 ])
3213 .build();
3214
3215 if let Err(e) = self.send_node(node).await {
3216 warn!(
3217 "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
3218 receipt_type, self.unique_id, e
3219 );
3220 }
3221 }
3222 }
3223}
3224
3225fn build_pong(to: String, id: Option<&str>) -> wacore_binary::node::Node {
3230 let mut builder = NodeBuilder::new("iq").attr("to", to).attr("type", "result");
3231 if let Some(id) = id {
3232 builder = builder.attr("id", id);
3233 }
3234 builder.build()
3235}
3236
3237fn build_ack_node(node: &Node, own_device_pn: Option<&Jid>) -> Option<Node> {
3250 let id = node.attrs.get("id")?.clone();
3251 let from = node.attrs.get("from")?.clone();
3252 let participant = node.attrs.get("participant").cloned();
3253
3254 let typ = if node.tag != "message" && !is_encrypt_identity_notification(node) {
3257 node.attrs.get("type").cloned()
3258 } else {
3259 None
3260 };
3261
3262 let mut attrs = Attrs::new();
3263 attrs.insert("class", NodeValue::String(node.tag.to_string()));
3264 attrs.insert("id", id);
3265 attrs.insert("to", from);
3266
3267 if node.tag == "message"
3268 && let Some(own_device_pn) = own_device_pn
3269 {
3270 attrs.insert("from", NodeValue::Jid(own_device_pn.clone()));
3271 }
3272 if let Some(p) = participant {
3273 attrs.insert("participant", p);
3274 }
3275 if let Some(t) = typ {
3276 attrs.insert("type", t);
3277 }
3278
3279 Some(Node {
3280 tag: Cow::Borrowed("ack"),
3281 attrs,
3282 content: None,
3283 })
3284}
3285
3286fn is_encrypt_identity_notification(node: &Node) -> bool {
3288 node.tag == "notification"
3289 && node.attrs.get("type").is_some_and(|v| v == "encrypt")
3290 && node.get_optional_child("identity").is_some()
3291}
3292
3293fn fibonacci_backoff(attempt: u32) -> Duration {
3299 const MAX_MS: u64 = 900_000; let mut a: u64 = 1000;
3302 let mut b: u64 = 1000;
3303 for _ in 0..attempt {
3304 let next = a.saturating_add(b).min(MAX_MS);
3305 a = b;
3306 b = next;
3307 }
3308 let base = a.min(MAX_MS);
3309
3310 let jitter_range = base / 10;
3312 let jitter = if jitter_range > 0 {
3313 rand::make_rng::<rand::rngs::StdRng>().random_range(0..=(jitter_range * 2)) as i64
3314 - jitter_range as i64
3315 } else {
3316 0
3317 };
3318 let ms = (base as i64 + jitter).max(0) as u64;
3319 Duration::from_millis(ms)
3320}
3321
3322#[cfg(test)]
3323mod tests {
3324 use super::*;
3325 use crate::lid_pn_cache::LearningSource;
3326 use crate::test_utils::MockHttpClient;
3327 use futures::channel::oneshot;
3328 use wacore_binary::jid::SERVER_JID;
3329
3330 #[tokio::test]
3331 async fn test_ack_behavior_for_incoming_stanzas() {
3332 let backend = crate::test_utils::create_test_backend().await;
3333 let pm = Arc::new(
3334 PersistenceManager::new(backend)
3335 .await
3336 .expect("persistence manager should initialize"),
3337 );
3338 let (client, _rx) = Client::new(
3339 Arc::new(crate::runtime_impl::TokioRuntime),
3340 pm,
3341 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3342 Arc::new(MockHttpClient),
3343 None,
3344 )
3345 .await;
3346
3347 use wacore_binary::node::{Attrs, Node, NodeContent};
3351
3352 let mut receipt_attrs = Attrs::new();
3353 receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3354 receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
3355 let receipt_node = Node::new(
3356 "receipt",
3357 receipt_attrs,
3358 Some(NodeContent::String("test".to_string())),
3359 );
3360
3361 let mut notification_attrs = Attrs::new();
3362 notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3363 notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
3364 let notification_node = Node::new(
3365 "notification",
3366 notification_attrs,
3367 Some(NodeContent::String("test".to_string())),
3368 );
3369
3370 assert!(
3371 client.should_ack(&receipt_node),
3372 "should_ack must still return TRUE for <receipt> stanzas."
3373 );
3374 assert!(
3375 client.should_ack(¬ification_node),
3376 "should_ack must still return TRUE for <notification> stanzas."
3377 );
3378
3379 info!(
3380 "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
3381 );
3382 }
3383
3384 #[tokio::test]
3385 async fn test_ack_waiter_resolves() {
3386 let backend = crate::test_utils::create_test_backend().await;
3387 let pm = Arc::new(
3388 PersistenceManager::new(backend)
3389 .await
3390 .expect("persistence manager should initialize"),
3391 );
3392 let (client, _rx) = Client::new(
3393 Arc::new(crate::runtime_impl::TokioRuntime),
3394 pm,
3395 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3396 Arc::new(MockHttpClient),
3397 None,
3398 )
3399 .await;
3400
3401 let test_id = "ack-test-123".to_string();
3403 let (tx, rx) = oneshot::channel();
3404 client
3405 .response_waiters
3406 .lock()
3407 .await
3408 .insert(test_id.clone(), tx);
3409 assert!(
3410 client.response_waiters.lock().await.contains_key(&test_id),
3411 "Waiter should be inserted before handling ack"
3412 );
3413
3414 let ack_node = NodeBuilder::new("ack")
3416 .attr("id", test_id.clone())
3417 .attr("from", SERVER_JID)
3418 .build();
3419
3420 let handled = client.handle_ack_response(ack_node).await;
3422 assert!(
3423 handled,
3424 "handle_ack_response should return true when waiter exists"
3425 );
3426
3427 match tokio::time::timeout(Duration::from_secs(1), rx).await {
3429 Ok(Ok(response_node)) => {
3430 assert!(
3431 response_node
3432 .attrs
3433 .get("id")
3434 .is_some_and(|v| v == test_id.as_str()),
3435 "Response node should have correct ID"
3436 );
3437 }
3438 Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
3439 Err(_) => panic!("Test timed out waiting for ack response"),
3440 }
3441
3442 assert!(
3444 !client.response_waiters.lock().await.contains_key(&test_id),
3445 "Waiter should be removed after handling"
3446 );
3447
3448 info!(
3449 "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
3450 );
3451 }
3452
3453 #[tokio::test]
3454 async fn test_ack_without_matching_waiter() {
3455 let backend = crate::test_utils::create_test_backend().await;
3456 let pm = Arc::new(
3457 PersistenceManager::new(backend)
3458 .await
3459 .expect("persistence manager should initialize"),
3460 );
3461 let (client, _rx) = Client::new(
3462 Arc::new(crate::runtime_impl::TokioRuntime),
3463 pm,
3464 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3465 Arc::new(MockHttpClient),
3466 None,
3467 )
3468 .await;
3469
3470 let ack_node = NodeBuilder::new("ack")
3472 .attr("id", "non-existent-id")
3473 .attr("from", SERVER_JID)
3474 .build();
3475
3476 let handled = client.handle_ack_response(ack_node).await;
3478 assert!(
3479 !handled,
3480 "handle_ack_response should return false when no waiter exists"
3481 );
3482
3483 info!(
3484 "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
3485 );
3486 }
3487
3488 #[tokio::test]
3494 async fn test_lid_pn_cache_basic_operations() {
3495 let backend = Arc::new(
3496 crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
3497 .await
3498 .expect("Failed to create in-memory backend for test"),
3499 );
3500 let pm = Arc::new(
3501 PersistenceManager::new(backend)
3502 .await
3503 .expect("persistence manager should initialize"),
3504 );
3505 let (client, _rx) = Client::new(
3506 Arc::new(crate::runtime_impl::TokioRuntime),
3507 pm,
3508 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3509 Arc::new(MockHttpClient),
3510 None,
3511 )
3512 .await;
3513
3514 let phone = "559980000001";
3516 let lid = "100000012345678";
3517
3518 assert!(
3519 client.lid_pn_cache.get_current_lid(phone).await.is_none(),
3520 "Cache should be empty initially"
3521 );
3522
3523 client
3525 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3526 .await
3527 .expect("Failed to persist LID-PN mapping in tests");
3528
3529 let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
3531 assert!(cached_lid.is_some(), "Cache should contain the mapping");
3532 assert_eq!(
3533 cached_lid.expect("cache should have LID"),
3534 lid,
3535 "Cached LID should match what we inserted"
3536 );
3537
3538 let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
3540 assert!(cached_phone.is_some(), "Reverse lookup should work");
3541 assert_eq!(
3542 cached_phone.expect("reverse lookup should return phone"),
3543 phone,
3544 "Cached phone should match what we inserted"
3545 );
3546
3547 assert!(
3549 client
3550 .lid_pn_cache
3551 .get_current_lid("559980000002")
3552 .await
3553 .is_none(),
3554 "Different phone number should not have a mapping"
3555 );
3556
3557 info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
3558 }
3559
3560 #[tokio::test]
3564 async fn test_lid_pn_cache_timestamp_resolution() {
3565 let backend = Arc::new(
3566 crate::store::SqliteStore::new(
3567 "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
3568 )
3569 .await
3570 .expect("Failed to create in-memory backend for test"),
3571 );
3572 let pm = Arc::new(
3573 PersistenceManager::new(backend)
3574 .await
3575 .expect("persistence manager should initialize"),
3576 );
3577 let (client, _rx) = Client::new(
3578 Arc::new(crate::runtime_impl::TokioRuntime),
3579 pm,
3580 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3581 Arc::new(MockHttpClient),
3582 None,
3583 )
3584 .await;
3585
3586 let phone = "559980000001";
3587 let lid_old = "100000012345678";
3588 let lid_new = "100000087654321";
3589
3590 client
3592 .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
3593 .await
3594 .expect("Failed to persist LID-PN mapping in tests");
3595
3596 assert_eq!(
3597 client
3598 .lid_pn_cache
3599 .get_current_lid(phone)
3600 .await
3601 .expect("cache should have LID"),
3602 lid_old,
3603 "Initial LID should be stored"
3604 );
3605
3606 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3608
3609 client
3611 .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
3612 .await
3613 .expect("Failed to persist LID-PN mapping in tests");
3614
3615 assert_eq!(
3616 client
3617 .lid_pn_cache
3618 .get_current_lid(phone)
3619 .await
3620 .expect("cache should have newer LID"),
3621 lid_new,
3622 "Newer LID should be returned for phone lookup"
3623 );
3624
3625 assert_eq!(
3627 client
3628 .lid_pn_cache
3629 .get_phone_number(lid_old)
3630 .await
3631 .expect("reverse lookup should return phone"),
3632 phone,
3633 "Old LID should still map to phone"
3634 );
3635 assert_eq!(
3636 client
3637 .lid_pn_cache
3638 .get_phone_number(lid_new)
3639 .await
3640 .expect("reverse lookup should return phone"),
3641 phone,
3642 "New LID should also map to phone"
3643 );
3644
3645 info!(
3646 "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
3647 );
3648 }
3649
3650 #[tokio::test]
3654 async fn test_get_lid_for_phone_via_send_context_resolver() {
3655 use wacore::client::context::SendContextResolver;
3656
3657 let backend = Arc::new(
3658 crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
3659 .await
3660 .expect("Failed to create in-memory backend for test"),
3661 );
3662 let pm = Arc::new(
3663 PersistenceManager::new(backend)
3664 .await
3665 .expect("persistence manager should initialize"),
3666 );
3667 let (client, _rx) = Client::new(
3668 Arc::new(crate::runtime_impl::TokioRuntime),
3669 pm,
3670 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3671 Arc::new(MockHttpClient),
3672 None,
3673 )
3674 .await;
3675
3676 let phone = "559980000001";
3677 let lid = "100000012345678";
3678
3679 assert!(
3681 client.get_lid_for_phone(phone).await.is_none(),
3682 "get_lid_for_phone should return None before caching"
3683 );
3684
3685 client
3687 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3688 .await
3689 .expect("Failed to persist LID-PN mapping in tests");
3690
3691 let result = client.get_lid_for_phone(phone).await;
3693 assert!(
3694 result.is_some(),
3695 "get_lid_for_phone should return Some after caching"
3696 );
3697 assert_eq!(
3698 result.expect("get_lid_for_phone should return Some"),
3699 lid,
3700 "get_lid_for_phone should return the cached LID"
3701 );
3702
3703 info!(
3704 "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
3705 );
3706 }
3707
3708 #[tokio::test]
3710 async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
3711 let backend = Arc::new(
3712 crate::store::SqliteStore::new(
3713 "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
3714 )
3715 .await
3716 .expect("Failed to create in-memory backend for test"),
3717 );
3718 let pm = Arc::new(
3719 PersistenceManager::new(backend)
3720 .await
3721 .expect("persistence manager should initialize"),
3722 );
3723 let (client, _rx) = Client::new(
3724 Arc::new(crate::runtime_impl::TokioRuntime),
3725 pm,
3726 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3727 Arc::new(MockHttpClient),
3728 None,
3729 )
3730 .await;
3731
3732 client
3734 .offline_sync_completed
3735 .store(true, std::sync::atomic::Ordering::Relaxed);
3736
3737 let start = std::time::Instant::now();
3739 client.wait_for_offline_delivery_end().await;
3740 let elapsed = start.elapsed();
3741
3742 assert!(
3744 elapsed.as_millis() < 100,
3745 "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
3746 elapsed
3747 );
3748
3749 info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
3750 }
3751
3752 #[tokio::test]
3755 async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
3756 let backend = Arc::new(
3757 crate::store::SqliteStore::new(
3758 "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
3759 )
3760 .await
3761 .expect("Failed to create in-memory backend for test"),
3762 );
3763 let pm = Arc::new(
3764 PersistenceManager::new(backend)
3765 .await
3766 .expect("persistence manager should initialize"),
3767 );
3768 let (client, _rx) = Client::new(
3769 Arc::new(crate::runtime_impl::TokioRuntime),
3770 pm,
3771 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3772 Arc::new(MockHttpClient),
3773 None,
3774 )
3775 .await;
3776
3777 let start = std::time::Instant::now();
3780 client
3781 .wait_for_offline_delivery_end_with_timeout(std::time::Duration::from_millis(50))
3782 .await;
3783
3784 let elapsed = start.elapsed();
3785 let semaphore = client
3787 .message_processing_semaphore
3788 .lock()
3789 .expect("message_processing_semaphore poisoned")
3790 .clone();
3791 let mut guards = Vec::new();
3792 while let Some(guard) = semaphore.try_acquire() {
3793 guards.push(guard);
3794 }
3795 let permits = guards.len();
3796 drop(guards);
3797
3798 assert!(
3799 elapsed.as_millis() >= 45, "Should have waited for the configured timeout duration, took {:?}",
3801 elapsed
3802 );
3803 assert!(
3804 client
3805 .offline_sync_completed
3806 .load(std::sync::atomic::Ordering::Relaxed),
3807 "wait_for_offline_delivery_end should mark offline sync complete on timeout"
3808 );
3809 assert_eq!(
3810 permits, 64,
3811 "timeout completion should restore parallel permits"
3812 );
3813
3814 info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
3815 }
3816
3817 #[tokio::test]
3819 async fn test_wait_for_offline_delivery_end_returns_on_notify() {
3820 let backend = Arc::new(
3821 crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
3822 .await
3823 .expect("Failed to create in-memory backend for test"),
3824 );
3825 let pm = Arc::new(
3826 PersistenceManager::new(backend)
3827 .await
3828 .expect("persistence manager should initialize"),
3829 );
3830 let (client, _rx) = Client::new(
3831 Arc::new(crate::runtime_impl::TokioRuntime),
3832 pm,
3833 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3834 Arc::new(MockHttpClient),
3835 None,
3836 )
3837 .await;
3838
3839 let client_clone = client.clone();
3840
3841 tokio::spawn(async move {
3843 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3844 client_clone.offline_sync_notifier.notify(usize::MAX);
3845 });
3846
3847 let start = std::time::Instant::now();
3848 client.wait_for_offline_delivery_end().await;
3849 let elapsed = start.elapsed();
3850
3851 assert!(
3853 elapsed.as_millis() < 200,
3854 "wait_for_offline_delivery_end should return when notified, took {:?}",
3855 elapsed
3856 );
3857 assert!(
3858 elapsed.as_millis() >= 45, "Should have waited for the notify, only took {:?}",
3860 elapsed
3861 );
3862
3863 info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
3864 }
3865
3866 #[tokio::test]
3868 async fn test_offline_sync_flag_initially_false() {
3869 let backend = Arc::new(
3870 crate::store::SqliteStore::new(
3871 "file:memdb_offline_flag_initial?mode=memory&cache=shared",
3872 )
3873 .await
3874 .expect("Failed to create in-memory backend for test"),
3875 );
3876 let pm = Arc::new(
3877 PersistenceManager::new(backend)
3878 .await
3879 .expect("persistence manager should initialize"),
3880 );
3881 let (client, _rx) = Client::new(
3882 Arc::new(crate::runtime_impl::TokioRuntime),
3883 pm,
3884 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3885 Arc::new(MockHttpClient),
3886 None,
3887 )
3888 .await;
3889
3890 assert!(
3892 !client
3893 .offline_sync_completed
3894 .load(std::sync::atomic::Ordering::Relaxed),
3895 "offline_sync_completed should be false when Client is first created"
3896 );
3897
3898 info!("✅ test_offline_sync_flag_initially_false passed");
3899 }
3900
3901 #[tokio::test]
3906 async fn test_offline_sync_lifecycle() {
3907 use std::sync::atomic::Ordering;
3908
3909 let backend = Arc::new(
3910 crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
3911 .await
3912 .expect("Failed to create in-memory backend for test"),
3913 );
3914 let pm = Arc::new(
3915 PersistenceManager::new(backend)
3916 .await
3917 .expect("persistence manager should initialize"),
3918 );
3919 let (client, _rx) = Client::new(
3920 Arc::new(crate::runtime_impl::TokioRuntime),
3921 pm,
3922 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3923 Arc::new(MockHttpClient),
3924 None,
3925 )
3926 .await;
3927
3928 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3930
3931 let client_waiter = client.clone();
3933 let waiter_handle = tokio::spawn(async move {
3934 client_waiter.wait_for_offline_delivery_end().await;
3935 true });
3937
3938 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3940
3941 assert!(
3943 !waiter_handle.is_finished(),
3944 "Waiter should still be waiting"
3945 );
3946
3947 client.offline_sync_completed.store(true, Ordering::Relaxed);
3949 client.offline_sync_notifier.notify(usize::MAX);
3950
3951 let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
3953 .await
3954 .expect("Waiter should complete after notify")
3955 .expect("Waiter task should not panic");
3956
3957 assert!(result, "Waiter should have completed successfully");
3958 assert!(client.offline_sync_completed.load(Ordering::Relaxed));
3959
3960 info!("✅ test_offline_sync_lifecycle passed");
3961 }
3962
3963 #[tokio::test]
3966 async fn test_establish_primary_phone_session_fails_without_pn() {
3967 let backend = Arc::new(
3968 crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
3969 .await
3970 .expect("Failed to create in-memory backend for test"),
3971 );
3972 let pm = Arc::new(
3973 PersistenceManager::new(backend)
3974 .await
3975 .expect("persistence manager should initialize"),
3976 );
3977 let (client, _rx) = Client::new(
3978 Arc::new(crate::runtime_impl::TokioRuntime),
3979 pm,
3980 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3981 Arc::new(MockHttpClient),
3982 None,
3983 )
3984 .await;
3985
3986 let result = client.establish_primary_phone_session_immediate().await;
3988
3989 assert!(
3990 result.is_err(),
3991 "establish_primary_phone_session_immediate should fail when no PN is set"
3992 );
3993
3994 let err = result.unwrap_err();
3995 assert!(
3996 err.downcast_ref::<ClientError>()
3997 .is_some_and(|e| matches!(e, ClientError::NotLoggedIn)),
3998 "Error should be ClientError::NotLoggedIn, got: {}",
3999 err
4000 );
4001
4002 info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
4003 }
4004
4005 #[tokio::test]
4009 async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
4010 use std::sync::atomic::Ordering;
4011 use wacore_binary::jid::Jid;
4012
4013 let backend = Arc::new(
4014 crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
4015 .await
4016 .expect("Failed to create in-memory backend for test"),
4017 );
4018 let pm = Arc::new(
4019 PersistenceManager::new(backend)
4020 .await
4021 .expect("persistence manager should initialize"),
4022 );
4023 let (client, _rx) = Client::new(
4024 Arc::new(crate::runtime_impl::TokioRuntime),
4025 pm,
4026 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4027 Arc::new(MockHttpClient),
4028 None,
4029 )
4030 .await;
4031
4032 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4034
4035 let client_clone = client.clone();
4038 let ensure_handle = tokio::spawn(async move {
4039 client_clone.ensure_e2e_sessions(vec![]).await
4042 });
4043
4044 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4046 assert!(
4047 ensure_handle.is_finished(),
4048 "ensure_e2e_sessions should return immediately for empty JID list"
4049 );
4050
4051 let client_clone = client.clone();
4053 let test_jid = Jid::pn("559999999999");
4054 let ensure_handle = tokio::spawn(async move {
4055 let start = std::time::Instant::now();
4057 let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
4058 start.elapsed()
4059 });
4060
4061 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
4063
4064 assert!(
4066 !ensure_handle.is_finished(),
4067 "ensure_e2e_sessions should be waiting for offline sync"
4068 );
4069
4070 client.offline_sync_completed.store(true, Ordering::Relaxed);
4072 client.offline_sync_notifier.notify(usize::MAX);
4073
4074 let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
4076
4077 assert!(
4078 result.is_ok(),
4079 "ensure_e2e_sessions should complete after offline sync"
4080 );
4081
4082 info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
4083 }
4084
4085 #[tokio::test]
4094 async fn test_immediate_session_does_not_wait_for_offline_sync() {
4095 use std::sync::atomic::Ordering;
4096 use wacore_binary::jid::Jid;
4097
4098 let backend = Arc::new(
4099 crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
4100 .await
4101 .expect("Failed to create in-memory backend for test"),
4102 );
4103 let pm = Arc::new(
4104 PersistenceManager::new(backend.clone())
4105 .await
4106 .expect("persistence manager should initialize"),
4107 );
4108
4109 pm.modify_device(|device| {
4111 device.pn = Some(Jid::pn("559999999999"));
4112 })
4113 .await;
4114
4115 let (client, _rx) = Client::new(
4116 Arc::new(crate::runtime_impl::TokioRuntime),
4117 pm,
4118 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4119 Arc::new(MockHttpClient),
4120 None,
4121 )
4122 .await;
4123
4124 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4126
4127 let start = std::time::Instant::now();
4130
4131 let result = tokio::time::timeout(
4134 std::time::Duration::from_millis(500),
4135 client.establish_primary_phone_session_immediate(),
4136 )
4137 .await;
4138
4139 let elapsed = start.elapsed();
4140
4141 assert!(
4143 result.is_ok(),
4144 "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
4145 );
4146
4147 assert!(
4149 elapsed.as_millis() < 500,
4150 "establish_primary_phone_session_immediate should not wait, took {:?}",
4151 elapsed
4152 );
4153
4154 info!(
4157 "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
4158 elapsed,
4159 result.unwrap().is_ok()
4160 );
4161
4162 info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
4163 }
4164
4165 #[tokio::test]
4173 async fn test_establish_session_skips_when_exists() {
4174 use wacore::libsignal::protocol::SessionRecord;
4175 use wacore::libsignal::store::SessionStore;
4176 use wacore::types::jid::JidExt;
4177 use wacore_binary::jid::Jid;
4178
4179 let backend = Arc::new(
4180 crate::store::SqliteStore::new("file:memdb_skip_existing?mode=memory&cache=shared")
4181 .await
4182 .expect("Failed to create in-memory backend for test"),
4183 );
4184 let pm = Arc::new(
4185 PersistenceManager::new(backend.clone())
4186 .await
4187 .expect("persistence manager should initialize"),
4188 );
4189
4190 let own_pn = Jid::pn("559999999999");
4192 pm.modify_device(|device| {
4193 device.pn = Some(own_pn.clone());
4194 })
4195 .await;
4196
4197 let primary_phone_jid = own_pn.with_device(0);
4199 let signal_addr = primary_phone_jid.to_protocol_address();
4200
4201 let dummy_session = SessionRecord::new_fresh();
4203 {
4204 let device_arc = pm.get_device_arc().await;
4205 let device = device_arc.read().await;
4206 device
4207 .store_session(&signal_addr, &dummy_session)
4208 .await
4209 .expect("Failed to store test session");
4210
4211 let exists = device
4213 .contains_session(&signal_addr)
4214 .await
4215 .expect("Failed to check session");
4216 assert!(exists, "Session should exist after store");
4217 }
4218
4219 let (client, _rx) = Client::new(
4220 Arc::new(crate::runtime_impl::TokioRuntime),
4221 pm.clone(),
4222 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4223 Arc::new(MockHttpClient),
4224 None,
4225 )
4226 .await;
4227
4228 let result = client.establish_primary_phone_session_immediate().await;
4231
4232 assert!(
4233 result.is_ok(),
4234 "establish_primary_phone_session_immediate should succeed when session exists"
4235 );
4236
4237 {
4240 let device_arc = pm.get_device_arc().await;
4241 let device = device_arc.read().await;
4242 let exists = device
4243 .contains_session(&signal_addr)
4244 .await
4245 .expect("Failed to check session");
4246 assert!(exists, "Session should still exist after the call");
4247 }
4248
4249 info!("✅ test_establish_session_skips_when_exists passed");
4250 }
4251
4252 #[test]
4255 fn test_mac_failure_prevention_flow_documentation() {
4256 fn should_establish_session(
4258 check_result: Result<bool, &'static str>,
4259 ) -> Result<bool, String> {
4260 match check_result {
4261 Ok(true) => Ok(false), Ok(false) => Ok(true), Err(e) => Err(format!("Cannot verify session: {}", e)), }
4265 }
4266
4267 let result = should_establish_session(Ok(true));
4269 assert_eq!(result, Ok(false), "Should skip when session exists");
4270
4271 let result = should_establish_session(Ok(false));
4273 assert_eq!(result, Ok(true), "Should establish when no session");
4274
4275 let result = should_establish_session(Err("database error"));
4277 assert!(result.is_err(), "Should fail when check fails");
4278
4279 info!("✅ test_mac_failure_prevention_flow_documentation passed");
4280 }
4281
4282 #[test]
4283 fn test_unified_session_id_calculation() {
4284 const DAY_MS: i64 = 24 * 60 * 60 * 1000;
4288 const WEEK_MS: i64 = 7 * DAY_MS;
4289 const OFFSET_MS: i64 = 3 * DAY_MS;
4290
4291 fn calculate_session_id(now_ms: i64, server_offset_ms: i64) -> i64 {
4293 let adjusted_now = now_ms + server_offset_ms;
4294 (adjusted_now + OFFSET_MS) % WEEK_MS
4295 }
4296
4297 let now_ms = 1706000000000_i64; let id = calculate_session_id(now_ms, 0);
4300 assert!(
4301 (0..WEEK_MS).contains(&id),
4302 "Session ID should be in [0, WEEK_MS)"
4303 );
4304
4305 let id_with_positive_offset = calculate_session_id(now_ms, 5000);
4307 assert!(
4308 (0..WEEK_MS).contains(&id_with_positive_offset),
4309 "Session ID should be in [0, WEEK_MS)"
4310 );
4311 let id_with_negative_offset = calculate_session_id(now_ms, -5000);
4316 assert!(
4317 (0..WEEK_MS).contains(&id_with_negative_offset),
4318 "Session ID should be in [0, WEEK_MS)"
4319 );
4320
4321 let wrap_test_now = WEEK_MS - OFFSET_MS + 1000; let wrapped_id = calculate_session_id(wrap_test_now, 0);
4325 assert_eq!(wrapped_id, 1000, "Should wrap around correctly");
4326
4327 let boundary_now = WEEK_MS - OFFSET_MS;
4329 let boundary_id = calculate_session_id(boundary_now, 0);
4330 assert_eq!(boundary_id, 0, "At exact boundary should be 0");
4331 }
4332
4333 #[tokio::test]
4334 async fn test_server_time_offset_extraction() {
4335 use wacore_binary::builder::NodeBuilder;
4336
4337 let backend = crate::test_utils::create_test_backend().await;
4338 let pm = Arc::new(
4339 PersistenceManager::new(backend)
4340 .await
4341 .expect("persistence manager should initialize"),
4342 );
4343 let (client, _rx) = Client::new(
4344 Arc::new(crate::runtime_impl::TokioRuntime),
4345 pm,
4346 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4347 Arc::new(MockHttpClient),
4348 None,
4349 )
4350 .await;
4351
4352 assert_eq!(
4354 client.unified_session.server_time_offset_ms(),
4355 0,
4356 "Initial offset should be 0"
4357 );
4358
4359 let server_time = wacore::time::now_secs() + 10; let node = NodeBuilder::new("success")
4362 .attr("t", server_time.to_string())
4363 .build();
4364
4365 client.update_server_time_offset(&node);
4367
4368 let offset = client.unified_session.server_time_offset_ms();
4371 assert!(
4372 (offset - 10000).abs() < 1000, "Offset should be approximately 10000ms, got {}",
4374 offset
4375 );
4376
4377 let node_no_t = NodeBuilder::new("success").build();
4379 client.update_server_time_offset(&node_no_t);
4380 let offset_after = client.unified_session.server_time_offset_ms();
4381 assert!(
4382 (offset_after - offset).abs() < 100, "Offset should not change when 't' is missing"
4384 );
4385
4386 let node_invalid = NodeBuilder::new("success")
4388 .attr("t", "not_a_number")
4389 .build();
4390 client.update_server_time_offset(&node_invalid);
4391 let offset_after_invalid = client.unified_session.server_time_offset_ms();
4392 assert!(
4393 (offset_after_invalid - offset).abs() < 100,
4394 "Offset should not change when 't' is invalid"
4395 );
4396
4397 let node_zero = NodeBuilder::new("success").attr("t", "0").build();
4399 client.update_server_time_offset(&node_zero);
4400 let offset_after_zero = client.unified_session.server_time_offset_ms();
4401 assert!(
4402 (offset_after_zero - offset).abs() < 100,
4403 "Offset should not change when 't' is 0"
4404 );
4405
4406 info!("✅ test_server_time_offset_extraction passed");
4407 }
4408
4409 #[tokio::test]
4410 async fn test_unified_session_manager_integration() {
4411 let backend = crate::test_utils::create_test_backend().await;
4414 let pm = Arc::new(
4415 PersistenceManager::new(backend)
4416 .await
4417 .expect("persistence manager should initialize"),
4418 );
4419 let (client, _rx) = Client::new(
4420 Arc::new(crate::runtime_impl::TokioRuntime),
4421 pm,
4422 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4423 Arc::new(MockHttpClient),
4424 None,
4425 )
4426 .await;
4427
4428 assert_eq!(
4430 client.unified_session.sequence(),
4431 0,
4432 "Initial sequence should be 0"
4433 );
4434
4435 loop {
4439 client.unified_session.reset().await;
4440
4441 let result = client.unified_session.prepare_send().await;
4442 assert!(result.is_some(), "First send should succeed");
4443 let (node, seq) = result.unwrap();
4444 assert_eq!(node.tag, "ib", "Should be an IB stanza");
4445 assert_eq!(seq, 1, "First sequence should be 1 (pre-increment)");
4446 assert_eq!(client.unified_session.sequence(), 1);
4447
4448 let result2 = client.unified_session.prepare_send().await;
4449 if result2.is_none() {
4450 assert_eq!(client.unified_session.sequence(), 1);
4452 break;
4453 }
4454 tokio::task::yield_now().await;
4456 }
4457
4458 client.unified_session.clear_last_sent().await;
4460 let result3 = client.unified_session.prepare_send().await;
4461 assert!(result3.is_some(), "Should succeed after clearing");
4462 let (_, seq3) = result3.unwrap();
4463 assert_eq!(seq3, 1, "Sequence resets when session ID changes");
4464 assert_eq!(client.unified_session.sequence(), 1);
4465
4466 info!("✅ test_unified_session_manager_integration passed");
4467 }
4468
4469 #[test]
4470 fn test_unified_session_protocol_node() {
4471 use wacore::ib::{IbStanza, UnifiedSession};
4473 use wacore::protocol::ProtocolNode;
4474
4475 let session = UnifiedSession::new("123456789");
4477 assert_eq!(session.id, "123456789");
4478 assert_eq!(session.tag(), "unified_session");
4479
4480 let node = session.into_node();
4482 assert_eq!(node.tag, "unified_session");
4483 assert!(node.attrs.get("id").is_some_and(|v| v == "123456789"));
4484
4485 let stanza = IbStanza::unified_session(UnifiedSession::new("987654321"));
4487 assert_eq!(stanza.tag(), "ib");
4488
4489 let ib_node = stanza.into_node();
4491 assert_eq!(ib_node.tag, "ib");
4492 let children = ib_node.children().expect("IB stanza should have children");
4493 assert_eq!(children.len(), 1);
4494 assert_eq!(children[0].tag, "unified_session");
4495 assert!(
4496 children[0]
4497 .attrs
4498 .get("id")
4499 .is_some_and(|v| v == "987654321")
4500 );
4501
4502 info!("✅ test_unified_session_protocol_node passed");
4503 }
4504
4505 async fn create_offline_sync_test_client() -> Arc<Client> {
4507 let backend = crate::test_utils::create_test_backend().await;
4508 let pm = Arc::new(
4509 PersistenceManager::new(backend)
4510 .await
4511 .expect("persistence manager should initialize"),
4512 );
4513 let (client, _rx) = Client::new(
4514 Arc::new(crate::runtime_impl::TokioRuntime),
4515 pm,
4516 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4517 Arc::new(MockHttpClient),
4518 None,
4519 )
4520 .await;
4521 client
4522 }
4523
4524 #[tokio::test]
4525 async fn test_ib_thread_metadata_does_not_end_sync() {
4526 let client = create_offline_sync_test_client().await;
4527 client
4528 .offline_sync_metrics
4529 .active
4530 .store(true, Ordering::Release);
4531
4532 let node = NodeBuilder::new("ib")
4533 .children([NodeBuilder::new("thread_metadata")
4534 .children([NodeBuilder::new("item").build()])
4535 .build()])
4536 .build();
4537
4538 client.process_node(Arc::new(node)).await;
4539 assert!(
4540 client.offline_sync_metrics.active.load(Ordering::Acquire),
4541 "<ib><thread_metadata> should NOT end offline sync"
4542 );
4543 }
4544
4545 #[tokio::test]
4546 async fn test_ib_edge_routing_does_not_end_sync() {
4547 let client = create_offline_sync_test_client().await;
4548 client
4549 .offline_sync_metrics
4550 .active
4551 .store(true, Ordering::Release);
4552
4553 let node = NodeBuilder::new("ib")
4554 .children([NodeBuilder::new("edge_routing")
4555 .children([NodeBuilder::new("routing_info")
4556 .bytes(vec![1, 2, 3])
4557 .build()])
4558 .build()])
4559 .build();
4560
4561 client.process_node(Arc::new(node)).await;
4562 assert!(
4563 client.offline_sync_metrics.active.load(Ordering::Acquire),
4564 "<ib><edge_routing> should NOT end offline sync"
4565 );
4566 }
4567
4568 #[tokio::test]
4569 async fn test_ib_dirty_does_not_end_sync() {
4570 let client = create_offline_sync_test_client().await;
4571 client
4572 .offline_sync_metrics
4573 .active
4574 .store(true, Ordering::Release);
4575
4576 let node = NodeBuilder::new("ib")
4577 .children([NodeBuilder::new("dirty")
4578 .attr("type", "groups")
4579 .attr("timestamp", "1234")
4580 .build()])
4581 .build();
4582
4583 client.process_node(Arc::new(node)).await;
4584 assert!(
4585 client.offline_sync_metrics.active.load(Ordering::Acquire),
4586 "<ib><dirty> should NOT end offline sync"
4587 );
4588 }
4589
4590 #[tokio::test]
4591 async fn test_ib_offline_child_ends_sync() {
4592 let client = create_offline_sync_test_client().await;
4593 client
4594 .offline_sync_metrics
4595 .active
4596 .store(true, Ordering::Release);
4597 client
4598 .offline_sync_metrics
4599 .total_messages
4600 .store(301, Ordering::Release);
4601
4602 let node = NodeBuilder::new("ib")
4603 .children([NodeBuilder::new("offline").attr("count", "301").build()])
4604 .build();
4605
4606 client.process_node(Arc::new(node)).await;
4607 assert!(
4608 !client.offline_sync_metrics.active.load(Ordering::Acquire),
4609 "<ib><offline count='301'/> should end offline sync"
4610 );
4611 }
4612
4613 #[tokio::test]
4614 async fn test_ib_offline_preview_starts_sync() {
4615 let client = create_offline_sync_test_client().await;
4616
4617 let node = NodeBuilder::new("ib")
4618 .children([NodeBuilder::new("offline_preview")
4619 .attr("count", "301")
4620 .attr("message", "168")
4621 .attr("notification", "62")
4622 .attr("receipt", "68")
4623 .attr("appdata", "0")
4624 .build()])
4625 .build();
4626
4627 client.process_node(Arc::new(node)).await;
4628 assert!(
4629 client.offline_sync_metrics.active.load(Ordering::Acquire),
4630 "offline_preview with count>0 should activate sync"
4631 );
4632 assert_eq!(
4633 client
4634 .offline_sync_metrics
4635 .total_messages
4636 .load(Ordering::Acquire),
4637 301
4638 );
4639 }
4640
4641 #[tokio::test]
4642 async fn test_offline_message_increments_processed() {
4643 let client = create_offline_sync_test_client().await;
4644 client
4645 .offline_sync_metrics
4646 .active
4647 .store(true, Ordering::Release);
4648 client
4649 .offline_sync_metrics
4650 .total_messages
4651 .store(100, Ordering::Release);
4652
4653 let node = NodeBuilder::new("message")
4654 .attr("offline", "1")
4655 .attr("from", "5551234567@s.whatsapp.net")
4656 .attr("id", "TEST123")
4657 .attr("t", "1772884671")
4658 .attr("type", "text")
4659 .build();
4660
4661 client.process_node(Arc::new(node)).await;
4662 assert_eq!(
4663 client
4664 .offline_sync_metrics
4665 .processed_messages
4666 .load(Ordering::Acquire),
4667 1,
4668 "offline message should increment processed count"
4669 );
4670 }
4671
4672 #[tokio::test]
4694 async fn test_handle_iq_ping_with_child_element() {
4695 let backend = crate::test_utils::create_test_backend().await;
4697 let pm = Arc::new(
4698 PersistenceManager::new(backend)
4699 .await
4700 .expect("persistence manager should initialize"),
4701 );
4702 let (client, _rx) = Client::new(
4703 Arc::new(crate::runtime_impl::TokioRuntime),
4704 pm,
4705 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4706 Arc::new(MockHttpClient),
4707 None,
4708 )
4709 .await;
4710
4711 let ping_node = NodeBuilder::new("iq")
4712 .attr("type", "get")
4713 .attr("from", SERVER_JID)
4714 .attr("id", "ping-child-1")
4715 .children([NodeBuilder::new("ping").build()])
4716 .build();
4717
4718 let handled = client.handle_iq(&ping_node).await;
4719 assert!(
4720 handled,
4721 "handle_iq must recognize ping with <ping> child element"
4722 );
4723 }
4724
4725 #[tokio::test]
4726 async fn test_handle_iq_ping_with_xmlns_attribute() {
4727 let backend = crate::test_utils::create_test_backend().await;
4731 let pm = Arc::new(
4732 PersistenceManager::new(backend)
4733 .await
4734 .expect("persistence manager should initialize"),
4735 );
4736 let (client, _rx) = Client::new(
4737 Arc::new(crate::runtime_impl::TokioRuntime),
4738 pm,
4739 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4740 Arc::new(MockHttpClient),
4741 None,
4742 )
4743 .await;
4744
4745 let ping_node = NodeBuilder::new("iq")
4746 .attr("type", "get")
4747 .attr("from", SERVER_JID)
4748 .attr("id", "ping-xmlns-1")
4749 .attr("xmlns", "urn:xmpp:ping")
4750 .build();
4751
4752 let handled = client.handle_iq(&ping_node).await;
4753 assert!(
4754 handled,
4755 "handle_iq must recognize ping with xmlns=\"urn:xmpp:ping\" attribute (no children)"
4756 );
4757 }
4758
4759 #[tokio::test]
4760 async fn test_handle_iq_ping_with_both_child_and_xmlns() {
4761 let backend = crate::test_utils::create_test_backend().await;
4764 let pm = Arc::new(
4765 PersistenceManager::new(backend)
4766 .await
4767 .expect("persistence manager should initialize"),
4768 );
4769 let (client, _rx) = Client::new(
4770 Arc::new(crate::runtime_impl::TokioRuntime),
4771 pm,
4772 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4773 Arc::new(MockHttpClient),
4774 None,
4775 )
4776 .await;
4777
4778 let ping_node = NodeBuilder::new("iq")
4779 .attr("type", "get")
4780 .attr("from", SERVER_JID)
4781 .attr("id", "ping-both-1")
4782 .attr("xmlns", "urn:xmpp:ping")
4783 .children([NodeBuilder::new("ping").build()])
4784 .build();
4785
4786 let handled = client.handle_iq(&ping_node).await;
4787 assert!(
4788 handled,
4789 "handle_iq must handle ping with both child and xmlns"
4790 );
4791 }
4792
4793 #[tokio::test]
4794 async fn test_handle_iq_non_ping_returns_false() {
4795 let backend = crate::test_utils::create_test_backend().await;
4797 let pm = Arc::new(
4798 PersistenceManager::new(backend)
4799 .await
4800 .expect("persistence manager should initialize"),
4801 );
4802 let (client, _rx) = Client::new(
4803 Arc::new(crate::runtime_impl::TokioRuntime),
4804 pm,
4805 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4806 Arc::new(MockHttpClient),
4807 None,
4808 )
4809 .await;
4810
4811 let non_ping_node = NodeBuilder::new("iq")
4812 .attr("type", "get")
4813 .attr("from", SERVER_JID)
4814 .attr("id", "not-a-ping")
4815 .attr("xmlns", "some:other:namespace")
4816 .build();
4817
4818 let handled = client.handle_iq(&non_ping_node).await;
4819 assert!(
4820 !handled,
4821 "handle_iq must NOT treat non-ping xmlns as a ping"
4822 );
4823 }
4824
4825 #[tokio::test]
4826 async fn test_handle_iq_ping_wrong_type_returns_false() {
4827 let backend = crate::test_utils::create_test_backend().await;
4829 let pm = Arc::new(
4830 PersistenceManager::new(backend)
4831 .await
4832 .expect("persistence manager should initialize"),
4833 );
4834 let (client, _rx) = Client::new(
4835 Arc::new(crate::runtime_impl::TokioRuntime),
4836 pm,
4837 Arc::new(crate::transport::mock::MockTransportFactory::new()),
4838 Arc::new(MockHttpClient),
4839 None,
4840 )
4841 .await;
4842
4843 let result_node = NodeBuilder::new("iq")
4844 .attr("type", "result")
4845 .attr("from", SERVER_JID)
4846 .attr("id", "ping-result-1")
4847 .attr("xmlns", "urn:xmpp:ping")
4848 .build();
4849
4850 let handled = client.handle_iq(&result_node).await;
4851 assert!(
4852 !handled,
4853 "handle_iq must NOT respond to type=\"result\" even with ping xmlns"
4854 );
4855 }
4856
4857 #[test]
4860 fn test_build_pong_with_id() {
4861 let pong = build_pong("s.whatsapp.net".to_string(), Some("ping-123"));
4862 assert!(
4863 pong.attrs.get("id").is_some_and(|v| v == "ping-123"),
4864 "pong should include id when server ping has one"
4865 );
4866 assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4867 assert!(pong.attrs.get("to").is_some_and(|v| v == "s.whatsapp.net"));
4868 }
4869
4870 #[test]
4871 fn test_build_pong_without_id() {
4872 let pong = build_pong("s.whatsapp.net".to_string(), None);
4873 assert!(
4874 !pong.attrs.contains_key("id"),
4875 "pong should NOT include id when server ping has none"
4876 );
4877 assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4878 }
4879
4880 #[test]
4881 fn test_encrypt_identity_notification_omits_type() {
4882 let node = NodeBuilder::new("notification")
4883 .attr("from", "186303081611421@lid")
4884 .attr("id", "4128735301")
4885 .attr("type", "encrypt")
4886 .children([NodeBuilder::new("identity").build()])
4887 .build();
4888
4889 assert!(
4890 is_encrypt_identity_notification(&node),
4891 "identity-change notification ACK must omit type to match WA Web"
4892 );
4893 }
4894
4895 #[test]
4896 fn test_device_notification_is_not_encrypt_identity() {
4897 let node = NodeBuilder::new("notification")
4898 .attr("from", "186303081611421@lid")
4899 .attr("id", "269488578")
4900 .attr("type", "devices")
4901 .children([NodeBuilder::new("remove").build()])
4902 .build();
4903
4904 assert!(
4905 !is_encrypt_identity_notification(&node),
4906 "device notification is not an encrypt+identity notification"
4907 );
4908 }
4909
4910 #[test]
4911 fn test_build_ack_node_for_message_omits_type_includes_from() {
4912 let incoming = NodeBuilder::new("message")
4915 .attr("from", "120363161500776365@g.us")
4916 .attr("id", "A5791A5392EF60E3FB0670098DE010D4")
4917 .attr("type", "text")
4918 .attr("participant", "181531758878822@lid")
4919 .build();
4920 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4921 .parse()
4922 .expect("own device PN JID should parse");
4923
4924 let ack = build_ack_node(&incoming, Some(&own_device_pn))
4925 .expect("message ack should be buildable");
4926
4927 assert_eq!(ack.tag, "ack");
4928 assert!(ack.attrs.get("class").is_some_and(|v| v == "message"));
4931 assert!(
4932 ack.attrs
4933 .get("to")
4934 .is_some_and(|v| v == "120363161500776365@g.us")
4935 );
4936 assert!(
4937 ack.attrs
4938 .get("from")
4939 .is_some_and(|v| v == "155500012345:48@s.whatsapp.net")
4940 );
4941 assert!(
4942 ack.attrs
4943 .get("participant")
4944 .is_some_and(|v| v == "181531758878822@lid")
4945 );
4946 assert!(
4947 !ack.attrs.contains_key("type"),
4948 "message ACK must NOT echo type (matches whatsmeow behavior)"
4949 );
4950 }
4951
4952 #[test]
4953 fn test_build_ack_node_for_identity_change_omits_type_and_from() {
4954 let incoming = NodeBuilder::new("notification")
4955 .attr("from", "186303081611421@lid")
4956 .attr("id", "4128735301")
4957 .attr("type", "encrypt")
4958 .children([NodeBuilder::new("identity").build()])
4959 .build();
4960 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4961 .parse()
4962 .expect("own device PN JID should parse");
4963
4964 let ack = build_ack_node(&incoming, Some(&own_device_pn))
4965 .expect("notification ack should be buildable");
4966
4967 assert!(ack.attrs.get("class").is_some_and(|v| v == "notification"));
4968 assert!(
4969 !ack.attrs.contains_key("type"),
4970 "identity-change notification ACK must omit type"
4971 );
4972 assert!(
4973 !ack.attrs.contains_key("from"),
4974 "notification ACKs should not include our device PN"
4975 );
4976 }
4977
4978 #[test]
4979 fn test_build_ack_node_for_receipt_with_type_echoes_type() {
4980 let incoming = NodeBuilder::new("receipt")
4982 .attr("from", "156535032389744@lid")
4983 .attr("id", "RCPT-WITH-TYPE")
4984 .attr("type", "read")
4985 .build();
4986 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4987 .parse()
4988 .expect("own device PN JID should parse");
4989
4990 let ack = build_ack_node(&incoming, Some(&own_device_pn))
4991 .expect("receipt ack should be buildable");
4992
4993 assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
4994 assert!(
4995 ack.attrs.get("type").is_some_and(|v| v == "read"),
4996 "receipt ACK must echo the type attribute when present"
4997 );
4998 assert!(
4999 !ack.attrs.contains_key("from"),
5000 "receipt ACKs should not include our device PN"
5001 );
5002 }
5003
5004 #[test]
5005 fn test_build_ack_node_for_receipt_without_type_omits_type() {
5006 let incoming = NodeBuilder::new("receipt")
5009 .attr("from", "156535032389744@lid")
5010 .attr("id", "RCPT-NO-TYPE")
5011 .build();
5012 let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
5013 .parse()
5014 .expect("own device PN JID should parse");
5015
5016 let ack = build_ack_node(&incoming, Some(&own_device_pn))
5017 .expect("receipt ack should be buildable");
5018
5019 assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
5020 assert!(
5021 !ack.attrs.contains_key("type"),
5022 "receipt ACK must NOT contain type when the incoming receipt has no type attribute"
5023 );
5024 assert!(
5025 !ack.attrs.contains_key("from"),
5026 "receipt ACKs should not include our device PN"
5027 );
5028 }
5029
5030 #[tokio::test]
5032 async fn test_handle_iq_ping_without_id() {
5033 let backend = crate::test_utils::create_test_backend().await;
5034 let pm = Arc::new(
5035 PersistenceManager::new(backend)
5036 .await
5037 .expect("persistence manager should initialize"),
5038 );
5039 let (client, _rx) = Client::new(
5040 Arc::new(crate::runtime_impl::TokioRuntime),
5041 pm,
5042 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5043 Arc::new(MockHttpClient),
5044 None,
5045 )
5046 .await;
5047
5048 let ping_node = NodeBuilder::new("iq")
5050 .attr("type", "get")
5051 .attr("from", SERVER_JID)
5052 .attr("xmlns", "urn:xmpp:ping")
5053 .build();
5054
5055 let handled = client.handle_iq(&ping_node).await;
5056 assert!(
5057 handled,
5058 "handle_iq must recognize ping without id attribute"
5059 );
5060 }
5061
5062 #[test]
5065 fn test_fibonacci_backoff_sequence() {
5066 let expected_base_ms = [1000, 1000, 2000, 3000, 5000, 8000, 13000, 21000];
5069 for (attempt, &base) in expected_base_ms.iter().enumerate() {
5070 let delay = fibonacci_backoff(attempt as u32);
5071 let ms = delay.as_millis() as u64;
5072 let low = base - base / 10;
5073 let high = base + base / 10;
5074 assert!(
5075 ms >= low && ms <= high,
5076 "attempt {attempt}: expected {low}..={high}ms, got {ms}ms"
5077 );
5078 }
5079 }
5080
5081 #[test]
5082 fn test_fibonacci_backoff_max_900s() {
5083 let delay = fibonacci_backoff(100);
5085 let ms = delay.as_millis() as u64;
5086 assert!(
5087 ms <= 990_000,
5088 "should never exceed 900s + 10% jitter, got {ms}ms"
5089 );
5090 assert!(
5091 ms >= 810_000,
5092 "should be at least 900s - 10% jitter, got {ms}ms"
5093 );
5094 }
5095
5096 #[test]
5097 fn test_fibonacci_backoff_first_attempt_is_1s() {
5098 let delay = fibonacci_backoff(0);
5099 let ms = delay.as_millis() as u64;
5100 assert!(
5101 (900..=1100).contains(&ms),
5102 "first attempt should be ~1s (±10%), got {ms}ms"
5103 );
5104 }
5105
5106 #[tokio::test]
5109 async fn test_stream_error_401_disables_reconnect() {
5110 let client = create_offline_sync_test_client().await;
5111 let node = NodeBuilder::new("stream:error").attr("code", "401").build();
5112 client.handle_stream_error(&node).await;
5113 assert!(
5114 !client.enable_auto_reconnect.load(Ordering::Relaxed),
5115 "401 should disable auto-reconnect"
5116 );
5117 }
5118
5119 #[tokio::test]
5120 async fn test_stream_error_409_disables_reconnect() {
5121 let client = create_offline_sync_test_client().await;
5122 let node = NodeBuilder::new("stream:error").attr("code", "409").build();
5123 client.handle_stream_error(&node).await;
5124 assert!(
5125 !client.enable_auto_reconnect.load(Ordering::Relaxed),
5126 "409 should disable auto-reconnect"
5127 );
5128 }
5129
5130 #[tokio::test]
5131 async fn test_stream_error_429_keeps_reconnect_with_backoff() {
5132 let client = create_offline_sync_test_client().await;
5133 let before = client.auto_reconnect_errors.load(Ordering::Relaxed);
5134 let node = NodeBuilder::new("stream:error").attr("code", "429").build();
5135 client.handle_stream_error(&node).await;
5136 assert!(
5137 client.enable_auto_reconnect.load(Ordering::Relaxed),
5138 "429 should keep auto-reconnect enabled"
5139 );
5140 let after = client.auto_reconnect_errors.load(Ordering::Relaxed);
5141 assert_eq!(
5142 after,
5143 before + 5,
5144 "429 should increase backoff by exactly 5: before={before}, after={after}"
5145 );
5146 }
5147
5148 #[tokio::test]
5149 async fn test_stream_error_503_keeps_reconnect() {
5150 let client = create_offline_sync_test_client().await;
5151 let node = NodeBuilder::new("stream:error").attr("code", "503").build();
5152 client.handle_stream_error(&node).await;
5153 assert!(
5154 client.enable_auto_reconnect.load(Ordering::Relaxed),
5155 "503 should keep auto-reconnect enabled"
5156 );
5157 }
5158
5159 #[tokio::test]
5160 async fn test_custom_cache_config_is_respected() {
5161 use crate::cache_config::{CacheConfig, CacheEntryConfig};
5162 use std::time::Duration;
5163
5164 let backend = crate::test_utils::create_test_backend().await;
5165 let pm = Arc::new(
5166 PersistenceManager::new(backend)
5167 .await
5168 .expect("persistence manager should initialize"),
5169 );
5170
5171 let custom_config = CacheConfig {
5172 group_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5173 device_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5174 ..CacheConfig::default()
5175 };
5176
5177 let (client, _rx) = Client::new_with_cache_config(
5180 Arc::new(crate::runtime_impl::TokioRuntime),
5181 pm,
5182 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5183 Arc::new(MockHttpClient),
5184 None,
5185 custom_config,
5186 )
5187 .await;
5188
5189 assert!(!client.is_logged_in());
5190 }
5191
5192 #[tokio::test]
5201 async fn test_is_connected_not_affected_by_mutex_contention() {
5202 use crate::socket::NoiseSocket;
5203 use wacore::handshake::NoiseCipher;
5204
5205 let backend = crate::test_utils::create_test_backend().await;
5206 let pm = Arc::new(
5207 PersistenceManager::new(backend)
5208 .await
5209 .expect("persistence manager should initialize"),
5210 );
5211 let (client, _rx) = Client::new(
5212 Arc::new(crate::runtime_impl::TokioRuntime),
5213 pm,
5214 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5215 Arc::new(MockHttpClient),
5216 None,
5217 )
5218 .await;
5219
5220 assert!(!client.is_connected(), "should start disconnected");
5222
5223 let transport: Arc<dyn crate::transport::Transport> =
5225 Arc::new(crate::transport::mock::MockTransport);
5226 let key = [0u8; 32];
5227 let write_key = NoiseCipher::new(&key).expect("valid key");
5228 let read_key = NoiseCipher::new(&key).expect("valid key");
5229 let noise_socket = NoiseSocket::new(
5230 Arc::new(crate::runtime_impl::TokioRuntime),
5231 transport,
5232 write_key,
5233 read_key,
5234 );
5235 *client.noise_socket.lock().await = Some(Arc::new(noise_socket));
5236 client.is_connected.store(true, Ordering::Release);
5237
5238 assert!(client.is_connected(), "should report connected");
5239
5240 let _guard = client.noise_socket.lock().await;
5243 assert!(
5244 client.is_connected(),
5245 "is_connected() must return true even while noise_socket mutex is held"
5246 );
5247 }
5248
5249 #[tokio::test]
5253 async fn test_send_ack_for_returns_error_when_disconnected() {
5254 let backend = crate::test_utils::create_test_backend().await;
5255 let pm = Arc::new(
5256 PersistenceManager::new(backend)
5257 .await
5258 .expect("persistence manager should initialize"),
5259 );
5260 let (client, _rx) = Client::new(
5261 Arc::new(crate::runtime_impl::TokioRuntime),
5262 pm,
5263 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5264 Arc::new(MockHttpClient),
5265 None,
5266 )
5267 .await;
5268
5269 let receipt = NodeBuilder::new("receipt")
5271 .attr("from", "120363040237990503@g.us")
5272 .attr("id", "TEST-RECEIPT-ID")
5273 .attr("participant", "236395184570386@lid")
5274 .build();
5275
5276 let result = client.send_ack_for(&receipt).await;
5277 assert!(
5278 matches!(result, Err(ClientError::NotConnected)),
5279 "send_ack_for must return Err(NotConnected) when disconnected, got: {result:?}"
5280 );
5281 }
5282
5283 #[tokio::test]
5286 async fn test_send_ack_for_returns_ok_on_expected_disconnect() {
5287 let backend = crate::test_utils::create_test_backend().await;
5288 let pm = Arc::new(
5289 PersistenceManager::new(backend)
5290 .await
5291 .expect("persistence manager should initialize"),
5292 );
5293 let (client, _rx) = Client::new(
5294 Arc::new(crate::runtime_impl::TokioRuntime),
5295 pm,
5296 Arc::new(crate::transport::mock::MockTransportFactory::new()),
5297 Arc::new(MockHttpClient),
5298 None,
5299 )
5300 .await;
5301
5302 client.expected_disconnect.store(true, Ordering::Relaxed);
5304
5305 let receipt = NodeBuilder::new("receipt")
5306 .attr("from", "120363040237990503@g.us")
5307 .attr("id", "TEST-RECEIPT-ID")
5308 .build();
5309
5310 let result = client.send_ack_for(&receipt).await;
5311 assert!(
5312 result.is_ok(),
5313 "send_ack_for should return Ok during expected disconnect"
5314 );
5315 }
5316}