Skip to main content

whatsapp_rust/
client.rs

1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::cache::Cache;
8use crate::cache_store::TypedCache;
9use crate::handshake;
10use crate::lid_pn_cache::LidPnCache;
11use crate::pair;
12use anyhow::{Result, anyhow};
13use futures::FutureExt;
14use std::borrow::Cow;
15use std::collections::{HashMap, HashSet};
16
17use wacore::xml::DisplayableNode;
18use wacore_binary::builder::NodeBuilder;
19use wacore_binary::jid::JidExt;
20use wacore_binary::node::{Attrs, Node, NodeValue};
21
22use crate::appstate_sync::AppStateProcessor;
23use crate::handlers::chatstate::ChatStateEvent;
24use crate::jid_utils::server_jid;
25use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
26use crate::types::enc_handler::EncHandler;
27use crate::types::events::{ConnectFailureReason, Event};
28
29use log::{debug, error, info, trace, warn};
30
31use rand::{Rng, RngExt};
32use scopeguard;
33use wacore_binary::jid::Jid;
34
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
37
38/// Filter for matching incoming stanzas (nodes) by tag and attributes.
39///
40/// Used with [`Client::wait_for_node`] to wait for specific stanzas.
41/// Zero-cost when no waiters are active (single atomic load per node).
42///
43/// # Example
44/// ```ignore
45/// // Wait for a w:gp2 notification from a specific group
46/// let waiter = client.wait_for_node(
47///     NodeFilter::tag("notification")
48///         .attr("type", "w:gp2")
49///         .attr("from", "group@g.us"),
50/// );
51/// // ... trigger the action ...
52/// let node = waiter.await?;
53/// ```
54#[derive(Debug, Clone)]
55pub struct NodeFilter {
56    tag: String,
57    attrs: Vec<(String, String)>,
58}
59
60impl NodeFilter {
61    /// Create a filter matching nodes with the given tag.
62    pub fn tag(tag: impl Into<String>) -> Self {
63        Self {
64            tag: tag.into(),
65            attrs: Vec::new(),
66        }
67    }
68
69    /// Add an attribute constraint. All attributes must match.
70    pub fn attr(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
71        self.attrs.push((key.into(), value.into()));
72        self
73    }
74
75    /// Shorthand for `.attr("from", jid.to_string())`.
76    pub fn from_jid(self, jid: &Jid) -> Self {
77        self.attr("from", jid.to_string())
78    }
79
80    fn matches(&self, node: &Node) -> bool {
81        node.tag == self.tag
82            && self
83                .attrs
84                .iter()
85                .all(|(k, v)| node.attrs.get(k.as_str()).is_some_and(|attr| *attr == *v))
86    }
87}
88
89struct NodeWaiter {
90    filter: NodeFilter,
91    tx: futures::channel::oneshot::Sender<Arc<Node>>,
92}
93
94use async_lock::Mutex;
95use async_lock::RwLock;
96use std::time::Duration;
97use thiserror::Error;
98
99use wacore::appstate::patch_decode::WAPatchName;
100use wacore::client::context::GroupInfo;
101use wacore::runtime::timeout as rt_timeout;
102use waproto::whatsapp as wa;
103
104use crate::cache_config::CacheConfig;
105use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
106use crate::sync_task::MajorSyncTask;
107use wacore::runtime::Runtime;
108
109/// Type alias for chatstate event handler functions.
110type ChatStateHandler = Arc<dyn Fn(ChatStateEvent) + Send + Sync>;
111
112const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
113
114/// WA Web: MQTT `MqttProtocolClient.connect()` uses `CONNECT_TIMEOUT = 20s`,
115/// DGW `connectTimeoutMs` defaults to `20000ms`.
116const TRANSPORT_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
117
118/// Snapshot of internal collection sizes for memory leak detection.
119///
120/// All counts are approximate (moka caches may have pending evictions).
121/// Call [`Client::memory_diagnostics`] to obtain a snapshot.
122///
123/// Requires the `debug-diagnostics` feature.
124#[cfg(feature = "debug-diagnostics")]
125#[derive(Debug, Clone)]
126pub struct MemoryDiagnostics {
127    // -- Moka caches (TTL/capacity-bounded) --
128    pub group_cache: u64,
129    pub device_cache: u64,
130    pub device_registry_cache: u64,
131    pub lid_pn_lid_entries: u64,
132    pub lid_pn_pn_entries: u64,
133    pub retried_group_messages: u64,
134    pub recent_messages: u64,
135    pub message_retry_counts: u64,
136    pub pdo_pending_requests: u64,
137    // -- Moka caches (capacity-only, no TTL) --
138    pub session_locks: u64,
139    pub message_queues: u64,
140    pub message_enqueue_locks: u64,
141    // -- Unbounded collections --
142    pub response_waiters: usize,
143    pub node_waiters: usize,
144    pub pending_retries: usize,
145    pub presence_subscriptions: usize,
146    pub app_state_key_requests: usize,
147    pub app_state_syncing: usize,
148    pub signal_cache_sessions: usize,
149    pub signal_cache_identities: usize,
150    pub signal_cache_sender_keys: usize,
151    // -- Misc --
152    pub chatstate_handlers: usize,
153    pub custom_enc_handlers: usize,
154}
155
156#[cfg(feature = "debug-diagnostics")]
157impl std::fmt::Display for MemoryDiagnostics {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        writeln!(f, "=== Memory Diagnostics ===")?;
160        writeln!(f, "--- Moka caches (TTL-bounded) ---")?;
161        writeln!(f, "  group_cache:            {}", self.group_cache)?;
162        writeln!(f, "  device_cache:           {}", self.device_cache)?;
163        writeln!(
164            f,
165            "  device_registry_cache:  {}",
166            self.device_registry_cache
167        )?;
168        writeln!(f, "  lid_pn (lid):           {}", self.lid_pn_lid_entries)?;
169        writeln!(f, "  lid_pn (pn):            {}", self.lid_pn_pn_entries)?;
170        writeln!(
171            f,
172            "  retried_group_messages: {}",
173            self.retried_group_messages
174        )?;
175        writeln!(f, "  recent_messages:        {}", self.recent_messages)?;
176        writeln!(f, "  message_retry_counts:   {}", self.message_retry_counts)?;
177        writeln!(f, "  pdo_pending_requests:   {}", self.pdo_pending_requests)?;
178        writeln!(f, "--- Moka caches (capacity-only) ---")?;
179        writeln!(f, "  session_locks:          {}", self.session_locks)?;
180        writeln!(f, "  message_queues:         {}", self.message_queues)?;
181        writeln!(
182            f,
183            "  message_enqueue_locks:  {}",
184            self.message_enqueue_locks
185        )?;
186        writeln!(f, "--- Unbounded collections ---")?;
187        writeln!(f, "  response_waiters:       {}", self.response_waiters)?;
188        writeln!(f, "  node_waiters:           {}", self.node_waiters)?;
189        writeln!(f, "  pending_retries:        {}", self.pending_retries)?;
190        writeln!(
191            f,
192            "  presence_subscriptions: {}",
193            self.presence_subscriptions
194        )?;
195        writeln!(
196            f,
197            "  app_state_key_requests: {}",
198            self.app_state_key_requests
199        )?;
200        writeln!(f, "  app_state_syncing:      {}", self.app_state_syncing)?;
201        writeln!(
202            f,
203            "  signal_sessions:        {}",
204            self.signal_cache_sessions
205        )?;
206        writeln!(
207            f,
208            "  signal_identities:      {}",
209            self.signal_cache_identities
210        )?;
211        writeln!(
212            f,
213            "  signal_sender_keys:     {}",
214            self.signal_cache_sender_keys
215        )?;
216        writeln!(f, "--- Misc ---")?;
217        writeln!(f, "  chatstate_handlers:     {}", self.chatstate_handlers)?;
218        writeln!(f, "  custom_enc_handlers:    {}", self.custom_enc_handlers)?;
219        Ok(())
220    }
221}
222
223#[derive(Debug, Error)]
224pub enum ClientError {
225    #[error("client is not connected")]
226    NotConnected,
227    #[error("socket error: {0}")]
228    Socket(#[from] SocketError),
229    #[error("encrypt/send error: {0}")]
230    EncryptSend(#[from] EncryptSendError),
231    #[error("client is already connected")]
232    AlreadyConnected,
233    #[error("client is not logged in")]
234    NotLoggedIn,
235}
236
237use wacore::types::message::StanzaKey;
238
239/// Metrics for tracking offline sync progress
240#[derive(Debug)]
241pub(crate) struct OfflineSyncMetrics {
242    pub active: AtomicBool,
243    pub total_messages: AtomicUsize,
244    pub processed_messages: AtomicUsize,
245    // Using simple std Mutex for timestamp as it's rarely contended and non-async
246    pub start_time: std::sync::Mutex<Option<wacore::time::Instant>>,
247}
248
249pub struct Client {
250    pub(crate) runtime: Arc<dyn Runtime>,
251    pub(crate) core: wacore::client::CoreClient,
252
253    pub(crate) persistence_manager: Arc<PersistenceManager>,
254    pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
255
256    pub(crate) is_logged_in: Arc<AtomicBool>,
257    pub(crate) is_connecting: Arc<AtomicBool>,
258    pub(crate) is_running: Arc<AtomicBool>,
259    /// Whether the noise socket is established (connected to WhatsApp servers).
260    /// Uses an AtomicBool instead of probing the noise_socket mutex to avoid
261    /// TOCTOU races where `try_lock()` fails due to contention, not disconnection.
262    is_connected: Arc<AtomicBool>,
263    pub(crate) shutdown_notifier: Arc<event_listener::Event>,
264    /// Timestamp (ms since UNIX epoch) of the last received WebSocket data.
265    /// Updated on every `DataReceived` transport event.
266    /// WA Web: `parseAndHandleStanza` → `deadSocketTimer.cancel()`.
267    pub(crate) last_data_received_ms: Arc<AtomicU64>,
268    /// Timestamp (ms since UNIX epoch) of the last sent WebSocket data.
269    /// Updated on every `send_node` call.
270    /// WA Web: `callStanza` → `deadSocketTimer.onOrBefore(deadSocketTime)`.
271    pub(crate) last_data_sent_ms: Arc<AtomicU64>,
272
273    pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
274    pub(crate) transport_events:
275        Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
276    pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
277    pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
278
279    pub(crate) response_waiters:
280        Arc<Mutex<HashMap<String, futures::channel::oneshot::Sender<wacore_binary::Node>>>>,
281
282    /// Generic node waiters for waiting on specific stanzas by tag/attributes.
283    /// Uses std::sync::Mutex (not tokio) since the critical section is trivial.
284    /// Guarded by `node_waiter_count` for zero-cost when no waiters are active.
285    node_waiters: std::sync::Mutex<Vec<NodeWaiter>>,
286    node_waiter_count: AtomicUsize,
287
288    pub(crate) unique_id: String,
289    pub(crate) id_counter: Arc<AtomicU64>,
290
291    pub(crate) unified_session: crate::unified_session::UnifiedSessionManager,
292
293    /// In-memory cache for Signal protocol state (sessions, identities, sender keys).
294    /// Matches WhatsApp Web's SignalStoreCache pattern: crypto ops read/write this cache,
295    /// and DB writes are deferred to flush() after each message is processed.
296    pub(crate) signal_cache: Arc<crate::store::signal_cache::SignalStoreCache>,
297
298    /// Limits message processing concurrency (1 permit during offline sync, N after).
299    /// Wrapped in Mutex to allow replacing on reconnect.
300    pub(crate) message_processing_semaphore: std::sync::Mutex<Arc<async_lock::Semaphore>>,
301    /// Bumped on every semaphore swap so stale Arc clones are rejected.
302    pub(crate) message_semaphore_generation: Arc<AtomicU64>,
303
304    /// Per-device session locks for Signal protocol operations.
305    /// Prevents race conditions when multiple messages from the same sender
306    /// are processed concurrently across different chats.
307    /// Keys are Signal protocol address strings (e.g., "user@s.whatsapp.net:0")
308    /// to match the SignalProtocolStoreAdapter's internal locking.
309    pub(crate) session_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
310
311    /// Per-chat message queues for sequential message processing.
312    /// Prevents race conditions where a later message is processed before
313    /// the PreKey message that establishes the Signal session.
314    pub(crate) message_queues: Cache<String, async_channel::Sender<Arc<Node>>>,
315
316    /// Cache for LID to Phone Number mappings (bidirectional).
317    /// When we receive a message with sender_lid/sender_pn attributes, we store the mapping here.
318    /// This allows us to reuse existing LID-based sessions when sending replies.
319    /// The cache is backed by persistent storage and warmed up on client initialization.
320    pub(crate) lid_pn_cache: Arc<LidPnCache>,
321
322    /// Per-chat mutex for serializing message enqueue operations.
323    /// This ensures messages are enqueued in the order they arrive,
324    /// preventing race conditions during queue initialization.
325    pub(crate) message_enqueue_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
326
327    pub group_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, GroupInfo>>>>,
328    #[allow(clippy::type_complexity)]
329    pub device_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, Vec<Jid>>>>>,
330
331    pub(crate) retried_group_messages: Cache<String, ()>,
332    pub(crate) expected_disconnect: Arc<AtomicBool>,
333
334    /// Connection generation counter - incremented on each new connection.
335    /// Used to detect stale post-login tasks from previous connections.
336    pub(crate) connection_generation: Arc<AtomicU64>,
337
338    /// Cache for recent messages (serialized bytes) for retry functionality.
339    /// Uses moka cache with TTL and max capacity for automatic eviction.
340    pub(crate) recent_messages: Cache<StanzaKey, Vec<u8>>,
341
342    pub(crate) pending_retries: Arc<async_lock::Mutex<HashSet<String>>>,
343
344    /// Track retry attempts per message to prevent infinite retry loops.
345    /// Key: "{chat}:{msg_id}:{sender}", Value: retry count
346    /// Matches WhatsApp Web's MAX_RETRY = 5 behavior.
347    pub(crate) message_retry_counts: Cache<String, u8>,
348
349    pub enable_auto_reconnect: Arc<AtomicBool>,
350    pub auto_reconnect_errors: Arc<AtomicU32>,
351
352    pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
353
354    pub(crate) app_state_processor: async_lock::Mutex<Option<Arc<AppStateProcessor>>>,
355    pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, wacore::time::Instant>>>,
356    /// Tracks collections currently being synced to prevent duplicate sync tasks.
357    /// Matches WA Web's in-flight tracking set in WAWebSyncdCollectionsStateMachine.
358    pub(crate) app_state_syncing: Arc<Mutex<HashSet<WAPatchName>>>,
359    pub(crate) initial_keys_synced_notifier: Arc<event_listener::Event>,
360    pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
361
362    /// Tracks whether the server has our prekeys (matches WA Web's `setServerHasPreKeys`).
363    /// Set to `false` when encrypt/count notification arrives, `true` after successful upload.
364    pub(crate) server_has_prekeys: Arc<AtomicBool>,
365    /// Prevents concurrent prekey upload operations (matches WA Web's dedup set in `handlePreKeyLow`).
366    pub(crate) prekey_upload_lock: Arc<async_lock::Mutex<()>>,
367    /// Notifier for when offline sync (ib offline stanza) is received.
368    /// WhatsApp Web waits for this before sending passive tasks (prekey upload, active IQ, presence).
369    pub(crate) offline_sync_notifier: Arc<event_listener::Event>,
370    /// Flag indicating offline sync has completed (received ib offline stanza).
371    pub(crate) offline_sync_completed: Arc<AtomicBool>,
372    /// Number of history sync tasks currently queued or running.
373    pub(crate) history_sync_tasks_in_flight: Arc<AtomicUsize>,
374    /// Notifier triggered when history sync work becomes idle.
375    pub(crate) history_sync_idle_notifier: Arc<event_listener::Event>,
376    /// Contacts with active presence subscriptions that must be re-subscribed on reconnect.
377    pub(crate) presence_subscriptions: Arc<async_lock::Mutex<HashSet<Jid>>>,
378    /// Metrics for granular offline sync logging
379    pub(crate) offline_sync_metrics: Arc<OfflineSyncMetrics>,
380    /// Notifier for when the noise socket is established (before login).
381    /// Use this to wait for the socket to be ready for sending messages.
382    pub(crate) socket_ready_notifier: Arc<event_listener::Event>,
383    /// Set to `true` only when `dispatch_connected()` fires (after critical sync
384    /// completes). Reset on each new connection attempt. Used by
385    /// `wait_for_connected()` to avoid a false-positive fast path when the
386    /// client is logged in but critical app state hasn't synced yet.
387    pub(crate) is_ready: Arc<AtomicBool>,
388    /// Notifier for when the client is fully connected and logged in.
389    /// Triggered after Event::Connected is dispatched.
390    pub(crate) connected_notifier: Arc<event_listener::Event>,
391    pub(crate) major_sync_task_sender: async_channel::Sender<MajorSyncTask>,
392    pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<async_channel::Sender<()>>>>,
393
394    /// State machine for pair code authentication flow.
395    /// Tracks the pending pair code request and ephemeral keys.
396    pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
397
398    /// Custom handlers for encrypted message types
399    pub custom_enc_handlers: Arc<async_lock::RwLock<HashMap<String, Arc<dyn EncHandler>>>>,
400
401    /// Chat state (typing indicator) handlers registered by external consumers.
402    /// Each handler receives a `ChatStateEvent` describing the chat, optional participant and state.
403    pub(crate) chatstate_handlers: Arc<RwLock<Vec<ChatStateHandler>>>,
404
405    /// Cache for pending PDO (Peer Data Operation) requests.
406    /// Maps message cache keys (chat:id) to pending request info.
407    pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
408
409    /// LRU cache for device registry (matches WhatsApp Web's 5000 entry limit).
410    /// Maps user ID to DeviceListRecord for fast device existence checks.
411    /// Backed by persistent storage.
412    pub(crate) device_registry_cache: TypedCache<String, wacore::store::traits::DeviceListRecord>,
413
414    /// Router for dispatching stanzas to their appropriate handlers
415    pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
416
417    /// Whether to send ACKs synchronously or in a background task
418    pub(crate) synchronous_ack: bool,
419
420    /// HTTP client for making HTTP requests (media upload/download, version fetching)
421    pub http_client: Arc<dyn crate::http::HttpClient>,
422
423    /// Version override for testing or manual specification
424    pub(crate) override_version: Option<(u32, u32, u32)>,
425
426    /// When true, history sync notifications are acknowledged but not downloaded
427    /// or processed. Set via `BotBuilder::skip_history_sync()`.
428    pub(crate) skip_history_sync: AtomicBool,
429
430    /// Cache configuration for TTL and capacity of all caches.
431    /// Stored for use by lazily-initialized caches (group_cache, device_cache).
432    pub(crate) cache_config: CacheConfig,
433}
434
435impl Client {
436    /// Replace the message processing semaphore and bump the generation counter.
437    ///
438    /// Both operations happen under the same mutex hold so readers always see
439    /// a consistent (generation, Arc) pair. Must be called from a non-async
440    /// context or inside a scoped block (MutexGuard is !Send).
441    pub(crate) fn swap_message_semaphore(&self, permits: usize) {
442        let mut guard = match self.message_processing_semaphore.lock() {
443            Ok(g) => g,
444            Err(poisoned) => poisoned.into_inner(),
445        };
446        *guard = Arc::new(async_lock::Semaphore::new(permits));
447        self.message_semaphore_generation
448            .fetch_add(1, Ordering::SeqCst);
449    }
450
451    fn should_downgrade_sync_error(&self, err: &anyhow::Error) -> bool {
452        if self.is_shutting_down() {
453            return true;
454        }
455
456        matches!(
457            err.downcast_ref::<crate::request::IqError>(),
458            Some(
459                crate::request::IqError::NotConnected
460                    | crate::request::IqError::InternalChannelClosed
461            )
462        )
463    }
464
465    /// Log a sync error, downgrading to debug level during shutdown/disconnect.
466    fn log_sync_error(&self, context: &str, err: &anyhow::Error) {
467        if self.should_downgrade_sync_error(err) {
468            debug!("Skipping {context} during shutdown: {err}");
469        } else {
470            warn!("Failed {context}: {err}");
471        }
472    }
473
474    /// Returns `true` when the client has completed its full startup:
475    /// transport connected, server authenticated, and critical app state synced.
476    /// This is the condition `wait_for_connected` uses to resolve.
477    fn is_fully_ready(&self) -> bool {
478        self.is_connected() && self.is_logged_in() && self.is_ready.load(Ordering::Relaxed)
479    }
480
481    /// Dispatch the Connected event and notify waiters.
482    fn dispatch_connected(&self) {
483        self.is_ready.store(true, Ordering::Relaxed);
484        self.core
485            .event_bus
486            .dispatch(&Event::Connected(crate::types::events::Connected));
487        self.connected_notifier.notify(usize::MAX);
488    }
489
490    /// Enable or disable skipping of history sync notifications at runtime.
491    ///
492    /// When enabled, the client will acknowledge incoming history sync
493    /// notifications but will not download or process the data.
494    pub fn set_skip_history_sync(&self, enabled: bool) {
495        self.skip_history_sync.store(enabled, Ordering::Relaxed);
496    }
497
498    /// Public entry point for processing [`MajorSyncTask`] from the sync channel.
499    pub async fn process_sync_task(self: &Arc<Self>, task: crate::sync_task::MajorSyncTask) {
500        match task {
501            crate::sync_task::MajorSyncTask::HistorySync {
502                message_id,
503                notification,
504            } => {
505                self.process_history_sync_task(message_id, *notification)
506                    .await;
507                self.finish_history_sync_task();
508            }
509            crate::sync_task::MajorSyncTask::AppStateSync { name, full_sync } => {
510                if let Err(e) = self.process_app_state_sync_task(name, full_sync).await {
511                    log::warn!("App state sync task for {name:?} failed: {e}");
512                }
513            }
514        }
515    }
516
517    /// Returns `true` if history sync notifications are currently being skipped.
518    pub fn skip_history_sync_enabled(&self) -> bool {
519        self.skip_history_sync.load(Ordering::Relaxed)
520    }
521
522    pub(crate) fn is_shutting_down(&self) -> bool {
523        self.expected_disconnect.load(Ordering::Relaxed) || !self.is_running.load(Ordering::Relaxed)
524    }
525
526    /// Create a new `Client` with default cache configuration.
527    ///
528    /// This is the standard constructor. Use [`Client::new_with_cache_config`]
529    /// if you need to customise cache TTL / capacity.
530    pub async fn new(
531        runtime: Arc<dyn Runtime>,
532        persistence_manager: Arc<PersistenceManager>,
533        transport_factory: Arc<dyn crate::transport::TransportFactory>,
534        http_client: Arc<dyn crate::http::HttpClient>,
535        override_version: Option<(u32, u32, u32)>,
536    ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
537        Self::new_with_cache_config(
538            runtime,
539            persistence_manager,
540            transport_factory,
541            http_client,
542            override_version,
543            CacheConfig::default(),
544        )
545        .await
546    }
547
548    /// Create a new `Client` with a custom [`CacheConfig`].
549    pub async fn new_with_cache_config(
550        runtime: Arc<dyn Runtime>,
551        persistence_manager: Arc<PersistenceManager>,
552        transport_factory: Arc<dyn crate::transport::TransportFactory>,
553        http_client: Arc<dyn crate::http::HttpClient>,
554        override_version: Option<(u32, u32, u32)>,
555        cache_config: CacheConfig,
556    ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
557        let mut unique_id_bytes = [0u8; 2];
558        rand::make_rng::<rand::rngs::StdRng>().fill_bytes(&mut unique_id_bytes);
559
560        let device_snapshot = persistence_manager.get_device_snapshot().await;
561        let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
562
563        let (tx, rx) = async_channel::bounded(32);
564
565        let this = Self {
566            runtime: runtime.clone(),
567            core,
568            persistence_manager: persistence_manager.clone(),
569            media_conn: Arc::new(RwLock::new(None)),
570            is_logged_in: Arc::new(AtomicBool::new(false)),
571            is_connecting: Arc::new(AtomicBool::new(false)),
572            is_running: Arc::new(AtomicBool::new(false)),
573            is_connected: Arc::new(AtomicBool::new(false)),
574            shutdown_notifier: Arc::new(event_listener::Event::new()),
575            last_data_received_ms: Arc::new(AtomicU64::new(0)),
576            last_data_sent_ms: Arc::new(AtomicU64::new(0)),
577
578            transport: Arc::new(Mutex::new(None)),
579            transport_events: Arc::new(Mutex::new(None)),
580            transport_factory,
581            noise_socket: Arc::new(Mutex::new(None)),
582
583            response_waiters: Arc::new(Mutex::new(HashMap::new())),
584            node_waiters: std::sync::Mutex::new(Vec::new()),
585            node_waiter_count: AtomicUsize::new(0),
586            unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
587            id_counter: Arc::new(AtomicU64::new(0)),
588            unified_session: crate::unified_session::UnifiedSessionManager::new(),
589
590            signal_cache: Arc::new(crate::store::signal_cache::SignalStoreCache::new()),
591            message_processing_semaphore: std::sync::Mutex::new(Arc::new(
592                async_lock::Semaphore::new(1),
593            )),
594            message_semaphore_generation: Arc::new(AtomicU64::new(0)),
595            // Coordination caches: capacity-only eviction, no TTL/TTI.
596            // These hold live mutexes and channel senders; time-based eviction
597            // while tasks hold references would silently break serialisation.
598            session_locks: Cache::builder()
599                .max_capacity(cache_config.session_locks_capacity.max(1))
600                .build(),
601            message_queues: Cache::builder()
602                .max_capacity(cache_config.message_queues_capacity.max(1))
603                .build(),
604            lid_pn_cache: Arc::new(LidPnCache::with_config(
605                &cache_config.lid_pn_cache,
606                cache_config.cache_stores.lid_pn_cache.clone(),
607            )),
608            message_enqueue_locks: Cache::builder()
609                .max_capacity(cache_config.message_enqueue_locks_capacity.max(1))
610                .build(),
611            group_cache: async_lock::Mutex::new(None),
612            device_cache: async_lock::Mutex::new(None),
613            retried_group_messages: cache_config.retried_group_messages.build_with_ttl(),
614
615            expected_disconnect: Arc::new(AtomicBool::new(false)),
616            connection_generation: Arc::new(AtomicU64::new(0)),
617
618            recent_messages: cache_config.recent_messages.build_with_ttl(),
619
620            pending_retries: Arc::new(async_lock::Mutex::new(HashSet::new())),
621
622            message_retry_counts: cache_config.message_retry_counts.build_with_ttl(),
623
624            offline_sync_metrics: Arc::new(OfflineSyncMetrics {
625                active: AtomicBool::new(false),
626                total_messages: AtomicUsize::new(0),
627                processed_messages: AtomicUsize::new(0),
628                start_time: std::sync::Mutex::new(None),
629            }),
630
631            enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
632            auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
633
634            needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
635
636            app_state_processor: async_lock::Mutex::new(None),
637            app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
638            app_state_syncing: Arc::new(Mutex::new(HashSet::new())),
639            initial_keys_synced_notifier: Arc::new(event_listener::Event::new()),
640            initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
641            server_has_prekeys: Arc::new(AtomicBool::new(true)),
642            prekey_upload_lock: Arc::new(async_lock::Mutex::new(())),
643            offline_sync_notifier: Arc::new(event_listener::Event::new()),
644            offline_sync_completed: Arc::new(AtomicBool::new(false)),
645            history_sync_tasks_in_flight: Arc::new(AtomicUsize::new(0)),
646            history_sync_idle_notifier: Arc::new(event_listener::Event::new()),
647            presence_subscriptions: Arc::new(async_lock::Mutex::new(HashSet::new())),
648            socket_ready_notifier: Arc::new(event_listener::Event::new()),
649            is_ready: Arc::new(AtomicBool::new(false)),
650            connected_notifier: Arc::new(event_listener::Event::new()),
651            major_sync_task_sender: tx,
652            pairing_cancellation_tx: Arc::new(Mutex::new(None)),
653            pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
654            custom_enc_handlers: Arc::new(async_lock::RwLock::new(HashMap::new())),
655            chatstate_handlers: Arc::new(RwLock::new(Vec::new())),
656            pdo_pending_requests: cache_config.pdo_pending_requests.build_with_ttl(),
657            device_registry_cache: cache_config.device_registry_cache.build_typed_ttl(
658                cache_config.cache_stores.device_registry_cache.clone(),
659                "device_registry",
660            ),
661            stanza_router: Self::create_stanza_router(),
662            synchronous_ack: false,
663            http_client,
664            override_version,
665            skip_history_sync: AtomicBool::new(false),
666            cache_config,
667        };
668
669        let arc = Arc::new(this);
670
671        // Warm up the LID-PN cache from persistent storage
672        let warm_up_arc = arc.clone();
673        arc.runtime
674            .spawn(Box::pin(async move {
675                if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
676                    warn!("Failed to warm up LID-PN cache: {e}");
677                }
678            }))
679            .detach();
680
681        // Start background task to clean up stale device registry entries
682        let cleanup_arc = arc.clone();
683        arc.runtime
684            .spawn(Box::pin(async move {
685                cleanup_arc.device_registry_cleanup_loop().await;
686            }))
687            .detach();
688
689        (arc, rx)
690    }
691
692    pub(crate) async fn get_group_cache(&self) -> Arc<TypedCache<Jid, GroupInfo>> {
693        let mut guard = self.group_cache.lock().await;
694        if let Some(cache) = guard.as_ref() {
695            return cache.clone();
696        }
697        debug!("Initializing Group Cache for the first time.");
698        let cache = Arc::new(
699            self.cache_config
700                .group_cache
701                .build_typed_ttl(self.cache_config.cache_stores.group_cache.clone(), "group"),
702        );
703        *guard = Some(cache.clone());
704        cache
705    }
706
707    pub(crate) async fn get_device_cache(&self) -> Arc<TypedCache<Jid, Vec<Jid>>> {
708        let mut guard = self.device_cache.lock().await;
709        if let Some(cache) = guard.as_ref() {
710            return cache.clone();
711        }
712        debug!("Initializing Device Cache for the first time.");
713        let cache = Arc::new(self.cache_config.device_cache.build_typed_ttl(
714            self.cache_config.cache_stores.device_cache.clone(),
715            "device",
716        ));
717        *guard = Some(cache.clone());
718        cache
719    }
720
721    pub(crate) async fn get_app_state_processor(&self) -> Arc<AppStateProcessor> {
722        let mut guard = self.app_state_processor.lock().await;
723        if let Some(proc) = guard.as_ref() {
724            return proc.clone();
725        }
726        debug!("Initializing AppStateProcessor for the first time.");
727        let proc = Arc::new(AppStateProcessor::new(
728            self.persistence_manager.backend(),
729            self.runtime.clone(),
730        ));
731        *guard = Some(proc.clone());
732        proc
733    }
734
735    /// Create and configure the stanza router with all the handlers.
736    fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
737        use crate::handlers::{
738            basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
739            chatstate::ChatstateHandler,
740            ib::IbHandler,
741            iq::IqHandler,
742            message::MessageHandler,
743            notification::NotificationHandler,
744            receipt::ReceiptHandler,
745            router::StanzaRouter,
746            unimplemented::UnimplementedHandler,
747        };
748
749        let mut router = StanzaRouter::new();
750
751        // Register all handlers
752        router.register(Arc::new(MessageHandler));
753        router.register(Arc::new(ReceiptHandler));
754        router.register(Arc::new(IqHandler));
755        router.register(Arc::new(SuccessHandler));
756        router.register(Arc::new(FailureHandler));
757        router.register(Arc::new(StreamErrorHandler));
758        router.register(Arc::new(IbHandler));
759        router.register(Arc::new(NotificationHandler));
760        router.register(Arc::new(AckHandler));
761        router.register(Arc::new(ChatstateHandler));
762
763        // Register unimplemented handlers
764        router.register(Arc::new(UnimplementedHandler::for_call()));
765        router.register(Arc::new(crate::handlers::presence::PresenceHandler));
766
767        router
768    }
769
770    /// Registers an external event handler to the core event bus.
771    pub fn register_handler(&self, handler: Arc<dyn wacore::types::events::EventHandler>) {
772        self.core.event_bus.add_handler(handler);
773    }
774
775    /// Register a chatstate handler which will be invoked when a `<chatstate>` stanza is received.
776    ///
777    /// The handler receives a `ChatStateEvent` with the parsed chat state information.
778    pub async fn register_chatstate_handler(
779        &self,
780        handler: Arc<dyn Fn(ChatStateEvent) + Send + Sync>,
781    ) {
782        self.chatstate_handlers.write().await.push(handler);
783    }
784
785    /// Dispatch a parsed chatstate stanza to registered handlers.
786    ///
787    /// Called by `ChatstateHandler` after parsing the incoming stanza.
788    pub(crate) async fn dispatch_chatstate_event(
789        &self,
790        stanza: wacore::iq::chatstate::ChatstateStanza,
791    ) {
792        use wacore::iq::chatstate::{ChatstateSource, ReceivedChatState};
793        use wacore::types::events::ChatPresenceUpdate;
794        use wacore::types::message::MessageSource;
795        use wacore::types::presence::{ChatPresence, ChatPresenceMedia};
796
797        // Dispatch via event bus
798        let (chat, sender, is_group) = match &stanza.source {
799            ChatstateSource::User { from } => (from.clone(), from.clone(), false),
800            ChatstateSource::Group { from, participant } => {
801                (from.clone(), participant.clone(), true)
802            }
803        };
804
805        let (state, media) = match stanza.state {
806            ReceivedChatState::Typing => (ChatPresence::Composing, ChatPresenceMedia::Text),
807            ReceivedChatState::RecordingAudio => {
808                (ChatPresence::Composing, ChatPresenceMedia::Audio)
809            }
810            ReceivedChatState::Idle => (ChatPresence::Paused, ChatPresenceMedia::Text),
811        };
812
813        self.core
814            .event_bus
815            .dispatch(&Event::ChatPresence(ChatPresenceUpdate {
816                source: MessageSource {
817                    chat,
818                    sender,
819                    is_from_me: false,
820                    is_group,
821                    addressing_mode: None,
822                    sender_alt: None,
823                    recipient_alt: None,
824                    broadcast_list_owner: None,
825                    recipient: None,
826                },
827                state,
828                media,
829            }));
830
831        // Invoke legacy callback handlers
832        let event = ChatStateEvent::from_stanza(stanza);
833        let handlers = self.chatstate_handlers.read().await.clone();
834        for handler in handlers {
835            let event_clone = event.clone();
836            let handler_clone = handler.clone();
837            self.runtime
838                .spawn(Box::pin(async move {
839                    (handler_clone)(event_clone);
840                }))
841                .detach();
842        }
843    }
844
845    pub async fn run(self: &Arc<Self>) {
846        if self.is_running.swap(true, Ordering::SeqCst) {
847            warn!("Client `run` method called while already running.");
848            return;
849        }
850        while self.is_running.load(Ordering::Relaxed) {
851            self.expected_disconnect.store(false, Ordering::Relaxed);
852
853            if let Err(connect_err) = self.connect().await {
854                error!("Failed to connect: {connect_err:#}. Will retry...");
855            } else {
856                if self.read_messages_loop().await.is_err() {
857                    warn!(
858                        "Message loop exited with an error. Will attempt to reconnect if enabled."
859                    );
860                } else if self.expected_disconnect.load(Ordering::Relaxed) {
861                    debug!("Message loop exited gracefully (expected disconnect).");
862                } else {
863                    info!("Message loop exited gracefully.");
864                }
865
866                self.cleanup_connection_state().await;
867            }
868
869            if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
870                info!("Auto-reconnect disabled, shutting down.");
871                self.is_running.store(false, Ordering::Relaxed);
872                break;
873            }
874
875            // If this was an expected disconnect (e.g., 515 after pairing), reconnect immediately
876            if self.expected_disconnect.load(Ordering::Relaxed) {
877                self.auto_reconnect_errors.store(0, Ordering::Relaxed);
878                info!("Expected disconnect (e.g., 515), reconnecting immediately...");
879                continue;
880            }
881
882            let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
883            // WA Web: Fibonacci backoff with 10% jitter, max 900s.
884            // algo: { type: "fibonacci", first: 1000, second: 1000 }
885            // jitter: 0.1, max: 9e5
886            let delay = fibonacci_backoff(error_count);
887            info!(
888                "Will attempt to reconnect in {:?} (attempt {})",
889                delay,
890                error_count + 1
891            );
892            self.runtime.sleep(delay).await;
893        }
894        info!("Client run loop has shut down.");
895    }
896
897    pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
898        if self.is_connecting.swap(true, Ordering::SeqCst) {
899            return Err(ClientError::AlreadyConnected.into());
900        }
901
902        let _guard = scopeguard::guard((), |_| {
903            self.is_connecting.store(false, Ordering::Relaxed);
904        });
905
906        if self.is_connected() {
907            return Err(ClientError::AlreadyConnected.into());
908        }
909
910        // Reset login state for new connection attempt. This ensures that
911        // handle_success will properly process the <success> stanza even if
912        // a previous connection's post-login task bailed out early.
913        self.is_logged_in.store(false, Ordering::Relaxed);
914        self.is_ready.store(false, Ordering::Relaxed);
915        self.is_connected.store(false, Ordering::Relaxed);
916        self.offline_sync_completed.store(false, Ordering::Relaxed);
917        self.server_has_prekeys.store(true, Ordering::Relaxed);
918
919        // WA Web: both MQTT and DGW transports use a 20s connect timeout.
920        // Without this, a dead network blocks on the OS TCP SYN timeout (~60-75s).
921        // Version fetch is also wrapped so a hung HTTP request doesn't block connect().
922        let version_future = rt_timeout(
923            &*self.runtime,
924            TRANSPORT_CONNECT_TIMEOUT,
925            crate::version::resolve_and_update_version(
926                &self.persistence_manager,
927                &self.http_client,
928                self.override_version,
929            ),
930        );
931        let transport_future = rt_timeout(
932            &*self.runtime,
933            TRANSPORT_CONNECT_TIMEOUT,
934            self.transport_factory.create_transport(),
935        );
936
937        debug!("Connecting WebSocket and fetching latest client version in parallel...");
938        let (version_result, transport_result) = futures::join!(version_future, transport_future);
939
940        version_result
941            .map_err(|_| anyhow!("Version fetch timed out after {TRANSPORT_CONNECT_TIMEOUT:?}"))?
942            .map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
943        let (transport, mut transport_events) = transport_result.map_err(|_| {
944            anyhow!("Transport connect timed out after {TRANSPORT_CONNECT_TIMEOUT:?}")
945        })??;
946        debug!("Version fetch and transport connection established.");
947
948        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
949
950        let noise_socket = handshake::do_handshake(
951            self.runtime.clone(),
952            &device_snapshot,
953            transport.clone(),
954            &mut transport_events,
955        )
956        .await?;
957
958        *self.transport.lock().await = Some(transport);
959        *self.transport_events.lock().await = Some(transport_events);
960        *self.noise_socket.lock().await = Some(noise_socket);
961        self.is_connected.store(true, Ordering::Release);
962
963        // Notify waiters that socket is ready (before login)
964        self.socket_ready_notifier.notify(usize::MAX);
965
966        let client_clone = self.clone();
967        self.runtime
968            .spawn(Box::pin(async move { client_clone.keepalive_loop().await }))
969            .detach();
970
971        Ok(())
972    }
973
974    pub async fn disconnect(self: &Arc<Self>) {
975        info!("Disconnecting client intentionally.");
976        self.expected_disconnect.store(true, Ordering::Relaxed);
977        self.is_running.store(false, Ordering::Relaxed);
978        self.shutdown_notifier.notify(usize::MAX);
979
980        // Flush dirty device state before tearing down the connection.
981        if let Err(e) = self.persistence_manager.flush().await {
982            log::error!("Failed to flush device state during disconnect: {e}");
983        }
984
985        if let Some(transport) = self.transport.lock().await.as_ref() {
986            transport.disconnect().await;
987        }
988        self.cleanup_connection_state().await;
989    }
990
991    /// Backoff step used by [`reconnect()`] to create an offline window.
992    ///
993    /// `fibonacci_backoff(RECONNECT_BACKOFF_STEP)` determines the delay before
994    /// the run loop re-connects.  This must be longer than the mock server's
995    /// chatstate TTL (`CHATSTATE_TTL_SECS=3`) so TTL-expiry tests pass.
996    ///
997    /// Sequence: fib(0)=1s, fib(1)=1s, fib(2)=2s, fib(3)=3s, **fib(4)=5s**.
998    pub const RECONNECT_BACKOFF_STEP: u32 = 4;
999
1000    /// Drop the current connection and trigger the auto-reconnect loop.
1001    ///
1002    /// Unlike [`disconnect`], this does **not** stop the run loop. The client
1003    /// will reconnect automatically using the same persisted identity/store,
1004    /// just as it would after a network interruption. Use
1005    /// [`wait_for_connected`] to wait for the new connection to be ready.
1006    ///
1007    /// This is useful for:
1008    /// - Handling network changes (e.g., Wi-Fi → cellular)
1009    /// - Forcing a fresh server session
1010    /// - Testing offline message delivery
1011    pub async fn reconnect(self: &Arc<Self>) {
1012        info!("Reconnecting: dropping transport for auto-reconnect.");
1013        self.auto_reconnect_errors
1014            .store(Self::RECONNECT_BACKOFF_STEP, Ordering::Relaxed);
1015        if let Some(transport) = self.transport.lock().await.as_ref() {
1016            transport.disconnect().await;
1017        }
1018    }
1019
1020    /// Drop the current connection and reconnect immediately with no delay.
1021    ///
1022    /// Unlike [`reconnect`], which introduces a deliberate offline window,
1023    /// this method sets the `expected_disconnect` flag so the run loop
1024    /// skips the backoff delay and reconnects as fast as possible.
1025    pub async fn reconnect_immediately(self: &Arc<Self>) {
1026        info!("Reconnecting immediately (expected disconnect).");
1027        self.expected_disconnect.store(true, Ordering::Relaxed);
1028        if let Some(transport) = self.transport.lock().await.as_ref() {
1029            transport.disconnect().await;
1030        }
1031    }
1032
1033    async fn cleanup_connection_state(&self) {
1034        self.is_logged_in.store(false, Ordering::Relaxed);
1035        self.is_ready.store(false, Ordering::Relaxed);
1036        // Signal the keepalive loop (and any other tasks) to exit promptly.
1037        // Without this, a stale keepalive loop can overlap with the next one
1038        // after reconnect, causing duplicate pings.
1039        self.shutdown_notifier.notify(usize::MAX);
1040        *self.transport.lock().await = None;
1041        *self.transport_events.lock().await = None;
1042        *self.noise_socket.lock().await = None;
1043        // Clear is_connected AFTER noise_socket is None, so no task can see
1044        // is_connected==true with a cleared socket. send_node() independently
1045        // checks the socket, but this ordering avoids a confusing state window.
1046        self.is_connected.store(false, Ordering::Release);
1047        self.retried_group_messages.invalidate_all();
1048        // Clear signal cache so stale state doesn't leak across connections
1049        self.signal_cache.clear().await;
1050        // Reset semaphore to 1 permit for next offline sync.
1051        self.swap_message_semaphore(1);
1052        // Reset dead-socket timestamps so stale values from the previous
1053        // connection don't trigger an immediate reconnect on the next one.
1054        self.last_data_received_ms.store(0, Ordering::Relaxed);
1055        self.last_data_sent_ms.store(0, Ordering::Relaxed);
1056        // Reset offline sync state for next connection
1057        self.offline_sync_completed.store(false, Ordering::Relaxed);
1058        self.offline_sync_metrics
1059            .active
1060            .store(false, Ordering::Release);
1061        self.offline_sync_metrics
1062            .total_messages
1063            .store(0, Ordering::Release);
1064        self.offline_sync_metrics
1065            .processed_messages
1066            .store(0, Ordering::Release);
1067        match self.offline_sync_metrics.start_time.lock() {
1068            Ok(mut guard) => *guard = None,
1069            Err(poison) => *poison.into_inner() = None,
1070        }
1071        self.server_has_prekeys.store(true, Ordering::Relaxed);
1072        self.history_sync_tasks_in_flight
1073            .store(0, Ordering::Relaxed);
1074        self.history_sync_idle_notifier.notify(usize::MAX);
1075        // Drain all pending IQ waiters so they fail fast with InternalChannelClosed
1076        // instead of hanging until the 75s timeout.
1077        let mut waiters_map = self.response_waiters.lock().await;
1078        let waiter_count = waiters_map.len();
1079        // Replace with new map to release backing storage; old senders drop here,
1080        // causing receivers to get RecvError → IqError::InternalChannelClosed
1081        *waiters_map = HashMap::new();
1082        drop(waiters_map);
1083        if waiter_count > 0 {
1084            debug!(
1085                "Dropping {} orphaned IQ response waiter(s) on disconnect",
1086                waiter_count
1087            );
1088        }
1089
1090        // Clear app state tracking maps to prevent unbounded growth across reconnections.
1091        // Replace with new collections to release backing storage.
1092        *self.app_state_key_requests.lock().await = HashMap::new();
1093        *self.app_state_syncing.lock().await = HashSet::new();
1094
1095        // Drop stale media connection (auth tokens become invalid on reconnect)
1096        *self.media_conn.write().await = None;
1097
1098        // Clear app state key cache — keys will be re-fetched from DB on demand
1099        if let Some(proc) = self.app_state_processor.lock().await.as_ref() {
1100            proc.clear_key_cache().await;
1101        }
1102    }
1103
1104    /// Returns a snapshot of all internal collection sizes for memory leak detection.
1105    ///
1106    /// Moka caches report approximate counts (pending evictions may not be reflected).
1107    /// Call `run_pending_tasks()` on individual caches first if you need exact counts.
1108    ///
1109    /// Requires the `debug-diagnostics` feature.
1110    #[cfg(feature = "debug-diagnostics")]
1111    pub async fn memory_diagnostics(&self) -> MemoryDiagnostics {
1112        let (sig_sessions, sig_identities, sig_sender_keys) =
1113            self.signal_cache.entry_counts().await;
1114        let (lid_lid, lid_pn) = self.lid_pn_cache.entry_counts();
1115
1116        MemoryDiagnostics {
1117            group_cache: self
1118                .group_cache
1119                .lock()
1120                .await
1121                .as_ref()
1122                .map_or(0, |c| c.entry_count()),
1123            device_cache: self
1124                .device_cache
1125                .lock()
1126                .await
1127                .as_ref()
1128                .map_or(0, |c| c.entry_count()),
1129            device_registry_cache: self.device_registry_cache.entry_count(),
1130            lid_pn_lid_entries: lid_lid,
1131            lid_pn_pn_entries: lid_pn,
1132            retried_group_messages: self.retried_group_messages.entry_count(),
1133            recent_messages: self.recent_messages.entry_count(),
1134            message_retry_counts: self.message_retry_counts.entry_count(),
1135            pdo_pending_requests: self.pdo_pending_requests.entry_count(),
1136            session_locks: self.session_locks.entry_count(),
1137            message_queues: self.message_queues.entry_count(),
1138            message_enqueue_locks: self.message_enqueue_locks.entry_count(),
1139            response_waiters: self.response_waiters.lock().await.len(),
1140            node_waiters: self.node_waiter_count.load(Ordering::Relaxed),
1141            pending_retries: self.pending_retries.lock().await.len(),
1142            presence_subscriptions: self.presence_subscriptions.lock().await.len(),
1143            app_state_key_requests: self.app_state_key_requests.lock().await.len(),
1144            app_state_syncing: self.app_state_syncing.lock().await.len(),
1145            signal_cache_sessions: sig_sessions,
1146            signal_cache_identities: sig_identities,
1147            signal_cache_sender_keys: sig_sender_keys,
1148            chatstate_handlers: self.chatstate_handlers.read().await.len(),
1149            custom_enc_handlers: self.custom_enc_handlers.read().await.len(),
1150        }
1151    }
1152
1153    /// Flush the in-memory signal cache to the database backend.
1154    /// Called after each message is decrypted or after encryption operations.
1155    pub(crate) async fn flush_signal_cache(&self) -> Result<(), anyhow::Error> {
1156        let device = self.persistence_manager.get_device_arc().await;
1157        let device_guard = device.read().await;
1158        self.signal_cache
1159            .flush(&*device_guard.backend)
1160            .await
1161            .map_err(|e| anyhow::anyhow!("Failed to flush signal cache: {e}"))
1162    }
1163
1164    async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
1165        debug!("Starting message processing loop...");
1166
1167        let mut rx_guard = self.transport_events.lock().await;
1168        let transport_events = rx_guard
1169            .take()
1170            .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
1171        drop(rx_guard);
1172
1173        // Frame decoder to parse incoming data
1174        let mut frame_decoder = wacore::framing::FrameDecoder::new();
1175
1176        loop {
1177            futures::select_biased! {
1178                    _ = self.shutdown_notifier.listen().fuse() => {
1179                        debug!("Shutdown signaled in message loop. Exiting message loop.");
1180                        return Ok(());
1181                    },
1182                    event_result = transport_events.recv().fuse() => {
1183                        match event_result {
1184                            Ok(crate::transport::TransportEvent::DataReceived(data)) => {
1185                                // Update dead-socket timer (WA Web: deadSocketTimer reset)
1186                                self.last_data_received_ms.store(
1187                                    wacore::time::now_millis() as u64,
1188                                    Ordering::Relaxed,
1189                                );
1190
1191                                // Feed data into the frame decoder
1192                                frame_decoder.feed(&data);
1193
1194                                // Process all complete frames.
1195                                // Frame decryption must be sequential (noise protocol counter),
1196                                // but we spawn node processing concurrently after decryption.
1197                                let mut frames_in_batch: u32 = 0;
1198
1199                                while let Some(encrypted_frame) = frame_decoder.decode_frame() {
1200                                    // Decrypt the frame synchronously (required for noise counter ordering)
1201                                    if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
1202                                        // Determine processing mode for this node:
1203                                        // - Critical nodes (success/failure/stream:error): inline, required for state
1204                                        // - Message nodes: inline, preserves arrival order for per-chat queues
1205                                        //   (MessageHandler just enqueues + ACKs, heavy crypto runs in workers)
1206                                        // - ib (in-band): inline, ensures offline sync tracking (expected count)
1207                                        //   is set up before offline messages are processed
1208                                        // - Everything else: spawned concurrently for parallelism
1209                                        let process_inline = matches!(
1210                                            node.tag.as_ref(),
1211                                            "success" | "failure" | "stream:error" | "message" | "ib"
1212                                        );
1213
1214                                        if process_inline {
1215                                            self.process_decrypted_node(node).await;
1216                                        } else {
1217                                            let client = self.clone();
1218                                            self.runtime.spawn(Box::pin(async move {
1219                                                client.process_decrypted_node(node).await;
1220                                            })).detach();
1221                                        }
1222                                    }
1223
1224                                    // Check if we should exit after processing (e.g., after 515 stream error)
1225                                    if self.expected_disconnect.load(Ordering::Relaxed) {
1226                                        debug!("Expected disconnect signaled during frame processing. Exiting message loop.");
1227                                        return Ok(());
1228                                    }
1229
1230                                    // Cooperative yield — frequency and behavior are runtime-defined.
1231                                    frames_in_batch += 1;
1232                                    if frames_in_batch.is_multiple_of(self.runtime.yield_frequency())
1233                                        && let Some(yield_fut) = self.runtime.yield_now()
1234                                    {
1235                                        yield_fut.await;
1236                                    }
1237                                }
1238                            },
1239                            Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
1240                                self.cleanup_connection_state().await;
1241                                 if !self.expected_disconnect.load(Ordering::Relaxed) {
1242                                    self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
1243                                    debug!("Transport disconnected unexpectedly.");
1244                                    return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
1245                                } else {
1246                                    debug!("Transport disconnected as expected.");
1247                                    return Ok(());
1248                                }
1249                            }
1250                            Ok(crate::transport::TransportEvent::Connected) => {
1251                                // Already handled during handshake, but could be useful for logging
1252                                debug!("Transport connected event received");
1253                            }
1254                    }
1255                }
1256            }
1257        }
1258    }
1259
1260    /// Decrypt a frame and return the parsed node.
1261    /// This must be called sequentially due to noise protocol counter requirements.
1262    pub(crate) async fn decrypt_frame(
1263        self: &Arc<Self>,
1264        encrypted_frame: &bytes::Bytes,
1265    ) -> Option<wacore_binary::node::Node> {
1266        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1267        let noise_socket = match noise_socket_arc {
1268            Some(s) => s,
1269            None => {
1270                log::error!("Cannot process frame: not connected (no noise socket)");
1271                return None;
1272            }
1273        };
1274
1275        let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
1276            Ok(p) => p,
1277            Err(e) => {
1278                log::error!("Failed to decrypt frame: {e}");
1279                return None;
1280            }
1281        };
1282
1283        let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
1284            Ok(data) => data,
1285            Err(e) => {
1286                log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
1287                return None;
1288            }
1289        };
1290
1291        match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
1292            Ok(node_ref) => Some(node_ref.to_owned()),
1293            Err(e) => {
1294                log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
1295                None
1296            }
1297        }
1298    }
1299
1300    /// Process an already-decrypted node.
1301    /// This can be spawned concurrently since it doesn't depend on noise protocol state.
1302    /// The node is wrapped in Arc to avoid cloning when passing through handlers.
1303    pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
1304        // Wrap in Arc once - all handlers will share this same allocation
1305        let node_arc = Arc::new(node);
1306        self.process_node(node_arc).await;
1307    }
1308
1309    /// Process a node wrapped in Arc. Handlers receive the Arc and can share/store it cheaply.
1310    pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
1311        use wacore::xml::DisplayableNode;
1312
1313        // --- Offline Sync Tracking ---
1314        if node.tag.as_ref() == "ib" {
1315            // Check for offline_preview child to get expected count
1316            if let Some(preview) = node.get_optional_child("offline_preview") {
1317                let count: usize = preview
1318                    .attrs
1319                    .get("count")
1320                    .and_then(|v| v.as_str().parse().ok())
1321                    .unwrap_or(0);
1322
1323                if count == 0 {
1324                    self.offline_sync_metrics
1325                        .active
1326                        .store(false, Ordering::Release);
1327                    debug!(target: "Client/OfflineSync", "Sync COMPLETED: 0 items.");
1328                } else {
1329                    // Use stronger memory ordering for state transitions
1330                    self.offline_sync_metrics
1331                        .total_messages
1332                        .store(count, Ordering::Release);
1333                    self.offline_sync_metrics
1334                        .processed_messages
1335                        .store(0, Ordering::Release);
1336                    self.offline_sync_metrics
1337                        .active
1338                        .store(true, Ordering::Release);
1339                    match self.offline_sync_metrics.start_time.lock() {
1340                        Ok(mut guard) => *guard = Some(wacore::time::Instant::now()),
1341                        Err(poison) => *poison.into_inner() = Some(wacore::time::Instant::now()),
1342                    }
1343                    debug!(target: "Client/OfflineSync", "Sync STARTED: Expecting {} items.", count);
1344                }
1345            } else if self.offline_sync_metrics.active.load(Ordering::Acquire)
1346                && node.get_optional_child("offline").is_some()
1347            {
1348                // Handle end marker: <ib><offline count="N"/> signals sync completion
1349                // Only <ib> with an <offline> child is a real end marker.
1350                // Other <ib> children (thread_metadata, edge_routing, dirty) are NOT end markers.
1351                let processed = self
1352                    .offline_sync_metrics
1353                    .processed_messages
1354                    .load(Ordering::Acquire);
1355                let elapsed = match self.offline_sync_metrics.start_time.lock() {
1356                    Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1357                    Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1358                };
1359                debug!(target: "Client/OfflineSync", "Sync COMPLETED: End marker received. Processed {} items in {:.2?}.", processed, elapsed);
1360                self.offline_sync_metrics
1361                    .active
1362                    .store(false, Ordering::Release);
1363            }
1364        }
1365
1366        // Track progress if active
1367        if self.offline_sync_metrics.active.load(Ordering::Acquire) {
1368            // Check for 'offline' attribute on relevant stanzas
1369            if node.attrs.contains_key("offline") {
1370                let processed = self
1371                    .offline_sync_metrics
1372                    .processed_messages
1373                    .fetch_add(1, Ordering::Release)
1374                    + 1;
1375                let total = self
1376                    .offline_sync_metrics
1377                    .total_messages
1378                    .load(Ordering::Acquire);
1379
1380                if processed.is_multiple_of(50) || processed == total {
1381                    trace!(target: "Client/OfflineSync", "Sync Progress: {}/{}", processed, total);
1382                }
1383
1384                if processed >= total {
1385                    let elapsed = match self.offline_sync_metrics.start_time.lock() {
1386                        Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1387                        Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1388                    };
1389                    debug!(target: "Client/OfflineSync", "Sync COMPLETED: Processed {} items in {:.2?}.", processed, elapsed);
1390                    self.offline_sync_metrics
1391                        .active
1392                        .store(false, Ordering::Release);
1393                }
1394            }
1395        }
1396        // --- End Tracking ---
1397
1398        if node.tag.as_ref() == "iq"
1399            && let Some(sync_node) = node.get_optional_child("sync")
1400            && let Some(collection_node) = sync_node.get_optional_child("collection")
1401        {
1402            let name = collection_node.attrs().optional_string("name");
1403            let name = name.as_deref().unwrap_or("<unknown>");
1404            debug!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
1405        } else {
1406            debug!(target: "Client/Recv","{}", DisplayableNode(&node));
1407        }
1408
1409        // Prepare deferred ACK cancellation flag (sent after dispatch unless cancelled)
1410        let mut cancelled = false;
1411
1412        if node.tag.as_ref() == "xmlstreamend" {
1413            if self.expected_disconnect.load(Ordering::Relaxed) {
1414                debug!("Received <xmlstreamend/>, expected disconnect.");
1415            } else {
1416                warn!("Received <xmlstreamend/>, treating as disconnect.");
1417            }
1418            self.shutdown_notifier.notify(usize::MAX);
1419            return;
1420        }
1421
1422        // Check generic node waiters (zero-cost when none registered)
1423        if self.node_waiter_count.load(Ordering::Relaxed) > 0 {
1424            self.resolve_node_waiters(&node);
1425        }
1426
1427        if node.tag.as_ref() == "iq"
1428            && let Some(id) = node.attrs.get("id").map(|v| v.as_str())
1429        {
1430            let has_waiter = self.response_waiters.lock().await.contains_key(id.as_ref());
1431            if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
1432                return;
1433            }
1434        }
1435
1436        // Dispatch to appropriate handler using the router
1437        // Clone Arc (cheap - just reference count) not the Node itself
1438        if !self
1439            .stanza_router
1440            .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
1441            .await
1442        {
1443            warn!(
1444                "Received unknown top-level node: {}",
1445                DisplayableNode(&node)
1446            );
1447        }
1448
1449        // Send the deferred ACK if applicable and not cancelled by handler
1450        if self.should_ack(&node) && !cancelled {
1451            self.maybe_deferred_ack(node).await;
1452        }
1453    }
1454
1455    /// Determine if a Node should be acknowledged with <ack/>.
1456    fn should_ack(&self, node: &Node) -> bool {
1457        matches!(
1458            node.tag.as_ref(),
1459            "message" | "receipt" | "notification" | "call"
1460        ) && node.attrs.contains_key("id")
1461            && node.attrs.contains_key("from")
1462    }
1463
1464    /// Possibly send a deferred ack: either immediately or via spawned task.
1465    /// Handlers can cancel by setting `cancelled` to true.
1466    /// Uses Arc<Node> to avoid cloning when spawning the async task.
1467    async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
1468        if self.synchronous_ack {
1469            if let Err(e) = self.send_ack_for(&node).await {
1470                warn!("Failed to send ack: {e:?}");
1471            }
1472        } else {
1473            let this = self.clone();
1474            // Node is already in Arc - just clone the Arc (cheap), not the Node
1475            self.runtime
1476                .spawn(Box::pin(async move {
1477                    if let Err(e) = this.send_ack_for(&node).await {
1478                        warn!("Failed to send ack: {e:?}");
1479                    }
1480                }))
1481                .detach();
1482        }
1483    }
1484
1485    /// Build and send an <ack/> node corresponding to the given stanza.
1486    async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
1487        if self.expected_disconnect.load(Ordering::Relaxed) {
1488            return Ok(());
1489        }
1490        if !self.is_connected() {
1491            return Err(ClientError::NotConnected);
1492        }
1493        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1494        let ack = match build_ack_node(node, device_snapshot.pn.as_ref()) {
1495            Some(ack) => ack,
1496            None => return Ok(()),
1497        };
1498        self.send_node(ack).await
1499    }
1500
1501    pub(crate) async fn handle_unimplemented(&self, tag: &str) {
1502        warn!("TODO: Implement handler for <{tag}>");
1503    }
1504
1505    pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
1506        use wacore::iq::passive::PassiveModeSpec;
1507        self.execute(PassiveModeSpec::new(passive)).await
1508    }
1509
1510    pub async fn clean_dirty_bits(
1511        &self,
1512        type_: &str,
1513        timestamp: Option<&str>,
1514    ) -> Result<(), crate::request::IqError> {
1515        use wacore::iq::dirty::CleanDirtyBitsSpec;
1516
1517        let spec = CleanDirtyBitsSpec::single(type_, timestamp)?;
1518        self.execute(spec).await
1519    }
1520
1521    pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
1522        use wacore::iq::props::PropsSpec;
1523        use wacore::store::commands::DeviceCommand;
1524
1525        let stored_hash = self
1526            .persistence_manager
1527            .get_device_snapshot()
1528            .await
1529            .props_hash
1530            .clone();
1531
1532        let spec = match &stored_hash {
1533            Some(hash) => {
1534                debug!("Fetching props with hash for delta update...");
1535                PropsSpec::with_hash(hash)
1536            }
1537            None => {
1538                debug!("Fetching props (full, no stored hash)...");
1539                PropsSpec::new()
1540            }
1541        };
1542
1543        let response = self.execute(spec).await?;
1544
1545        if response.delta_update {
1546            debug!(
1547                "Props delta update received ({} changed props)",
1548                response.props.len()
1549            );
1550        } else {
1551            debug!(
1552                "Props full update received ({} props, hash={:?})",
1553                response.props.len(),
1554                response.hash
1555            );
1556        }
1557
1558        if let Some(new_hash) = response.hash {
1559            self.persistence_manager
1560                .process_command(DeviceCommand::SetPropsHash(Some(new_hash)))
1561                .await;
1562        }
1563
1564        Ok(())
1565    }
1566
1567    pub async fn fetch_privacy_settings(
1568        &self,
1569    ) -> Result<wacore::iq::privacy::PrivacySettingsResponse, crate::request::IqError> {
1570        use wacore::iq::privacy::PrivacySettingsSpec;
1571
1572        debug!("Fetching privacy settings...");
1573
1574        self.execute(PrivacySettingsSpec::new()).await
1575    }
1576
1577    /// Set a privacy setting (e.g. "last" → "contacts").
1578    pub async fn set_privacy_setting(
1579        &self,
1580        category: &str,
1581        value: &str,
1582    ) -> Result<(), crate::request::IqError> {
1583        use wacore::iq::privacy::SetPrivacySettingSpec;
1584        self.execute(SetPrivacySettingSpec::new(category, value))
1585            .await
1586    }
1587
1588    /// Set the default disappearing messages duration (seconds). Pass 0 to disable.
1589    pub async fn set_default_disappearing_mode(
1590        &self,
1591        duration: u32,
1592    ) -> Result<(), crate::request::IqError> {
1593        use wacore::iq::privacy::SetDefaultDisappearingModeSpec;
1594        self.execute(SetDefaultDisappearingModeSpec::new(duration))
1595            .await
1596    }
1597
1598    /// Get business profile for a WhatsApp Business account.
1599    pub async fn get_business_profile(
1600        &self,
1601        jid: &wacore_binary::jid::Jid,
1602    ) -> Result<Option<wacore::iq::business::BusinessProfile>, crate::request::IqError> {
1603        use wacore::iq::business::BusinessProfileSpec;
1604        self.execute(BusinessProfileSpec::new(jid)).await
1605    }
1606
1607    /// Reject an incoming call. Fire-and-forget — no server response is expected.
1608    pub async fn reject_call(
1609        &self,
1610        call_id: &str,
1611        call_from: &wacore_binary::jid::Jid,
1612    ) -> Result<(), anyhow::Error> {
1613        anyhow::ensure!(!call_id.is_empty(), "call_id cannot be empty");
1614        let id = self.generate_request_id();
1615
1616        let stanza = wacore_binary::builder::NodeBuilder::new("call")
1617            .attr("to", call_from.clone())
1618            .attr("id", id)
1619            .children([wacore_binary::builder::NodeBuilder::new("reject")
1620                .attr("call-id", call_id)
1621                .attr("call-creator", call_from.clone())
1622                .attr("count", "0")
1623                .build()])
1624            .build();
1625
1626        self.send_node(stanza).await?;
1627        Ok(())
1628    }
1629
1630    pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
1631        use wacore::iq::prekeys::DigestKeyBundleSpec;
1632
1633        debug!("Sending digest key bundle...");
1634
1635        self.execute(DigestKeyBundleSpec::new()).await.map(|_| ())
1636    }
1637
1638    pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
1639        // Skip processing if an expected disconnect is pending (e.g., 515 received).
1640        // This prevents race conditions where a spawned success handler runs after
1641        // cleanup_connection_state has already reset is_logged_in.
1642        if self.expected_disconnect.load(Ordering::Relaxed) {
1643            debug!("Ignoring <success> stanza: expected disconnect pending");
1644            return;
1645        }
1646
1647        // Guard against multiple <success> stanzas (WhatsApp may send more than one during
1648        // routing/reconnection). Only process the first one per connection.
1649        if self.is_logged_in.swap(true, Ordering::SeqCst) {
1650            debug!("Ignoring duplicate <success> stanza (already logged in)");
1651            return;
1652        }
1653
1654        // Increment connection generation to invalidate any stale post-login tasks
1655        // from previous connections (e.g., during 515 reconnect cycles).
1656        let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
1657
1658        info!(
1659            "Successfully authenticated with WhatsApp servers! (gen={})",
1660            current_generation
1661        );
1662        self.auto_reconnect_errors.store(0, Ordering::Relaxed);
1663
1664        self.update_server_time_offset(node);
1665
1666        if let Some(lid_value) = node.attrs.get("lid") {
1667            if let Some(lid) = lid_value.to_jid() {
1668                let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1669                if device_snapshot.lid.as_ref() != Some(&lid) {
1670                    debug!("Updating LID from server to '{lid}'");
1671                    self.persistence_manager
1672                        .process_command(DeviceCommand::SetLid(Some(lid)))
1673                        .await;
1674                }
1675            } else {
1676                warn!("Failed to parse LID from success stanza: {lid_value}");
1677            }
1678        } else {
1679            warn!("LID not found in <success> stanza. Group messaging may fail.");
1680        }
1681
1682        let client_clone = self.clone();
1683        let task_generation = current_generation;
1684        self.runtime.spawn(Box::pin(async move {
1685            // Macro to check if this task is still valid (connection hasn't been replaced)
1686            macro_rules! check_generation {
1687                () => {
1688                    if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
1689                    {
1690                        debug!("Post-login task cancelled: connection generation changed");
1691                        return;
1692                    }
1693                };
1694            }
1695
1696            debug!(
1697                "Starting post-login initialization sequence (gen={})...",
1698                task_generation
1699            );
1700
1701            // Check if we need initial app state sync (empty pushname indicates fresh pairing
1702            // where pushname will come from app state sync's setting_pushName mutation)
1703            let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1704            let needs_pushname_from_sync = device_snapshot.push_name.is_empty();
1705            if needs_pushname_from_sync {
1706                debug!("Push name is empty - will be set from app state sync (setting_pushName)");
1707            }
1708
1709            // Check connection before network operations.
1710            // During pairing, a 515 disconnect happens quickly after success,
1711            // so the socket may already be gone.
1712            if !client_clone.is_connected() {
1713                debug!(
1714                    "Skipping post-login init: connection closed (likely pairing phase reconnect)"
1715                );
1716                return;
1717            }
1718
1719            check_generation!();
1720            client_clone.send_unified_session().await;
1721
1722            // === Establish session with primary phone for PDO ===
1723            // This must happen BEFORE we exit passive mode (before offline messages arrive).
1724            // PDO needs a session with device 0 to request decrypted content from our phone.
1725            // Matches WhatsApp Web's bootstrapDeviceCapabilities() pattern.
1726            check_generation!();
1727            if let Err(e) = client_clone
1728                .establish_primary_phone_session_immediate()
1729                .await
1730            {
1731                warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
1732                // Don't fail login - PDO will retry via ensure_e2e_sessions fallback
1733            }
1734
1735            // === Passive Tasks (mimics WhatsApp Web's PassiveTaskManager) ===
1736            // WhatsApp Web executes passive tasks (like PreKey upload) BEFORE sending the active IQ.
1737            check_generation!();
1738            if let Err(e) = client_clone.upload_pre_keys(false).await {
1739                warn!("Failed to upload pre-keys during startup: {e:?}");
1740            }
1741
1742            // === Send active IQ ===
1743            // The server sends <ib><offline count="X"/></ib> AFTER we exit passive mode.
1744            // This matches WhatsApp Web's behavior: executePassiveTasks() -> sendPassiveModeProtocol("active")
1745            check_generation!();
1746            if let Err(e) = client_clone.set_passive(false).await {
1747                warn!("Failed to send post-connect active IQ: {e:?}");
1748            }
1749
1750            // === Wait for offline sync to complete ===
1751            // The server sends <ib><offline count="X"/></ib> after we exit passive mode.
1752            client_clone.wait_for_offline_delivery_end().await;
1753
1754            // Check if connection was replaced while waiting
1755            check_generation!();
1756
1757            // Re-check connection and generation before sending presence
1758            check_generation!();
1759            if !client_clone.is_connected() {
1760                debug!("Skipping presence: connection closed");
1761                return;
1762            }
1763
1764            // Background initialization queries (can run in parallel, non-blocking)
1765            let bg_client = client_clone.clone();
1766            let bg_generation = task_generation;
1767            client_clone.runtime.spawn(Box::pin(async move {
1768                // Check connection and generation before starting background queries
1769                if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1770                    debug!("Skipping background init queries: connection generation changed");
1771                    return;
1772                }
1773                if !bg_client.is_connected() {
1774                    debug!("Skipping background init queries: connection closed");
1775                    return;
1776                }
1777
1778                debug!(
1779                    "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1780                );
1781
1782                let props_fut = bg_client.fetch_props();
1783                let binding = bg_client.blocking();
1784                let blocklist_fut = binding.get_blocklist();
1785                let privacy_fut = bg_client.fetch_privacy_settings();
1786                let digest_fut = bg_client.validate_digest_key();
1787
1788                let (r_props, r_block, r_priv, r_digest) =
1789                    futures::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1790
1791                if let Err(e) = r_props {
1792                    warn!("Background init: Failed to fetch props: {e:?}");
1793                }
1794                if let Err(e) = r_block {
1795                    warn!("Background init: Failed to fetch blocklist: {e:?}");
1796                }
1797                if let Err(e) = r_priv {
1798                    warn!("Background init: Failed to fetch privacy settings: {e:?}");
1799                }
1800                if let Err(e) = r_digest {
1801                    warn!("Background init: Failed to validate digest key: {e:?}");
1802                }
1803
1804                // Prune expired tcTokens on connect (matches WhatsApp Web's PrivacyTokenJob)
1805                if let Err(e) = bg_client.tc_token().prune_expired().await {
1806                    warn!("Background init: Failed to prune expired tc_tokens: {e:?}");
1807                }
1808            })).detach();
1809
1810            check_generation!();
1811
1812            let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1813            let needs_initial_sync = flag_set || needs_pushname_from_sync;
1814
1815            if needs_initial_sync {
1816                // === Fresh pairing path ===
1817                // Like WhatsApp Web's syncCriticalData(): await critical collections before
1818                // dispatching Connected, so blocklist/privacy settings are applied first.
1819                debug!(
1820                    target: "Client/AppState",
1821                    "Starting Initial App State Sync (flag_set={flag_set}, needs_pushname={needs_pushname_from_sync})"
1822                );
1823
1824                if !client_clone
1825                    .initial_app_state_keys_received
1826                    .load(Ordering::Relaxed)
1827                {
1828                    debug!(
1829                        target: "Client/AppState",
1830                        "Waiting up to 5s for app state keys..."
1831                    );
1832                    let _ = rt_timeout(
1833                        &*client_clone.runtime,
1834                        Duration::from_secs(5),
1835                        client_clone.initial_keys_synced_notifier.listen(),
1836                    )
1837                    .await;
1838
1839                    // Check if connection was replaced while waiting
1840                    check_generation!();
1841                }
1842
1843                // Start the critical sync timeout timer matching WhatsApp Web's
1844                // WAWebSyncBootstrap.$15 (setSyncDCriticalDataSyncTimeout).
1845                // WhatsApp Web uses 180s and calls socketLogout(SyncdTimeout) if
1846                // the critical data hasn't synced by then.
1847                const CRITICAL_SYNC_TIMEOUT_SECS: u64 = 180;
1848                let timeout_client = client_clone.clone();
1849                let timeout_generation = task_generation;
1850                let timeout_rt = client_clone.runtime.clone();
1851                let critical_sync_timeout_handle = timeout_rt.spawn(Box::pin(async move {
1852                    timeout_client.runtime.sleep(Duration::from_secs(CRITICAL_SYNC_TIMEOUT_SECS)).await;
1853                    // Check generation — if connection was replaced, this timeout is stale
1854                    if timeout_client.connection_generation.load(Ordering::SeqCst)
1855                        != timeout_generation
1856                    {
1857                        return;
1858                    }
1859                    // Matches WhatsApp Web's $16(): check if SettingPushName was synced.
1860                    // If push_name is still empty after 180s, critical sync failed.
1861                    let push_name = timeout_client.get_push_name().await;
1862                    if push_name.is_empty() {
1863                        warn!(
1864                            target: "Client/AppState",
1865                            "Critical app state sync timed out after {CRITICAL_SYNC_TIMEOUT_SECS}s \
1866                             (push_name not synced). Reconnecting to retry."
1867                        );
1868                        // WhatsApp Web does socketLogout here which clears device identity.
1869                        // We reconnect instead — preserving credentials and keeping the
1870                        // run loop active so auto-reconnect can retry the sync.
1871                        timeout_client.reconnect_immediately().await;
1872                    } else {
1873                        debug!(
1874                            target: "Client/AppState",
1875                            "Critical sync timeout fired but push_name was already synced"
1876                        );
1877                    }
1878                }));
1879
1880                // Await critical collections via batched IQ before dispatching Connected.
1881                check_generation!();
1882                match client_clone
1883                    .sync_collections_batched(vec![
1884                        WAPatchName::CriticalBlock,
1885                        WAPatchName::CriticalUnblockLow,
1886                    ])
1887                    .await
1888                {
1889                    Ok(()) => {
1890                        // Critical sync completed — cancel the timeout timer
1891                        critical_sync_timeout_handle.abort();
1892
1893                        check_generation!();
1894
1895                        client_clone
1896                            .resubscribe_presence_subscriptions(task_generation)
1897                            .await;
1898
1899                        check_generation!();
1900
1901                        // Dispatch Connected after critical sync completes.
1902                        // Presence is NOT sent here — WhatsApp Web sends presence from the
1903                        // setting_pushName mutation handler (WAWebPushNameSync), not from
1904                        // criticalSyncDone. Our setting_pushName handler already does this.
1905                        client_clone.dispatch_connected();
1906                    }
1907                    Err(e) => {
1908                        client_clone.log_sync_error("critical app state sync", &e);
1909                        // Don't abort the timeout or dispatch Connected — the sync failed,
1910                        // so the timeout watchdog should remain active to force a reconnect
1911                        // if needed. Return early to avoid emitting a spurious Connected event.
1912                        return;
1913                    }
1914                }
1915
1916                // Spawn remaining non-critical collections in background
1917                let sync_client = client_clone.clone();
1918                let sync_generation = task_generation;
1919                client_clone.runtime.spawn(Box::pin(async move {
1920                    if sync_client.connection_generation.load(Ordering::SeqCst) != sync_generation {
1921                        debug!("App state sync cancelled: connection generation changed");
1922                        return;
1923                    }
1924
1925                    if let Err(e) = sync_client
1926                        .sync_collections_batched(vec![
1927                            WAPatchName::RegularLow,
1928                            WAPatchName::RegularHigh,
1929                            WAPatchName::Regular,
1930                        ])
1931                        .await
1932                    {
1933                        sync_client.log_sync_error("non-critical app state sync", &e);
1934                    }
1935
1936                    sync_client
1937                        .needs_initial_full_sync
1938                        .store(false, Ordering::Relaxed);
1939                    debug!(target: "Client/AppState", "Initial App State Sync Completed.");
1940                })).detach();
1941            } else {
1942                // === Reconnection path ===
1943                // Pushname is already known, send presence and Connected immediately.
1944                let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1945                if !device_snapshot.push_name.is_empty() {
1946                    if let Err(e) = client_clone.presence().set_available().await {
1947                        warn!("Failed to send initial presence: {e:?}");
1948                    } else {
1949                        debug!("Initial presence sent successfully.");
1950                    }
1951                }
1952
1953                client_clone
1954                    .resubscribe_presence_subscriptions(task_generation)
1955                    .await;
1956
1957                // Re-check generation after awaits to avoid dispatching Connected
1958                // for an outdated connection that was replaced mid-await.
1959                check_generation!();
1960
1961                client_clone.dispatch_connected();
1962            }
1963        })).detach();
1964    }
1965
1966    /// Handles incoming `<ack/>` stanzas by resolving pending response waiters.
1967    ///
1968    /// If an ack with an ID that matches a pending task in `response_waiters`,
1969    /// the task is resolved and the function returns `true`. Otherwise, returns `false`.
1970    pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1971        let id_opt = node.attrs.get("id").map(|v| v.as_str().into_owned());
1972        if let Some(id) = id_opt
1973            && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1974        {
1975            if waiter.send(node).is_err() {
1976                warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1977            }
1978            return true;
1979        }
1980        false
1981    }
1982
1983    #[allow(dead_code)] // Used by per-collection callers (e.g., critical sync gating)
1984    pub(crate) async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1985        // In-flight dedup: skip if this collection is already being synced.
1986        // Matches WA Web's WAWebSyncdCollectionsStateMachine which tracks in-flight syncs
1987        // and queues new requests to a pending set.
1988        {
1989            let mut syncing = self.app_state_syncing.lock().await;
1990            if !syncing.insert(name) {
1991                debug!(target: "Client/AppState", "Skipping sync for {:?}: already in flight", name);
1992                return Ok(());
1993            }
1994        }
1995
1996        let result = self.fetch_app_state_with_retry_inner(name).await;
1997
1998        // Always remove from in-flight set when done
1999        self.app_state_syncing.lock().await.remove(&name);
2000
2001        result
2002    }
2003
2004    #[allow(dead_code)]
2005    async fn fetch_app_state_with_retry_inner(&self, name: WAPatchName) -> anyhow::Result<()> {
2006        let mut attempt = 0u32;
2007        loop {
2008            attempt += 1;
2009            // full_sync=false lets process_app_state_sync_task auto-detect:
2010            // version 0 → snapshot (full sync), version > 0 → incremental patches.
2011            // Matches WA Web which only requests snapshot when version is undefined.
2012            let res = self.process_app_state_sync_task(name, false).await;
2013            match res {
2014                Ok(()) => return Ok(()),
2015                Err(e) => {
2016                    if e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
2017                        .is_some_and(|ase| {
2018                            matches!(ase, crate::appstate_sync::AppStateSyncError::KeyNotFound(_))
2019                        })
2020                        && attempt == 1
2021                    {
2022                        if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
2023                            debug!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
2024                            if rt_timeout(
2025                                &*self.runtime,
2026                                Duration::from_secs(10),
2027                                self.initial_keys_synced_notifier.listen(),
2028                            )
2029                            .await
2030                            .is_err()
2031                            {
2032                                warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
2033                            }
2034                        }
2035                        continue;
2036                    }
2037                    let is_db_locked = e.downcast_ref::<wacore::store::error::StoreError>()
2038                        .is_some_and(|se| matches!(se, wacore::store::error::StoreError::Database(msg) if msg.contains("locked") || msg.contains("busy")))
2039                        || e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
2040                            .is_some_and(|ase| matches!(ase, crate::appstate_sync::AppStateSyncError::Store(wacore::store::error::StoreError::Database(msg)) if msg.contains("locked") || msg.contains("busy")));
2041                    if is_db_locked && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
2042                        let backoff = Duration::from_millis(200 * attempt as u64 + 150);
2043                        warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
2044                        self.runtime.sleep(backoff).await;
2045                        continue;
2046                    }
2047                    return Err(e);
2048                }
2049            }
2050        }
2051    }
2052
2053    /// Sync multiple collections in a single IQ request, re-fetching those with `has_more_patches`.
2054    /// Matches WA Web's `serverSync()` outer loop (`3JJWKHeu5-P.js:54278-54305`).
2055    /// Max 5 iterations (WA Web's `C=5` constant).
2056    pub(crate) async fn sync_collections_batched(
2057        &self,
2058        collections: Vec<WAPatchName>,
2059    ) -> anyhow::Result<()> {
2060        if collections.is_empty() {
2061            return Ok(());
2062        }
2063
2064        // In-flight dedup: filter out collections already being synced
2065        let pending = {
2066            let mut syncing = self.app_state_syncing.lock().await;
2067            let mut filtered = Vec::with_capacity(collections.len());
2068            for name in collections {
2069                if syncing.insert(name) {
2070                    filtered.push(name);
2071                } else {
2072                    debug!(target: "Client/AppState", "Skipping {:?} in batch: already in flight", name);
2073                }
2074            }
2075            filtered
2076        };
2077
2078        if pending.is_empty() {
2079            return Ok(());
2080        }
2081
2082        // Track all collections for cleanup
2083        let all_collections: Vec<WAPatchName> = pending.clone();
2084
2085        let result = self.sync_collections_batched_inner(pending).await;
2086
2087        // Always clean up in-flight set
2088        {
2089            let mut syncing = self.app_state_syncing.lock().await;
2090            for name in &all_collections {
2091                syncing.remove(name);
2092            }
2093        }
2094
2095        result
2096    }
2097
2098    async fn sync_collections_batched_inner(
2099        &self,
2100        mut pending: Vec<WAPatchName>,
2101    ) -> anyhow::Result<()> {
2102        use wacore::appstate::patch_decode::CollectionSyncError;
2103        const MAX_ITERATIONS: usize = 5;
2104        let mut iteration = 0;
2105
2106        while !pending.is_empty() && iteration < MAX_ITERATIONS {
2107            iteration += 1;
2108            debug!(
2109                target: "Client/AppState",
2110                "Batched sync iteration {}/{}: {:?}",
2111                iteration, MAX_ITERATIONS, pending
2112            );
2113
2114            let backend = self.persistence_manager.backend();
2115
2116            // Build multi-collection IQ, tracking which collections need a snapshot
2117            let mut collection_nodes = Vec::with_capacity(pending.len());
2118            let mut was_snapshot = std::collections::HashSet::new();
2119            for &name in &pending {
2120                let state = backend.get_version(name.as_str()).await?;
2121                let want_snapshot = state.version == 0;
2122                if want_snapshot {
2123                    was_snapshot.insert(name);
2124                }
2125                let mut builder = NodeBuilder::new("collection")
2126                    .attr("name", name.as_str())
2127                    .attr(
2128                        "return_snapshot",
2129                        if want_snapshot { "true" } else { "false" },
2130                    );
2131                if !want_snapshot {
2132                    builder = builder.attr("version", state.version.to_string());
2133                }
2134                collection_nodes.push(builder.build());
2135            }
2136
2137            let sync_node = NodeBuilder::new("sync").children(collection_nodes).build();
2138            let iq = crate::request::InfoQuery {
2139                namespace: "w:sync:app:state",
2140                query_type: crate::request::InfoQueryType::Set,
2141                to: server_jid().clone(),
2142                target: None,
2143                id: None,
2144                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2145                timeout: Some(Duration::from_secs(30)),
2146            };
2147
2148            let resp = self.send_iq(iq).await?;
2149
2150            // Pre-download all external blobs for all collections in the response
2151            let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2152                std::collections::HashMap::new();
2153
2154            if let Ok(patch_lists) = wacore::appstate::patch_decode::parse_patch_lists(&resp) {
2155                for pl in &patch_lists {
2156                    // Download external snapshot
2157                    if let Some(ext) = &pl.snapshot_ref
2158                        && let Some(path) = &ext.direct_path
2159                    {
2160                        match self.download(ext).await {
2161                            Ok(bytes) => {
2162                                pre_downloaded.insert(path.clone(), bytes);
2163                            }
2164                            Err(e) => {
2165                                warn!(
2166                                    "Failed to download external snapshot for {:?}: {e}",
2167                                    pl.name
2168                                );
2169                            }
2170                        }
2171                    }
2172
2173                    // Download external mutations
2174                    for patch in &pl.patches {
2175                        if let Some(ext) = &patch.external_mutations
2176                            && let Some(path) = &ext.direct_path
2177                        {
2178                            match self.download(ext).await {
2179                                Ok(bytes) => {
2180                                    pre_downloaded.insert(path.clone(), bytes);
2181                                }
2182                                Err(e) => {
2183                                    let v =
2184                                        patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2185                                    warn!(
2186                                        "Failed to download external mutations for patch v{}: {e}",
2187                                        v
2188                                    );
2189                                }
2190                            }
2191                        }
2192                    }
2193                }
2194            }
2195
2196            let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2197                if let Some(path) = &ext.direct_path {
2198                    if let Some(bytes) = pre_downloaded.get(path) {
2199                        Ok(bytes.clone())
2200                    } else {
2201                        Err(anyhow::anyhow!(
2202                            "external blob not pre-downloaded: {}",
2203                            path
2204                        ))
2205                    }
2206                } else {
2207                    Err(anyhow::anyhow!("external blob has no directPath"))
2208                }
2209            };
2210
2211            // Parse and process all collections from the response
2212            let proc = self.get_app_state_processor().await;
2213            let results = proc.decode_multi_patch_list(&resp, &download, true).await?;
2214
2215            let mut needs_refetch = Vec::new();
2216
2217            for (mutations, new_state, list) in results {
2218                let name = list.name;
2219
2220                // Handle per-collection errors
2221                if let Some(ref err) = list.error {
2222                    match err {
2223                        CollectionSyncError::Conflict { has_more } => {
2224                            if *has_more {
2225                                // ConflictHasMore: server has more patches, must refetch.
2226                                warn!(target: "Client/AppState", "Collection {:?} conflict (has_more=true), will refetch", name);
2227                                needs_refetch.push(name);
2228                            } else {
2229                                // Conflict without has_more: WA Web treats this as success
2230                                // when there are no pending mutations to push (which is
2231                                // always the case for us since we don't push app state).
2232                                debug!(target: "Client/AppState", "Collection {:?} conflict (has_more=false), treating as success (no pending mutations)", name);
2233                            }
2234                            continue;
2235                        }
2236                        CollectionSyncError::Fatal { code, text } => {
2237                            warn!(target: "Client/AppState", "Collection {:?} fatal error {}: {}", name, code, text);
2238                            continue;
2239                        }
2240                        CollectionSyncError::Retry { code, text } => {
2241                            warn!(target: "Client/AppState", "Collection {:?} retryable error {}: {}, will refetch", name, code, text);
2242                            needs_refetch.push(name);
2243                            continue;
2244                        }
2245                    }
2246                }
2247
2248                // Handle missing keys
2249                let missing = match proc.get_missing_key_ids(&list).await {
2250                    Ok(v) => v,
2251                    Err(e) => {
2252                        warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2253                        Vec::new()
2254                    }
2255                };
2256                if !missing.is_empty() {
2257                    let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2258                    let mut guard = self.app_state_key_requests.lock().await;
2259                    let now = wacore::time::Instant::now();
2260                    for key_id in missing {
2261                        let hex_id = hex::encode(&key_id);
2262                        let should = guard
2263                            .get(&hex_id)
2264                            .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2265                            .unwrap_or(true);
2266                        if should {
2267                            guard.insert(hex_id, now);
2268                            to_request.push(key_id);
2269                        }
2270                    }
2271                    // Evict stale entries to prevent unbounded growth over long sessions
2272                    guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2273                    drop(guard);
2274                    if !to_request.is_empty() {
2275                        self.request_app_state_keys(&to_request).await;
2276                    }
2277                }
2278
2279                // full_sync is true only when this collection had a snapshot
2280                // (version was 0 before sync). This prevents server_sync-triggered
2281                // incremental syncs from being incorrectly marked as full syncs.
2282                let full_sync = was_snapshot.contains(&name);
2283                for m in mutations {
2284                    self.dispatch_app_state_mutation(&m, full_sync).await;
2285                }
2286
2287                // Save version
2288                backend
2289                    .set_version(name.as_str(), new_state.clone())
2290                    .await?;
2291
2292                // Check if this collection needs more patches
2293                if list.has_more_patches {
2294                    needs_refetch.push(name);
2295                }
2296
2297                debug!(
2298                    target: "Client/AppState",
2299                    "Batched sync: {:?} done (version={}, has_more={})",
2300                    name, new_state.version, list.has_more_patches
2301                );
2302            }
2303
2304            pending = needs_refetch;
2305        }
2306
2307        if !pending.is_empty() {
2308            warn!(
2309                target: "Client/AppState",
2310                "Batched sync: max iterations ({}) reached for {:?}",
2311                MAX_ITERATIONS, pending
2312            );
2313        }
2314
2315        Ok(())
2316    }
2317
2318    pub(crate) async fn process_app_state_sync_task(
2319        &self,
2320        name: WAPatchName,
2321        full_sync: bool,
2322    ) -> anyhow::Result<()> {
2323        if self.is_shutting_down() {
2324            debug!(target: "Client/AppState", "Skipping app state sync task {:?}: client is shutting down", name);
2325            return Ok(());
2326        }
2327
2328        let backend = self.persistence_manager.backend();
2329        let mut full_sync = full_sync;
2330
2331        let mut state = backend.get_version(name.as_str()).await?;
2332        if state.version == 0 {
2333            full_sync = true;
2334        }
2335
2336        let mut has_more = true;
2337        let mut want_snapshot = full_sync;
2338        // Safety cap to prevent infinite loops if the server keeps returning
2339        // has_more_patches=true without advancing the version (WA Web uses 500).
2340        const MAX_PAGINATION_ITERATIONS: u32 = 500;
2341        let mut iteration = 0u32;
2342
2343        while has_more {
2344            if self.is_shutting_down() {
2345                debug!(target: "Client/AppState", "Stopping app state sync task {:?}: shutdown detected", name);
2346                break;
2347            }
2348            iteration += 1;
2349            if iteration > MAX_PAGINATION_ITERATIONS {
2350                warn!(target: "Client/AppState", "App state sync for {:?} exceeded {} iterations, aborting", name, MAX_PAGINATION_ITERATIONS);
2351                break;
2352            }
2353            debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
2354
2355            let mut collection_builder = NodeBuilder::new("collection")
2356                .attr("name", name.as_str())
2357                .attr(
2358                    "return_snapshot",
2359                    if want_snapshot { "true" } else { "false" },
2360                );
2361            if !want_snapshot {
2362                collection_builder = collection_builder.attr("version", state.version.to_string());
2363            }
2364            let sync_node = NodeBuilder::new("sync")
2365                .children([collection_builder.build()])
2366                .build();
2367            let iq = crate::request::InfoQuery {
2368                namespace: "w:sync:app:state",
2369                query_type: crate::request::InfoQueryType::Set,
2370                to: server_jid().clone(),
2371                target: None,
2372                id: None,
2373                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2374                timeout: None,
2375            };
2376
2377            let resp = self.send_iq(iq).await?;
2378            if self.is_shutting_down() {
2379                debug!(target: "Client/AppState", "Discarding app state sync response for {:?}: shutdown detected", name);
2380                break;
2381            }
2382            debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
2383
2384            let _decode_start = wacore::time::Instant::now();
2385
2386            // Pre-download all external blobs (snapshot and patch mutations)
2387            // We use directPath as the key to identify each blob
2388            let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2389                std::collections::HashMap::new();
2390
2391            if let Ok(pl) = wacore::appstate::patch_decode::parse_patch_list(&resp) {
2392                debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={} patches_count={}",
2393                    name, pl.snapshot_ref.is_some(), pl.has_more_patches, pl.patches.len());
2394
2395                // Download external snapshot if present
2396                if let Some(ext) = &pl.snapshot_ref
2397                    && let Some(path) = &ext.direct_path
2398                {
2399                    match self.download(ext).await {
2400                        Ok(bytes) => {
2401                            debug!(target: "Client/AppState", "Downloaded external snapshot ({} bytes)", bytes.len());
2402                            pre_downloaded.insert(path.clone(), bytes);
2403                        }
2404                        Err(e) => {
2405                            warn!("Failed to download external snapshot: {e}");
2406                        }
2407                    }
2408                }
2409
2410                // Download external mutations for each patch that has them
2411                for patch in &pl.patches {
2412                    if let Some(ext) = &patch.external_mutations
2413                        && let Some(path) = &ext.direct_path
2414                    {
2415                        let patch_version =
2416                            patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2417                        match self.download(ext).await {
2418                            Ok(bytes) => {
2419                                debug!(target: "Client/AppState", "Downloaded external mutations for patch v{} ({} bytes)", patch_version, bytes.len());
2420                                pre_downloaded.insert(path.clone(), bytes);
2421                            }
2422                            Err(e) => {
2423                                warn!(
2424                                    "Failed to download external mutations for patch v{}: {e}",
2425                                    patch_version
2426                                );
2427                            }
2428                        }
2429                    }
2430                }
2431            }
2432
2433            let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2434                if let Some(path) = &ext.direct_path {
2435                    if let Some(bytes) = pre_downloaded.get(path) {
2436                        Ok(bytes.clone())
2437                    } else {
2438                        Err(anyhow::anyhow!(
2439                            "external blob not pre-downloaded: {}",
2440                            path
2441                        ))
2442                    }
2443                } else {
2444                    Err(anyhow::anyhow!("external blob has no directPath"))
2445                }
2446            };
2447
2448            let proc = self.get_app_state_processor().await;
2449            let (mutations, new_state, list) =
2450                proc.decode_patch_list(&resp, &download, true).await?;
2451            let decode_elapsed = _decode_start.elapsed();
2452            if decode_elapsed.as_millis() > 500 {
2453                debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
2454            }
2455
2456            let missing = match proc.get_missing_key_ids(&list).await {
2457                Ok(v) => v,
2458                Err(e) => {
2459                    warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2460                    Vec::new()
2461                }
2462            };
2463            if !missing.is_empty() {
2464                let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2465                let mut guard = self.app_state_key_requests.lock().await;
2466                let now = wacore::time::Instant::now();
2467                for key_id in missing {
2468                    let hex_id = hex::encode(&key_id);
2469                    let should = guard
2470                        .get(&hex_id)
2471                        .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2472                        .unwrap_or(true);
2473                    if should {
2474                        guard.insert(hex_id, now);
2475                        to_request.push(key_id);
2476                    }
2477                }
2478                // Evict stale entries to prevent unbounded growth over long sessions
2479                guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2480                drop(guard);
2481                if !to_request.is_empty() {
2482                    self.request_app_state_keys(&to_request).await;
2483                }
2484            }
2485
2486            for m in mutations {
2487                debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
2488                self.dispatch_app_state_mutation(&m, full_sync).await;
2489            }
2490
2491            state = new_state;
2492            has_more = list.has_more_patches;
2493            // After the first batch, never request a snapshot again — only incremental patches.
2494            want_snapshot = false;
2495            debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more} new_version={}", name, state.version);
2496        }
2497
2498        backend.set_version(name.as_str(), state.clone()).await?;
2499
2500        debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
2501        Ok(())
2502    }
2503
2504    async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
2505        if raw_key_ids.is_empty() {
2506            return;
2507        }
2508        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2509        let own_jid = match device_snapshot.pn.clone() {
2510            Some(j) => j,
2511            None => return,
2512        };
2513        let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
2514            .iter()
2515            .map(|k| wa::message::AppStateSyncKeyId {
2516                key_id: Some(k.clone()),
2517            })
2518            .collect();
2519        let msg = wa::Message {
2520            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
2521                r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
2522                app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
2523                ..Default::default()
2524            })),
2525            ..Default::default()
2526        };
2527        if let Err(e) = self
2528            .send_message_impl(
2529                own_jid,
2530                &msg,
2531                Some(self.generate_message_id().await),
2532                true,
2533                false,
2534                None,
2535                vec![],
2536            )
2537            .await
2538        {
2539            warn!("Failed to send app state key request: {e}");
2540        }
2541    }
2542
2543    /// Send an app state patch to the server for a given collection.
2544    ///
2545    /// Builds the IQ stanza and sends it. Returns the updated hash state.
2546    pub(crate) async fn send_app_state_patch(
2547        &self,
2548        collection_name: &str,
2549        mutations: Vec<(wa::SyncdMutation, Vec<u8>)>,
2550    ) -> Result<()> {
2551        let proc = self.get_app_state_processor().await;
2552        let (patch_bytes, base_version) = proc.build_patch(collection_name, mutations).await?;
2553
2554        let collection_node = NodeBuilder::new("collection")
2555            .attr("name", collection_name)
2556            .attr("version", base_version.to_string())
2557            .attr("return_snapshot", "false")
2558            .children([NodeBuilder::new("patch").bytes(patch_bytes).build()])
2559            .build();
2560        let sync_node = NodeBuilder::new("sync").children([collection_node]).build();
2561        let iq = crate::request::InfoQuery {
2562            namespace: "w:sync:app:state",
2563            query_type: crate::request::InfoQueryType::Set,
2564            to: server_jid().clone(),
2565            target: None,
2566            id: None,
2567            content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2568            timeout: None,
2569        };
2570
2571        self.send_iq(iq).await?;
2572
2573        // Re-sync to get the latest state from the server after our patch was accepted.
2574        // This matches whatsmeow's behavior: fetchAppState after successful send.
2575        if let Ok(patch_name) = collection_name.parse::<WAPatchName>()
2576            && let Err(e) = self.fetch_app_state_with_retry(patch_name).await
2577        {
2578            log::warn!("Failed to re-sync {collection_name} after patch send: {e}");
2579        }
2580
2581        Ok(())
2582    }
2583
2584    async fn dispatch_app_state_mutation(
2585        &self,
2586        m: &crate::appstate_sync::Mutation,
2587        full_sync: bool,
2588    ) {
2589        use wacore::types::events::Event;
2590
2591        if m.operation != wa::syncd_mutation::SyncdOperation::Set {
2592            return;
2593        }
2594        if m.index.is_empty() {
2595            return;
2596        }
2597
2598        // Delegate chat-related mutations (mute, pin, archive, star, contact, etc.)
2599        if crate::features::chat_actions::dispatch_chat_mutation(&self.core.event_bus, m, full_sync)
2600        {
2601            return;
2602        }
2603
2604        // Handle client-internal mutations that need persistence/presence access
2605        if m.index[0] == "setting_pushName"
2606            && let Some(val) = &m.action_value
2607            && let Some(act) = &val.push_name_setting
2608            && let Some(new_name) = &act.name
2609        {
2610            let new_name = new_name.clone();
2611            let bus = self.core.event_bus.clone();
2612
2613            let snapshot = self.persistence_manager.get_device_snapshot().await;
2614            let old = snapshot.push_name.clone();
2615            if old != new_name {
2616                debug!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
2617                self.persistence_manager
2618                    .process_command(DeviceCommand::SetPushName(new_name.clone()))
2619                    .await;
2620                bus.dispatch(&Event::SelfPushNameUpdated(
2621                    crate::types::events::SelfPushNameUpdated {
2622                        from_server: true,
2623                        old_name: old.clone(),
2624                        new_name: new_name.clone(),
2625                    },
2626                ));
2627
2628                // WhatsApp Web sends presence immediately when receiving pushname
2629                if old.is_empty() && !new_name.is_empty() {
2630                    debug!(target: "Client/AppState", "Sending presence after receiving initial pushname from app state sync");
2631                    if let Err(e) = self.presence().set_available().await {
2632                        warn!(target: "Client/AppState", "Failed to send presence after pushname sync: {e:?}");
2633                    }
2634                }
2635            } else {
2636                debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
2637            }
2638        }
2639    }
2640
2641    async fn expect_disconnect(&self) {
2642        self.expected_disconnect.store(true, Ordering::Relaxed);
2643    }
2644
2645    pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
2646        self.is_logged_in.store(false, Ordering::Relaxed);
2647
2648        let mut attrs = node.attrs();
2649        let code_cow = attrs.optional_string("code");
2650        let code = code_cow.as_deref().unwrap_or("");
2651        let conflict_type = node
2652            .get_optional_child("conflict")
2653            .map(|n| {
2654                n.attrs()
2655                    .optional_string("type")
2656                    .as_deref()
2657                    .unwrap_or("")
2658                    .to_string()
2659            })
2660            .unwrap_or_default();
2661
2662        if !conflict_type.is_empty() {
2663            info!(
2664                "Got stream error indicating client was removed or replaced (conflict={}). Logging out.",
2665                conflict_type
2666            );
2667            self.expect_disconnect().await;
2668            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2669
2670            let event = if conflict_type == "replaced" {
2671                Event::StreamReplaced(crate::types::events::StreamReplaced)
2672            } else {
2673                Event::LoggedOut(crate::types::events::LoggedOut {
2674                    on_connect: false,
2675                    reason: ConnectFailureReason::LoggedOut,
2676                })
2677            };
2678            self.core.event_bus.dispatch(&event);
2679
2680            let transport_opt = self.transport.lock().await.clone();
2681            if let Some(transport) = transport_opt {
2682                self.runtime
2683                    .spawn(Box::pin(async move {
2684                        info!("Disconnecting transport after conflict");
2685                        transport.disconnect().await;
2686                    }))
2687                    .detach();
2688            }
2689        } else {
2690            match code {
2691                "515" => {
2692                    // 515 is expected during registration/pairing phase - server closes stream after pairing
2693                    info!(
2694                        "Got 515 stream error, server is closing stream (expected after pairing). Will auto-reconnect."
2695                    );
2696                    self.expect_disconnect().await;
2697                    // Proactively disconnect transport since server may not close the connection
2698                    // Clone the transport Arc before spawning to avoid holding the lock
2699                    let transport_opt = self.transport.lock().await.clone();
2700                    if let Some(transport) = transport_opt {
2701                        // Spawn disconnect in background so we don't block the message loop
2702                        self.runtime
2703                            .spawn(Box::pin(async move {
2704                                info!("Disconnecting transport after 515");
2705                                transport.disconnect().await;
2706                            }))
2707                            .detach();
2708                    }
2709                }
2710                "516" => {
2711                    info!("Got 516 stream error (device removed). Logging out.");
2712                    self.expect_disconnect().await;
2713                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2714                    self.core.event_bus.dispatch(&Event::LoggedOut(
2715                        crate::types::events::LoggedOut {
2716                            on_connect: false,
2717                            reason: ConnectFailureReason::LoggedOut,
2718                        },
2719                    ));
2720
2721                    let transport_opt = self.transport.lock().await.clone();
2722                    if let Some(transport) = transport_opt {
2723                        self.runtime
2724                            .spawn(Box::pin(async move {
2725                                info!("Disconnecting transport after 516");
2726                                transport.disconnect().await;
2727                            }))
2728                            .detach();
2729                    }
2730                }
2731                "401" => {
2732                    // 401: unauthorized — session invalid, needs re-authentication.
2733                    // Matches WA Web's handling of unauthorized stream errors.
2734                    info!("Got 401 stream error (unauthorized). Logging out.");
2735                    self.expect_disconnect().await;
2736                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2737                    self.core.event_bus.dispatch(&Event::LoggedOut(
2738                        crate::types::events::LoggedOut {
2739                            on_connect: false,
2740                            reason: ConnectFailureReason::LoggedOut,
2741                        },
2742                    ));
2743
2744                    let transport_opt = self.transport.lock().await.clone();
2745                    if let Some(transport) = transport_opt {
2746                        self.runtime
2747                            .spawn(Box::pin(async move {
2748                                info!("Disconnecting transport after 401");
2749                                transport.disconnect().await;
2750                            }))
2751                            .detach();
2752                    }
2753                }
2754                "409" => {
2755                    // 409: conflict — another client instance connected.
2756                    // Same semantics as conflict child element but via code.
2757                    info!("Got 409 stream error (conflict). Another session replaced this one.");
2758                    self.expect_disconnect().await;
2759                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2760                    self.core
2761                        .event_bus
2762                        .dispatch(&Event::StreamReplaced(crate::types::events::StreamReplaced));
2763
2764                    let transport_opt = self.transport.lock().await.clone();
2765                    if let Some(transport) = transport_opt {
2766                        self.runtime
2767                            .spawn(Box::pin(async move {
2768                                info!("Disconnecting transport after 409");
2769                                transport.disconnect().await;
2770                            }))
2771                            .detach();
2772                    }
2773                }
2774                "429" => {
2775                    // 429: rate limited — server is throttling connections.
2776                    // Auto-reconnect with extended backoff.
2777                    warn!(
2778                        "Got 429 stream error (rate limited). Will auto-reconnect with extended backoff."
2779                    );
2780                    self.auto_reconnect_errors.fetch_add(5, Ordering::Relaxed);
2781                }
2782                "503" => {
2783                    info!("Got 503 service unavailable, will auto-reconnect.");
2784                }
2785                _ => {
2786                    error!("Unknown stream error: {}", DisplayableNode(node));
2787                    self.expect_disconnect().await;
2788                    self.core.event_bus.dispatch(&Event::StreamError(
2789                        crate::types::events::StreamError {
2790                            code: code.to_string(),
2791                            raw: Some(node.clone()),
2792                        },
2793                    ));
2794                }
2795            }
2796        }
2797
2798        info!("Notifying shutdown from stream error handler");
2799        self.shutdown_notifier.notify(usize::MAX);
2800    }
2801
2802    pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
2803        self.expected_disconnect.store(true, Ordering::Relaxed);
2804        self.shutdown_notifier.notify(usize::MAX);
2805
2806        let mut attrs = node.attrs();
2807        let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
2808        let reason = ConnectFailureReason::from(reason_code);
2809
2810        if reason.should_reconnect() {
2811            self.expected_disconnect.store(false, Ordering::Relaxed);
2812        } else {
2813            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2814        }
2815
2816        if reason.is_logged_out() {
2817            info!("Got {reason:?} connect failure, logging out.");
2818            self.core
2819                .event_bus
2820                .dispatch(&wacore::types::events::Event::LoggedOut(
2821                    crate::types::events::LoggedOut {
2822                        on_connect: true,
2823                        reason,
2824                    },
2825                ));
2826        } else if let ConnectFailureReason::TempBanned = reason {
2827            let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
2828            let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
2829            let expire_duration =
2830                chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
2831            warn!("Temporary ban connect failure: {}", DisplayableNode(node));
2832            self.core.event_bus.dispatch(&Event::TemporaryBan(
2833                crate::types::events::TemporaryBan {
2834                    code: crate::types::events::TempBanReason::from(ban_code),
2835                    expire: expire_duration,
2836                },
2837            ));
2838        } else if let ConnectFailureReason::ClientOutdated = reason {
2839            error!("Client is outdated and was rejected by server.");
2840            self.core
2841                .event_bus
2842                .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
2843        } else {
2844            warn!("Unknown connect failure: {}", DisplayableNode(node));
2845            self.core.event_bus.dispatch(&Event::ConnectFailure(
2846                crate::types::events::ConnectFailure {
2847                    reason,
2848                    message: attrs
2849                        .optional_string("message")
2850                        .as_deref()
2851                        .unwrap_or("")
2852                        .to_string(),
2853                    raw: Some(node.clone()),
2854                },
2855            ));
2856        }
2857    }
2858
2859    pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
2860        if node.attrs.get("type").is_some_and(|s| s == "get")
2861            && (node.get_optional_child("ping").is_some()
2862                || node
2863                    .attrs
2864                    .get("xmlns")
2865                    .is_some_and(|s| s == "urn:xmpp:ping"))
2866        {
2867            info!("Received ping, sending pong.");
2868            let mut parser = node.attrs();
2869            let from_jid = parser.jid("from");
2870            let id = parser.optional_string("id").map(|s| s.to_string());
2871            let pong = build_pong(from_jid.to_string(), id.as_deref());
2872            if let Err(e) = self.send_node(pong).await {
2873                warn!("Failed to send pong: {e:?}");
2874            }
2875            return true;
2876        }
2877
2878        // Pass Node directly to pair handling
2879        if pair::handle_iq(self, node).await {
2880            return true;
2881        }
2882
2883        false
2884    }
2885
2886    pub fn is_connected(&self) -> bool {
2887        self.is_connected.load(Ordering::Acquire)
2888    }
2889
2890    pub fn is_logged_in(&self) -> bool {
2891        self.is_logged_in.load(Ordering::Relaxed)
2892    }
2893
2894    /// Register a waiter for an incoming node matching the given filter.
2895    ///
2896    /// Returns a receiver that resolves when a matching node arrives.
2897    /// The waiter starts buffering immediately, so register it **before**
2898    /// performing the action that triggers the expected node.
2899    ///
2900    /// When multiple waiters match the same node, each matching waiter
2901    /// receives a clone of the node (broadcast within a single resolve pass).
2902    ///
2903    /// # Example
2904    /// ```ignore
2905    /// let waiter = client.wait_for_node(
2906    ///     NodeFilter::tag("notification").attr("type", "w:gp2"),
2907    /// );
2908    /// client.groups().add_participants(&group_jid, &[jid_c]).await?;
2909    /// let node = waiter.await.expect("notification arrived");
2910    /// ```
2911    pub fn wait_for_node(
2912        &self,
2913        filter: NodeFilter,
2914    ) -> futures::channel::oneshot::Receiver<Arc<Node>> {
2915        let (tx, rx) = futures::channel::oneshot::channel();
2916        self.node_waiter_count.fetch_add(1, Ordering::Release);
2917        let mut waiters = self
2918            .node_waiters
2919            .lock()
2920            .unwrap_or_else(|poisoned| poisoned.into_inner());
2921        waiters.push(NodeWaiter { filter, tx });
2922        rx
2923    }
2924
2925    /// Check pending node waiters against an incoming node.
2926    /// Only called when `node_waiter_count > 0`.
2927    fn resolve_node_waiters(&self, node: &Arc<Node>) {
2928        let mut waiters = self
2929            .node_waiters
2930            .lock()
2931            .unwrap_or_else(|poisoned| poisoned.into_inner());
2932        let mut i = 0;
2933        while i < waiters.len() {
2934            if waiters[i].tx.is_canceled() {
2935                // Receiver dropped — clean up
2936                waiters.swap_remove(i);
2937                self.node_waiter_count.fetch_sub(1, Ordering::Release);
2938            } else if waiters[i].filter.matches(node) {
2939                // Match found — remove and send
2940                let w = waiters.swap_remove(i);
2941                self.node_waiter_count.fetch_sub(1, Ordering::Release);
2942                let _ = w.tx.send(Arc::clone(node));
2943            } else {
2944                i += 1;
2945            }
2946        }
2947    }
2948
2949    pub(crate) fn update_server_time_offset(&self, node: &wacore_binary::node::Node) {
2950        self.unified_session.update_server_time_offset(node);
2951    }
2952
2953    pub(crate) async fn send_unified_session(&self) {
2954        if !self.is_connected() {
2955            debug!(target: "Client/UnifiedSession", "Skipping: not connected");
2956            return;
2957        }
2958
2959        let Some((node, _sequence)) = self.unified_session.prepare_send().await else {
2960            return;
2961        };
2962
2963        if let Err(e) = self.send_node(node).await {
2964            debug!(target: "Client/UnifiedSession", "Send failed: {e}");
2965            self.unified_session.clear_last_sent().await;
2966        }
2967    }
2968
2969    /// Waits for the noise socket to be established.
2970    ///
2971    /// Returns `Ok(())` when the socket is ready, or `Err` on timeout.
2972    /// This is useful for code that needs to send messages before login,
2973    /// such as requesting a pair code during initial pairing.
2974    ///
2975    /// If the socket is already connected, returns immediately.
2976    pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
2977        // Fast path: already connected
2978        if self.is_connected() {
2979            return Ok(());
2980        }
2981
2982        // Register waiter and re-check to avoid race condition:
2983        // If socket becomes ready between checks, the notified future captures it.
2984        let notified = self.socket_ready_notifier.listen();
2985        if self.is_connected() {
2986            return Ok(());
2987        }
2988
2989        rt_timeout(&*self.runtime, timeout, notified)
2990            .await
2991            .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
2992    }
2993
2994    /// Waits for the client to establish a connection and complete login.
2995    ///
2996    /// Returns `Ok(())` when connected, or `Err` on timeout.
2997    /// This is useful for code that needs to run after connection is established
2998    /// and authentication is complete.
2999    ///
3000    /// If the client is already connected and logged in, returns immediately.
3001    pub async fn wait_for_connected(
3002        &self,
3003        timeout: std::time::Duration,
3004    ) -> Result<(), anyhow::Error> {
3005        // Fast path: fully ready (connected + logged in + critical sync done).
3006        if self.is_fully_ready() {
3007            return Ok(());
3008        }
3009
3010        // Register waiter and re-check to avoid TOCTOU race:
3011        // dispatch_connected() could fire between the check above and notified() registration.
3012        let notified = self.connected_notifier.listen();
3013        if self.is_fully_ready() {
3014            return Ok(());
3015        }
3016
3017        rt_timeout(&*self.runtime, timeout, notified)
3018            .await
3019            .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
3020    }
3021
3022    /// Get access to the PersistenceManager for this client.
3023    /// This is useful for multi-account scenarios to get the device ID.
3024    pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
3025        self.persistence_manager.clone()
3026    }
3027
3028    pub async fn edit_message(
3029        &self,
3030        to: Jid,
3031        original_id: impl Into<String>,
3032        new_content: wa::Message,
3033    ) -> Result<String, anyhow::Error> {
3034        let original_id = original_id.into();
3035
3036        // WhatsApp Web uses getMeUserLidOrJidForChat(chat, EditMessage) which
3037        // returns LID for LID-addressing groups and PN otherwise.
3038        let participant = if to.is_group() {
3039            Some(
3040                self.get_own_jid_for_group(&to)
3041                    .await?
3042                    .to_non_ad()
3043                    .to_string(),
3044            )
3045        } else {
3046            if self.get_pn().await.is_none() {
3047                return Err(anyhow::Error::from(ClientError::NotLoggedIn));
3048            }
3049            None
3050        };
3051
3052        let edit_container_message = wa::Message {
3053            edited_message: Some(Box::new(wa::message::FutureProofMessage {
3054                message: Some(Box::new(wa::Message {
3055                    protocol_message: Some(Box::new(wa::message::ProtocolMessage {
3056                        key: Some(wa::MessageKey {
3057                            remote_jid: Some(to.to_string()),
3058                            from_me: Some(true),
3059                            id: Some(original_id.clone()),
3060                            participant,
3061                        }),
3062                        r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
3063                        edited_message: Some(Box::new(new_content)),
3064                        timestamp_ms: Some(wacore::time::now_millis()),
3065                        ..Default::default()
3066                    })),
3067                    ..Default::default()
3068                })),
3069            })),
3070            ..Default::default()
3071        };
3072
3073        // Use a new stanza ID instead of reusing the original message ID.
3074        // The original message ID is already embedded in protocolMessage.key.id
3075        // inside the encrypted payload. Reusing it as the outer stanza ID causes
3076        // the server to deduplicate against the original message and silently
3077        // drop the edit.
3078        self.send_message_impl(
3079            to,
3080            &edit_container_message,
3081            None,
3082            false,
3083            false,
3084            Some(crate::types::message::EditAttribute::MessageEdit),
3085            vec![],
3086        )
3087        .await?;
3088
3089        Ok(original_id)
3090    }
3091
3092    pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
3093        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
3094        let noise_socket = match noise_socket_arc {
3095            Some(socket) => socket,
3096            None => return Err(ClientError::NotConnected),
3097        };
3098
3099        debug!(target: "Client/Send", "{}", DisplayableNode(&node));
3100
3101        let mut plaintext_buf = Vec::with_capacity(1024);
3102
3103        if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
3104            error!("Failed to marshal node: {e:?}");
3105            return Err(SocketError::Crypto("Marshal error".to_string()).into());
3106        }
3107
3108        // Size based on plaintext + encryption overhead (16 byte tag + 3 byte frame header)
3109        let encrypted_buf = Vec::with_capacity(plaintext_buf.len() + 32);
3110
3111        if let Err(e) = noise_socket
3112            .encrypt_and_send(plaintext_buf, encrypted_buf)
3113            .await
3114        {
3115            return Err(e.into());
3116        }
3117
3118        // WA Web: callStanza → deadSocketTimer.onOrBefore(deadSocketTime, socketId)
3119        self.last_data_sent_ms
3120            .store(wacore::time::now_millis() as u64, Ordering::Relaxed);
3121
3122        Ok(())
3123    }
3124
3125    pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
3126        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3127        let old_name = device_snapshot.push_name.clone();
3128
3129        if old_name == new_name {
3130            return;
3131        }
3132
3133        log::debug!("Updating push name from '{}' -> '{}'", old_name, new_name);
3134        self.persistence_manager
3135            .process_command(DeviceCommand::SetPushName(new_name.clone()))
3136            .await;
3137
3138        self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
3139            crate::types::events::SelfPushNameUpdated {
3140                from_server: true,
3141                old_name,
3142                new_name: new_name.clone(),
3143            },
3144        ));
3145
3146        let client_clone = self.clone();
3147        self.runtime
3148            .spawn(Box::pin(async move {
3149                if let Err(e) = client_clone.presence().set_available().await {
3150                    log::warn!("Failed to send presence after push name update: {:?}", e);
3151                } else {
3152                    log::debug!("Sent presence after push name update.");
3153                }
3154            }))
3155            .detach();
3156    }
3157
3158    pub async fn get_push_name(&self) -> String {
3159        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3160        device_snapshot.push_name.clone()
3161    }
3162
3163    pub async fn get_pn(&self) -> Option<Jid> {
3164        let snapshot = self.persistence_manager.get_device_snapshot().await;
3165        snapshot.pn.clone()
3166    }
3167
3168    pub async fn get_lid(&self) -> Option<Jid> {
3169        let snapshot = self.persistence_manager.get_device_snapshot().await;
3170        snapshot.lid.clone()
3171    }
3172
3173    /// Resolve our own JID for a group, respecting its addressing mode.
3174    ///
3175    /// Returns LID for LID-addressing groups, PN otherwise.
3176    /// Matches WhatsApp Web's `getMeUserLidOrJidForChat`.
3177    pub(crate) async fn get_own_jid_for_group(
3178        &self,
3179        group_jid: &Jid,
3180    ) -> Result<Jid, anyhow::Error> {
3181        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3182        let own_pn = device_snapshot
3183            .pn
3184            .clone()
3185            .ok_or_else(|| anyhow::Error::from(ClientError::NotLoggedIn))?;
3186
3187        let addressing_mode = self
3188            .groups()
3189            .query_info(group_jid)
3190            .await
3191            .map(|info| info.addressing_mode)
3192            .unwrap_or(crate::types::message::AddressingMode::Pn);
3193
3194        Ok(match addressing_mode {
3195            crate::types::message::AddressingMode::Lid => {
3196                device_snapshot.lid.clone().unwrap_or(own_pn)
3197            }
3198            crate::types::message::AddressingMode::Pn => own_pn,
3199        })
3200    }
3201
3202    /// Creates a normalized StanzaKey by resolving PN to LID JIDs.
3203    pub(crate) async fn make_stanza_key(&self, chat: Jid, id: String) -> StanzaKey {
3204        // Resolve chat JID to LID if possible
3205        let chat = self.resolve_encryption_jid(&chat).await;
3206
3207        StanzaKey { chat, id }
3208    }
3209
3210    // get_phone_number_from_lid is in client/lid_pn.rs
3211
3212    pub(crate) async fn send_protocol_receipt(
3213        &self,
3214        id: String,
3215        receipt_type: crate::types::presence::ReceiptType,
3216    ) {
3217        if id.is_empty() {
3218            return;
3219        }
3220        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3221        if let Some(own_jid) = &device_snapshot.pn {
3222            let type_str = match receipt_type {
3223                crate::types::presence::ReceiptType::HistorySync => "hist_sync",
3224                crate::types::presence::ReceiptType::Read => "read",
3225                crate::types::presence::ReceiptType::ReadSelf => "read-self",
3226                crate::types::presence::ReceiptType::Delivered => "delivery",
3227                crate::types::presence::ReceiptType::Played => "played",
3228                crate::types::presence::ReceiptType::PlayedSelf => "played-self",
3229                crate::types::presence::ReceiptType::Inactive => "inactive",
3230                crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
3231                crate::types::presence::ReceiptType::Sender => "sender",
3232                crate::types::presence::ReceiptType::ServerError => "server-error",
3233                crate::types::presence::ReceiptType::Retry => "retry",
3234                crate::types::presence::ReceiptType::EncRekeyRetry => "enc_rekey_retry",
3235                crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
3236            };
3237
3238            let node = NodeBuilder::new("receipt")
3239                .attrs([
3240                    ("id", id),
3241                    ("type", type_str.to_string()),
3242                    ("to", own_jid.to_non_ad().to_string()),
3243                ])
3244                .build();
3245
3246            if let Err(e) = self.send_node(node).await {
3247                warn!(
3248                    "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
3249                    receipt_type, self.unique_id, e
3250                );
3251            }
3252        }
3253    }
3254}
3255
3256/// Builds a pong response node for a server-initiated ping.
3257///
3258/// Matches WhatsApp Web (`WAWebCommsHandleStanza`): only includes `id`
3259/// when the server ping carried one.
3260fn build_pong(to: String, id: Option<&str>) -> wacore_binary::node::Node {
3261    let mut builder = NodeBuilder::new("iq").attr("to", to).attr("type", "result");
3262    if let Some(id) = id {
3263        builder = builder.attr("id", id);
3264    }
3265    builder.build()
3266}
3267
3268/// Build an `<ack/>` for the given stanza, matching WA Web / whatsmeow behavior:
3269///
3270/// - `class` = original stanza tag
3271/// - `id`, `to` (flipped from `from`), `participant` copied from original
3272/// - `from` = own device PN, only for message acks
3273/// - `type` echoed for non-message stanzas (whatsmeow: `node.Tag != "message"`),
3274///   except `notification type="encrypt"` with `<identity/>` child (WA Web drops type there).
3275///
3276/// For receipt acks, WA Web uses `MAYBE_CUSTOM_STRING(ackString)` where
3277/// `ackString = maybeAttrString("type")` — so `type` is only included when
3278/// explicitly present on the incoming receipt (delivery receipts normally
3279/// have no type attribute, meaning the ack also has no type).
3280fn build_ack_node(node: &Node, own_device_pn: Option<&Jid>) -> Option<Node> {
3281    let id = node.attrs.get("id")?.clone();
3282    let from = node.attrs.get("from")?.clone();
3283    let participant = node.attrs.get("participant").cloned();
3284
3285    // Whatsmeow: echo type for all stanza tags EXCEPT "message".
3286    // WA Web additionally omits type for notification type="encrypt" with <identity/> child.
3287    let typ = if node.tag != "message" && !is_encrypt_identity_notification(node) {
3288        node.attrs.get("type").cloned()
3289    } else {
3290        None
3291    };
3292
3293    let mut attrs = Attrs::new();
3294    attrs.insert("class", NodeValue::String(node.tag.to_string()));
3295    attrs.insert("id", id);
3296    attrs.insert("to", from);
3297
3298    if node.tag == "message"
3299        && let Some(own_device_pn) = own_device_pn
3300    {
3301        attrs.insert("from", NodeValue::Jid(own_device_pn.clone()));
3302    }
3303    if let Some(p) = participant {
3304        attrs.insert("participant", p);
3305    }
3306    if let Some(t) = typ {
3307        attrs.insert("type", t);
3308    }
3309
3310    Some(Node {
3311        tag: Cow::Borrowed("ack"),
3312        attrs,
3313        content: None,
3314    })
3315}
3316
3317/// WA Web omits `type` when ACKing `<notification type="encrypt"><identity/></notification>`.
3318fn is_encrypt_identity_notification(node: &Node) -> bool {
3319    node.tag == "notification"
3320        && node.attrs.get("type").is_some_and(|v| v == "encrypt")
3321        && node.get_optional_child("identity").is_some()
3322}
3323
3324/// Computes a reconnect delay matching WhatsApp Web's Fibonacci backoff:
3325/// `{ algo: { type: "fibonacci", first: 1000, second: 1000 }, jitter: 0.1, max: 9e5 }`
3326///
3327/// Sequence: 1s, 1s, 2s, 3s, 5s, 8s, 13s, 21s, 34s, 55s, 89s, 144s, ... capped at 900s.
3328/// Each value gets ±10% random jitter.
3329fn fibonacci_backoff(attempt: u32) -> Duration {
3330    const MAX_MS: u64 = 900_000; // WA Web: 9e5
3331
3332    let mut a: u64 = 1000;
3333    let mut b: u64 = 1000;
3334    for _ in 0..attempt {
3335        let next = a.saturating_add(b).min(MAX_MS);
3336        a = b;
3337        b = next;
3338    }
3339    let base = a.min(MAX_MS);
3340
3341    // ±10% jitter (WA Web: jitter: 0.1)
3342    let jitter_range = base / 10;
3343    let jitter = if jitter_range > 0 {
3344        rand::make_rng::<rand::rngs::StdRng>().random_range(0..=(jitter_range * 2)) as i64
3345            - jitter_range as i64
3346    } else {
3347        0
3348    };
3349    let ms = (base as i64 + jitter).max(0) as u64;
3350    Duration::from_millis(ms)
3351}
3352
3353#[cfg(test)]
3354mod tests {
3355    use super::*;
3356    use crate::lid_pn_cache::LearningSource;
3357    use crate::test_utils::MockHttpClient;
3358    use futures::channel::oneshot;
3359    use wacore_binary::jid::SERVER_JID;
3360
3361    #[tokio::test]
3362    async fn test_ack_behavior_for_incoming_stanzas() {
3363        let backend = crate::test_utils::create_test_backend().await;
3364        let pm = Arc::new(
3365            PersistenceManager::new(backend)
3366                .await
3367                .expect("persistence manager should initialize"),
3368        );
3369        let (client, _rx) = Client::new(
3370            Arc::new(crate::runtime_impl::TokioRuntime),
3371            pm,
3372            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3373            Arc::new(MockHttpClient),
3374            None,
3375        )
3376        .await;
3377
3378        // --- Assertions ---
3379
3380        // Verify that we still ack other critical stanzas (regression check).
3381        use wacore_binary::node::{Attrs, Node, NodeContent};
3382
3383        let mut receipt_attrs = Attrs::new();
3384        receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3385        receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
3386        let receipt_node = Node::new(
3387            "receipt",
3388            receipt_attrs,
3389            Some(NodeContent::String("test".to_string())),
3390        );
3391
3392        let mut notification_attrs = Attrs::new();
3393        notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3394        notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
3395        let notification_node = Node::new(
3396            "notification",
3397            notification_attrs,
3398            Some(NodeContent::String("test".to_string())),
3399        );
3400
3401        assert!(
3402            client.should_ack(&receipt_node),
3403            "should_ack must still return TRUE for <receipt> stanzas."
3404        );
3405        assert!(
3406            client.should_ack(&notification_node),
3407            "should_ack must still return TRUE for <notification> stanzas."
3408        );
3409
3410        info!(
3411            "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
3412        );
3413    }
3414
3415    #[tokio::test]
3416    async fn test_ack_waiter_resolves() {
3417        let backend = crate::test_utils::create_test_backend().await;
3418        let pm = Arc::new(
3419            PersistenceManager::new(backend)
3420                .await
3421                .expect("persistence manager should initialize"),
3422        );
3423        let (client, _rx) = Client::new(
3424            Arc::new(crate::runtime_impl::TokioRuntime),
3425            pm,
3426            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3427            Arc::new(MockHttpClient),
3428            None,
3429        )
3430        .await;
3431
3432        // 1. Insert a waiter for a specific ID
3433        let test_id = "ack-test-123".to_string();
3434        let (tx, rx) = oneshot::channel();
3435        client
3436            .response_waiters
3437            .lock()
3438            .await
3439            .insert(test_id.clone(), tx);
3440        assert!(
3441            client.response_waiters.lock().await.contains_key(&test_id),
3442            "Waiter should be inserted before handling ack"
3443        );
3444
3445        // 2. Create a mock <ack/> node with the test ID
3446        let ack_node = NodeBuilder::new("ack")
3447            .attr("id", test_id.clone())
3448            .attr("from", SERVER_JID)
3449            .build();
3450
3451        // 3. Handle the ack
3452        let handled = client.handle_ack_response(ack_node).await;
3453        assert!(
3454            handled,
3455            "handle_ack_response should return true when waiter exists"
3456        );
3457
3458        // 4. Await the receiver with a timeout
3459        match tokio::time::timeout(Duration::from_secs(1), rx).await {
3460            Ok(Ok(response_node)) => {
3461                assert!(
3462                    response_node
3463                        .attrs
3464                        .get("id")
3465                        .is_some_and(|v| v == test_id.as_str()),
3466                    "Response node should have correct ID"
3467                );
3468            }
3469            Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
3470            Err(_) => panic!("Test timed out waiting for ack response"),
3471        }
3472
3473        // 5. Verify the waiter was removed
3474        assert!(
3475            !client.response_waiters.lock().await.contains_key(&test_id),
3476            "Waiter should be removed after handling"
3477        );
3478
3479        info!(
3480            "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
3481        );
3482    }
3483
3484    #[tokio::test]
3485    async fn test_ack_without_matching_waiter() {
3486        let backend = crate::test_utils::create_test_backend().await;
3487        let pm = Arc::new(
3488            PersistenceManager::new(backend)
3489                .await
3490                .expect("persistence manager should initialize"),
3491        );
3492        let (client, _rx) = Client::new(
3493            Arc::new(crate::runtime_impl::TokioRuntime),
3494            pm,
3495            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3496            Arc::new(MockHttpClient),
3497            None,
3498        )
3499        .await;
3500
3501        // Create an ack without any matching waiter
3502        let ack_node = NodeBuilder::new("ack")
3503            .attr("id", "non-existent-id")
3504            .attr("from", SERVER_JID)
3505            .build();
3506
3507        // Should return false since there's no waiter
3508        let handled = client.handle_ack_response(ack_node).await;
3509        assert!(
3510            !handled,
3511            "handle_ack_response should return false when no waiter exists"
3512        );
3513
3514        info!(
3515            "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
3516        );
3517    }
3518
3519    /// Test that the lid_pn_cache correctly stores and retrieves LID mappings.
3520    ///
3521    /// This is critical for the LID-PN session mismatch fix. When we receive a message
3522    /// with sender_lid, we cache the phone->LID mapping so that when sending replies,
3523    /// we can reuse the existing LID session instead of creating a new PN session.
3524    #[tokio::test]
3525    async fn test_lid_pn_cache_basic_operations() {
3526        let backend = Arc::new(
3527            crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
3528                .await
3529                .expect("Failed to create in-memory backend for test"),
3530        );
3531        let pm = Arc::new(
3532            PersistenceManager::new(backend)
3533                .await
3534                .expect("persistence manager should initialize"),
3535        );
3536        let (client, _rx) = Client::new(
3537            Arc::new(crate::runtime_impl::TokioRuntime),
3538            pm,
3539            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3540            Arc::new(MockHttpClient),
3541            None,
3542        )
3543        .await;
3544
3545        // Initially, the cache should be empty for a phone number
3546        let phone = "559980000001";
3547        let lid = "100000012345678";
3548
3549        assert!(
3550            client.lid_pn_cache.get_current_lid(phone).await.is_none(),
3551            "Cache should be empty initially"
3552        );
3553
3554        // Insert a phone->LID mapping using add_lid_pn_mapping
3555        client
3556            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3557            .await
3558            .expect("Failed to persist LID-PN mapping in tests");
3559
3560        // Verify we can retrieve it (phone -> LID lookup)
3561        let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
3562        assert!(cached_lid.is_some(), "Cache should contain the mapping");
3563        assert_eq!(
3564            cached_lid.expect("cache should have LID"),
3565            lid,
3566            "Cached LID should match what we inserted"
3567        );
3568
3569        // Verify reverse lookup works (LID -> phone)
3570        let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
3571        assert!(cached_phone.is_some(), "Reverse lookup should work");
3572        assert_eq!(
3573            cached_phone.expect("reverse lookup should return phone"),
3574            phone,
3575            "Cached phone should match what we inserted"
3576        );
3577
3578        // Verify a different phone number returns None
3579        assert!(
3580            client
3581                .lid_pn_cache
3582                .get_current_lid("559980000002")
3583                .await
3584                .is_none(),
3585            "Different phone number should not have a mapping"
3586        );
3587
3588        info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
3589    }
3590
3591    /// Test that the lid_pn_cache respects timestamp-based conflict resolution.
3592    ///
3593    /// When a phone number has multiple LIDs, the most recent one should be returned.
3594    #[tokio::test]
3595    async fn test_lid_pn_cache_timestamp_resolution() {
3596        let backend = Arc::new(
3597            crate::store::SqliteStore::new(
3598                "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
3599            )
3600            .await
3601            .expect("Failed to create in-memory backend for test"),
3602        );
3603        let pm = Arc::new(
3604            PersistenceManager::new(backend)
3605                .await
3606                .expect("persistence manager should initialize"),
3607        );
3608        let (client, _rx) = Client::new(
3609            Arc::new(crate::runtime_impl::TokioRuntime),
3610            pm,
3611            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3612            Arc::new(MockHttpClient),
3613            None,
3614        )
3615        .await;
3616
3617        let phone = "559980000001";
3618        let lid_old = "100000012345678";
3619        let lid_new = "100000087654321";
3620
3621        // Insert initial mapping
3622        client
3623            .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
3624            .await
3625            .expect("Failed to persist LID-PN mapping in tests");
3626
3627        assert_eq!(
3628            client
3629                .lid_pn_cache
3630                .get_current_lid(phone)
3631                .await
3632                .expect("cache should have LID"),
3633            lid_old,
3634            "Initial LID should be stored"
3635        );
3636
3637        // Small delay to ensure different timestamp
3638        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3639
3640        // Add new mapping with newer timestamp
3641        client
3642            .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
3643            .await
3644            .expect("Failed to persist LID-PN mapping in tests");
3645
3646        assert_eq!(
3647            client
3648                .lid_pn_cache
3649                .get_current_lid(phone)
3650                .await
3651                .expect("cache should have newer LID"),
3652            lid_new,
3653            "Newer LID should be returned for phone lookup"
3654        );
3655
3656        // Both LIDs should still resolve to the same phone
3657        assert_eq!(
3658            client
3659                .lid_pn_cache
3660                .get_phone_number(lid_old)
3661                .await
3662                .expect("reverse lookup should return phone"),
3663            phone,
3664            "Old LID should still map to phone"
3665        );
3666        assert_eq!(
3667            client
3668                .lid_pn_cache
3669                .get_phone_number(lid_new)
3670                .await
3671                .expect("reverse lookup should return phone"),
3672            phone,
3673            "New LID should also map to phone"
3674        );
3675
3676        info!(
3677            "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
3678        );
3679    }
3680
3681    /// Test that get_lid_for_phone (from SendContextResolver) returns the cached value.
3682    ///
3683    /// This is the method used by wacore::send to look up LID mappings when encrypting.
3684    #[tokio::test]
3685    async fn test_get_lid_for_phone_via_send_context_resolver() {
3686        use wacore::client::context::SendContextResolver;
3687
3688        let backend = Arc::new(
3689            crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
3690                .await
3691                .expect("Failed to create in-memory backend for test"),
3692        );
3693        let pm = Arc::new(
3694            PersistenceManager::new(backend)
3695                .await
3696                .expect("persistence manager should initialize"),
3697        );
3698        let (client, _rx) = Client::new(
3699            Arc::new(crate::runtime_impl::TokioRuntime),
3700            pm,
3701            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3702            Arc::new(MockHttpClient),
3703            None,
3704        )
3705        .await;
3706
3707        let phone = "559980000001";
3708        let lid = "100000012345678";
3709
3710        // Before caching, should return None
3711        assert!(
3712            client.get_lid_for_phone(phone).await.is_none(),
3713            "get_lid_for_phone should return None before caching"
3714        );
3715
3716        // Cache the mapping using add_lid_pn_mapping
3717        client
3718            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3719            .await
3720            .expect("Failed to persist LID-PN mapping in tests");
3721
3722        // Now it should return the LID
3723        let result = client.get_lid_for_phone(phone).await;
3724        assert!(
3725            result.is_some(),
3726            "get_lid_for_phone should return Some after caching"
3727        );
3728        assert_eq!(
3729            result.expect("get_lid_for_phone should return Some"),
3730            lid,
3731            "get_lid_for_phone should return the cached LID"
3732        );
3733
3734        info!(
3735            "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
3736        );
3737    }
3738
3739    /// Test that wait_for_offline_delivery_end returns immediately when the flag is already set.
3740    #[tokio::test]
3741    async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
3742        let backend = Arc::new(
3743            crate::store::SqliteStore::new(
3744                "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
3745            )
3746            .await
3747            .expect("Failed to create in-memory backend for test"),
3748        );
3749        let pm = Arc::new(
3750            PersistenceManager::new(backend)
3751                .await
3752                .expect("persistence manager should initialize"),
3753        );
3754        let (client, _rx) = Client::new(
3755            Arc::new(crate::runtime_impl::TokioRuntime),
3756            pm,
3757            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3758            Arc::new(MockHttpClient),
3759            None,
3760        )
3761        .await;
3762
3763        // Set the flag to true (simulating offline sync completed)
3764        client
3765            .offline_sync_completed
3766            .store(true, std::sync::atomic::Ordering::Relaxed);
3767
3768        // This should return immediately (not wait 10 seconds)
3769        let start = std::time::Instant::now();
3770        client.wait_for_offline_delivery_end().await;
3771        let elapsed = start.elapsed();
3772
3773        // Should complete in < 100ms (not 10 second timeout)
3774        assert!(
3775            elapsed.as_millis() < 100,
3776            "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
3777            elapsed
3778        );
3779
3780        info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
3781    }
3782
3783    /// Test that wait_for_offline_delivery_end times out when the flag is NOT set.
3784    /// This verifies the 10-second timeout is working.
3785    #[tokio::test]
3786    async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
3787        let backend = Arc::new(
3788            crate::store::SqliteStore::new(
3789                "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
3790            )
3791            .await
3792            .expect("Failed to create in-memory backend for test"),
3793        );
3794        let pm = Arc::new(
3795            PersistenceManager::new(backend)
3796                .await
3797                .expect("persistence manager should initialize"),
3798        );
3799        let (client, _rx) = Client::new(
3800            Arc::new(crate::runtime_impl::TokioRuntime),
3801            pm,
3802            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3803            Arc::new(MockHttpClient),
3804            None,
3805        )
3806        .await;
3807
3808        // Flag is false by default, so use a short timeout and verify the helper
3809        // marks the sync complete on timeout.
3810        let start = std::time::Instant::now();
3811        client
3812            .wait_for_offline_delivery_end_with_timeout(std::time::Duration::from_millis(50))
3813            .await;
3814
3815        let elapsed = start.elapsed();
3816        // Count available permits by trying to acquire non-blockingly
3817        let semaphore = match client.message_processing_semaphore.lock() {
3818            Ok(guard) => guard.clone(),
3819            Err(poisoned) => poisoned.into_inner().clone(),
3820        };
3821        let mut guards = Vec::new();
3822        while let Some(guard) = semaphore.try_acquire() {
3823            guards.push(guard);
3824        }
3825        let permits = guards.len();
3826        drop(guards);
3827
3828        assert!(
3829            elapsed.as_millis() >= 45, // Allow small timing variance
3830            "Should have waited for the configured timeout duration, took {:?}",
3831            elapsed
3832        );
3833        assert!(
3834            client
3835                .offline_sync_completed
3836                .load(std::sync::atomic::Ordering::Relaxed),
3837            "wait_for_offline_delivery_end should mark offline sync complete on timeout"
3838        );
3839        assert_eq!(
3840            permits, 64,
3841            "timeout completion should restore parallel permits"
3842        );
3843
3844        info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
3845    }
3846
3847    /// Test that wait_for_offline_delivery_end returns when notified.
3848    #[tokio::test]
3849    async fn test_wait_for_offline_delivery_end_returns_on_notify() {
3850        let backend = Arc::new(
3851            crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
3852                .await
3853                .expect("Failed to create in-memory backend for test"),
3854        );
3855        let pm = Arc::new(
3856            PersistenceManager::new(backend)
3857                .await
3858                .expect("persistence manager should initialize"),
3859        );
3860        let (client, _rx) = Client::new(
3861            Arc::new(crate::runtime_impl::TokioRuntime),
3862            pm,
3863            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3864            Arc::new(MockHttpClient),
3865            None,
3866        )
3867        .await;
3868
3869        let client_clone = client.clone();
3870
3871        // Spawn a task that will notify after 50ms
3872        tokio::spawn(async move {
3873            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3874            client_clone.offline_sync_notifier.notify(usize::MAX);
3875        });
3876
3877        let start = std::time::Instant::now();
3878        client.wait_for_offline_delivery_end().await;
3879        let elapsed = start.elapsed();
3880
3881        // Should complete around 50ms (when notified), not 10 seconds
3882        assert!(
3883            elapsed.as_millis() < 200,
3884            "wait_for_offline_delivery_end should return when notified, took {:?}",
3885            elapsed
3886        );
3887        assert!(
3888            elapsed.as_millis() >= 45, // Should have waited for the notify
3889            "Should have waited for the notify, only took {:?}",
3890            elapsed
3891        );
3892
3893        info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
3894    }
3895
3896    /// Test that the offline_sync_completed flag starts as false.
3897    #[tokio::test]
3898    async fn test_offline_sync_flag_initially_false() {
3899        let backend = Arc::new(
3900            crate::store::SqliteStore::new(
3901                "file:memdb_offline_flag_initial?mode=memory&cache=shared",
3902            )
3903            .await
3904            .expect("Failed to create in-memory backend for test"),
3905        );
3906        let pm = Arc::new(
3907            PersistenceManager::new(backend)
3908                .await
3909                .expect("persistence manager should initialize"),
3910        );
3911        let (client, _rx) = Client::new(
3912            Arc::new(crate::runtime_impl::TokioRuntime),
3913            pm,
3914            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3915            Arc::new(MockHttpClient),
3916            None,
3917        )
3918        .await;
3919
3920        // The flag should be false initially
3921        assert!(
3922            !client
3923                .offline_sync_completed
3924                .load(std::sync::atomic::Ordering::Relaxed),
3925            "offline_sync_completed should be false when Client is first created"
3926        );
3927
3928        info!("✅ test_offline_sync_flag_initially_false passed");
3929    }
3930
3931    /// Test the complete offline sync lifecycle:
3932    /// 1. Flag starts false
3933    /// 2. Flag is set true after IB offline stanza
3934    /// 3. Notify is called
3935    #[tokio::test]
3936    async fn test_offline_sync_lifecycle() {
3937        use std::sync::atomic::Ordering;
3938
3939        let backend = Arc::new(
3940            crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
3941                .await
3942                .expect("Failed to create in-memory backend for test"),
3943        );
3944        let pm = Arc::new(
3945            PersistenceManager::new(backend)
3946                .await
3947                .expect("persistence manager should initialize"),
3948        );
3949        let (client, _rx) = Client::new(
3950            Arc::new(crate::runtime_impl::TokioRuntime),
3951            pm,
3952            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3953            Arc::new(MockHttpClient),
3954            None,
3955        )
3956        .await;
3957
3958        // 1. Initially false
3959        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3960
3961        // 2. Spawn a waiter
3962        let client_waiter = client.clone();
3963        let waiter_handle = tokio::spawn(async move {
3964            client_waiter.wait_for_offline_delivery_end().await;
3965            true // Return that we completed
3966        });
3967
3968        // Give the waiter time to start waiting
3969        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3970
3971        // Verify waiter hasn't completed yet
3972        assert!(
3973            !waiter_handle.is_finished(),
3974            "Waiter should still be waiting"
3975        );
3976
3977        // 3. Simulate IB handler behavior (set flag and notify)
3978        client.offline_sync_completed.store(true, Ordering::Relaxed);
3979        client.offline_sync_notifier.notify(usize::MAX);
3980
3981        // 4. Waiter should complete
3982        let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
3983            .await
3984            .expect("Waiter should complete after notify")
3985            .expect("Waiter task should not panic");
3986
3987        assert!(result, "Waiter should have completed successfully");
3988        assert!(client.offline_sync_completed.load(Ordering::Relaxed));
3989
3990        info!("✅ test_offline_sync_lifecycle passed");
3991    }
3992
3993    /// Test that establish_primary_phone_session_immediate returns error when no PN is set.
3994    /// This verifies the "not logged in" guard works.
3995    #[tokio::test]
3996    async fn test_establish_primary_phone_session_fails_without_pn() {
3997        let backend = Arc::new(
3998            crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
3999                .await
4000                .expect("Failed to create in-memory backend for test"),
4001        );
4002        let pm = Arc::new(
4003            PersistenceManager::new(backend)
4004                .await
4005                .expect("persistence manager should initialize"),
4006        );
4007        let (client, _rx) = Client::new(
4008            Arc::new(crate::runtime_impl::TokioRuntime),
4009            pm,
4010            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4011            Arc::new(MockHttpClient),
4012            None,
4013        )
4014        .await;
4015
4016        // No PN set, so this should fail
4017        let result = client.establish_primary_phone_session_immediate().await;
4018
4019        assert!(
4020            result.is_err(),
4021            "establish_primary_phone_session_immediate should fail when no PN is set"
4022        );
4023
4024        let err = result.unwrap_err();
4025        assert!(
4026            err.downcast_ref::<ClientError>()
4027                .is_some_and(|e| matches!(e, ClientError::NotLoggedIn)),
4028            "Error should be ClientError::NotLoggedIn, got: {}",
4029            err
4030        );
4031
4032        info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
4033    }
4034
4035    /// Test that ensure_e2e_sessions waits for offline sync to complete.
4036    /// This is the CRITICAL difference between ensure_e2e_sessions and
4037    /// establish_primary_phone_session_immediate.
4038    #[tokio::test]
4039    async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
4040        use std::sync::atomic::Ordering;
4041        use wacore_binary::jid::Jid;
4042
4043        let backend = Arc::new(
4044            crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
4045                .await
4046                .expect("Failed to create in-memory backend for test"),
4047        );
4048        let pm = Arc::new(
4049            PersistenceManager::new(backend)
4050                .await
4051                .expect("persistence manager should initialize"),
4052        );
4053        let (client, _rx) = Client::new(
4054            Arc::new(crate::runtime_impl::TokioRuntime),
4055            pm,
4056            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4057            Arc::new(MockHttpClient),
4058            None,
4059        )
4060        .await;
4061
4062        // Flag is false (offline sync not complete)
4063        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4064
4065        // Call ensure_e2e_sessions with an empty list (so it returns early after the wait)
4066        // This lets us test the waiting behavior without needing network
4067        let client_clone = client.clone();
4068        let ensure_handle = tokio::spawn(async move {
4069            // Start with some JIDs - but since we're testing the wait, we use empty
4070            // to avoid needing actual session establishment
4071            client_clone.ensure_e2e_sessions(vec![]).await
4072        });
4073
4074        // Wait a bit - ensure_e2e_sessions should return immediately for empty list
4075        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4076        assert!(
4077            ensure_handle.is_finished(),
4078            "ensure_e2e_sessions should return immediately for empty JID list"
4079        );
4080
4081        // Now test with actual JIDs - it should wait for offline sync
4082        let client_clone = client.clone();
4083        let test_jid = Jid::pn("559999999999");
4084        let ensure_handle = tokio::spawn(async move {
4085            // This will wait for offline sync before proceeding
4086            let start = std::time::Instant::now();
4087            let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
4088            start.elapsed()
4089        });
4090
4091        // Give it a moment to start
4092        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
4093
4094        // It should still be waiting (offline sync not complete)
4095        assert!(
4096            !ensure_handle.is_finished(),
4097            "ensure_e2e_sessions should be waiting for offline sync"
4098        );
4099
4100        // Now complete offline sync
4101        client.offline_sync_completed.store(true, Ordering::Relaxed);
4102        client.offline_sync_notifier.notify(usize::MAX);
4103
4104        // Now it should complete (might fail on session establishment, but that's ok)
4105        let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
4106
4107        assert!(
4108            result.is_ok(),
4109            "ensure_e2e_sessions should complete after offline sync"
4110        );
4111
4112        info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
4113    }
4114
4115    /// Integration test: Verify that the immediate session establishment does NOT
4116    /// wait for offline sync. This is critical for PDO to work during offline sync.
4117    ///
4118    /// The flow is:
4119    /// 1. Login -> establish_primary_phone_session_immediate() is called
4120    /// 2. This should NOT wait for offline sync (flag is false at this point)
4121    /// 3. After session is established, offline messages arrive
4122    /// 4. When decryption fails, PDO can immediately send to device 0
4123    #[tokio::test]
4124    async fn test_immediate_session_does_not_wait_for_offline_sync() {
4125        use std::sync::atomic::Ordering;
4126        use wacore_binary::jid::Jid;
4127
4128        let backend = Arc::new(
4129            crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
4130                .await
4131                .expect("Failed to create in-memory backend for test"),
4132        );
4133        let pm = Arc::new(
4134            PersistenceManager::new(backend.clone())
4135                .await
4136                .expect("persistence manager should initialize"),
4137        );
4138
4139        // Set a PN so establish_primary_phone_session_immediate doesn't fail early
4140        pm.modify_device(|device| {
4141            device.pn = Some(Jid::pn("559999999999"));
4142        })
4143        .await;
4144
4145        let (client, _rx) = Client::new(
4146            Arc::new(crate::runtime_impl::TokioRuntime),
4147            pm,
4148            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4149            Arc::new(MockHttpClient),
4150            None,
4151        )
4152        .await;
4153
4154        // Flag is false (offline sync not complete - simulating login state)
4155        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4156
4157        // Call establish_primary_phone_session_immediate
4158        // It should NOT wait for offline sync - it should proceed immediately
4159        let start = std::time::Instant::now();
4160
4161        // Note: This will fail because we can't actually fetch prekeys in tests,
4162        // but the important thing is that it doesn't WAIT for offline sync
4163        let result = tokio::time::timeout(
4164            std::time::Duration::from_millis(500),
4165            client.establish_primary_phone_session_immediate(),
4166        )
4167        .await;
4168
4169        let elapsed = start.elapsed();
4170
4171        // The call should complete (or fail) quickly, NOT wait for 10 second timeout
4172        assert!(
4173            result.is_ok(),
4174            "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
4175        );
4176
4177        // It should complete in < 500ms (not 10 second wait)
4178        assert!(
4179            elapsed.as_millis() < 500,
4180            "establish_primary_phone_session_immediate should not wait, took {:?}",
4181            elapsed
4182        );
4183
4184        // The actual result might be an error (no network), but that's fine
4185        // The important thing is it didn't wait for offline sync
4186        info!(
4187            "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
4188            elapsed,
4189            result.unwrap().is_ok()
4190        );
4191
4192        info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
4193    }
4194
4195    /// Integration test: Verify that establish_primary_phone_session_immediate
4196    /// skips establishment when a session already exists.
4197    ///
4198    /// This is the CRITICAL fix for MAC verification failures:
4199    /// - BUG (before fix): Called process_prekey_bundle() unconditionally,
4200    ///   replacing the existing session with a new one
4201    /// - RESULT: Remote device still uses old session state, causing MAC failures
4202    #[tokio::test]
4203    async fn test_establish_session_skips_when_exists() {
4204        use wacore::libsignal::protocol::SessionRecord;
4205        use wacore::libsignal::store::SessionStore;
4206        use wacore::types::jid::JidExt;
4207        use wacore_binary::jid::Jid;
4208
4209        let backend = Arc::new(
4210            crate::store::SqliteStore::new("file:memdb_skip_existing?mode=memory&cache=shared")
4211                .await
4212                .expect("Failed to create in-memory backend for test"),
4213        );
4214        let pm = Arc::new(
4215            PersistenceManager::new(backend.clone())
4216                .await
4217                .expect("persistence manager should initialize"),
4218        );
4219
4220        // Set a PN so the function doesn't fail early
4221        let own_pn = Jid::pn("559999999999");
4222        pm.modify_device(|device| {
4223            device.pn = Some(own_pn.clone());
4224        })
4225        .await;
4226
4227        // Pre-populate a session for the primary phone JID (device 0)
4228        let primary_phone_jid = own_pn.with_device(0);
4229        let signal_addr = primary_phone_jid.to_protocol_address();
4230
4231        // Create a dummy session record
4232        let dummy_session = SessionRecord::new_fresh();
4233        {
4234            let device_arc = pm.get_device_arc().await;
4235            let device = device_arc.read().await;
4236            device
4237                .store_session(&signal_addr, &dummy_session)
4238                .await
4239                .expect("Failed to store test session");
4240
4241            // Verify session exists
4242            let exists = device
4243                .contains_session(&signal_addr)
4244                .await
4245                .expect("Failed to check session");
4246            assert!(exists, "Session should exist after store");
4247        }
4248
4249        let (client, _rx) = Client::new(
4250            Arc::new(crate::runtime_impl::TokioRuntime),
4251            pm.clone(),
4252            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4253            Arc::new(MockHttpClient),
4254            None,
4255        )
4256        .await;
4257
4258        // Call establish_primary_phone_session_immediate
4259        // It should return Ok(()) immediately without fetching prekeys
4260        let result = client.establish_primary_phone_session_immediate().await;
4261
4262        assert!(
4263            result.is_ok(),
4264            "establish_primary_phone_session_immediate should succeed when session exists"
4265        );
4266
4267        // Verify the session was NOT replaced (still has the same record)
4268        // This is the critical assertion - if session was replaced, it would cause MAC failures
4269        {
4270            let device_arc = pm.get_device_arc().await;
4271            let device = device_arc.read().await;
4272            let exists = device
4273                .contains_session(&signal_addr)
4274                .await
4275                .expect("Failed to check session");
4276            assert!(exists, "Session should still exist after the call");
4277        }
4278
4279        info!("✅ test_establish_session_skips_when_exists passed");
4280    }
4281
4282    /// Integration test: Verify that the session check prevents MAC failures
4283    /// by documenting the exact control flow that caused the bug.
4284    #[test]
4285    fn test_mac_failure_prevention_flow_documentation() {
4286        // Simulate the decision logic
4287        fn should_establish_session(
4288            check_result: Result<bool, &'static str>,
4289        ) -> Result<bool, String> {
4290            match check_result {
4291                Ok(true) => Ok(false), // Session exists → DON'T establish
4292                Ok(false) => Ok(true), // No session → establish
4293                Err(e) => Err(format!("Cannot verify session: {}", e)), // Fail-safe
4294            }
4295        }
4296
4297        // Test Case 1: Session exists → skip (prevents MAC failure)
4298        let result = should_establish_session(Ok(true));
4299        assert_eq!(result, Ok(false), "Should skip when session exists");
4300
4301        // Test Case 2: No session → establish
4302        let result = should_establish_session(Ok(false));
4303        assert_eq!(result, Ok(true), "Should establish when no session");
4304
4305        // Test Case 3: Check fails → error (fail-safe)
4306        let result = should_establish_session(Err("database error"));
4307        assert!(result.is_err(), "Should fail when check fails");
4308
4309        info!("✅ test_mac_failure_prevention_flow_documentation passed");
4310    }
4311
4312    #[test]
4313    fn test_unified_session_id_calculation() {
4314        // Test the mathematical calculation of the unified session ID.
4315        // Formula: (now_ms + server_offset_ms + 3_days_ms) % 7_days_ms
4316
4317        const DAY_MS: i64 = 24 * 60 * 60 * 1000;
4318        const WEEK_MS: i64 = 7 * DAY_MS;
4319        const OFFSET_MS: i64 = 3 * DAY_MS;
4320
4321        // Helper function matching the implementation
4322        fn calculate_session_id(now_ms: i64, server_offset_ms: i64) -> i64 {
4323            let adjusted_now = now_ms + server_offset_ms;
4324            (adjusted_now + OFFSET_MS) % WEEK_MS
4325        }
4326
4327        // Test 1: Zero offset
4328        let now_ms = 1706000000000_i64; // Some arbitrary timestamp
4329        let id = calculate_session_id(now_ms, 0);
4330        assert!(
4331            (0..WEEK_MS).contains(&id),
4332            "Session ID should be in [0, WEEK_MS)"
4333        );
4334
4335        // Test 2: Positive server offset (server is ahead)
4336        let id_with_positive_offset = calculate_session_id(now_ms, 5000);
4337        assert!(
4338            (0..WEEK_MS).contains(&id_with_positive_offset),
4339            "Session ID should be in [0, WEEK_MS)"
4340        );
4341        // The ID should be different from zero offset (unless wrap-around)
4342        // Not testing exact value as it depends on the offset
4343
4344        // Test 3: Negative server offset (server is behind)
4345        let id_with_negative_offset = calculate_session_id(now_ms, -5000);
4346        assert!(
4347            (0..WEEK_MS).contains(&id_with_negative_offset),
4348            "Session ID should be in [0, WEEK_MS)"
4349        );
4350
4351        // Test 4: Verify modulo wrap-around
4352        // If adjusted_now + OFFSET_MS >= WEEK_MS, it should wrap
4353        let wrap_test_now = WEEK_MS - OFFSET_MS + 1000; // Should produce small result
4354        let wrapped_id = calculate_session_id(wrap_test_now, 0);
4355        assert_eq!(wrapped_id, 1000, "Should wrap around correctly");
4356
4357        // Test 5: Edge case - at exact boundary
4358        let boundary_now = WEEK_MS - OFFSET_MS;
4359        let boundary_id = calculate_session_id(boundary_now, 0);
4360        assert_eq!(boundary_id, 0, "At exact boundary should be 0");
4361    }
4362
4363    #[tokio::test]
4364    async fn test_server_time_offset_extraction() {
4365        use wacore_binary::builder::NodeBuilder;
4366
4367        let backend = crate::test_utils::create_test_backend().await;
4368        let pm = Arc::new(
4369            PersistenceManager::new(backend)
4370                .await
4371                .expect("persistence manager should initialize"),
4372        );
4373        let (client, _rx) = Client::new(
4374            Arc::new(crate::runtime_impl::TokioRuntime),
4375            pm,
4376            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4377            Arc::new(MockHttpClient),
4378            None,
4379        )
4380        .await;
4381
4382        // Initially, offset should be 0
4383        assert_eq!(
4384            client.unified_session.server_time_offset_ms(),
4385            0,
4386            "Initial offset should be 0"
4387        );
4388
4389        // Create a node with a 't' attribute
4390        let server_time = wacore::time::now_secs() + 10; // Server is 10 seconds ahead
4391        let node = NodeBuilder::new("success")
4392            .attr("t", server_time.to_string())
4393            .build();
4394
4395        // Update the offset
4396        client.update_server_time_offset(&node);
4397
4398        // The offset should be approximately 10 * 1000 = 10000 ms
4399        // Allow some tolerance for timing differences during the test
4400        let offset = client.unified_session.server_time_offset_ms();
4401        assert!(
4402            (offset - 10000).abs() < 1000, // Allow 1 second tolerance
4403            "Offset should be approximately 10000ms, got {}",
4404            offset
4405        );
4406
4407        // Test with no 't' attribute - should not change offset
4408        let node_no_t = NodeBuilder::new("success").build();
4409        client.update_server_time_offset(&node_no_t);
4410        let offset_after = client.unified_session.server_time_offset_ms();
4411        assert!(
4412            (offset_after - offset).abs() < 100, // Should be same (or very close)
4413            "Offset should not change when 't' is missing"
4414        );
4415
4416        // Test with invalid 't' attribute - should not change offset
4417        let node_invalid = NodeBuilder::new("success")
4418            .attr("t", "not_a_number")
4419            .build();
4420        client.update_server_time_offset(&node_invalid);
4421        let offset_after_invalid = client.unified_session.server_time_offset_ms();
4422        assert!(
4423            (offset_after_invalid - offset).abs() < 100,
4424            "Offset should not change when 't' is invalid"
4425        );
4426
4427        // Test with negative/zero 't' - should not change offset
4428        let node_zero = NodeBuilder::new("success").attr("t", "0").build();
4429        client.update_server_time_offset(&node_zero);
4430        let offset_after_zero = client.unified_session.server_time_offset_ms();
4431        assert!(
4432            (offset_after_zero - offset).abs() < 100,
4433            "Offset should not change when 't' is 0"
4434        );
4435
4436        info!("✅ test_server_time_offset_extraction passed");
4437    }
4438
4439    #[tokio::test]
4440    async fn test_unified_session_manager_integration() {
4441        // Test the unified session manager through the client
4442
4443        let backend = crate::test_utils::create_test_backend().await;
4444        let pm = Arc::new(
4445            PersistenceManager::new(backend)
4446                .await
4447                .expect("persistence manager should initialize"),
4448        );
4449        let (client, _rx) = Client::new(
4450            Arc::new(crate::runtime_impl::TokioRuntime),
4451            pm,
4452            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4453            Arc::new(MockHttpClient),
4454            None,
4455        )
4456        .await;
4457
4458        // Initially, sequence should be 0
4459        assert_eq!(
4460            client.unified_session.sequence(),
4461            0,
4462            "Initial sequence should be 0"
4463        );
4464
4465        // Duplicate prevention depends on the session ID staying the same between calls.
4466        // Since the session ID is millisecond-based, use a retry loop to handle
4467        // the rare case where we cross a millisecond boundary between calls.
4468        loop {
4469            client.unified_session.reset().await;
4470
4471            let result = client.unified_session.prepare_send().await;
4472            assert!(result.is_some(), "First send should succeed");
4473            let (node, seq) = result.unwrap();
4474            assert_eq!(node.tag, "ib", "Should be an IB stanza");
4475            assert_eq!(seq, 1, "First sequence should be 1 (pre-increment)");
4476            assert_eq!(client.unified_session.sequence(), 1);
4477
4478            let result2 = client.unified_session.prepare_send().await;
4479            if result2.is_none() {
4480                // Duplicate was prevented within the same millisecond
4481                assert_eq!(client.unified_session.sequence(), 1);
4482                break;
4483            }
4484            // Millisecond boundary crossed, retry
4485            tokio::task::yield_now().await;
4486        }
4487
4488        // Clear last sent and try again - sequence resets on "new" session ID
4489        client.unified_session.clear_last_sent().await;
4490        let result3 = client.unified_session.prepare_send().await;
4491        assert!(result3.is_some(), "Should succeed after clearing");
4492        let (_, seq3) = result3.unwrap();
4493        assert_eq!(seq3, 1, "Sequence resets when session ID changes");
4494        assert_eq!(client.unified_session.sequence(), 1);
4495
4496        info!("✅ test_unified_session_manager_integration passed");
4497    }
4498
4499    #[test]
4500    fn test_unified_session_protocol_node() {
4501        // Test the type-safe protocol node implementation
4502        use wacore::ib::{IbStanza, UnifiedSession};
4503        use wacore::protocol::ProtocolNode;
4504
4505        // Create a unified session
4506        let session = UnifiedSession::new("123456789");
4507        assert_eq!(session.id, "123456789");
4508        assert_eq!(session.tag(), "unified_session");
4509
4510        // Convert to node
4511        let node = session.into_node();
4512        assert_eq!(node.tag, "unified_session");
4513        assert!(node.attrs.get("id").is_some_and(|v| v == "123456789"));
4514
4515        // Create an IB stanza
4516        let stanza = IbStanza::unified_session(UnifiedSession::new("987654321"));
4517        assert_eq!(stanza.tag(), "ib");
4518
4519        // Convert to node and verify structure
4520        let ib_node = stanza.into_node();
4521        assert_eq!(ib_node.tag, "ib");
4522        let children = ib_node.children().expect("IB stanza should have children");
4523        assert_eq!(children.len(), 1);
4524        assert_eq!(children[0].tag, "unified_session");
4525        assert!(
4526            children[0]
4527                .attrs
4528                .get("id")
4529                .is_some_and(|v| v == "987654321")
4530        );
4531
4532        info!("✅ test_unified_session_protocol_node passed");
4533    }
4534
4535    /// Helper to create a test client for offline sync tests
4536    async fn create_offline_sync_test_client() -> Arc<Client> {
4537        let backend = crate::test_utils::create_test_backend().await;
4538        let pm = Arc::new(
4539            PersistenceManager::new(backend)
4540                .await
4541                .expect("persistence manager should initialize"),
4542        );
4543        let (client, _rx) = Client::new(
4544            Arc::new(crate::runtime_impl::TokioRuntime),
4545            pm,
4546            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4547            Arc::new(MockHttpClient),
4548            None,
4549        )
4550        .await;
4551        client
4552    }
4553
4554    #[tokio::test]
4555    async fn test_ib_thread_metadata_does_not_end_sync() {
4556        let client = create_offline_sync_test_client().await;
4557        client
4558            .offline_sync_metrics
4559            .active
4560            .store(true, Ordering::Release);
4561
4562        let node = NodeBuilder::new("ib")
4563            .children([NodeBuilder::new("thread_metadata")
4564                .children([NodeBuilder::new("item").build()])
4565                .build()])
4566            .build();
4567
4568        client.process_node(Arc::new(node)).await;
4569        assert!(
4570            client.offline_sync_metrics.active.load(Ordering::Acquire),
4571            "<ib><thread_metadata> should NOT end offline sync"
4572        );
4573    }
4574
4575    #[tokio::test]
4576    async fn test_ib_edge_routing_does_not_end_sync() {
4577        let client = create_offline_sync_test_client().await;
4578        client
4579            .offline_sync_metrics
4580            .active
4581            .store(true, Ordering::Release);
4582
4583        let node = NodeBuilder::new("ib")
4584            .children([NodeBuilder::new("edge_routing")
4585                .children([NodeBuilder::new("routing_info")
4586                    .bytes(vec![1, 2, 3])
4587                    .build()])
4588                .build()])
4589            .build();
4590
4591        client.process_node(Arc::new(node)).await;
4592        assert!(
4593            client.offline_sync_metrics.active.load(Ordering::Acquire),
4594            "<ib><edge_routing> should NOT end offline sync"
4595        );
4596    }
4597
4598    #[tokio::test]
4599    async fn test_ib_dirty_does_not_end_sync() {
4600        let client = create_offline_sync_test_client().await;
4601        client
4602            .offline_sync_metrics
4603            .active
4604            .store(true, Ordering::Release);
4605
4606        let node = NodeBuilder::new("ib")
4607            .children([NodeBuilder::new("dirty")
4608                .attr("type", "groups")
4609                .attr("timestamp", "1234")
4610                .build()])
4611            .build();
4612
4613        client.process_node(Arc::new(node)).await;
4614        assert!(
4615            client.offline_sync_metrics.active.load(Ordering::Acquire),
4616            "<ib><dirty> should NOT end offline sync"
4617        );
4618    }
4619
4620    #[tokio::test]
4621    async fn test_ib_offline_child_ends_sync() {
4622        let client = create_offline_sync_test_client().await;
4623        client
4624            .offline_sync_metrics
4625            .active
4626            .store(true, Ordering::Release);
4627        client
4628            .offline_sync_metrics
4629            .total_messages
4630            .store(301, Ordering::Release);
4631
4632        let node = NodeBuilder::new("ib")
4633            .children([NodeBuilder::new("offline").attr("count", "301").build()])
4634            .build();
4635
4636        client.process_node(Arc::new(node)).await;
4637        assert!(
4638            !client.offline_sync_metrics.active.load(Ordering::Acquire),
4639            "<ib><offline count='301'/> should end offline sync"
4640        );
4641    }
4642
4643    #[tokio::test]
4644    async fn test_ib_offline_preview_starts_sync() {
4645        let client = create_offline_sync_test_client().await;
4646
4647        let node = NodeBuilder::new("ib")
4648            .children([NodeBuilder::new("offline_preview")
4649                .attr("count", "301")
4650                .attr("message", "168")
4651                .attr("notification", "62")
4652                .attr("receipt", "68")
4653                .attr("appdata", "0")
4654                .build()])
4655            .build();
4656
4657        client.process_node(Arc::new(node)).await;
4658        assert!(
4659            client.offline_sync_metrics.active.load(Ordering::Acquire),
4660            "offline_preview with count>0 should activate sync"
4661        );
4662        assert_eq!(
4663            client
4664                .offline_sync_metrics
4665                .total_messages
4666                .load(Ordering::Acquire),
4667            301
4668        );
4669    }
4670
4671    #[tokio::test]
4672    async fn test_offline_message_increments_processed() {
4673        let client = create_offline_sync_test_client().await;
4674        client
4675            .offline_sync_metrics
4676            .active
4677            .store(true, Ordering::Release);
4678        client
4679            .offline_sync_metrics
4680            .total_messages
4681            .store(100, Ordering::Release);
4682
4683        let node = NodeBuilder::new("message")
4684            .attr("offline", "1")
4685            .attr("from", "5551234567@s.whatsapp.net")
4686            .attr("id", "TEST123")
4687            .attr("t", "1772884671")
4688            .attr("type", "text")
4689            .build();
4690
4691        client.process_node(Arc::new(node)).await;
4692        assert_eq!(
4693            client
4694                .offline_sync_metrics
4695                .processed_messages
4696                .load(Ordering::Acquire),
4697            1,
4698            "offline message should increment processed count"
4699        );
4700    }
4701
4702    // ---------------------------------------------------------------
4703    // Server-initiated ping detection tests
4704    //
4705    // The WhatsApp server can send pings in two formats:
4706    //
4707    // 1. Child-element format (legacy/whatsmeow style):
4708    //    <iq type="get" from="s.whatsapp.net" id="...">
4709    //      <ping/>
4710    //    </iq>
4711    //
4712    // 2. xmlns-attribute format (real WhatsApp Web format):
4713    //    <iq from="s.whatsapp.net" t="..." type="get" xmlns="urn:xmpp:ping"/>
4714    //    This is a self-closing tag with NO child elements.
4715    //    Verified against captured WhatsApp Web JS (WAWebCommsHandleStanza):
4716    //      if (t.xmlns === "urn:xmpp:ping") return wap("iq", { type: "result", to: t.from });
4717    //
4718    // Both must be recognized and answered with a pong, otherwise the
4719    // server considers the client dead and stops responding to keepalive
4720    // pings — causing a timeout cascade and forced reconnect.
4721    // ---------------------------------------------------------------
4722
4723    #[tokio::test]
4724    async fn test_handle_iq_ping_with_child_element() {
4725        // Format 1: <iq type="get"><ping/></iq> — the legacy format with a <ping> child node.
4726        let backend = crate::test_utils::create_test_backend().await;
4727        let pm = Arc::new(
4728            PersistenceManager::new(backend)
4729                .await
4730                .expect("persistence manager should initialize"),
4731        );
4732        let (client, _rx) = Client::new(
4733            Arc::new(crate::runtime_impl::TokioRuntime),
4734            pm,
4735            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4736            Arc::new(MockHttpClient),
4737            None,
4738        )
4739        .await;
4740
4741        let ping_node = NodeBuilder::new("iq")
4742            .attr("type", "get")
4743            .attr("from", SERVER_JID)
4744            .attr("id", "ping-child-1")
4745            .children([NodeBuilder::new("ping").build()])
4746            .build();
4747
4748        let handled = client.handle_iq(&ping_node).await;
4749        assert!(
4750            handled,
4751            "handle_iq must recognize ping with <ping> child element"
4752        );
4753    }
4754
4755    #[tokio::test]
4756    async fn test_handle_iq_ping_with_xmlns_attribute() {
4757        // Format 2: <iq type="get" xmlns="urn:xmpp:ping"/> — the real WhatsApp Web format.
4758        // This is a self-closing IQ with NO children, only an xmlns attribute.
4759        // The server sends this format; failing to respond causes keepalive timeout cascade.
4760        let backend = crate::test_utils::create_test_backend().await;
4761        let pm = Arc::new(
4762            PersistenceManager::new(backend)
4763                .await
4764                .expect("persistence manager should initialize"),
4765        );
4766        let (client, _rx) = Client::new(
4767            Arc::new(crate::runtime_impl::TokioRuntime),
4768            pm,
4769            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4770            Arc::new(MockHttpClient),
4771            None,
4772        )
4773        .await;
4774
4775        let ping_node = NodeBuilder::new("iq")
4776            .attr("type", "get")
4777            .attr("from", SERVER_JID)
4778            .attr("id", "ping-xmlns-1")
4779            .attr("xmlns", "urn:xmpp:ping")
4780            .build();
4781
4782        let handled = client.handle_iq(&ping_node).await;
4783        assert!(
4784            handled,
4785            "handle_iq must recognize ping with xmlns=\"urn:xmpp:ping\" attribute (no children)"
4786        );
4787    }
4788
4789    #[tokio::test]
4790    async fn test_handle_iq_ping_with_both_child_and_xmlns() {
4791        // Edge case: node has BOTH a <ping> child AND xmlns="urn:xmpp:ping".
4792        // Should still be handled (OR condition).
4793        let backend = crate::test_utils::create_test_backend().await;
4794        let pm = Arc::new(
4795            PersistenceManager::new(backend)
4796                .await
4797                .expect("persistence manager should initialize"),
4798        );
4799        let (client, _rx) = Client::new(
4800            Arc::new(crate::runtime_impl::TokioRuntime),
4801            pm,
4802            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4803            Arc::new(MockHttpClient),
4804            None,
4805        )
4806        .await;
4807
4808        let ping_node = NodeBuilder::new("iq")
4809            .attr("type", "get")
4810            .attr("from", SERVER_JID)
4811            .attr("id", "ping-both-1")
4812            .attr("xmlns", "urn:xmpp:ping")
4813            .children([NodeBuilder::new("ping").build()])
4814            .build();
4815
4816        let handled = client.handle_iq(&ping_node).await;
4817        assert!(
4818            handled,
4819            "handle_iq must handle ping with both child and xmlns"
4820        );
4821    }
4822
4823    #[tokio::test]
4824    async fn test_handle_iq_non_ping_returns_false() {
4825        // A type="get" IQ without ping child or xmlns should NOT be handled as ping.
4826        let backend = crate::test_utils::create_test_backend().await;
4827        let pm = Arc::new(
4828            PersistenceManager::new(backend)
4829                .await
4830                .expect("persistence manager should initialize"),
4831        );
4832        let (client, _rx) = Client::new(
4833            Arc::new(crate::runtime_impl::TokioRuntime),
4834            pm,
4835            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4836            Arc::new(MockHttpClient),
4837            None,
4838        )
4839        .await;
4840
4841        let non_ping_node = NodeBuilder::new("iq")
4842            .attr("type", "get")
4843            .attr("from", SERVER_JID)
4844            .attr("id", "not-a-ping")
4845            .attr("xmlns", "some:other:namespace")
4846            .build();
4847
4848        let handled = client.handle_iq(&non_ping_node).await;
4849        assert!(
4850            !handled,
4851            "handle_iq must NOT treat non-ping xmlns as a ping"
4852        );
4853    }
4854
4855    #[tokio::test]
4856    async fn test_handle_iq_ping_wrong_type_returns_false() {
4857        // xmlns="urn:xmpp:ping" but type="result" (not "get") — should NOT be handled as ping.
4858        let backend = crate::test_utils::create_test_backend().await;
4859        let pm = Arc::new(
4860            PersistenceManager::new(backend)
4861                .await
4862                .expect("persistence manager should initialize"),
4863        );
4864        let (client, _rx) = Client::new(
4865            Arc::new(crate::runtime_impl::TokioRuntime),
4866            pm,
4867            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4868            Arc::new(MockHttpClient),
4869            None,
4870        )
4871        .await;
4872
4873        let result_node = NodeBuilder::new("iq")
4874            .attr("type", "result")
4875            .attr("from", SERVER_JID)
4876            .attr("id", "ping-result-1")
4877            .attr("xmlns", "urn:xmpp:ping")
4878            .build();
4879
4880        let handled = client.handle_iq(&result_node).await;
4881        assert!(
4882            !handled,
4883            "handle_iq must NOT respond to type=\"result\" even with ping xmlns"
4884        );
4885    }
4886
4887    // ── build_pong tests ──────────────────────────────────────────────
4888
4889    #[test]
4890    fn test_build_pong_with_id() {
4891        let pong = build_pong("s.whatsapp.net".to_string(), Some("ping-123"));
4892        assert!(
4893            pong.attrs.get("id").is_some_and(|v| v == "ping-123"),
4894            "pong should include id when server ping has one"
4895        );
4896        assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4897        assert!(pong.attrs.get("to").is_some_and(|v| v == "s.whatsapp.net"));
4898    }
4899
4900    #[test]
4901    fn test_build_pong_without_id() {
4902        let pong = build_pong("s.whatsapp.net".to_string(), None);
4903        assert!(
4904            !pong.attrs.contains_key("id"),
4905            "pong should NOT include id when server ping has none"
4906        );
4907        assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4908    }
4909
4910    #[test]
4911    fn test_encrypt_identity_notification_omits_type() {
4912        let node = NodeBuilder::new("notification")
4913            .attr("from", "186303081611421@lid")
4914            .attr("id", "4128735301")
4915            .attr("type", "encrypt")
4916            .children([NodeBuilder::new("identity").build()])
4917            .build();
4918
4919        assert!(
4920            is_encrypt_identity_notification(&node),
4921            "identity-change notification ACK must omit type to match WA Web"
4922        );
4923    }
4924
4925    #[test]
4926    fn test_device_notification_is_not_encrypt_identity() {
4927        let node = NodeBuilder::new("notification")
4928            .attr("from", "186303081611421@lid")
4929            .attr("id", "269488578")
4930            .attr("type", "devices")
4931            .children([NodeBuilder::new("remove").build()])
4932            .build();
4933
4934        assert!(
4935            !is_encrypt_identity_notification(&node),
4936            "device notification is not an encrypt+identity notification"
4937        );
4938    }
4939
4940    #[test]
4941    fn test_build_ack_node_for_message_omits_type_includes_from() {
4942        // Whatsmeow: message acks do NOT echo type (node.Tag != "message" guard).
4943        // They DO include `from` with own device PN.
4944        let incoming = NodeBuilder::new("message")
4945            .attr("from", "120363161500776365@g.us")
4946            .attr("id", "A5791A5392EF60E3FB0670098DE010D4")
4947            .attr("type", "text")
4948            .attr("participant", "181531758878822@lid")
4949            .build();
4950        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4951            .parse()
4952            .expect("own device PN JID should parse");
4953
4954        let ack = build_ack_node(&incoming, Some(&own_device_pn))
4955            .expect("message ack should be buildable");
4956
4957        assert_eq!(ack.tag, "ack");
4958        // Use PartialEq<str> on NodeValue — works for both String and Jid variants
4959        // without allocation, so tests don't depend on internal representation.
4960        assert!(ack.attrs.get("class").is_some_and(|v| v == "message"));
4961        assert!(
4962            ack.attrs
4963                .get("to")
4964                .is_some_and(|v| v == "120363161500776365@g.us")
4965        );
4966        assert!(
4967            ack.attrs
4968                .get("from")
4969                .is_some_and(|v| v == "155500012345:48@s.whatsapp.net")
4970        );
4971        assert!(
4972            ack.attrs
4973                .get("participant")
4974                .is_some_and(|v| v == "181531758878822@lid")
4975        );
4976        assert!(
4977            !ack.attrs.contains_key("type"),
4978            "message ACK must NOT echo type (matches whatsmeow behavior)"
4979        );
4980    }
4981
4982    #[test]
4983    fn test_build_ack_node_for_identity_change_omits_type_and_from() {
4984        let incoming = NodeBuilder::new("notification")
4985            .attr("from", "186303081611421@lid")
4986            .attr("id", "4128735301")
4987            .attr("type", "encrypt")
4988            .children([NodeBuilder::new("identity").build()])
4989            .build();
4990        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4991            .parse()
4992            .expect("own device PN JID should parse");
4993
4994        let ack = build_ack_node(&incoming, Some(&own_device_pn))
4995            .expect("notification ack should be buildable");
4996
4997        assert!(ack.attrs.get("class").is_some_and(|v| v == "notification"));
4998        assert!(
4999            !ack.attrs.contains_key("type"),
5000            "identity-change notification ACK must omit type"
5001        );
5002        assert!(
5003            !ack.attrs.contains_key("from"),
5004            "notification ACKs should not include our device PN"
5005        );
5006    }
5007
5008    #[test]
5009    fn test_build_ack_node_for_receipt_with_type_echoes_type() {
5010        // Receipt acks should echo the type attribute when present (e.g. "read", "played").
5011        let incoming = NodeBuilder::new("receipt")
5012            .attr("from", "156535032389744@lid")
5013            .attr("id", "RCPT-WITH-TYPE")
5014            .attr("type", "read")
5015            .build();
5016        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
5017            .parse()
5018            .expect("own device PN JID should parse");
5019
5020        let ack = build_ack_node(&incoming, Some(&own_device_pn))
5021            .expect("receipt ack should be buildable");
5022
5023        assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
5024        assert!(
5025            ack.attrs.get("type").is_some_and(|v| v == "read"),
5026            "receipt ACK must echo the type attribute when present"
5027        );
5028        assert!(
5029            !ack.attrs.contains_key("from"),
5030            "receipt ACKs should not include our device PN"
5031        );
5032    }
5033
5034    #[test]
5035    fn test_build_ack_node_for_receipt_without_type_omits_type() {
5036        // Delivery receipts have no type attribute — the ack must also omit it.
5037        // Sending type="delivery" in the ack causes stream:error disconnections.
5038        let incoming = NodeBuilder::new("receipt")
5039            .attr("from", "156535032389744@lid")
5040            .attr("id", "RCPT-NO-TYPE")
5041            .build();
5042        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
5043            .parse()
5044            .expect("own device PN JID should parse");
5045
5046        let ack = build_ack_node(&incoming, Some(&own_device_pn))
5047            .expect("receipt ack should be buildable");
5048
5049        assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
5050        assert!(
5051            !ack.attrs.contains_key("type"),
5052            "receipt ACK must NOT contain type when the incoming receipt has no type attribute"
5053        );
5054        assert!(
5055            !ack.attrs.contains_key("from"),
5056            "receipt ACKs should not include our device PN"
5057        );
5058    }
5059
5060    /// Smoke test: server ping with xmlns but no id attribute is handled.
5061    #[tokio::test]
5062    async fn test_handle_iq_ping_without_id() {
5063        let backend = crate::test_utils::create_test_backend().await;
5064        let pm = Arc::new(
5065            PersistenceManager::new(backend)
5066                .await
5067                .expect("persistence manager should initialize"),
5068        );
5069        let (client, _rx) = Client::new(
5070            Arc::new(crate::runtime_impl::TokioRuntime),
5071            pm,
5072            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5073            Arc::new(MockHttpClient),
5074            None,
5075        )
5076        .await;
5077
5078        // Server ping without id — real format observed in production logs
5079        let ping_node = NodeBuilder::new("iq")
5080            .attr("type", "get")
5081            .attr("from", SERVER_JID)
5082            .attr("xmlns", "urn:xmpp:ping")
5083            .build();
5084
5085        let handled = client.handle_iq(&ping_node).await;
5086        assert!(
5087            handled,
5088            "handle_iq must recognize ping without id attribute"
5089        );
5090    }
5091
5092    // ── fibonacci_backoff tests ────────────────────────────────────────
5093
5094    #[test]
5095    fn test_fibonacci_backoff_sequence() {
5096        // WA Web: first=1000, second=1000 → 1,1,2,3,5,8,13,21,34,55,89,144...s
5097        // We test base values without jitter by checking the range (±10%).
5098        let expected_base_ms = [1000, 1000, 2000, 3000, 5000, 8000, 13000, 21000];
5099        for (attempt, &base) in expected_base_ms.iter().enumerate() {
5100            let delay = fibonacci_backoff(attempt as u32);
5101            let ms = delay.as_millis() as u64;
5102            let low = base - base / 10;
5103            let high = base + base / 10;
5104            assert!(
5105                ms >= low && ms <= high,
5106                "attempt {attempt}: expected {low}..={high}ms, got {ms}ms"
5107            );
5108        }
5109    }
5110
5111    #[test]
5112    fn test_fibonacci_backoff_max_900s() {
5113        // After many attempts, should cap at 900s (±10%)
5114        let delay = fibonacci_backoff(100);
5115        let ms = delay.as_millis() as u64;
5116        assert!(
5117            ms <= 990_000,
5118            "should never exceed 900s + 10% jitter, got {ms}ms"
5119        );
5120        assert!(
5121            ms >= 810_000,
5122            "should be at least 900s - 10% jitter, got {ms}ms"
5123        );
5124    }
5125
5126    #[test]
5127    fn test_fibonacci_backoff_first_attempt_is_1s() {
5128        let delay = fibonacci_backoff(0);
5129        let ms = delay.as_millis() as u64;
5130        assert!(
5131            (900..=1100).contains(&ms),
5132            "first attempt should be ~1s (±10%), got {ms}ms"
5133        );
5134    }
5135
5136    // ── stream error tests ─────────────────────────────────────────────
5137
5138    #[tokio::test]
5139    async fn test_stream_error_401_disables_reconnect() {
5140        let client = create_offline_sync_test_client().await;
5141        let node = NodeBuilder::new("stream:error").attr("code", "401").build();
5142        client.handle_stream_error(&node).await;
5143        assert!(
5144            !client.enable_auto_reconnect.load(Ordering::Relaxed),
5145            "401 should disable auto-reconnect"
5146        );
5147    }
5148
5149    #[tokio::test]
5150    async fn test_stream_error_409_disables_reconnect() {
5151        let client = create_offline_sync_test_client().await;
5152        let node = NodeBuilder::new("stream:error").attr("code", "409").build();
5153        client.handle_stream_error(&node).await;
5154        assert!(
5155            !client.enable_auto_reconnect.load(Ordering::Relaxed),
5156            "409 should disable auto-reconnect"
5157        );
5158    }
5159
5160    #[tokio::test]
5161    async fn test_stream_error_429_keeps_reconnect_with_backoff() {
5162        let client = create_offline_sync_test_client().await;
5163        let before = client.auto_reconnect_errors.load(Ordering::Relaxed);
5164        let node = NodeBuilder::new("stream:error").attr("code", "429").build();
5165        client.handle_stream_error(&node).await;
5166        assert!(
5167            client.enable_auto_reconnect.load(Ordering::Relaxed),
5168            "429 should keep auto-reconnect enabled"
5169        );
5170        let after = client.auto_reconnect_errors.load(Ordering::Relaxed);
5171        assert_eq!(
5172            after,
5173            before + 5,
5174            "429 should increase backoff by exactly 5: before={before}, after={after}"
5175        );
5176    }
5177
5178    #[tokio::test]
5179    async fn test_stream_error_503_keeps_reconnect() {
5180        let client = create_offline_sync_test_client().await;
5181        let node = NodeBuilder::new("stream:error").attr("code", "503").build();
5182        client.handle_stream_error(&node).await;
5183        assert!(
5184            client.enable_auto_reconnect.load(Ordering::Relaxed),
5185            "503 should keep auto-reconnect enabled"
5186        );
5187    }
5188
5189    #[tokio::test]
5190    async fn test_custom_cache_config_is_respected() {
5191        use crate::cache_config::{CacheConfig, CacheEntryConfig};
5192        use std::time::Duration;
5193
5194        let backend = crate::test_utils::create_test_backend().await;
5195        let pm = Arc::new(
5196            PersistenceManager::new(backend)
5197                .await
5198                .expect("persistence manager should initialize"),
5199        );
5200
5201        let custom_config = CacheConfig {
5202            group_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5203            device_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5204            ..CacheConfig::default()
5205        };
5206
5207        // Verify that constructing a client with a custom config does not panic
5208        // and the client is usable.
5209        let (client, _rx) = Client::new_with_cache_config(
5210            Arc::new(crate::runtime_impl::TokioRuntime),
5211            pm,
5212            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5213            Arc::new(MockHttpClient),
5214            None,
5215            custom_config,
5216        )
5217        .await;
5218
5219        assert!(!client.is_logged_in());
5220    }
5221
5222    /// Proves that `is_connected()` no longer gives false negatives under mutex
5223    /// contention. Before the fix, `try_lock()` would fail when another task held
5224    /// the noise_socket mutex, causing `is_connected()` to return `false` even
5225    /// though the connection was alive — silently dropping receipt acks.
5226    ///
5227    /// This test sets up a real NoiseSocket (same as socket unit tests) so it
5228    /// accurately models the pre-fix scenario: socket is Some + mutex is held
5229    /// by another task = old is_connected() returned false.
5230    #[tokio::test]
5231    async fn test_is_connected_not_affected_by_mutex_contention() {
5232        use crate::socket::NoiseSocket;
5233        use wacore::handshake::NoiseCipher;
5234
5235        let backend = crate::test_utils::create_test_backend().await;
5236        let pm = Arc::new(
5237            PersistenceManager::new(backend)
5238                .await
5239                .expect("persistence manager should initialize"),
5240        );
5241        let (client, _rx) = Client::new(
5242            Arc::new(crate::runtime_impl::TokioRuntime),
5243            pm,
5244            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5245            Arc::new(MockHttpClient),
5246            None,
5247        )
5248        .await;
5249
5250        // Initially not connected
5251        assert!(!client.is_connected(), "should start disconnected");
5252
5253        // Simulate a real connection: create a NoiseSocket and store it
5254        let transport: Arc<dyn crate::transport::Transport> =
5255            Arc::new(crate::transport::mock::MockTransport);
5256        let key = [0u8; 32];
5257        let write_key = NoiseCipher::new(&key).expect("valid key");
5258        let read_key = NoiseCipher::new(&key).expect("valid key");
5259        let noise_socket = NoiseSocket::new(
5260            Arc::new(crate::runtime_impl::TokioRuntime),
5261            transport,
5262            write_key,
5263            read_key,
5264        );
5265        *client.noise_socket.lock().await = Some(Arc::new(noise_socket));
5266        client.is_connected.store(true, Ordering::Release);
5267
5268        assert!(client.is_connected(), "should report connected");
5269
5270        // Hold the noise_socket mutex — this used to make is_connected() return
5271        // false via try_lock() even though the socket was Some(...)
5272        let _guard = client.noise_socket.lock().await;
5273        assert!(
5274            client.is_connected(),
5275            "is_connected() must return true even while noise_socket mutex is held"
5276        );
5277    }
5278
5279    /// Verifies that `send_ack_for` returns an error (not silent Ok) when
5280    /// disconnected. This ensures the caller's `warn!` fires so dropped acks
5281    /// are visible in logs.
5282    #[tokio::test]
5283    async fn test_send_ack_for_returns_error_when_disconnected() {
5284        let backend = crate::test_utils::create_test_backend().await;
5285        let pm = Arc::new(
5286            PersistenceManager::new(backend)
5287                .await
5288                .expect("persistence manager should initialize"),
5289        );
5290        let (client, _rx) = Client::new(
5291            Arc::new(crate::runtime_impl::TokioRuntime),
5292            pm,
5293            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5294            Arc::new(MockHttpClient),
5295            None,
5296        )
5297        .await;
5298
5299        // Not connected — send_ack_for should return Err, not Ok
5300        let receipt = NodeBuilder::new("receipt")
5301            .attr("from", "120363040237990503@g.us")
5302            .attr("id", "TEST-RECEIPT-ID")
5303            .attr("participant", "236395184570386@lid")
5304            .build();
5305
5306        let result = client.send_ack_for(&receipt).await;
5307        assert!(
5308            matches!(result, Err(ClientError::NotConnected)),
5309            "send_ack_for must return Err(NotConnected) when disconnected, got: {result:?}"
5310        );
5311    }
5312
5313    /// Verifies that `send_ack_for` returns Ok when expected_disconnect is set,
5314    /// since this is an intentional shutdown path.
5315    #[tokio::test]
5316    async fn test_send_ack_for_returns_ok_on_expected_disconnect() {
5317        let backend = crate::test_utils::create_test_backend().await;
5318        let pm = Arc::new(
5319            PersistenceManager::new(backend)
5320                .await
5321                .expect("persistence manager should initialize"),
5322        );
5323        let (client, _rx) = Client::new(
5324            Arc::new(crate::runtime_impl::TokioRuntime),
5325            pm,
5326            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5327            Arc::new(MockHttpClient),
5328            None,
5329        )
5330        .await;
5331
5332        // Set expected disconnect — send_ack_for should gracefully return Ok
5333        client.expected_disconnect.store(true, Ordering::Relaxed);
5334
5335        let receipt = NodeBuilder::new("receipt")
5336            .attr("from", "120363040237990503@g.us")
5337            .attr("id", "TEST-RECEIPT-ID")
5338            .build();
5339
5340        let result = client.send_ack_for(&receipt).await;
5341        assert!(
5342            result.is_ok(),
5343            "send_ack_for should return Ok during expected disconnect"
5344        );
5345    }
5346}