Skip to main content

whatsapp_rust/
client.rs

1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::cache::Cache;
8use crate::cache_store::TypedCache;
9use crate::handshake;
10use crate::lid_pn_cache::LidPnCache;
11use crate::pair;
12use anyhow::{Result, anyhow};
13use futures::FutureExt;
14use std::borrow::Cow;
15use std::collections::{HashMap, HashSet};
16
17use wacore::xml::DisplayableNode;
18use wacore_binary::builder::NodeBuilder;
19use wacore_binary::jid::JidExt;
20use wacore_binary::node::{Attrs, Node, NodeValue};
21
22use crate::appstate_sync::AppStateProcessor;
23use crate::handlers::chatstate::ChatStateEvent;
24use crate::jid_utils::server_jid;
25use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
26use crate::types::enc_handler::EncHandler;
27use crate::types::events::{ConnectFailureReason, Event};
28
29use log::{debug, error, info, trace, warn};
30
31use rand::{Rng, RngExt};
32use scopeguard;
33use wacore_binary::jid::Jid;
34
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
37
38/// Filter for matching incoming stanzas (nodes) by tag and attributes.
39///
40/// Used with [`Client::wait_for_node`] to wait for specific stanzas.
41/// Zero-cost when no waiters are active (single atomic load per node).
42///
43/// # Example
44/// ```ignore
45/// // Wait for a w:gp2 notification from a specific group
46/// let waiter = client.wait_for_node(
47///     NodeFilter::tag("notification")
48///         .attr("type", "w:gp2")
49///         .attr("from", "group@g.us"),
50/// );
51/// // ... trigger the action ...
52/// let node = waiter.await?;
53/// ```
54#[derive(Debug, Clone)]
55pub struct NodeFilter {
56    tag: String,
57    attrs: Vec<(String, String)>,
58}
59
60impl NodeFilter {
61    /// Create a filter matching nodes with the given tag.
62    pub fn tag(tag: impl Into<String>) -> Self {
63        Self {
64            tag: tag.into(),
65            attrs: Vec::new(),
66        }
67    }
68
69    /// Add an attribute constraint. All attributes must match.
70    pub fn attr(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
71        self.attrs.push((key.into(), value.into()));
72        self
73    }
74
75    /// Shorthand for `.attr("from", jid.to_string())`.
76    pub fn from_jid(self, jid: &Jid) -> Self {
77        self.attr("from", jid.to_string())
78    }
79
80    fn matches(&self, node: &Node) -> bool {
81        node.tag == self.tag
82            && self
83                .attrs
84                .iter()
85                .all(|(k, v)| node.attrs.get(k.as_str()).is_some_and(|attr| *attr == *v))
86    }
87}
88
89struct NodeWaiter {
90    filter: NodeFilter,
91    tx: futures::channel::oneshot::Sender<Arc<Node>>,
92}
93
94use async_lock::Mutex;
95use async_lock::RwLock;
96use std::time::Duration;
97use thiserror::Error;
98
99use wacore::appstate::patch_decode::WAPatchName;
100use wacore::client::context::GroupInfo;
101use wacore::runtime::timeout as rt_timeout;
102use waproto::whatsapp as wa;
103
104use crate::cache_config::CacheConfig;
105use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
106use crate::sync_task::MajorSyncTask;
107use wacore::runtime::Runtime;
108
109/// Type alias for chatstate event handler functions.
110type ChatStateHandler = Arc<dyn Fn(ChatStateEvent) + Send + Sync>;
111
112const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
113
114/// WA Web: MQTT `MqttProtocolClient.connect()` uses `CONNECT_TIMEOUT = 20s`,
115/// DGW `connectTimeoutMs` defaults to `20000ms`.
116const TRANSPORT_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
117
118/// Snapshot of internal collection sizes for memory leak detection.
119///
120/// All counts are approximate (moka caches may have pending evictions).
121/// Call [`Client::memory_diagnostics`] to obtain a snapshot.
122///
123/// Requires the `debug-diagnostics` feature.
124#[cfg(feature = "debug-diagnostics")]
125#[derive(Debug, Clone)]
126pub struct MemoryDiagnostics {
127    // -- Moka caches (TTL/capacity-bounded) --
128    pub group_cache: u64,
129    pub device_cache: u64,
130    pub device_registry_cache: u64,
131    pub lid_pn_lid_entries: u64,
132    pub lid_pn_pn_entries: u64,
133    pub retried_group_messages: u64,
134    pub recent_messages: u64,
135    pub message_retry_counts: u64,
136    pub pdo_pending_requests: u64,
137    // -- Moka caches (capacity-only, no TTL) --
138    pub session_locks: u64,
139    pub message_queues: u64,
140    pub message_enqueue_locks: u64,
141    // -- Unbounded collections --
142    pub response_waiters: usize,
143    pub node_waiters: usize,
144    pub pending_retries: usize,
145    pub presence_subscriptions: usize,
146    pub app_state_key_requests: usize,
147    pub app_state_syncing: usize,
148    pub signal_cache_sessions: usize,
149    pub signal_cache_identities: usize,
150    pub signal_cache_sender_keys: usize,
151    // -- Misc --
152    pub chatstate_handlers: usize,
153    pub custom_enc_handlers: usize,
154}
155
156#[cfg(feature = "debug-diagnostics")]
157impl std::fmt::Display for MemoryDiagnostics {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        writeln!(f, "=== Memory Diagnostics ===")?;
160        writeln!(f, "--- Moka caches (TTL-bounded) ---")?;
161        writeln!(f, "  group_cache:            {}", self.group_cache)?;
162        writeln!(f, "  device_cache:           {}", self.device_cache)?;
163        writeln!(
164            f,
165            "  device_registry_cache:  {}",
166            self.device_registry_cache
167        )?;
168        writeln!(f, "  lid_pn (lid):           {}", self.lid_pn_lid_entries)?;
169        writeln!(f, "  lid_pn (pn):            {}", self.lid_pn_pn_entries)?;
170        writeln!(
171            f,
172            "  retried_group_messages: {}",
173            self.retried_group_messages
174        )?;
175        writeln!(f, "  recent_messages:        {}", self.recent_messages)?;
176        writeln!(f, "  message_retry_counts:   {}", self.message_retry_counts)?;
177        writeln!(f, "  pdo_pending_requests:   {}", self.pdo_pending_requests)?;
178        writeln!(f, "--- Moka caches (capacity-only) ---")?;
179        writeln!(f, "  session_locks:          {}", self.session_locks)?;
180        writeln!(f, "  message_queues:         {}", self.message_queues)?;
181        writeln!(
182            f,
183            "  message_enqueue_locks:  {}",
184            self.message_enqueue_locks
185        )?;
186        writeln!(f, "--- Unbounded collections ---")?;
187        writeln!(f, "  response_waiters:       {}", self.response_waiters)?;
188        writeln!(f, "  node_waiters:           {}", self.node_waiters)?;
189        writeln!(f, "  pending_retries:        {}", self.pending_retries)?;
190        writeln!(
191            f,
192            "  presence_subscriptions: {}",
193            self.presence_subscriptions
194        )?;
195        writeln!(
196            f,
197            "  app_state_key_requests: {}",
198            self.app_state_key_requests
199        )?;
200        writeln!(f, "  app_state_syncing:      {}", self.app_state_syncing)?;
201        writeln!(
202            f,
203            "  signal_sessions:        {}",
204            self.signal_cache_sessions
205        )?;
206        writeln!(
207            f,
208            "  signal_identities:      {}",
209            self.signal_cache_identities
210        )?;
211        writeln!(
212            f,
213            "  signal_sender_keys:     {}",
214            self.signal_cache_sender_keys
215        )?;
216        writeln!(f, "--- Misc ---")?;
217        writeln!(f, "  chatstate_handlers:     {}", self.chatstate_handlers)?;
218        writeln!(f, "  custom_enc_handlers:    {}", self.custom_enc_handlers)?;
219        Ok(())
220    }
221}
222
223#[derive(Debug, Error)]
224pub enum ClientError {
225    #[error("client is not connected")]
226    NotConnected,
227    #[error("socket error: {0}")]
228    Socket(#[from] SocketError),
229    #[error("encrypt/send error: {0}")]
230    EncryptSend(#[from] EncryptSendError),
231    #[error("client is already connected")]
232    AlreadyConnected,
233    #[error("client is not logged in")]
234    NotLoggedIn,
235}
236
237use wacore::types::message::StanzaKey;
238
239/// Metrics for tracking offline sync progress
240#[derive(Debug)]
241pub(crate) struct OfflineSyncMetrics {
242    pub active: AtomicBool,
243    pub total_messages: AtomicUsize,
244    pub processed_messages: AtomicUsize,
245    // Using simple std Mutex for timestamp as it's rarely contended and non-async
246    pub start_time: std::sync::Mutex<Option<wacore::time::Instant>>,
247}
248
249pub struct Client {
250    pub(crate) runtime: Arc<dyn Runtime>,
251    pub(crate) core: wacore::client::CoreClient,
252
253    pub(crate) persistence_manager: Arc<PersistenceManager>,
254    pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
255
256    pub(crate) is_logged_in: Arc<AtomicBool>,
257    pub(crate) is_connecting: Arc<AtomicBool>,
258    pub(crate) is_running: Arc<AtomicBool>,
259    /// Whether the noise socket is established (connected to WhatsApp servers).
260    /// Uses an AtomicBool instead of probing the noise_socket mutex to avoid
261    /// TOCTOU races where `try_lock()` fails due to contention, not disconnection.
262    is_connected: Arc<AtomicBool>,
263    pub(crate) shutdown_notifier: Arc<event_listener::Event>,
264    /// Timestamp (ms since UNIX epoch) of the last received WebSocket data.
265    /// Updated on every `DataReceived` transport event.
266    /// WA Web: `parseAndHandleStanza` → `deadSocketTimer.cancel()`.
267    pub(crate) last_data_received_ms: Arc<AtomicU64>,
268    /// Timestamp (ms since UNIX epoch) of the last sent WebSocket data.
269    /// Updated on every `send_node` call.
270    /// WA Web: `callStanza` → `deadSocketTimer.onOrBefore(deadSocketTime)`.
271    pub(crate) last_data_sent_ms: Arc<AtomicU64>,
272
273    pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
274    pub(crate) transport_events:
275        Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
276    pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
277    pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
278
279    pub(crate) response_waiters:
280        Arc<Mutex<HashMap<String, futures::channel::oneshot::Sender<wacore_binary::Node>>>>,
281
282    /// Generic node waiters for waiting on specific stanzas by tag/attributes.
283    /// Uses std::sync::Mutex (not tokio) since the critical section is trivial.
284    /// Guarded by `node_waiter_count` for zero-cost when no waiters are active.
285    node_waiters: std::sync::Mutex<Vec<NodeWaiter>>,
286    node_waiter_count: AtomicUsize,
287
288    pub(crate) unique_id: String,
289    pub(crate) id_counter: Arc<AtomicU64>,
290
291    pub(crate) unified_session: crate::unified_session::UnifiedSessionManager,
292
293    /// In-memory cache for Signal protocol state (sessions, identities, sender keys).
294    /// Matches WhatsApp Web's SignalStoreCache pattern: crypto ops read/write this cache,
295    /// and DB writes are deferred to flush() after each message is processed.
296    pub(crate) signal_cache: Arc<crate::store::signal_cache::SignalStoreCache>,
297
298    /// Global semaphore that limits message processing concurrency.
299    /// During offline sync: permits=1 (sequential, like WA Web's allChatQueue)
300    /// After offline sync: permits=N (parallel per-chat processing)
301    /// Wrapped in std::sync::Mutex to allow replacing on reconnect.
302    pub(crate) message_processing_semaphore: std::sync::Mutex<Arc<async_lock::Semaphore>>,
303
304    /// Per-device session locks for Signal protocol operations.
305    /// Prevents race conditions when multiple messages from the same sender
306    /// are processed concurrently across different chats.
307    /// Keys are Signal protocol address strings (e.g., "user@s.whatsapp.net:0")
308    /// to match the SignalProtocolStoreAdapter's internal locking.
309    pub(crate) session_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
310
311    /// Per-chat message queues for sequential message processing.
312    /// Prevents race conditions where a later message is processed before
313    /// the PreKey message that establishes the Signal session.
314    pub(crate) message_queues: Cache<String, async_channel::Sender<Arc<Node>>>,
315
316    /// Cache for LID to Phone Number mappings (bidirectional).
317    /// When we receive a message with sender_lid/sender_pn attributes, we store the mapping here.
318    /// This allows us to reuse existing LID-based sessions when sending replies.
319    /// The cache is backed by persistent storage and warmed up on client initialization.
320    pub(crate) lid_pn_cache: Arc<LidPnCache>,
321
322    /// Per-chat mutex for serializing message enqueue operations.
323    /// This ensures messages are enqueued in the order they arrive,
324    /// preventing race conditions during queue initialization.
325    pub(crate) message_enqueue_locks: Cache<String, Arc<async_lock::Mutex<()>>>,
326
327    pub group_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, GroupInfo>>>>,
328    #[allow(clippy::type_complexity)]
329    pub device_cache: async_lock::Mutex<Option<Arc<TypedCache<Jid, Vec<Jid>>>>>,
330
331    pub(crate) retried_group_messages: Cache<String, ()>,
332    pub(crate) expected_disconnect: Arc<AtomicBool>,
333
334    /// Connection generation counter - incremented on each new connection.
335    /// Used to detect stale post-login tasks from previous connections.
336    pub(crate) connection_generation: Arc<AtomicU64>,
337
338    /// Cache for recent messages (serialized bytes) for retry functionality.
339    /// Uses moka cache with TTL and max capacity for automatic eviction.
340    pub(crate) recent_messages: Cache<StanzaKey, Vec<u8>>,
341
342    pub(crate) pending_retries: Arc<async_lock::Mutex<HashSet<String>>>,
343
344    /// Track retry attempts per message to prevent infinite retry loops.
345    /// Key: "{chat}:{msg_id}:{sender}", Value: retry count
346    /// Matches WhatsApp Web's MAX_RETRY = 5 behavior.
347    pub(crate) message_retry_counts: Cache<String, u8>,
348
349    pub enable_auto_reconnect: Arc<AtomicBool>,
350    pub auto_reconnect_errors: Arc<AtomicU32>,
351
352    pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
353
354    pub(crate) app_state_processor: async_lock::Mutex<Option<Arc<AppStateProcessor>>>,
355    pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, wacore::time::Instant>>>,
356    /// Tracks collections currently being synced to prevent duplicate sync tasks.
357    /// Matches WA Web's in-flight tracking set in WAWebSyncdCollectionsStateMachine.
358    pub(crate) app_state_syncing: Arc<Mutex<HashSet<WAPatchName>>>,
359    pub(crate) initial_keys_synced_notifier: Arc<event_listener::Event>,
360    pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
361
362    /// Tracks whether the server has our prekeys (matches WA Web's `setServerHasPreKeys`).
363    /// Set to `false` when encrypt/count notification arrives, `true` after successful upload.
364    pub(crate) server_has_prekeys: Arc<AtomicBool>,
365    /// Prevents concurrent prekey upload operations (matches WA Web's dedup set in `handlePreKeyLow`).
366    pub(crate) prekey_upload_lock: Arc<async_lock::Mutex<()>>,
367    /// Notifier for when offline sync (ib offline stanza) is received.
368    /// WhatsApp Web waits for this before sending passive tasks (prekey upload, active IQ, presence).
369    pub(crate) offline_sync_notifier: Arc<event_listener::Event>,
370    /// Flag indicating offline sync has completed (received ib offline stanza).
371    pub(crate) offline_sync_completed: Arc<AtomicBool>,
372    /// Number of history sync tasks currently queued or running.
373    pub(crate) history_sync_tasks_in_flight: Arc<AtomicUsize>,
374    /// Notifier triggered when history sync work becomes idle.
375    pub(crate) history_sync_idle_notifier: Arc<event_listener::Event>,
376    /// Contacts with active presence subscriptions that must be re-subscribed on reconnect.
377    pub(crate) presence_subscriptions: Arc<async_lock::Mutex<HashSet<Jid>>>,
378    /// Metrics for granular offline sync logging
379    pub(crate) offline_sync_metrics: Arc<OfflineSyncMetrics>,
380    /// Notifier for when the noise socket is established (before login).
381    /// Use this to wait for the socket to be ready for sending messages.
382    pub(crate) socket_ready_notifier: Arc<event_listener::Event>,
383    /// Set to `true` only when `dispatch_connected()` fires (after critical sync
384    /// completes). Reset on each new connection attempt. Used by
385    /// `wait_for_connected()` to avoid a false-positive fast path when the
386    /// client is logged in but critical app state hasn't synced yet.
387    pub(crate) is_ready: Arc<AtomicBool>,
388    /// Notifier for when the client is fully connected and logged in.
389    /// Triggered after Event::Connected is dispatched.
390    pub(crate) connected_notifier: Arc<event_listener::Event>,
391    pub(crate) major_sync_task_sender: async_channel::Sender<MajorSyncTask>,
392    pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<async_channel::Sender<()>>>>,
393
394    /// State machine for pair code authentication flow.
395    /// Tracks the pending pair code request and ephemeral keys.
396    pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
397
398    /// Custom handlers for encrypted message types
399    pub custom_enc_handlers: Arc<async_lock::RwLock<HashMap<String, Arc<dyn EncHandler>>>>,
400
401    /// Chat state (typing indicator) handlers registered by external consumers.
402    /// Each handler receives a `ChatStateEvent` describing the chat, optional participant and state.
403    pub(crate) chatstate_handlers: Arc<RwLock<Vec<ChatStateHandler>>>,
404
405    /// Cache for pending PDO (Peer Data Operation) requests.
406    /// Maps message cache keys (chat:id) to pending request info.
407    pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
408
409    /// LRU cache for device registry (matches WhatsApp Web's 5000 entry limit).
410    /// Maps user ID to DeviceListRecord for fast device existence checks.
411    /// Backed by persistent storage.
412    pub(crate) device_registry_cache: TypedCache<String, wacore::store::traits::DeviceListRecord>,
413
414    /// Router for dispatching stanzas to their appropriate handlers
415    pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
416
417    /// Whether to send ACKs synchronously or in a background task
418    pub(crate) synchronous_ack: bool,
419
420    /// HTTP client for making HTTP requests (media upload/download, version fetching)
421    pub http_client: Arc<dyn crate::http::HttpClient>,
422
423    /// Version override for testing or manual specification
424    pub(crate) override_version: Option<(u32, u32, u32)>,
425
426    /// When true, history sync notifications are acknowledged but not downloaded
427    /// or processed. Set via `BotBuilder::skip_history_sync()`.
428    pub(crate) skip_history_sync: AtomicBool,
429
430    /// Cache configuration for TTL and capacity of all caches.
431    /// Stored for use by lazily-initialized caches (group_cache, device_cache).
432    pub(crate) cache_config: CacheConfig,
433}
434
435impl Client {
436    fn should_downgrade_sync_error(&self, err: &anyhow::Error) -> bool {
437        if self.is_shutting_down() {
438            return true;
439        }
440
441        matches!(
442            err.downcast_ref::<crate::request::IqError>(),
443            Some(
444                crate::request::IqError::NotConnected
445                    | crate::request::IqError::InternalChannelClosed
446            )
447        )
448    }
449
450    /// Log a sync error, downgrading to debug level during shutdown/disconnect.
451    fn log_sync_error(&self, context: &str, err: &anyhow::Error) {
452        if self.should_downgrade_sync_error(err) {
453            debug!("Skipping {context} during shutdown: {err}");
454        } else {
455            warn!("Failed {context}: {err}");
456        }
457    }
458
459    /// Returns `true` when the client has completed its full startup:
460    /// transport connected, server authenticated, and critical app state synced.
461    /// This is the condition `wait_for_connected` uses to resolve.
462    fn is_fully_ready(&self) -> bool {
463        self.is_connected() && self.is_logged_in() && self.is_ready.load(Ordering::Relaxed)
464    }
465
466    /// Dispatch the Connected event and notify waiters.
467    fn dispatch_connected(&self) {
468        self.is_ready.store(true, Ordering::Relaxed);
469        self.core
470            .event_bus
471            .dispatch(&Event::Connected(crate::types::events::Connected));
472        self.connected_notifier.notify(usize::MAX);
473    }
474
475    /// Enable or disable skipping of history sync notifications at runtime.
476    ///
477    /// When enabled, the client will acknowledge incoming history sync
478    /// notifications but will not download or process the data.
479    pub fn set_skip_history_sync(&self, enabled: bool) {
480        self.skip_history_sync.store(enabled, Ordering::Relaxed);
481    }
482
483    /// Returns `true` if history sync notifications are currently being skipped.
484    pub fn skip_history_sync_enabled(&self) -> bool {
485        self.skip_history_sync.load(Ordering::Relaxed)
486    }
487
488    pub(crate) fn is_shutting_down(&self) -> bool {
489        self.expected_disconnect.load(Ordering::Relaxed) || !self.is_running.load(Ordering::Relaxed)
490    }
491
492    /// Create a new `Client` with default cache configuration.
493    ///
494    /// This is the standard constructor. Use [`Client::new_with_cache_config`]
495    /// if you need to customise cache TTL / capacity.
496    pub async fn new(
497        runtime: Arc<dyn Runtime>,
498        persistence_manager: Arc<PersistenceManager>,
499        transport_factory: Arc<dyn crate::transport::TransportFactory>,
500        http_client: Arc<dyn crate::http::HttpClient>,
501        override_version: Option<(u32, u32, u32)>,
502    ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
503        Self::new_with_cache_config(
504            runtime,
505            persistence_manager,
506            transport_factory,
507            http_client,
508            override_version,
509            CacheConfig::default(),
510        )
511        .await
512    }
513
514    /// Create a new `Client` with a custom [`CacheConfig`].
515    pub async fn new_with_cache_config(
516        runtime: Arc<dyn Runtime>,
517        persistence_manager: Arc<PersistenceManager>,
518        transport_factory: Arc<dyn crate::transport::TransportFactory>,
519        http_client: Arc<dyn crate::http::HttpClient>,
520        override_version: Option<(u32, u32, u32)>,
521        cache_config: CacheConfig,
522    ) -> (Arc<Self>, async_channel::Receiver<MajorSyncTask>) {
523        let mut unique_id_bytes = [0u8; 2];
524        rand::make_rng::<rand::rngs::StdRng>().fill_bytes(&mut unique_id_bytes);
525
526        let device_snapshot = persistence_manager.get_device_snapshot().await;
527        let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
528
529        let (tx, rx) = async_channel::bounded(32);
530
531        let this = Self {
532            runtime: runtime.clone(),
533            core,
534            persistence_manager: persistence_manager.clone(),
535            media_conn: Arc::new(RwLock::new(None)),
536            is_logged_in: Arc::new(AtomicBool::new(false)),
537            is_connecting: Arc::new(AtomicBool::new(false)),
538            is_running: Arc::new(AtomicBool::new(false)),
539            is_connected: Arc::new(AtomicBool::new(false)),
540            shutdown_notifier: Arc::new(event_listener::Event::new()),
541            last_data_received_ms: Arc::new(AtomicU64::new(0)),
542            last_data_sent_ms: Arc::new(AtomicU64::new(0)),
543
544            transport: Arc::new(Mutex::new(None)),
545            transport_events: Arc::new(Mutex::new(None)),
546            transport_factory,
547            noise_socket: Arc::new(Mutex::new(None)),
548
549            response_waiters: Arc::new(Mutex::new(HashMap::new())),
550            node_waiters: std::sync::Mutex::new(Vec::new()),
551            node_waiter_count: AtomicUsize::new(0),
552            unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
553            id_counter: Arc::new(AtomicU64::new(0)),
554            unified_session: crate::unified_session::UnifiedSessionManager::new(),
555
556            signal_cache: Arc::new(crate::store::signal_cache::SignalStoreCache::new()),
557            message_processing_semaphore: std::sync::Mutex::new(Arc::new(
558                async_lock::Semaphore::new(1),
559            )),
560            // Coordination caches: capacity-only eviction, no TTL/TTI.
561            // These hold live mutexes and channel senders; time-based eviction
562            // while tasks hold references would silently break serialisation.
563            session_locks: Cache::builder()
564                .max_capacity(cache_config.session_locks_capacity.max(1))
565                .build(),
566            message_queues: Cache::builder()
567                .max_capacity(cache_config.message_queues_capacity.max(1))
568                .build(),
569            lid_pn_cache: Arc::new(LidPnCache::with_config(
570                &cache_config.lid_pn_cache,
571                cache_config.cache_stores.lid_pn_cache.clone(),
572            )),
573            message_enqueue_locks: Cache::builder()
574                .max_capacity(cache_config.message_enqueue_locks_capacity.max(1))
575                .build(),
576            group_cache: async_lock::Mutex::new(None),
577            device_cache: async_lock::Mutex::new(None),
578            retried_group_messages: cache_config.retried_group_messages.build_with_ttl(),
579
580            expected_disconnect: Arc::new(AtomicBool::new(false)),
581            connection_generation: Arc::new(AtomicU64::new(0)),
582
583            recent_messages: cache_config.recent_messages.build_with_ttl(),
584
585            pending_retries: Arc::new(async_lock::Mutex::new(HashSet::new())),
586
587            message_retry_counts: cache_config.message_retry_counts.build_with_ttl(),
588
589            offline_sync_metrics: Arc::new(OfflineSyncMetrics {
590                active: AtomicBool::new(false),
591                total_messages: AtomicUsize::new(0),
592                processed_messages: AtomicUsize::new(0),
593                start_time: std::sync::Mutex::new(None),
594            }),
595
596            enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
597            auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
598
599            needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
600
601            app_state_processor: async_lock::Mutex::new(None),
602            app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
603            app_state_syncing: Arc::new(Mutex::new(HashSet::new())),
604            initial_keys_synced_notifier: Arc::new(event_listener::Event::new()),
605            initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
606            server_has_prekeys: Arc::new(AtomicBool::new(true)),
607            prekey_upload_lock: Arc::new(async_lock::Mutex::new(())),
608            offline_sync_notifier: Arc::new(event_listener::Event::new()),
609            offline_sync_completed: Arc::new(AtomicBool::new(false)),
610            history_sync_tasks_in_flight: Arc::new(AtomicUsize::new(0)),
611            history_sync_idle_notifier: Arc::new(event_listener::Event::new()),
612            presence_subscriptions: Arc::new(async_lock::Mutex::new(HashSet::new())),
613            socket_ready_notifier: Arc::new(event_listener::Event::new()),
614            is_ready: Arc::new(AtomicBool::new(false)),
615            connected_notifier: Arc::new(event_listener::Event::new()),
616            major_sync_task_sender: tx,
617            pairing_cancellation_tx: Arc::new(Mutex::new(None)),
618            pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
619            custom_enc_handlers: Arc::new(async_lock::RwLock::new(HashMap::new())),
620            chatstate_handlers: Arc::new(RwLock::new(Vec::new())),
621            pdo_pending_requests: cache_config.pdo_pending_requests.build_with_ttl(),
622            device_registry_cache: cache_config.device_registry_cache.build_typed_ttl(
623                cache_config.cache_stores.device_registry_cache.clone(),
624                "device_registry",
625            ),
626            stanza_router: Self::create_stanza_router(),
627            synchronous_ack: false,
628            http_client,
629            override_version,
630            skip_history_sync: AtomicBool::new(false),
631            cache_config,
632        };
633
634        let arc = Arc::new(this);
635
636        // Warm up the LID-PN cache from persistent storage
637        let warm_up_arc = arc.clone();
638        arc.runtime
639            .spawn(Box::pin(async move {
640                if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
641                    warn!("Failed to warm up LID-PN cache: {e}");
642                }
643            }))
644            .detach();
645
646        // Start background task to clean up stale device registry entries
647        let cleanup_arc = arc.clone();
648        arc.runtime
649            .spawn(Box::pin(async move {
650                cleanup_arc.device_registry_cleanup_loop().await;
651            }))
652            .detach();
653
654        (arc, rx)
655    }
656
657    pub(crate) async fn get_group_cache(&self) -> Arc<TypedCache<Jid, GroupInfo>> {
658        let mut guard = self.group_cache.lock().await;
659        if let Some(cache) = guard.as_ref() {
660            return cache.clone();
661        }
662        debug!("Initializing Group Cache for the first time.");
663        let cache = Arc::new(
664            self.cache_config
665                .group_cache
666                .build_typed_ttl(self.cache_config.cache_stores.group_cache.clone(), "group"),
667        );
668        *guard = Some(cache.clone());
669        cache
670    }
671
672    pub(crate) async fn get_device_cache(&self) -> Arc<TypedCache<Jid, Vec<Jid>>> {
673        let mut guard = self.device_cache.lock().await;
674        if let Some(cache) = guard.as_ref() {
675            return cache.clone();
676        }
677        debug!("Initializing Device Cache for the first time.");
678        let cache = Arc::new(self.cache_config.device_cache.build_typed_ttl(
679            self.cache_config.cache_stores.device_cache.clone(),
680            "device",
681        ));
682        *guard = Some(cache.clone());
683        cache
684    }
685
686    pub(crate) async fn get_app_state_processor(&self) -> Arc<AppStateProcessor> {
687        let mut guard = self.app_state_processor.lock().await;
688        if let Some(proc) = guard.as_ref() {
689            return proc.clone();
690        }
691        debug!("Initializing AppStateProcessor for the first time.");
692        let proc = Arc::new(AppStateProcessor::new(
693            self.persistence_manager.backend(),
694            self.runtime.clone(),
695        ));
696        *guard = Some(proc.clone());
697        proc
698    }
699
700    /// Create and configure the stanza router with all the handlers.
701    fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
702        use crate::handlers::{
703            basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
704            chatstate::ChatstateHandler,
705            ib::IbHandler,
706            iq::IqHandler,
707            message::MessageHandler,
708            notification::NotificationHandler,
709            receipt::ReceiptHandler,
710            router::StanzaRouter,
711            unimplemented::UnimplementedHandler,
712        };
713
714        let mut router = StanzaRouter::new();
715
716        // Register all handlers
717        router.register(Arc::new(MessageHandler));
718        router.register(Arc::new(ReceiptHandler));
719        router.register(Arc::new(IqHandler));
720        router.register(Arc::new(SuccessHandler));
721        router.register(Arc::new(FailureHandler));
722        router.register(Arc::new(StreamErrorHandler));
723        router.register(Arc::new(IbHandler));
724        router.register(Arc::new(NotificationHandler));
725        router.register(Arc::new(AckHandler));
726        router.register(Arc::new(ChatstateHandler));
727
728        // Register unimplemented handlers
729        router.register(Arc::new(UnimplementedHandler::for_call()));
730        router.register(Arc::new(crate::handlers::presence::PresenceHandler));
731
732        router
733    }
734
735    /// Registers an external event handler to the core event bus.
736    pub fn register_handler(&self, handler: Arc<dyn wacore::types::events::EventHandler>) {
737        self.core.event_bus.add_handler(handler);
738    }
739
740    /// Register a chatstate handler which will be invoked when a `<chatstate>` stanza is received.
741    ///
742    /// The handler receives a `ChatStateEvent` with the parsed chat state information.
743    pub async fn register_chatstate_handler(
744        &self,
745        handler: Arc<dyn Fn(ChatStateEvent) + Send + Sync>,
746    ) {
747        self.chatstate_handlers.write().await.push(handler);
748    }
749
750    /// Dispatch a parsed chatstate stanza to registered handlers.
751    ///
752    /// Called by `ChatstateHandler` after parsing the incoming stanza.
753    pub(crate) async fn dispatch_chatstate_event(
754        &self,
755        stanza: wacore::iq::chatstate::ChatstateStanza,
756    ) {
757        use wacore::iq::chatstate::{ChatstateSource, ReceivedChatState};
758        use wacore::types::events::ChatPresenceUpdate;
759        use wacore::types::message::MessageSource;
760        use wacore::types::presence::{ChatPresence, ChatPresenceMedia};
761
762        // Dispatch via event bus
763        let (chat, sender, is_group) = match &stanza.source {
764            ChatstateSource::User { from } => (from.clone(), from.clone(), false),
765            ChatstateSource::Group { from, participant } => {
766                (from.clone(), participant.clone(), true)
767            }
768        };
769
770        let (state, media) = match stanza.state {
771            ReceivedChatState::Typing => (ChatPresence::Composing, ChatPresenceMedia::Text),
772            ReceivedChatState::RecordingAudio => {
773                (ChatPresence::Composing, ChatPresenceMedia::Audio)
774            }
775            ReceivedChatState::Idle => (ChatPresence::Paused, ChatPresenceMedia::Text),
776        };
777
778        self.core
779            .event_bus
780            .dispatch(&Event::ChatPresence(ChatPresenceUpdate {
781                source: MessageSource {
782                    chat,
783                    sender,
784                    is_from_me: false,
785                    is_group,
786                    addressing_mode: None,
787                    sender_alt: None,
788                    recipient_alt: None,
789                    broadcast_list_owner: None,
790                    recipient: None,
791                },
792                state,
793                media,
794            }));
795
796        // Invoke legacy callback handlers
797        let event = ChatStateEvent::from_stanza(stanza);
798        let handlers = self.chatstate_handlers.read().await.clone();
799        for handler in handlers {
800            let event_clone = event.clone();
801            let handler_clone = handler.clone();
802            self.runtime
803                .spawn(Box::pin(async move {
804                    (handler_clone)(event_clone);
805                }))
806                .detach();
807        }
808    }
809
810    pub async fn run(self: &Arc<Self>) {
811        if self.is_running.swap(true, Ordering::SeqCst) {
812            warn!("Client `run` method called while already running.");
813            return;
814        }
815        while self.is_running.load(Ordering::Relaxed) {
816            self.expected_disconnect.store(false, Ordering::Relaxed);
817
818            if let Err(connect_err) = self.connect().await {
819                error!("Failed to connect: {connect_err:#}. Will retry...");
820            } else {
821                if self.read_messages_loop().await.is_err() {
822                    warn!(
823                        "Message loop exited with an error. Will attempt to reconnect if enabled."
824                    );
825                } else if self.expected_disconnect.load(Ordering::Relaxed) {
826                    debug!("Message loop exited gracefully (expected disconnect).");
827                } else {
828                    info!("Message loop exited gracefully.");
829                }
830
831                self.cleanup_connection_state().await;
832            }
833
834            if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
835                info!("Auto-reconnect disabled, shutting down.");
836                self.is_running.store(false, Ordering::Relaxed);
837                break;
838            }
839
840            // If this was an expected disconnect (e.g., 515 after pairing), reconnect immediately
841            if self.expected_disconnect.load(Ordering::Relaxed) {
842                self.auto_reconnect_errors.store(0, Ordering::Relaxed);
843                info!("Expected disconnect (e.g., 515), reconnecting immediately...");
844                continue;
845            }
846
847            let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
848            // WA Web: Fibonacci backoff with 10% jitter, max 900s.
849            // algo: { type: "fibonacci", first: 1000, second: 1000 }
850            // jitter: 0.1, max: 9e5
851            let delay = fibonacci_backoff(error_count);
852            info!(
853                "Will attempt to reconnect in {:?} (attempt {})",
854                delay,
855                error_count + 1
856            );
857            self.runtime.sleep(delay).await;
858        }
859        info!("Client run loop has shut down.");
860    }
861
862    pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
863        if self.is_connecting.swap(true, Ordering::SeqCst) {
864            return Err(ClientError::AlreadyConnected.into());
865        }
866
867        let _guard = scopeguard::guard((), |_| {
868            self.is_connecting.store(false, Ordering::Relaxed);
869        });
870
871        if self.is_connected() {
872            return Err(ClientError::AlreadyConnected.into());
873        }
874
875        // Reset login state for new connection attempt. This ensures that
876        // handle_success will properly process the <success> stanza even if
877        // a previous connection's post-login task bailed out early.
878        self.is_logged_in.store(false, Ordering::Relaxed);
879        self.is_ready.store(false, Ordering::Relaxed);
880        self.is_connected.store(false, Ordering::Relaxed);
881        self.offline_sync_completed.store(false, Ordering::Relaxed);
882        self.server_has_prekeys.store(true, Ordering::Relaxed);
883
884        // WA Web: both MQTT and DGW transports use a 20s connect timeout.
885        // Without this, a dead network blocks on the OS TCP SYN timeout (~60-75s).
886        // Version fetch is also wrapped so a hung HTTP request doesn't block connect().
887        let version_future = rt_timeout(
888            &*self.runtime,
889            TRANSPORT_CONNECT_TIMEOUT,
890            crate::version::resolve_and_update_version(
891                &self.persistence_manager,
892                &self.http_client,
893                self.override_version,
894            ),
895        );
896        let transport_future = rt_timeout(
897            &*self.runtime,
898            TRANSPORT_CONNECT_TIMEOUT,
899            self.transport_factory.create_transport(),
900        );
901
902        debug!("Connecting WebSocket and fetching latest client version in parallel...");
903        let (version_result, transport_result) = futures::join!(version_future, transport_future);
904
905        version_result
906            .map_err(|_| anyhow!("Version fetch timed out after {TRANSPORT_CONNECT_TIMEOUT:?}"))?
907            .map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
908        let (transport, mut transport_events) = transport_result.map_err(|_| {
909            anyhow!("Transport connect timed out after {TRANSPORT_CONNECT_TIMEOUT:?}")
910        })??;
911        debug!("Version fetch and transport connection established.");
912
913        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
914
915        let noise_socket = handshake::do_handshake(
916            self.runtime.clone(),
917            &device_snapshot,
918            transport.clone(),
919            &mut transport_events,
920        )
921        .await?;
922
923        *self.transport.lock().await = Some(transport);
924        *self.transport_events.lock().await = Some(transport_events);
925        *self.noise_socket.lock().await = Some(noise_socket);
926        self.is_connected.store(true, Ordering::Release);
927
928        // Notify waiters that socket is ready (before login)
929        self.socket_ready_notifier.notify(usize::MAX);
930
931        let client_clone = self.clone();
932        self.runtime
933            .spawn(Box::pin(async move { client_clone.keepalive_loop().await }))
934            .detach();
935
936        Ok(())
937    }
938
939    pub async fn disconnect(self: &Arc<Self>) {
940        info!("Disconnecting client intentionally.");
941        self.expected_disconnect.store(true, Ordering::Relaxed);
942        self.is_running.store(false, Ordering::Relaxed);
943        self.shutdown_notifier.notify(usize::MAX);
944
945        // Flush dirty device state before tearing down the connection.
946        if let Err(e) = self.persistence_manager.flush().await {
947            log::error!("Failed to flush device state during disconnect: {e}");
948        }
949
950        if let Some(transport) = self.transport.lock().await.as_ref() {
951            transport.disconnect().await;
952        }
953        self.cleanup_connection_state().await;
954    }
955
956    /// Backoff step used by [`reconnect()`] to create an offline window.
957    ///
958    /// `fibonacci_backoff(RECONNECT_BACKOFF_STEP)` determines the delay before
959    /// the run loop re-connects.  This must be longer than the mock server's
960    /// chatstate TTL (`CHATSTATE_TTL_SECS=3`) so TTL-expiry tests pass.
961    ///
962    /// Sequence: fib(0)=1s, fib(1)=1s, fib(2)=2s, fib(3)=3s, **fib(4)=5s**.
963    pub const RECONNECT_BACKOFF_STEP: u32 = 4;
964
965    /// Drop the current connection and trigger the auto-reconnect loop.
966    ///
967    /// Unlike [`disconnect`], this does **not** stop the run loop. The client
968    /// will reconnect automatically using the same persisted identity/store,
969    /// just as it would after a network interruption. Use
970    /// [`wait_for_connected`] to wait for the new connection to be ready.
971    ///
972    /// This is useful for:
973    /// - Handling network changes (e.g., Wi-Fi → cellular)
974    /// - Forcing a fresh server session
975    /// - Testing offline message delivery
976    pub async fn reconnect(self: &Arc<Self>) {
977        info!("Reconnecting: dropping transport for auto-reconnect.");
978        self.auto_reconnect_errors
979            .store(Self::RECONNECT_BACKOFF_STEP, Ordering::Relaxed);
980        if let Some(transport) = self.transport.lock().await.as_ref() {
981            transport.disconnect().await;
982        }
983    }
984
985    /// Drop the current connection and reconnect immediately with no delay.
986    ///
987    /// Unlike [`reconnect`], which introduces a deliberate offline window,
988    /// this method sets the `expected_disconnect` flag so the run loop
989    /// skips the backoff delay and reconnects as fast as possible.
990    pub async fn reconnect_immediately(self: &Arc<Self>) {
991        info!("Reconnecting immediately (expected disconnect).");
992        self.expected_disconnect.store(true, Ordering::Relaxed);
993        if let Some(transport) = self.transport.lock().await.as_ref() {
994            transport.disconnect().await;
995        }
996    }
997
998    async fn cleanup_connection_state(&self) {
999        self.is_logged_in.store(false, Ordering::Relaxed);
1000        self.is_ready.store(false, Ordering::Relaxed);
1001        // Signal the keepalive loop (and any other tasks) to exit promptly.
1002        // Without this, a stale keepalive loop can overlap with the next one
1003        // after reconnect, causing duplicate pings.
1004        self.shutdown_notifier.notify(usize::MAX);
1005        *self.transport.lock().await = None;
1006        *self.transport_events.lock().await = None;
1007        *self.noise_socket.lock().await = None;
1008        // Clear is_connected AFTER noise_socket is None, so no task can see
1009        // is_connected==true with a cleared socket. send_node() independently
1010        // checks the socket, but this ordering avoids a confusing state window.
1011        self.is_connected.store(false, Ordering::Release);
1012        self.retried_group_messages.invalidate_all();
1013        // Clear signal cache so stale state doesn't leak across connections
1014        self.signal_cache.clear().await;
1015        // Reset message processing semaphore to 1 permit (sequential mode for next offline sync).
1016        // Old workers holding the previous semaphore Arc will finish normally.
1017        *self.message_processing_semaphore.lock().unwrap() =
1018            Arc::new(async_lock::Semaphore::new(1));
1019        // Reset dead-socket timestamps so stale values from the previous
1020        // connection don't trigger an immediate reconnect on the next one.
1021        self.last_data_received_ms.store(0, Ordering::Relaxed);
1022        self.last_data_sent_ms.store(0, Ordering::Relaxed);
1023        // Reset offline sync state for next connection
1024        self.offline_sync_completed.store(false, Ordering::Relaxed);
1025        self.offline_sync_metrics
1026            .active
1027            .store(false, Ordering::Release);
1028        self.offline_sync_metrics
1029            .total_messages
1030            .store(0, Ordering::Release);
1031        self.offline_sync_metrics
1032            .processed_messages
1033            .store(0, Ordering::Release);
1034        match self.offline_sync_metrics.start_time.lock() {
1035            Ok(mut guard) => *guard = None,
1036            Err(poison) => *poison.into_inner() = None,
1037        }
1038        self.server_has_prekeys.store(true, Ordering::Relaxed);
1039        self.history_sync_tasks_in_flight
1040            .store(0, Ordering::Relaxed);
1041        self.history_sync_idle_notifier.notify(usize::MAX);
1042        // Drain all pending IQ waiters so they fail fast with InternalChannelClosed
1043        // instead of hanging until the 75s timeout.
1044        let mut waiters_map = self.response_waiters.lock().await;
1045        let waiter_count = waiters_map.len();
1046        // Replace with new map to release backing storage; old senders drop here,
1047        // causing receivers to get RecvError → IqError::InternalChannelClosed
1048        *waiters_map = HashMap::new();
1049        drop(waiters_map);
1050        if waiter_count > 0 {
1051            debug!(
1052                "Dropping {} orphaned IQ response waiter(s) on disconnect",
1053                waiter_count
1054            );
1055        }
1056
1057        // Clear app state tracking maps to prevent unbounded growth across reconnections.
1058        // Replace with new collections to release backing storage.
1059        *self.app_state_key_requests.lock().await = HashMap::new();
1060        *self.app_state_syncing.lock().await = HashSet::new();
1061
1062        // Drop stale media connection (auth tokens become invalid on reconnect)
1063        *self.media_conn.write().await = None;
1064
1065        // Clear app state key cache — keys will be re-fetched from DB on demand
1066        if let Some(proc) = self.app_state_processor.lock().await.as_ref() {
1067            proc.clear_key_cache().await;
1068        }
1069    }
1070
1071    /// Returns a snapshot of all internal collection sizes for memory leak detection.
1072    ///
1073    /// Moka caches report approximate counts (pending evictions may not be reflected).
1074    /// Call `run_pending_tasks()` on individual caches first if you need exact counts.
1075    ///
1076    /// Requires the `debug-diagnostics` feature.
1077    #[cfg(feature = "debug-diagnostics")]
1078    pub async fn memory_diagnostics(&self) -> MemoryDiagnostics {
1079        let (sig_sessions, sig_identities, sig_sender_keys) =
1080            self.signal_cache.entry_counts().await;
1081        let (lid_lid, lid_pn) = self.lid_pn_cache.entry_counts();
1082
1083        MemoryDiagnostics {
1084            group_cache: self
1085                .group_cache
1086                .lock()
1087                .await
1088                .as_ref()
1089                .map_or(0, |c| c.entry_count()),
1090            device_cache: self
1091                .device_cache
1092                .lock()
1093                .await
1094                .as_ref()
1095                .map_or(0, |c| c.entry_count()),
1096            device_registry_cache: self.device_registry_cache.entry_count(),
1097            lid_pn_lid_entries: lid_lid,
1098            lid_pn_pn_entries: lid_pn,
1099            retried_group_messages: self.retried_group_messages.entry_count(),
1100            recent_messages: self.recent_messages.entry_count(),
1101            message_retry_counts: self.message_retry_counts.entry_count(),
1102            pdo_pending_requests: self.pdo_pending_requests.entry_count(),
1103            session_locks: self.session_locks.entry_count(),
1104            message_queues: self.message_queues.entry_count(),
1105            message_enqueue_locks: self.message_enqueue_locks.entry_count(),
1106            response_waiters: self.response_waiters.lock().await.len(),
1107            node_waiters: self.node_waiter_count.load(Ordering::Relaxed),
1108            pending_retries: self.pending_retries.lock().await.len(),
1109            presence_subscriptions: self.presence_subscriptions.lock().await.len(),
1110            app_state_key_requests: self.app_state_key_requests.lock().await.len(),
1111            app_state_syncing: self.app_state_syncing.lock().await.len(),
1112            signal_cache_sessions: sig_sessions,
1113            signal_cache_identities: sig_identities,
1114            signal_cache_sender_keys: sig_sender_keys,
1115            chatstate_handlers: self.chatstate_handlers.read().await.len(),
1116            custom_enc_handlers: self.custom_enc_handlers.read().await.len(),
1117        }
1118    }
1119
1120    /// Flush the in-memory signal cache to the database backend.
1121    /// Called after each message is decrypted or after encryption operations.
1122    pub(crate) async fn flush_signal_cache(&self) -> Result<(), anyhow::Error> {
1123        let device = self.persistence_manager.get_device_arc().await;
1124        let device_guard = device.read().await;
1125        self.signal_cache
1126            .flush(&*device_guard.backend)
1127            .await
1128            .map_err(|e| anyhow::anyhow!("Failed to flush signal cache: {e}"))
1129    }
1130
1131    async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
1132        debug!("Starting message processing loop...");
1133
1134        let mut rx_guard = self.transport_events.lock().await;
1135        let transport_events = rx_guard
1136            .take()
1137            .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
1138        drop(rx_guard);
1139
1140        // Frame decoder to parse incoming data
1141        let mut frame_decoder = wacore::framing::FrameDecoder::new();
1142
1143        loop {
1144            futures::select_biased! {
1145                    _ = self.shutdown_notifier.listen().fuse() => {
1146                        debug!("Shutdown signaled in message loop. Exiting message loop.");
1147                        return Ok(());
1148                    },
1149                    event_result = transport_events.recv().fuse() => {
1150                        match event_result {
1151                            Ok(crate::transport::TransportEvent::DataReceived(data)) => {
1152                                // Update dead-socket timer (WA Web: deadSocketTimer reset)
1153                                self.last_data_received_ms.store(
1154                                    wacore::time::now_millis() as u64,
1155                                    Ordering::Relaxed,
1156                                );
1157
1158                                // Feed data into the frame decoder
1159                                frame_decoder.feed(&data);
1160
1161                                // Process all complete frames.
1162                                // Frame decryption must be sequential (noise protocol counter),
1163                                // but we spawn node processing concurrently after decryption.
1164                                let mut frames_in_batch: u32 = 0;
1165
1166                                while let Some(encrypted_frame) = frame_decoder.decode_frame() {
1167                                    // Decrypt the frame synchronously (required for noise counter ordering)
1168                                    if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
1169                                        // Determine processing mode for this node:
1170                                        // - Critical nodes (success/failure/stream:error): inline, required for state
1171                                        // - Message nodes: inline, preserves arrival order for per-chat queues
1172                                        //   (MessageHandler just enqueues + ACKs, heavy crypto runs in workers)
1173                                        // - ib (in-band): inline, ensures offline sync tracking (expected count)
1174                                        //   is set up before offline messages are processed
1175                                        // - Everything else: spawned concurrently for parallelism
1176                                        let process_inline = matches!(
1177                                            node.tag.as_ref(),
1178                                            "success" | "failure" | "stream:error" | "message" | "ib"
1179                                        );
1180
1181                                        if process_inline {
1182                                            self.process_decrypted_node(node).await;
1183                                        } else {
1184                                            let client = self.clone();
1185                                            self.runtime.spawn(Box::pin(async move {
1186                                                client.process_decrypted_node(node).await;
1187                                            })).detach();
1188                                        }
1189                                    }
1190
1191                                    // Check if we should exit after processing (e.g., after 515 stream error)
1192                                    if self.expected_disconnect.load(Ordering::Relaxed) {
1193                                        debug!("Expected disconnect signaled during frame processing. Exiting message loop.");
1194                                        return Ok(());
1195                                    }
1196
1197                                    // Cooperative yield: give other tasks a chance to run.
1198                                    // The runtime decides whether yielding is needed — returns
1199                                    // None (zero-cost) when unnecessary, Some(fut) otherwise.
1200                                    frames_in_batch += 1;
1201                                    if frames_in_batch.is_multiple_of(10)
1202                                        && let Some(yield_fut) = self.runtime.yield_now()
1203                                    {
1204                                        yield_fut.await;
1205                                    }
1206                                }
1207                            },
1208                            Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
1209                                self.cleanup_connection_state().await;
1210                                 if !self.expected_disconnect.load(Ordering::Relaxed) {
1211                                    self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
1212                                    debug!("Transport disconnected unexpectedly.");
1213                                    return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
1214                                } else {
1215                                    debug!("Transport disconnected as expected.");
1216                                    return Ok(());
1217                                }
1218                            }
1219                            Ok(crate::transport::TransportEvent::Connected) => {
1220                                // Already handled during handshake, but could be useful for logging
1221                                debug!("Transport connected event received");
1222                            }
1223                    }
1224                }
1225            }
1226        }
1227    }
1228
1229    /// Decrypt a frame and return the parsed node.
1230    /// This must be called sequentially due to noise protocol counter requirements.
1231    pub(crate) async fn decrypt_frame(
1232        self: &Arc<Self>,
1233        encrypted_frame: &bytes::Bytes,
1234    ) -> Option<wacore_binary::node::Node> {
1235        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1236        let noise_socket = match noise_socket_arc {
1237            Some(s) => s,
1238            None => {
1239                log::error!("Cannot process frame: not connected (no noise socket)");
1240                return None;
1241            }
1242        };
1243
1244        let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
1245            Ok(p) => p,
1246            Err(e) => {
1247                log::error!("Failed to decrypt frame: {e}");
1248                return None;
1249            }
1250        };
1251
1252        let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
1253            Ok(data) => data,
1254            Err(e) => {
1255                log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
1256                return None;
1257            }
1258        };
1259
1260        match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
1261            Ok(node_ref) => Some(node_ref.to_owned()),
1262            Err(e) => {
1263                log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
1264                None
1265            }
1266        }
1267    }
1268
1269    /// Process an already-decrypted node.
1270    /// This can be spawned concurrently since it doesn't depend on noise protocol state.
1271    /// The node is wrapped in Arc to avoid cloning when passing through handlers.
1272    pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
1273        // Wrap in Arc once - all handlers will share this same allocation
1274        let node_arc = Arc::new(node);
1275        self.process_node(node_arc).await;
1276    }
1277
1278    /// Process a node wrapped in Arc. Handlers receive the Arc and can share/store it cheaply.
1279    pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
1280        use wacore::xml::DisplayableNode;
1281
1282        // --- Offline Sync Tracking ---
1283        if node.tag.as_ref() == "ib" {
1284            // Check for offline_preview child to get expected count
1285            if let Some(preview) = node.get_optional_child("offline_preview") {
1286                let count: usize = preview
1287                    .attrs
1288                    .get("count")
1289                    .and_then(|v| v.as_str().parse().ok())
1290                    .unwrap_or(0);
1291
1292                if count == 0 {
1293                    self.offline_sync_metrics
1294                        .active
1295                        .store(false, Ordering::Release);
1296                    debug!(target: "Client/OfflineSync", "Sync COMPLETED: 0 items.");
1297                } else {
1298                    // Use stronger memory ordering for state transitions
1299                    self.offline_sync_metrics
1300                        .total_messages
1301                        .store(count, Ordering::Release);
1302                    self.offline_sync_metrics
1303                        .processed_messages
1304                        .store(0, Ordering::Release);
1305                    self.offline_sync_metrics
1306                        .active
1307                        .store(true, Ordering::Release);
1308                    match self.offline_sync_metrics.start_time.lock() {
1309                        Ok(mut guard) => *guard = Some(wacore::time::Instant::now()),
1310                        Err(poison) => *poison.into_inner() = Some(wacore::time::Instant::now()),
1311                    }
1312                    debug!(target: "Client/OfflineSync", "Sync STARTED: Expecting {} items.", count);
1313                }
1314            } else if self.offline_sync_metrics.active.load(Ordering::Acquire)
1315                && node.get_optional_child("offline").is_some()
1316            {
1317                // Handle end marker: <ib><offline count="N"/> signals sync completion
1318                // Only <ib> with an <offline> child is a real end marker.
1319                // Other <ib> children (thread_metadata, edge_routing, dirty) are NOT end markers.
1320                let processed = self
1321                    .offline_sync_metrics
1322                    .processed_messages
1323                    .load(Ordering::Acquire);
1324                let elapsed = match self.offline_sync_metrics.start_time.lock() {
1325                    Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1326                    Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1327                };
1328                debug!(target: "Client/OfflineSync", "Sync COMPLETED: End marker received. Processed {} items in {:.2?}.", processed, elapsed);
1329                self.offline_sync_metrics
1330                    .active
1331                    .store(false, Ordering::Release);
1332            }
1333        }
1334
1335        // Track progress if active
1336        if self.offline_sync_metrics.active.load(Ordering::Acquire) {
1337            // Check for 'offline' attribute on relevant stanzas
1338            if node.attrs.contains_key("offline") {
1339                let processed = self
1340                    .offline_sync_metrics
1341                    .processed_messages
1342                    .fetch_add(1, Ordering::Release)
1343                    + 1;
1344                let total = self
1345                    .offline_sync_metrics
1346                    .total_messages
1347                    .load(Ordering::Acquire);
1348
1349                if processed.is_multiple_of(50) || processed == total {
1350                    trace!(target: "Client/OfflineSync", "Sync Progress: {}/{}", processed, total);
1351                }
1352
1353                if processed >= total {
1354                    let elapsed = match self.offline_sync_metrics.start_time.lock() {
1355                        Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
1356                        Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
1357                    };
1358                    debug!(target: "Client/OfflineSync", "Sync COMPLETED: Processed {} items in {:.2?}.", processed, elapsed);
1359                    self.offline_sync_metrics
1360                        .active
1361                        .store(false, Ordering::Release);
1362                }
1363            }
1364        }
1365        // --- End Tracking ---
1366
1367        if node.tag.as_ref() == "iq"
1368            && let Some(sync_node) = node.get_optional_child("sync")
1369            && let Some(collection_node) = sync_node.get_optional_child("collection")
1370        {
1371            let name = collection_node.attrs().optional_string("name");
1372            let name = name.as_deref().unwrap_or("<unknown>");
1373            debug!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
1374        } else {
1375            debug!(target: "Client/Recv","{}", DisplayableNode(&node));
1376        }
1377
1378        // Prepare deferred ACK cancellation flag (sent after dispatch unless cancelled)
1379        let mut cancelled = false;
1380
1381        if node.tag.as_ref() == "xmlstreamend" {
1382            if self.expected_disconnect.load(Ordering::Relaxed) {
1383                debug!("Received <xmlstreamend/>, expected disconnect.");
1384            } else {
1385                warn!("Received <xmlstreamend/>, treating as disconnect.");
1386            }
1387            self.shutdown_notifier.notify(usize::MAX);
1388            return;
1389        }
1390
1391        // Check generic node waiters (zero-cost when none registered)
1392        if self.node_waiter_count.load(Ordering::Relaxed) > 0 {
1393            self.resolve_node_waiters(&node);
1394        }
1395
1396        if node.tag.as_ref() == "iq"
1397            && let Some(id) = node.attrs.get("id").map(|v| v.as_str())
1398        {
1399            let has_waiter = self.response_waiters.lock().await.contains_key(id.as_ref());
1400            if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
1401                return;
1402            }
1403        }
1404
1405        // Dispatch to appropriate handler using the router
1406        // Clone Arc (cheap - just reference count) not the Node itself
1407        if !self
1408            .stanza_router
1409            .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
1410            .await
1411        {
1412            warn!(
1413                "Received unknown top-level node: {}",
1414                DisplayableNode(&node)
1415            );
1416        }
1417
1418        // Send the deferred ACK if applicable and not cancelled by handler
1419        if self.should_ack(&node) && !cancelled {
1420            self.maybe_deferred_ack(node).await;
1421        }
1422    }
1423
1424    /// Determine if a Node should be acknowledged with <ack/>.
1425    fn should_ack(&self, node: &Node) -> bool {
1426        matches!(
1427            node.tag.as_ref(),
1428            "message" | "receipt" | "notification" | "call"
1429        ) && node.attrs.contains_key("id")
1430            && node.attrs.contains_key("from")
1431    }
1432
1433    /// Possibly send a deferred ack: either immediately or via spawned task.
1434    /// Handlers can cancel by setting `cancelled` to true.
1435    /// Uses Arc<Node> to avoid cloning when spawning the async task.
1436    async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
1437        if self.synchronous_ack {
1438            if let Err(e) = self.send_ack_for(&node).await {
1439                warn!("Failed to send ack: {e:?}");
1440            }
1441        } else {
1442            let this = self.clone();
1443            // Node is already in Arc - just clone the Arc (cheap), not the Node
1444            self.runtime
1445                .spawn(Box::pin(async move {
1446                    if let Err(e) = this.send_ack_for(&node).await {
1447                        warn!("Failed to send ack: {e:?}");
1448                    }
1449                }))
1450                .detach();
1451        }
1452    }
1453
1454    /// Build and send an <ack/> node corresponding to the given stanza.
1455    async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
1456        if self.expected_disconnect.load(Ordering::Relaxed) {
1457            return Ok(());
1458        }
1459        if !self.is_connected() {
1460            return Err(ClientError::NotConnected);
1461        }
1462        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1463        let ack = match build_ack_node(node, device_snapshot.pn.as_ref()) {
1464            Some(ack) => ack,
1465            None => return Ok(()),
1466        };
1467        self.send_node(ack).await
1468    }
1469
1470    pub(crate) async fn handle_unimplemented(&self, tag: &str) {
1471        warn!("TODO: Implement handler for <{tag}>");
1472    }
1473
1474    pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
1475        use wacore::iq::passive::PassiveModeSpec;
1476        self.execute(PassiveModeSpec::new(passive)).await
1477    }
1478
1479    pub async fn clean_dirty_bits(
1480        &self,
1481        type_: &str,
1482        timestamp: Option<&str>,
1483    ) -> Result<(), crate::request::IqError> {
1484        use wacore::iq::dirty::CleanDirtyBitsSpec;
1485
1486        let spec = CleanDirtyBitsSpec::single(type_, timestamp)?;
1487        self.execute(spec).await
1488    }
1489
1490    pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
1491        use wacore::iq::props::PropsSpec;
1492        use wacore::store::commands::DeviceCommand;
1493
1494        let stored_hash = self
1495            .persistence_manager
1496            .get_device_snapshot()
1497            .await
1498            .props_hash
1499            .clone();
1500
1501        let spec = match &stored_hash {
1502            Some(hash) => {
1503                debug!("Fetching props with hash for delta update...");
1504                PropsSpec::with_hash(hash)
1505            }
1506            None => {
1507                debug!("Fetching props (full, no stored hash)...");
1508                PropsSpec::new()
1509            }
1510        };
1511
1512        let response = self.execute(spec).await?;
1513
1514        if response.delta_update {
1515            debug!(
1516                "Props delta update received ({} changed props)",
1517                response.props.len()
1518            );
1519        } else {
1520            debug!(
1521                "Props full update received ({} props, hash={:?})",
1522                response.props.len(),
1523                response.hash
1524            );
1525        }
1526
1527        if let Some(new_hash) = response.hash {
1528            self.persistence_manager
1529                .process_command(DeviceCommand::SetPropsHash(Some(new_hash)))
1530                .await;
1531        }
1532
1533        Ok(())
1534    }
1535
1536    pub async fn fetch_privacy_settings(
1537        &self,
1538    ) -> Result<wacore::iq::privacy::PrivacySettingsResponse, crate::request::IqError> {
1539        use wacore::iq::privacy::PrivacySettingsSpec;
1540
1541        debug!("Fetching privacy settings...");
1542
1543        self.execute(PrivacySettingsSpec::new()).await
1544    }
1545
1546    /// Set a privacy setting (e.g. "last" → "contacts").
1547    pub async fn set_privacy_setting(
1548        &self,
1549        category: &str,
1550        value: &str,
1551    ) -> Result<(), crate::request::IqError> {
1552        use wacore::iq::privacy::SetPrivacySettingSpec;
1553        self.execute(SetPrivacySettingSpec::new(category, value))
1554            .await
1555    }
1556
1557    /// Set the default disappearing messages duration (seconds). Pass 0 to disable.
1558    pub async fn set_default_disappearing_mode(
1559        &self,
1560        duration: u32,
1561    ) -> Result<(), crate::request::IqError> {
1562        use wacore::iq::privacy::SetDefaultDisappearingModeSpec;
1563        self.execute(SetDefaultDisappearingModeSpec::new(duration))
1564            .await
1565    }
1566
1567    /// Get business profile for a WhatsApp Business account.
1568    pub async fn get_business_profile(
1569        &self,
1570        jid: &wacore_binary::jid::Jid,
1571    ) -> Result<Option<wacore::iq::business::BusinessProfile>, crate::request::IqError> {
1572        use wacore::iq::business::BusinessProfileSpec;
1573        self.execute(BusinessProfileSpec::new(jid)).await
1574    }
1575
1576    /// Reject an incoming call. Fire-and-forget — no server response is expected.
1577    pub async fn reject_call(
1578        &self,
1579        call_id: &str,
1580        call_from: &wacore_binary::jid::Jid,
1581    ) -> Result<(), anyhow::Error> {
1582        anyhow::ensure!(!call_id.is_empty(), "call_id cannot be empty");
1583        let id = self.generate_request_id();
1584
1585        let stanza = wacore_binary::builder::NodeBuilder::new("call")
1586            .attr("to", call_from.clone())
1587            .attr("id", id)
1588            .children([wacore_binary::builder::NodeBuilder::new("reject")
1589                .attr("call-id", call_id)
1590                .attr("call-creator", call_from.clone())
1591                .attr("count", "0")
1592                .build()])
1593            .build();
1594
1595        self.send_node(stanza).await?;
1596        Ok(())
1597    }
1598
1599    pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
1600        use wacore::iq::prekeys::DigestKeyBundleSpec;
1601
1602        debug!("Sending digest key bundle...");
1603
1604        self.execute(DigestKeyBundleSpec::new()).await.map(|_| ())
1605    }
1606
1607    pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
1608        // Skip processing if an expected disconnect is pending (e.g., 515 received).
1609        // This prevents race conditions where a spawned success handler runs after
1610        // cleanup_connection_state has already reset is_logged_in.
1611        if self.expected_disconnect.load(Ordering::Relaxed) {
1612            debug!("Ignoring <success> stanza: expected disconnect pending");
1613            return;
1614        }
1615
1616        // Guard against multiple <success> stanzas (WhatsApp may send more than one during
1617        // routing/reconnection). Only process the first one per connection.
1618        if self.is_logged_in.swap(true, Ordering::SeqCst) {
1619            debug!("Ignoring duplicate <success> stanza (already logged in)");
1620            return;
1621        }
1622
1623        // Increment connection generation to invalidate any stale post-login tasks
1624        // from previous connections (e.g., during 515 reconnect cycles).
1625        let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
1626
1627        info!(
1628            "Successfully authenticated with WhatsApp servers! (gen={})",
1629            current_generation
1630        );
1631        self.auto_reconnect_errors.store(0, Ordering::Relaxed);
1632
1633        self.update_server_time_offset(node);
1634
1635        if let Some(lid_value) = node.attrs.get("lid") {
1636            if let Some(lid) = lid_value.to_jid() {
1637                let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1638                if device_snapshot.lid.as_ref() != Some(&lid) {
1639                    debug!("Updating LID from server to '{lid}'");
1640                    self.persistence_manager
1641                        .process_command(DeviceCommand::SetLid(Some(lid)))
1642                        .await;
1643                }
1644            } else {
1645                warn!("Failed to parse LID from success stanza: {lid_value}");
1646            }
1647        } else {
1648            warn!("LID not found in <success> stanza. Group messaging may fail.");
1649        }
1650
1651        let client_clone = self.clone();
1652        let task_generation = current_generation;
1653        self.runtime.spawn(Box::pin(async move {
1654            // Macro to check if this task is still valid (connection hasn't been replaced)
1655            macro_rules! check_generation {
1656                () => {
1657                    if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
1658                    {
1659                        debug!("Post-login task cancelled: connection generation changed");
1660                        return;
1661                    }
1662                };
1663            }
1664
1665            debug!(
1666                "Starting post-login initialization sequence (gen={})...",
1667                task_generation
1668            );
1669
1670            // Check if we need initial app state sync (empty pushname indicates fresh pairing
1671            // where pushname will come from app state sync's setting_pushName mutation)
1672            let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1673            let needs_pushname_from_sync = device_snapshot.push_name.is_empty();
1674            if needs_pushname_from_sync {
1675                debug!("Push name is empty - will be set from app state sync (setting_pushName)");
1676            }
1677
1678            // Check connection before network operations.
1679            // During pairing, a 515 disconnect happens quickly after success,
1680            // so the socket may already be gone.
1681            if !client_clone.is_connected() {
1682                debug!(
1683                    "Skipping post-login init: connection closed (likely pairing phase reconnect)"
1684                );
1685                return;
1686            }
1687
1688            check_generation!();
1689            client_clone.send_unified_session().await;
1690
1691            // === Establish session with primary phone for PDO ===
1692            // This must happen BEFORE we exit passive mode (before offline messages arrive).
1693            // PDO needs a session with device 0 to request decrypted content from our phone.
1694            // Matches WhatsApp Web's bootstrapDeviceCapabilities() pattern.
1695            check_generation!();
1696            if let Err(e) = client_clone
1697                .establish_primary_phone_session_immediate()
1698                .await
1699            {
1700                warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
1701                // Don't fail login - PDO will retry via ensure_e2e_sessions fallback
1702            }
1703
1704            // === Passive Tasks (mimics WhatsApp Web's PassiveTaskManager) ===
1705            // WhatsApp Web executes passive tasks (like PreKey upload) BEFORE sending the active IQ.
1706            check_generation!();
1707            if let Err(e) = client_clone.upload_pre_keys(false).await {
1708                warn!("Failed to upload pre-keys during startup: {e:?}");
1709            }
1710
1711            // === Send active IQ ===
1712            // The server sends <ib><offline count="X"/></ib> AFTER we exit passive mode.
1713            // This matches WhatsApp Web's behavior: executePassiveTasks() -> sendPassiveModeProtocol("active")
1714            check_generation!();
1715            if let Err(e) = client_clone.set_passive(false).await {
1716                warn!("Failed to send post-connect active IQ: {e:?}");
1717            }
1718
1719            // === Wait for offline sync to complete ===
1720            // The server sends <ib><offline count="X"/></ib> after we exit passive mode.
1721            client_clone.wait_for_offline_delivery_end().await;
1722
1723            // Check if connection was replaced while waiting
1724            check_generation!();
1725
1726            // Re-check connection and generation before sending presence
1727            check_generation!();
1728            if !client_clone.is_connected() {
1729                debug!("Skipping presence: connection closed");
1730                return;
1731            }
1732
1733            // Background initialization queries (can run in parallel, non-blocking)
1734            let bg_client = client_clone.clone();
1735            let bg_generation = task_generation;
1736            client_clone.runtime.spawn(Box::pin(async move {
1737                // Check connection and generation before starting background queries
1738                if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1739                    debug!("Skipping background init queries: connection generation changed");
1740                    return;
1741                }
1742                if !bg_client.is_connected() {
1743                    debug!("Skipping background init queries: connection closed");
1744                    return;
1745                }
1746
1747                debug!(
1748                    "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1749                );
1750
1751                let props_fut = bg_client.fetch_props();
1752                let binding = bg_client.blocking();
1753                let blocklist_fut = binding.get_blocklist();
1754                let privacy_fut = bg_client.fetch_privacy_settings();
1755                let digest_fut = bg_client.send_digest_key_bundle();
1756
1757                let (r_props, r_block, r_priv, r_digest) =
1758                    futures::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1759
1760                if let Err(e) = r_props {
1761                    warn!("Background init: Failed to fetch props: {e:?}");
1762                }
1763                if let Err(e) = r_block {
1764                    warn!("Background init: Failed to fetch blocklist: {e:?}");
1765                }
1766                if let Err(e) = r_priv {
1767                    warn!("Background init: Failed to fetch privacy settings: {e:?}");
1768                }
1769                if let Err(e) = r_digest {
1770                    warn!("Background init: Failed to send digest: {e:?}");
1771                }
1772
1773                // Prune expired tcTokens on connect (matches WhatsApp Web's PrivacyTokenJob)
1774                if let Err(e) = bg_client.tc_token().prune_expired().await {
1775                    warn!("Background init: Failed to prune expired tc_tokens: {e:?}");
1776                }
1777            })).detach();
1778
1779            check_generation!();
1780
1781            let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1782            let needs_initial_sync = flag_set || needs_pushname_from_sync;
1783
1784            if needs_initial_sync {
1785                // === Fresh pairing path ===
1786                // Like WhatsApp Web's syncCriticalData(): await critical collections before
1787                // dispatching Connected, so blocklist/privacy settings are applied first.
1788                debug!(
1789                    target: "Client/AppState",
1790                    "Starting Initial App State Sync (flag_set={flag_set}, needs_pushname={needs_pushname_from_sync})"
1791                );
1792
1793                if !client_clone
1794                    .initial_app_state_keys_received
1795                    .load(Ordering::Relaxed)
1796                {
1797                    debug!(
1798                        target: "Client/AppState",
1799                        "Waiting up to 5s for app state keys..."
1800                    );
1801                    let _ = rt_timeout(
1802                        &*client_clone.runtime,
1803                        Duration::from_secs(5),
1804                        client_clone.initial_keys_synced_notifier.listen(),
1805                    )
1806                    .await;
1807
1808                    // Check if connection was replaced while waiting
1809                    check_generation!();
1810                }
1811
1812                // Start the critical sync timeout timer matching WhatsApp Web's
1813                // WAWebSyncBootstrap.$15 (setSyncDCriticalDataSyncTimeout).
1814                // WhatsApp Web uses 180s and calls socketLogout(SyncdTimeout) if
1815                // the critical data hasn't synced by then.
1816                const CRITICAL_SYNC_TIMEOUT_SECS: u64 = 180;
1817                let timeout_client = client_clone.clone();
1818                let timeout_generation = task_generation;
1819                let timeout_rt = client_clone.runtime.clone();
1820                let critical_sync_timeout_handle = timeout_rt.spawn(Box::pin(async move {
1821                    timeout_client.runtime.sleep(Duration::from_secs(CRITICAL_SYNC_TIMEOUT_SECS)).await;
1822                    // Check generation — if connection was replaced, this timeout is stale
1823                    if timeout_client.connection_generation.load(Ordering::SeqCst)
1824                        != timeout_generation
1825                    {
1826                        return;
1827                    }
1828                    // Matches WhatsApp Web's $16(): check if SettingPushName was synced.
1829                    // If push_name is still empty after 180s, critical sync failed.
1830                    let push_name = timeout_client.get_push_name().await;
1831                    if push_name.is_empty() {
1832                        warn!(
1833                            target: "Client/AppState",
1834                            "Critical app state sync timed out after {CRITICAL_SYNC_TIMEOUT_SECS}s \
1835                             (push_name not synced). Reconnecting to retry."
1836                        );
1837                        // WhatsApp Web does socketLogout here which clears device identity.
1838                        // We reconnect instead — preserving credentials and keeping the
1839                        // run loop active so auto-reconnect can retry the sync.
1840                        timeout_client.reconnect_immediately().await;
1841                    } else {
1842                        debug!(
1843                            target: "Client/AppState",
1844                            "Critical sync timeout fired but push_name was already synced"
1845                        );
1846                    }
1847                }));
1848
1849                // Await critical collections via batched IQ before dispatching Connected.
1850                check_generation!();
1851                match client_clone
1852                    .sync_collections_batched(vec![
1853                        WAPatchName::CriticalBlock,
1854                        WAPatchName::CriticalUnblockLow,
1855                    ])
1856                    .await
1857                {
1858                    Ok(()) => {
1859                        // Critical sync completed — cancel the timeout timer
1860                        critical_sync_timeout_handle.abort();
1861
1862                        check_generation!();
1863
1864                        client_clone
1865                            .resubscribe_presence_subscriptions(task_generation)
1866                            .await;
1867
1868                        check_generation!();
1869
1870                        // Dispatch Connected after critical sync completes.
1871                        // Presence is NOT sent here — WhatsApp Web sends presence from the
1872                        // setting_pushName mutation handler (WAWebPushNameSync), not from
1873                        // criticalSyncDone. Our setting_pushName handler already does this.
1874                        client_clone.dispatch_connected();
1875                    }
1876                    Err(e) => {
1877                        client_clone.log_sync_error("critical app state sync", &e);
1878                        // Don't abort the timeout or dispatch Connected — the sync failed,
1879                        // so the timeout watchdog should remain active to force a reconnect
1880                        // if needed. Return early to avoid emitting a spurious Connected event.
1881                        return;
1882                    }
1883                }
1884
1885                // Spawn remaining non-critical collections in background
1886                let sync_client = client_clone.clone();
1887                let sync_generation = task_generation;
1888                client_clone.runtime.spawn(Box::pin(async move {
1889                    if sync_client.connection_generation.load(Ordering::SeqCst) != sync_generation {
1890                        debug!("App state sync cancelled: connection generation changed");
1891                        return;
1892                    }
1893
1894                    if let Err(e) = sync_client
1895                        .sync_collections_batched(vec![
1896                            WAPatchName::RegularLow,
1897                            WAPatchName::RegularHigh,
1898                            WAPatchName::Regular,
1899                        ])
1900                        .await
1901                    {
1902                        sync_client.log_sync_error("non-critical app state sync", &e);
1903                    }
1904
1905                    sync_client
1906                        .needs_initial_full_sync
1907                        .store(false, Ordering::Relaxed);
1908                    debug!(target: "Client/AppState", "Initial App State Sync Completed.");
1909                })).detach();
1910            } else {
1911                // === Reconnection path ===
1912                // Pushname is already known, send presence and Connected immediately.
1913                let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1914                if !device_snapshot.push_name.is_empty() {
1915                    if let Err(e) = client_clone.presence().set_available().await {
1916                        warn!("Failed to send initial presence: {e:?}");
1917                    } else {
1918                        debug!("Initial presence sent successfully.");
1919                    }
1920                }
1921
1922                client_clone
1923                    .resubscribe_presence_subscriptions(task_generation)
1924                    .await;
1925
1926                // Re-check generation after awaits to avoid dispatching Connected
1927                // for an outdated connection that was replaced mid-await.
1928                check_generation!();
1929
1930                client_clone.dispatch_connected();
1931            }
1932        })).detach();
1933    }
1934
1935    /// Handles incoming `<ack/>` stanzas by resolving pending response waiters.
1936    ///
1937    /// If an ack with an ID that matches a pending task in `response_waiters`,
1938    /// the task is resolved and the function returns `true`. Otherwise, returns `false`.
1939    pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1940        let id_opt = node.attrs.get("id").map(|v| v.as_str().into_owned());
1941        if let Some(id) = id_opt
1942            && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1943        {
1944            if waiter.send(node).is_err() {
1945                warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1946            }
1947            return true;
1948        }
1949        false
1950    }
1951
1952    #[allow(dead_code)] // Used by per-collection callers (e.g., critical sync gating)
1953    pub(crate) async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1954        // In-flight dedup: skip if this collection is already being synced.
1955        // Matches WA Web's WAWebSyncdCollectionsStateMachine which tracks in-flight syncs
1956        // and queues new requests to a pending set.
1957        {
1958            let mut syncing = self.app_state_syncing.lock().await;
1959            if !syncing.insert(name) {
1960                debug!(target: "Client/AppState", "Skipping sync for {:?}: already in flight", name);
1961                return Ok(());
1962            }
1963        }
1964
1965        let result = self.fetch_app_state_with_retry_inner(name).await;
1966
1967        // Always remove from in-flight set when done
1968        self.app_state_syncing.lock().await.remove(&name);
1969
1970        result
1971    }
1972
1973    #[allow(dead_code)]
1974    async fn fetch_app_state_with_retry_inner(&self, name: WAPatchName) -> anyhow::Result<()> {
1975        let mut attempt = 0u32;
1976        loop {
1977            attempt += 1;
1978            // full_sync=false lets process_app_state_sync_task auto-detect:
1979            // version 0 → snapshot (full sync), version > 0 → incremental patches.
1980            // Matches WA Web which only requests snapshot when version is undefined.
1981            let res = self.process_app_state_sync_task(name, false).await;
1982            match res {
1983                Ok(()) => return Ok(()),
1984                Err(e) => {
1985                    if e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
1986                        .is_some_and(|ase| {
1987                            matches!(ase, crate::appstate_sync::AppStateSyncError::KeyNotFound(_))
1988                        })
1989                        && attempt == 1
1990                    {
1991                        if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
1992                            debug!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
1993                            if rt_timeout(
1994                                &*self.runtime,
1995                                Duration::from_secs(10),
1996                                self.initial_keys_synced_notifier.listen(),
1997                            )
1998                            .await
1999                            .is_err()
2000                            {
2001                                warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
2002                            }
2003                        }
2004                        continue;
2005                    }
2006                    let is_db_locked = e.downcast_ref::<wacore::store::error::StoreError>()
2007                        .is_some_and(|se| matches!(se, wacore::store::error::StoreError::Database(msg) if msg.contains("locked") || msg.contains("busy")))
2008                        || e.downcast_ref::<crate::appstate_sync::AppStateSyncError>()
2009                            .is_some_and(|ase| matches!(ase, crate::appstate_sync::AppStateSyncError::Store(wacore::store::error::StoreError::Database(msg)) if msg.contains("locked") || msg.contains("busy")));
2010                    if is_db_locked && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
2011                        let backoff = Duration::from_millis(200 * attempt as u64 + 150);
2012                        warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
2013                        self.runtime.sleep(backoff).await;
2014                        continue;
2015                    }
2016                    return Err(e);
2017                }
2018            }
2019        }
2020    }
2021
2022    /// Sync multiple collections in a single IQ request, re-fetching those with `has_more_patches`.
2023    /// Matches WA Web's `serverSync()` outer loop (`3JJWKHeu5-P.js:54278-54305`).
2024    /// Max 5 iterations (WA Web's `C=5` constant).
2025    pub(crate) async fn sync_collections_batched(
2026        &self,
2027        collections: Vec<WAPatchName>,
2028    ) -> anyhow::Result<()> {
2029        if collections.is_empty() {
2030            return Ok(());
2031        }
2032
2033        // In-flight dedup: filter out collections already being synced
2034        let pending = {
2035            let mut syncing = self.app_state_syncing.lock().await;
2036            let mut filtered = Vec::with_capacity(collections.len());
2037            for name in collections {
2038                if syncing.insert(name) {
2039                    filtered.push(name);
2040                } else {
2041                    debug!(target: "Client/AppState", "Skipping {:?} in batch: already in flight", name);
2042                }
2043            }
2044            filtered
2045        };
2046
2047        if pending.is_empty() {
2048            return Ok(());
2049        }
2050
2051        // Track all collections for cleanup
2052        let all_collections: Vec<WAPatchName> = pending.clone();
2053
2054        let result = self.sync_collections_batched_inner(pending).await;
2055
2056        // Always clean up in-flight set
2057        {
2058            let mut syncing = self.app_state_syncing.lock().await;
2059            for name in &all_collections {
2060                syncing.remove(name);
2061            }
2062        }
2063
2064        result
2065    }
2066
2067    async fn sync_collections_batched_inner(
2068        &self,
2069        mut pending: Vec<WAPatchName>,
2070    ) -> anyhow::Result<()> {
2071        use wacore::appstate::patch_decode::CollectionSyncError;
2072        const MAX_ITERATIONS: usize = 5;
2073        let mut iteration = 0;
2074
2075        while !pending.is_empty() && iteration < MAX_ITERATIONS {
2076            iteration += 1;
2077            debug!(
2078                target: "Client/AppState",
2079                "Batched sync iteration {}/{}: {:?}",
2080                iteration, MAX_ITERATIONS, pending
2081            );
2082
2083            let backend = self.persistence_manager.backend();
2084
2085            // Build multi-collection IQ, tracking which collections need a snapshot
2086            let mut collection_nodes = Vec::with_capacity(pending.len());
2087            let mut was_snapshot = std::collections::HashSet::new();
2088            for &name in &pending {
2089                let state = backend.get_version(name.as_str()).await?;
2090                let want_snapshot = state.version == 0;
2091                if want_snapshot {
2092                    was_snapshot.insert(name);
2093                }
2094                let mut builder = NodeBuilder::new("collection")
2095                    .attr("name", name.as_str())
2096                    .attr(
2097                        "return_snapshot",
2098                        if want_snapshot { "true" } else { "false" },
2099                    );
2100                if !want_snapshot {
2101                    builder = builder.attr("version", state.version.to_string());
2102                }
2103                collection_nodes.push(builder.build());
2104            }
2105
2106            let sync_node = NodeBuilder::new("sync").children(collection_nodes).build();
2107            let iq = crate::request::InfoQuery {
2108                namespace: "w:sync:app:state",
2109                query_type: crate::request::InfoQueryType::Set,
2110                to: server_jid().clone(),
2111                target: None,
2112                id: None,
2113                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2114                timeout: Some(Duration::from_secs(30)),
2115            };
2116
2117            let resp = self.send_iq(iq).await?;
2118
2119            // Pre-download all external blobs for all collections in the response
2120            let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2121                std::collections::HashMap::new();
2122
2123            if let Ok(patch_lists) = wacore::appstate::patch_decode::parse_patch_lists(&resp) {
2124                for pl in &patch_lists {
2125                    // Download external snapshot
2126                    if let Some(ext) = &pl.snapshot_ref
2127                        && let Some(path) = &ext.direct_path
2128                    {
2129                        match self.download(ext).await {
2130                            Ok(bytes) => {
2131                                pre_downloaded.insert(path.clone(), bytes);
2132                            }
2133                            Err(e) => {
2134                                warn!(
2135                                    "Failed to download external snapshot for {:?}: {e}",
2136                                    pl.name
2137                                );
2138                            }
2139                        }
2140                    }
2141
2142                    // Download external mutations
2143                    for patch in &pl.patches {
2144                        if let Some(ext) = &patch.external_mutations
2145                            && let Some(path) = &ext.direct_path
2146                        {
2147                            match self.download(ext).await {
2148                                Ok(bytes) => {
2149                                    pre_downloaded.insert(path.clone(), bytes);
2150                                }
2151                                Err(e) => {
2152                                    let v =
2153                                        patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2154                                    warn!(
2155                                        "Failed to download external mutations for patch v{}: {e}",
2156                                        v
2157                                    );
2158                                }
2159                            }
2160                        }
2161                    }
2162                }
2163            }
2164
2165            let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2166                if let Some(path) = &ext.direct_path {
2167                    if let Some(bytes) = pre_downloaded.get(path) {
2168                        Ok(bytes.clone())
2169                    } else {
2170                        Err(anyhow::anyhow!(
2171                            "external blob not pre-downloaded: {}",
2172                            path
2173                        ))
2174                    }
2175                } else {
2176                    Err(anyhow::anyhow!("external blob has no directPath"))
2177                }
2178            };
2179
2180            // Parse and process all collections from the response
2181            let proc = self.get_app_state_processor().await;
2182            let results = proc.decode_multi_patch_list(&resp, &download, true).await?;
2183
2184            let mut needs_refetch = Vec::new();
2185
2186            for (mutations, new_state, list) in results {
2187                let name = list.name;
2188
2189                // Handle per-collection errors
2190                if let Some(ref err) = list.error {
2191                    match err {
2192                        CollectionSyncError::Conflict { has_more } => {
2193                            if *has_more {
2194                                // ConflictHasMore: server has more patches, must refetch.
2195                                warn!(target: "Client/AppState", "Collection {:?} conflict (has_more=true), will refetch", name);
2196                                needs_refetch.push(name);
2197                            } else {
2198                                // Conflict without has_more: WA Web treats this as success
2199                                // when there are no pending mutations to push (which is
2200                                // always the case for us since we don't push app state).
2201                                debug!(target: "Client/AppState", "Collection {:?} conflict (has_more=false), treating as success (no pending mutations)", name);
2202                            }
2203                            continue;
2204                        }
2205                        CollectionSyncError::Fatal { code, text } => {
2206                            warn!(target: "Client/AppState", "Collection {:?} fatal error {}: {}", name, code, text);
2207                            continue;
2208                        }
2209                        CollectionSyncError::Retry { code, text } => {
2210                            warn!(target: "Client/AppState", "Collection {:?} retryable error {}: {}, will refetch", name, code, text);
2211                            needs_refetch.push(name);
2212                            continue;
2213                        }
2214                    }
2215                }
2216
2217                // Handle missing keys
2218                let missing = match proc.get_missing_key_ids(&list).await {
2219                    Ok(v) => v,
2220                    Err(e) => {
2221                        warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2222                        Vec::new()
2223                    }
2224                };
2225                if !missing.is_empty() {
2226                    let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2227                    let mut guard = self.app_state_key_requests.lock().await;
2228                    let now = wacore::time::Instant::now();
2229                    for key_id in missing {
2230                        let hex_id = hex::encode(&key_id);
2231                        let should = guard
2232                            .get(&hex_id)
2233                            .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2234                            .unwrap_or(true);
2235                        if should {
2236                            guard.insert(hex_id, now);
2237                            to_request.push(key_id);
2238                        }
2239                    }
2240                    // Evict stale entries to prevent unbounded growth over long sessions
2241                    guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2242                    drop(guard);
2243                    if !to_request.is_empty() {
2244                        self.request_app_state_keys(&to_request).await;
2245                    }
2246                }
2247
2248                // full_sync is true only when this collection had a snapshot
2249                // (version was 0 before sync). This prevents server_sync-triggered
2250                // incremental syncs from being incorrectly marked as full syncs.
2251                let full_sync = was_snapshot.contains(&name);
2252                for m in mutations {
2253                    self.dispatch_app_state_mutation(&m, full_sync).await;
2254                }
2255
2256                // Save version
2257                backend
2258                    .set_version(name.as_str(), new_state.clone())
2259                    .await?;
2260
2261                // Check if this collection needs more patches
2262                if list.has_more_patches {
2263                    needs_refetch.push(name);
2264                }
2265
2266                debug!(
2267                    target: "Client/AppState",
2268                    "Batched sync: {:?} done (version={}, has_more={})",
2269                    name, new_state.version, list.has_more_patches
2270                );
2271            }
2272
2273            pending = needs_refetch;
2274        }
2275
2276        if !pending.is_empty() {
2277            warn!(
2278                target: "Client/AppState",
2279                "Batched sync: max iterations ({}) reached for {:?}",
2280                MAX_ITERATIONS, pending
2281            );
2282        }
2283
2284        Ok(())
2285    }
2286
2287    pub(crate) async fn process_app_state_sync_task(
2288        &self,
2289        name: WAPatchName,
2290        full_sync: bool,
2291    ) -> anyhow::Result<()> {
2292        if self.is_shutting_down() {
2293            debug!(target: "Client/AppState", "Skipping app state sync task {:?}: client is shutting down", name);
2294            return Ok(());
2295        }
2296
2297        let backend = self.persistence_manager.backend();
2298        let mut full_sync = full_sync;
2299
2300        let mut state = backend.get_version(name.as_str()).await?;
2301        if state.version == 0 {
2302            full_sync = true;
2303        }
2304
2305        let mut has_more = true;
2306        let mut want_snapshot = full_sync;
2307        // Safety cap to prevent infinite loops if the server keeps returning
2308        // has_more_patches=true without advancing the version (WA Web uses 500).
2309        const MAX_PAGINATION_ITERATIONS: u32 = 500;
2310        let mut iteration = 0u32;
2311
2312        while has_more {
2313            if self.is_shutting_down() {
2314                debug!(target: "Client/AppState", "Stopping app state sync task {:?}: shutdown detected", name);
2315                break;
2316            }
2317            iteration += 1;
2318            if iteration > MAX_PAGINATION_ITERATIONS {
2319                warn!(target: "Client/AppState", "App state sync for {:?} exceeded {} iterations, aborting", name, MAX_PAGINATION_ITERATIONS);
2320                break;
2321            }
2322            debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
2323
2324            let mut collection_builder = NodeBuilder::new("collection")
2325                .attr("name", name.as_str())
2326                .attr(
2327                    "return_snapshot",
2328                    if want_snapshot { "true" } else { "false" },
2329                );
2330            if !want_snapshot {
2331                collection_builder = collection_builder.attr("version", state.version.to_string());
2332            }
2333            let sync_node = NodeBuilder::new("sync")
2334                .children([collection_builder.build()])
2335                .build();
2336            let iq = crate::request::InfoQuery {
2337                namespace: "w:sync:app:state",
2338                query_type: crate::request::InfoQueryType::Set,
2339                to: server_jid().clone(),
2340                target: None,
2341                id: None,
2342                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2343                timeout: None,
2344            };
2345
2346            let resp = self.send_iq(iq).await?;
2347            if self.is_shutting_down() {
2348                debug!(target: "Client/AppState", "Discarding app state sync response for {:?}: shutdown detected", name);
2349                break;
2350            }
2351            debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
2352
2353            let _decode_start = wacore::time::Instant::now();
2354
2355            // Pre-download all external blobs (snapshot and patch mutations)
2356            // We use directPath as the key to identify each blob
2357            let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
2358                std::collections::HashMap::new();
2359
2360            if let Ok(pl) = wacore::appstate::patch_decode::parse_patch_list(&resp) {
2361                debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={} patches_count={}",
2362                    name, pl.snapshot_ref.is_some(), pl.has_more_patches, pl.patches.len());
2363
2364                // Download external snapshot if present
2365                if let Some(ext) = &pl.snapshot_ref
2366                    && let Some(path) = &ext.direct_path
2367                {
2368                    match self.download(ext).await {
2369                        Ok(bytes) => {
2370                            debug!(target: "Client/AppState", "Downloaded external snapshot ({} bytes)", bytes.len());
2371                            pre_downloaded.insert(path.clone(), bytes);
2372                        }
2373                        Err(e) => {
2374                            warn!("Failed to download external snapshot: {e}");
2375                        }
2376                    }
2377                }
2378
2379                // Download external mutations for each patch that has them
2380                for patch in &pl.patches {
2381                    if let Some(ext) = &patch.external_mutations
2382                        && let Some(path) = &ext.direct_path
2383                    {
2384                        let patch_version =
2385                            patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
2386                        match self.download(ext).await {
2387                            Ok(bytes) => {
2388                                debug!(target: "Client/AppState", "Downloaded external mutations for patch v{} ({} bytes)", patch_version, bytes.len());
2389                                pre_downloaded.insert(path.clone(), bytes);
2390                            }
2391                            Err(e) => {
2392                                warn!(
2393                                    "Failed to download external mutations for patch v{}: {e}",
2394                                    patch_version
2395                                );
2396                            }
2397                        }
2398                    }
2399                }
2400            }
2401
2402            let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
2403                if let Some(path) = &ext.direct_path {
2404                    if let Some(bytes) = pre_downloaded.get(path) {
2405                        Ok(bytes.clone())
2406                    } else {
2407                        Err(anyhow::anyhow!(
2408                            "external blob not pre-downloaded: {}",
2409                            path
2410                        ))
2411                    }
2412                } else {
2413                    Err(anyhow::anyhow!("external blob has no directPath"))
2414                }
2415            };
2416
2417            let proc = self.get_app_state_processor().await;
2418            let (mutations, new_state, list) =
2419                proc.decode_patch_list(&resp, &download, true).await?;
2420            let decode_elapsed = _decode_start.elapsed();
2421            if decode_elapsed.as_millis() > 500 {
2422                debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
2423            }
2424
2425            let missing = match proc.get_missing_key_ids(&list).await {
2426                Ok(v) => v,
2427                Err(e) => {
2428                    warn!("Failed to get missing key IDs for {:?}: {}", name, e);
2429                    Vec::new()
2430                }
2431            };
2432            if !missing.is_empty() {
2433                let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
2434                let mut guard = self.app_state_key_requests.lock().await;
2435                let now = wacore::time::Instant::now();
2436                for key_id in missing {
2437                    let hex_id = hex::encode(&key_id);
2438                    let should = guard
2439                        .get(&hex_id)
2440                        .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
2441                        .unwrap_or(true);
2442                    if should {
2443                        guard.insert(hex_id, now);
2444                        to_request.push(key_id);
2445                    }
2446                }
2447                // Evict stale entries to prevent unbounded growth over long sessions
2448                guard.retain(|_, t| t.elapsed() < std::time::Duration::from_secs(24 * 3600));
2449                drop(guard);
2450                if !to_request.is_empty() {
2451                    self.request_app_state_keys(&to_request).await;
2452                }
2453            }
2454
2455            for m in mutations {
2456                debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
2457                self.dispatch_app_state_mutation(&m, full_sync).await;
2458            }
2459
2460            state = new_state;
2461            has_more = list.has_more_patches;
2462            // After the first batch, never request a snapshot again — only incremental patches.
2463            want_snapshot = false;
2464            debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more} new_version={}", name, state.version);
2465        }
2466
2467        backend.set_version(name.as_str(), state.clone()).await?;
2468
2469        debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
2470        Ok(())
2471    }
2472
2473    async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
2474        if raw_key_ids.is_empty() {
2475            return;
2476        }
2477        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2478        let own_jid = match device_snapshot.pn.clone() {
2479            Some(j) => j,
2480            None => return,
2481        };
2482        let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
2483            .iter()
2484            .map(|k| wa::message::AppStateSyncKeyId {
2485                key_id: Some(k.clone()),
2486            })
2487            .collect();
2488        let msg = wa::Message {
2489            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
2490                r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
2491                app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
2492                ..Default::default()
2493            })),
2494            ..Default::default()
2495        };
2496        if let Err(e) = self
2497            .send_message_impl(
2498                own_jid,
2499                &msg,
2500                Some(self.generate_message_id().await),
2501                true,
2502                false,
2503                None,
2504                vec![],
2505            )
2506            .await
2507        {
2508            warn!("Failed to send app state key request: {e}");
2509        }
2510    }
2511
2512    /// Send an app state patch to the server for a given collection.
2513    ///
2514    /// Builds the IQ stanza and sends it. Returns the updated hash state.
2515    pub(crate) async fn send_app_state_patch(
2516        &self,
2517        collection_name: &str,
2518        mutations: Vec<(wa::SyncdMutation, Vec<u8>)>,
2519    ) -> Result<()> {
2520        let proc = self.get_app_state_processor().await;
2521        let (patch_bytes, base_version) = proc.build_patch(collection_name, mutations).await?;
2522
2523        let collection_node = NodeBuilder::new("collection")
2524            .attr("name", collection_name)
2525            .attr("version", base_version.to_string())
2526            .attr("return_snapshot", "false")
2527            .children([NodeBuilder::new("patch").bytes(patch_bytes).build()])
2528            .build();
2529        let sync_node = NodeBuilder::new("sync").children([collection_node]).build();
2530        let iq = crate::request::InfoQuery {
2531            namespace: "w:sync:app:state",
2532            query_type: crate::request::InfoQueryType::Set,
2533            to: server_jid().clone(),
2534            target: None,
2535            id: None,
2536            content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
2537            timeout: None,
2538        };
2539
2540        self.send_iq(iq).await?;
2541
2542        // Re-sync to get the latest state from the server after our patch was accepted.
2543        // This matches whatsmeow's behavior: fetchAppState after successful send.
2544        if let Ok(patch_name) = collection_name.parse::<WAPatchName>()
2545            && let Err(e) = self.fetch_app_state_with_retry(patch_name).await
2546        {
2547            log::warn!("Failed to re-sync {collection_name} after patch send: {e}");
2548        }
2549
2550        Ok(())
2551    }
2552
2553    async fn dispatch_app_state_mutation(
2554        &self,
2555        m: &crate::appstate_sync::Mutation,
2556        full_sync: bool,
2557    ) {
2558        use wacore::types::events::Event;
2559
2560        if m.operation != wa::syncd_mutation::SyncdOperation::Set {
2561            return;
2562        }
2563        if m.index.is_empty() {
2564            return;
2565        }
2566
2567        // Delegate chat-related mutations (mute, pin, archive, star, contact, etc.)
2568        if crate::features::chat_actions::dispatch_chat_mutation(&self.core.event_bus, m, full_sync)
2569        {
2570            return;
2571        }
2572
2573        // Handle client-internal mutations that need persistence/presence access
2574        if m.index[0] == "setting_pushName"
2575            && let Some(val) = &m.action_value
2576            && let Some(act) = &val.push_name_setting
2577            && let Some(new_name) = &act.name
2578        {
2579            let new_name = new_name.clone();
2580            let bus = self.core.event_bus.clone();
2581
2582            let snapshot = self.persistence_manager.get_device_snapshot().await;
2583            let old = snapshot.push_name.clone();
2584            if old != new_name {
2585                debug!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
2586                self.persistence_manager
2587                    .process_command(DeviceCommand::SetPushName(new_name.clone()))
2588                    .await;
2589                bus.dispatch(&Event::SelfPushNameUpdated(
2590                    crate::types::events::SelfPushNameUpdated {
2591                        from_server: true,
2592                        old_name: old.clone(),
2593                        new_name: new_name.clone(),
2594                    },
2595                ));
2596
2597                // WhatsApp Web sends presence immediately when receiving pushname
2598                if old.is_empty() && !new_name.is_empty() {
2599                    debug!(target: "Client/AppState", "Sending presence after receiving initial pushname from app state sync");
2600                    if let Err(e) = self.presence().set_available().await {
2601                        warn!(target: "Client/AppState", "Failed to send presence after pushname sync: {e:?}");
2602                    }
2603                }
2604            } else {
2605                debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
2606            }
2607        }
2608    }
2609
2610    async fn expect_disconnect(&self) {
2611        self.expected_disconnect.store(true, Ordering::Relaxed);
2612    }
2613
2614    pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
2615        self.is_logged_in.store(false, Ordering::Relaxed);
2616
2617        let mut attrs = node.attrs();
2618        let code_cow = attrs.optional_string("code");
2619        let code = code_cow.as_deref().unwrap_or("");
2620        let conflict_type = node
2621            .get_optional_child("conflict")
2622            .map(|n| {
2623                n.attrs()
2624                    .optional_string("type")
2625                    .as_deref()
2626                    .unwrap_or("")
2627                    .to_string()
2628            })
2629            .unwrap_or_default();
2630
2631        if !conflict_type.is_empty() {
2632            info!(
2633                "Got stream error indicating client was removed or replaced (conflict={}). Logging out.",
2634                conflict_type
2635            );
2636            self.expect_disconnect().await;
2637            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2638
2639            let event = if conflict_type == "replaced" {
2640                Event::StreamReplaced(crate::types::events::StreamReplaced)
2641            } else {
2642                Event::LoggedOut(crate::types::events::LoggedOut {
2643                    on_connect: false,
2644                    reason: ConnectFailureReason::LoggedOut,
2645                })
2646            };
2647            self.core.event_bus.dispatch(&event);
2648
2649            let transport_opt = self.transport.lock().await.clone();
2650            if let Some(transport) = transport_opt {
2651                self.runtime
2652                    .spawn(Box::pin(async move {
2653                        info!("Disconnecting transport after conflict");
2654                        transport.disconnect().await;
2655                    }))
2656                    .detach();
2657            }
2658        } else {
2659            match code {
2660                "515" => {
2661                    // 515 is expected during registration/pairing phase - server closes stream after pairing
2662                    info!(
2663                        "Got 515 stream error, server is closing stream (expected after pairing). Will auto-reconnect."
2664                    );
2665                    self.expect_disconnect().await;
2666                    // Proactively disconnect transport since server may not close the connection
2667                    // Clone the transport Arc before spawning to avoid holding the lock
2668                    let transport_opt = self.transport.lock().await.clone();
2669                    if let Some(transport) = transport_opt {
2670                        // Spawn disconnect in background so we don't block the message loop
2671                        self.runtime
2672                            .spawn(Box::pin(async move {
2673                                info!("Disconnecting transport after 515");
2674                                transport.disconnect().await;
2675                            }))
2676                            .detach();
2677                    }
2678                }
2679                "516" => {
2680                    info!("Got 516 stream error (device removed). Logging out.");
2681                    self.expect_disconnect().await;
2682                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2683                    self.core.event_bus.dispatch(&Event::LoggedOut(
2684                        crate::types::events::LoggedOut {
2685                            on_connect: false,
2686                            reason: ConnectFailureReason::LoggedOut,
2687                        },
2688                    ));
2689
2690                    let transport_opt = self.transport.lock().await.clone();
2691                    if let Some(transport) = transport_opt {
2692                        self.runtime
2693                            .spawn(Box::pin(async move {
2694                                info!("Disconnecting transport after 516");
2695                                transport.disconnect().await;
2696                            }))
2697                            .detach();
2698                    }
2699                }
2700                "401" => {
2701                    // 401: unauthorized — session invalid, needs re-authentication.
2702                    // Matches WA Web's handling of unauthorized stream errors.
2703                    info!("Got 401 stream error (unauthorized). Logging out.");
2704                    self.expect_disconnect().await;
2705                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2706                    self.core.event_bus.dispatch(&Event::LoggedOut(
2707                        crate::types::events::LoggedOut {
2708                            on_connect: false,
2709                            reason: ConnectFailureReason::LoggedOut,
2710                        },
2711                    ));
2712
2713                    let transport_opt = self.transport.lock().await.clone();
2714                    if let Some(transport) = transport_opt {
2715                        self.runtime
2716                            .spawn(Box::pin(async move {
2717                                info!("Disconnecting transport after 401");
2718                                transport.disconnect().await;
2719                            }))
2720                            .detach();
2721                    }
2722                }
2723                "409" => {
2724                    // 409: conflict — another client instance connected.
2725                    // Same semantics as conflict child element but via code.
2726                    info!("Got 409 stream error (conflict). Another session replaced this one.");
2727                    self.expect_disconnect().await;
2728                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2729                    self.core
2730                        .event_bus
2731                        .dispatch(&Event::StreamReplaced(crate::types::events::StreamReplaced));
2732
2733                    let transport_opt = self.transport.lock().await.clone();
2734                    if let Some(transport) = transport_opt {
2735                        self.runtime
2736                            .spawn(Box::pin(async move {
2737                                info!("Disconnecting transport after 409");
2738                                transport.disconnect().await;
2739                            }))
2740                            .detach();
2741                    }
2742                }
2743                "429" => {
2744                    // 429: rate limited — server is throttling connections.
2745                    // Auto-reconnect with extended backoff.
2746                    warn!(
2747                        "Got 429 stream error (rate limited). Will auto-reconnect with extended backoff."
2748                    );
2749                    self.auto_reconnect_errors.fetch_add(5, Ordering::Relaxed);
2750                }
2751                "503" => {
2752                    info!("Got 503 service unavailable, will auto-reconnect.");
2753                }
2754                _ => {
2755                    error!("Unknown stream error: {}", DisplayableNode(node));
2756                    self.expect_disconnect().await;
2757                    self.core.event_bus.dispatch(&Event::StreamError(
2758                        crate::types::events::StreamError {
2759                            code: code.to_string(),
2760                            raw: Some(node.clone()),
2761                        },
2762                    ));
2763                }
2764            }
2765        }
2766
2767        info!("Notifying shutdown from stream error handler");
2768        self.shutdown_notifier.notify(usize::MAX);
2769    }
2770
2771    pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
2772        self.expected_disconnect.store(true, Ordering::Relaxed);
2773        self.shutdown_notifier.notify(usize::MAX);
2774
2775        let mut attrs = node.attrs();
2776        let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
2777        let reason = ConnectFailureReason::from(reason_code);
2778
2779        if reason.should_reconnect() {
2780            self.expected_disconnect.store(false, Ordering::Relaxed);
2781        } else {
2782            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2783        }
2784
2785        if reason.is_logged_out() {
2786            info!("Got {reason:?} connect failure, logging out.");
2787            self.core
2788                .event_bus
2789                .dispatch(&wacore::types::events::Event::LoggedOut(
2790                    crate::types::events::LoggedOut {
2791                        on_connect: true,
2792                        reason,
2793                    },
2794                ));
2795        } else if let ConnectFailureReason::TempBanned = reason {
2796            let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
2797            let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
2798            let expire_duration =
2799                chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
2800            warn!("Temporary ban connect failure: {}", DisplayableNode(node));
2801            self.core.event_bus.dispatch(&Event::TemporaryBan(
2802                crate::types::events::TemporaryBan {
2803                    code: crate::types::events::TempBanReason::from(ban_code),
2804                    expire: expire_duration,
2805                },
2806            ));
2807        } else if let ConnectFailureReason::ClientOutdated = reason {
2808            error!("Client is outdated and was rejected by server.");
2809            self.core
2810                .event_bus
2811                .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
2812        } else {
2813            warn!("Unknown connect failure: {}", DisplayableNode(node));
2814            self.core.event_bus.dispatch(&Event::ConnectFailure(
2815                crate::types::events::ConnectFailure {
2816                    reason,
2817                    message: attrs
2818                        .optional_string("message")
2819                        .as_deref()
2820                        .unwrap_or("")
2821                        .to_string(),
2822                    raw: Some(node.clone()),
2823                },
2824            ));
2825        }
2826    }
2827
2828    pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
2829        if node.attrs.get("type").is_some_and(|s| s == "get")
2830            && (node.get_optional_child("ping").is_some()
2831                || node
2832                    .attrs
2833                    .get("xmlns")
2834                    .is_some_and(|s| s == "urn:xmpp:ping"))
2835        {
2836            info!("Received ping, sending pong.");
2837            let mut parser = node.attrs();
2838            let from_jid = parser.jid("from");
2839            let id = parser.optional_string("id").map(|s| s.to_string());
2840            let pong = build_pong(from_jid.to_string(), id.as_deref());
2841            if let Err(e) = self.send_node(pong).await {
2842                warn!("Failed to send pong: {e:?}");
2843            }
2844            return true;
2845        }
2846
2847        // Pass Node directly to pair handling
2848        if pair::handle_iq(self, node).await {
2849            return true;
2850        }
2851
2852        false
2853    }
2854
2855    pub fn is_connected(&self) -> bool {
2856        self.is_connected.load(Ordering::Acquire)
2857    }
2858
2859    pub fn is_logged_in(&self) -> bool {
2860        self.is_logged_in.load(Ordering::Relaxed)
2861    }
2862
2863    /// Register a waiter for an incoming node matching the given filter.
2864    ///
2865    /// Returns a receiver that resolves when a matching node arrives.
2866    /// The waiter starts buffering immediately, so register it **before**
2867    /// performing the action that triggers the expected node.
2868    ///
2869    /// When multiple waiters match the same node, each matching waiter
2870    /// receives a clone of the node (broadcast within a single resolve pass).
2871    ///
2872    /// # Example
2873    /// ```ignore
2874    /// let waiter = client.wait_for_node(
2875    ///     NodeFilter::tag("notification").attr("type", "w:gp2"),
2876    /// );
2877    /// client.groups().add_participants(&group_jid, &[jid_c]).await?;
2878    /// let node = waiter.await.expect("notification arrived");
2879    /// ```
2880    pub fn wait_for_node(
2881        &self,
2882        filter: NodeFilter,
2883    ) -> futures::channel::oneshot::Receiver<Arc<Node>> {
2884        let (tx, rx) = futures::channel::oneshot::channel();
2885        self.node_waiter_count.fetch_add(1, Ordering::Release);
2886        let mut waiters = self
2887            .node_waiters
2888            .lock()
2889            .unwrap_or_else(|poisoned| poisoned.into_inner());
2890        waiters.push(NodeWaiter { filter, tx });
2891        rx
2892    }
2893
2894    /// Check pending node waiters against an incoming node.
2895    /// Only called when `node_waiter_count > 0`.
2896    fn resolve_node_waiters(&self, node: &Arc<Node>) {
2897        let mut waiters = self
2898            .node_waiters
2899            .lock()
2900            .unwrap_or_else(|poisoned| poisoned.into_inner());
2901        let mut i = 0;
2902        while i < waiters.len() {
2903            if waiters[i].tx.is_canceled() {
2904                // Receiver dropped — clean up
2905                waiters.swap_remove(i);
2906                self.node_waiter_count.fetch_sub(1, Ordering::Release);
2907            } else if waiters[i].filter.matches(node) {
2908                // Match found — remove and send
2909                let w = waiters.swap_remove(i);
2910                self.node_waiter_count.fetch_sub(1, Ordering::Release);
2911                let _ = w.tx.send(Arc::clone(node));
2912            } else {
2913                i += 1;
2914            }
2915        }
2916    }
2917
2918    pub(crate) fn update_server_time_offset(&self, node: &wacore_binary::node::Node) {
2919        self.unified_session.update_server_time_offset(node);
2920    }
2921
2922    pub(crate) async fn send_unified_session(&self) {
2923        if !self.is_connected() {
2924            debug!(target: "Client/UnifiedSession", "Skipping: not connected");
2925            return;
2926        }
2927
2928        let Some((node, _sequence)) = self.unified_session.prepare_send().await else {
2929            return;
2930        };
2931
2932        if let Err(e) = self.send_node(node).await {
2933            debug!(target: "Client/UnifiedSession", "Send failed: {e}");
2934            self.unified_session.clear_last_sent().await;
2935        }
2936    }
2937
2938    /// Waits for the noise socket to be established.
2939    ///
2940    /// Returns `Ok(())` when the socket is ready, or `Err` on timeout.
2941    /// This is useful for code that needs to send messages before login,
2942    /// such as requesting a pair code during initial pairing.
2943    ///
2944    /// If the socket is already connected, returns immediately.
2945    pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
2946        // Fast path: already connected
2947        if self.is_connected() {
2948            return Ok(());
2949        }
2950
2951        // Register waiter and re-check to avoid race condition:
2952        // If socket becomes ready between checks, the notified future captures it.
2953        let notified = self.socket_ready_notifier.listen();
2954        if self.is_connected() {
2955            return Ok(());
2956        }
2957
2958        rt_timeout(&*self.runtime, timeout, notified)
2959            .await
2960            .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
2961    }
2962
2963    /// Waits for the client to establish a connection and complete login.
2964    ///
2965    /// Returns `Ok(())` when connected, or `Err` on timeout.
2966    /// This is useful for code that needs to run after connection is established
2967    /// and authentication is complete.
2968    ///
2969    /// If the client is already connected and logged in, returns immediately.
2970    pub async fn wait_for_connected(
2971        &self,
2972        timeout: std::time::Duration,
2973    ) -> Result<(), anyhow::Error> {
2974        // Fast path: fully ready (connected + logged in + critical sync done).
2975        if self.is_fully_ready() {
2976            return Ok(());
2977        }
2978
2979        // Register waiter and re-check to avoid TOCTOU race:
2980        // dispatch_connected() could fire between the check above and notified() registration.
2981        let notified = self.connected_notifier.listen();
2982        if self.is_fully_ready() {
2983            return Ok(());
2984        }
2985
2986        rt_timeout(&*self.runtime, timeout, notified)
2987            .await
2988            .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
2989    }
2990
2991    /// Get access to the PersistenceManager for this client.
2992    /// This is useful for multi-account scenarios to get the device ID.
2993    pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
2994        self.persistence_manager.clone()
2995    }
2996
2997    pub async fn edit_message(
2998        &self,
2999        to: Jid,
3000        original_id: impl Into<String>,
3001        new_content: wa::Message,
3002    ) -> Result<String, anyhow::Error> {
3003        let original_id = original_id.into();
3004
3005        // WhatsApp Web uses getMeUserLidOrJidForChat(chat, EditMessage) which
3006        // returns LID for LID-addressing groups and PN otherwise.
3007        let participant = if to.is_group() {
3008            Some(
3009                self.get_own_jid_for_group(&to)
3010                    .await?
3011                    .to_non_ad()
3012                    .to_string(),
3013            )
3014        } else {
3015            if self.get_pn().await.is_none() {
3016                return Err(anyhow::Error::from(ClientError::NotLoggedIn));
3017            }
3018            None
3019        };
3020
3021        let edit_container_message = wa::Message {
3022            edited_message: Some(Box::new(wa::message::FutureProofMessage {
3023                message: Some(Box::new(wa::Message {
3024                    protocol_message: Some(Box::new(wa::message::ProtocolMessage {
3025                        key: Some(wa::MessageKey {
3026                            remote_jid: Some(to.to_string()),
3027                            from_me: Some(true),
3028                            id: Some(original_id.clone()),
3029                            participant,
3030                        }),
3031                        r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
3032                        edited_message: Some(Box::new(new_content)),
3033                        timestamp_ms: Some(wacore::time::now_millis()),
3034                        ..Default::default()
3035                    })),
3036                    ..Default::default()
3037                })),
3038            })),
3039            ..Default::default()
3040        };
3041
3042        // Use a new stanza ID instead of reusing the original message ID.
3043        // The original message ID is already embedded in protocolMessage.key.id
3044        // inside the encrypted payload. Reusing it as the outer stanza ID causes
3045        // the server to deduplicate against the original message and silently
3046        // drop the edit.
3047        self.send_message_impl(
3048            to,
3049            &edit_container_message,
3050            None,
3051            false,
3052            false,
3053            Some(crate::types::message::EditAttribute::MessageEdit),
3054            vec![],
3055        )
3056        .await?;
3057
3058        Ok(original_id)
3059    }
3060
3061    pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
3062        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
3063        let noise_socket = match noise_socket_arc {
3064            Some(socket) => socket,
3065            None => return Err(ClientError::NotConnected),
3066        };
3067
3068        debug!(target: "Client/Send", "{}", DisplayableNode(&node));
3069
3070        let mut plaintext_buf = Vec::with_capacity(1024);
3071
3072        if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
3073            error!("Failed to marshal node: {e:?}");
3074            return Err(SocketError::Crypto("Marshal error".to_string()).into());
3075        }
3076
3077        // Size based on plaintext + encryption overhead (16 byte tag + 3 byte frame header)
3078        let encrypted_buf = Vec::with_capacity(plaintext_buf.len() + 32);
3079
3080        if let Err(e) = noise_socket
3081            .encrypt_and_send(plaintext_buf, encrypted_buf)
3082            .await
3083        {
3084            return Err(e.into());
3085        }
3086
3087        // WA Web: callStanza → deadSocketTimer.onOrBefore(deadSocketTime, socketId)
3088        self.last_data_sent_ms
3089            .store(wacore::time::now_millis() as u64, Ordering::Relaxed);
3090
3091        Ok(())
3092    }
3093
3094    pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
3095        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3096        let old_name = device_snapshot.push_name.clone();
3097
3098        if old_name == new_name {
3099            return;
3100        }
3101
3102        log::debug!("Updating push name from '{}' -> '{}'", old_name, new_name);
3103        self.persistence_manager
3104            .process_command(DeviceCommand::SetPushName(new_name.clone()))
3105            .await;
3106
3107        self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
3108            crate::types::events::SelfPushNameUpdated {
3109                from_server: true,
3110                old_name,
3111                new_name: new_name.clone(),
3112            },
3113        ));
3114
3115        let client_clone = self.clone();
3116        self.runtime
3117            .spawn(Box::pin(async move {
3118                if let Err(e) = client_clone.presence().set_available().await {
3119                    log::warn!("Failed to send presence after push name update: {:?}", e);
3120                } else {
3121                    log::debug!("Sent presence after push name update.");
3122                }
3123            }))
3124            .detach();
3125    }
3126
3127    pub async fn get_push_name(&self) -> String {
3128        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3129        device_snapshot.push_name.clone()
3130    }
3131
3132    pub async fn get_pn(&self) -> Option<Jid> {
3133        let snapshot = self.persistence_manager.get_device_snapshot().await;
3134        snapshot.pn.clone()
3135    }
3136
3137    pub async fn get_lid(&self) -> Option<Jid> {
3138        let snapshot = self.persistence_manager.get_device_snapshot().await;
3139        snapshot.lid.clone()
3140    }
3141
3142    /// Resolve our own JID for a group, respecting its addressing mode.
3143    ///
3144    /// Returns LID for LID-addressing groups, PN otherwise.
3145    /// Matches WhatsApp Web's `getMeUserLidOrJidForChat`.
3146    pub(crate) async fn get_own_jid_for_group(
3147        &self,
3148        group_jid: &Jid,
3149    ) -> Result<Jid, anyhow::Error> {
3150        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3151        let own_pn = device_snapshot
3152            .pn
3153            .clone()
3154            .ok_or_else(|| anyhow::Error::from(ClientError::NotLoggedIn))?;
3155
3156        let addressing_mode = self
3157            .groups()
3158            .query_info(group_jid)
3159            .await
3160            .map(|info| info.addressing_mode)
3161            .unwrap_or(crate::types::message::AddressingMode::Pn);
3162
3163        Ok(match addressing_mode {
3164            crate::types::message::AddressingMode::Lid => {
3165                device_snapshot.lid.clone().unwrap_or(own_pn)
3166            }
3167            crate::types::message::AddressingMode::Pn => own_pn,
3168        })
3169    }
3170
3171    /// Creates a normalized StanzaKey by resolving PN to LID JIDs.
3172    pub(crate) async fn make_stanza_key(&self, chat: Jid, id: String) -> StanzaKey {
3173        // Resolve chat JID to LID if possible
3174        let chat = self.resolve_encryption_jid(&chat).await;
3175
3176        StanzaKey { chat, id }
3177    }
3178
3179    // get_phone_number_from_lid is in client/lid_pn.rs
3180
3181    pub(crate) async fn send_protocol_receipt(
3182        &self,
3183        id: String,
3184        receipt_type: crate::types::presence::ReceiptType,
3185    ) {
3186        if id.is_empty() {
3187            return;
3188        }
3189        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
3190        if let Some(own_jid) = &device_snapshot.pn {
3191            let type_str = match receipt_type {
3192                crate::types::presence::ReceiptType::HistorySync => "hist_sync",
3193                crate::types::presence::ReceiptType::Read => "read",
3194                crate::types::presence::ReceiptType::ReadSelf => "read-self",
3195                crate::types::presence::ReceiptType::Delivered => "delivery",
3196                crate::types::presence::ReceiptType::Played => "played",
3197                crate::types::presence::ReceiptType::PlayedSelf => "played-self",
3198                crate::types::presence::ReceiptType::Inactive => "inactive",
3199                crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
3200                crate::types::presence::ReceiptType::Sender => "sender",
3201                crate::types::presence::ReceiptType::ServerError => "server-error",
3202                crate::types::presence::ReceiptType::Retry => "retry",
3203                crate::types::presence::ReceiptType::EncRekeyRetry => "enc_rekey_retry",
3204                crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
3205            };
3206
3207            let node = NodeBuilder::new("receipt")
3208                .attrs([
3209                    ("id", id),
3210                    ("type", type_str.to_string()),
3211                    ("to", own_jid.to_non_ad().to_string()),
3212                ])
3213                .build();
3214
3215            if let Err(e) = self.send_node(node).await {
3216                warn!(
3217                    "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
3218                    receipt_type, self.unique_id, e
3219                );
3220            }
3221        }
3222    }
3223}
3224
3225/// Builds a pong response node for a server-initiated ping.
3226///
3227/// Matches WhatsApp Web (`WAWebCommsHandleStanza`): only includes `id`
3228/// when the server ping carried one.
3229fn build_pong(to: String, id: Option<&str>) -> wacore_binary::node::Node {
3230    let mut builder = NodeBuilder::new("iq").attr("to", to).attr("type", "result");
3231    if let Some(id) = id {
3232        builder = builder.attr("id", id);
3233    }
3234    builder.build()
3235}
3236
3237/// Build an `<ack/>` for the given stanza, matching WA Web / whatsmeow behavior:
3238///
3239/// - `class` = original stanza tag
3240/// - `id`, `to` (flipped from `from`), `participant` copied from original
3241/// - `from` = own device PN, only for message acks
3242/// - `type` echoed for non-message stanzas (whatsmeow: `node.Tag != "message"`),
3243///   except `notification type="encrypt"` with `<identity/>` child (WA Web drops type there).
3244///
3245/// For receipt acks, WA Web uses `MAYBE_CUSTOM_STRING(ackString)` where
3246/// `ackString = maybeAttrString("type")` — so `type` is only included when
3247/// explicitly present on the incoming receipt (delivery receipts normally
3248/// have no type attribute, meaning the ack also has no type).
3249fn build_ack_node(node: &Node, own_device_pn: Option<&Jid>) -> Option<Node> {
3250    let id = node.attrs.get("id")?.clone();
3251    let from = node.attrs.get("from")?.clone();
3252    let participant = node.attrs.get("participant").cloned();
3253
3254    // Whatsmeow: echo type for all stanza tags EXCEPT "message".
3255    // WA Web additionally omits type for notification type="encrypt" with <identity/> child.
3256    let typ = if node.tag != "message" && !is_encrypt_identity_notification(node) {
3257        node.attrs.get("type").cloned()
3258    } else {
3259        None
3260    };
3261
3262    let mut attrs = Attrs::new();
3263    attrs.insert("class", NodeValue::String(node.tag.to_string()));
3264    attrs.insert("id", id);
3265    attrs.insert("to", from);
3266
3267    if node.tag == "message"
3268        && let Some(own_device_pn) = own_device_pn
3269    {
3270        attrs.insert("from", NodeValue::Jid(own_device_pn.clone()));
3271    }
3272    if let Some(p) = participant {
3273        attrs.insert("participant", p);
3274    }
3275    if let Some(t) = typ {
3276        attrs.insert("type", t);
3277    }
3278
3279    Some(Node {
3280        tag: Cow::Borrowed("ack"),
3281        attrs,
3282        content: None,
3283    })
3284}
3285
3286/// WA Web omits `type` when ACKing `<notification type="encrypt"><identity/></notification>`.
3287fn is_encrypt_identity_notification(node: &Node) -> bool {
3288    node.tag == "notification"
3289        && node.attrs.get("type").is_some_and(|v| v == "encrypt")
3290        && node.get_optional_child("identity").is_some()
3291}
3292
3293/// Computes a reconnect delay matching WhatsApp Web's Fibonacci backoff:
3294/// `{ algo: { type: "fibonacci", first: 1000, second: 1000 }, jitter: 0.1, max: 9e5 }`
3295///
3296/// Sequence: 1s, 1s, 2s, 3s, 5s, 8s, 13s, 21s, 34s, 55s, 89s, 144s, ... capped at 900s.
3297/// Each value gets ±10% random jitter.
3298fn fibonacci_backoff(attempt: u32) -> Duration {
3299    const MAX_MS: u64 = 900_000; // WA Web: 9e5
3300
3301    let mut a: u64 = 1000;
3302    let mut b: u64 = 1000;
3303    for _ in 0..attempt {
3304        let next = a.saturating_add(b).min(MAX_MS);
3305        a = b;
3306        b = next;
3307    }
3308    let base = a.min(MAX_MS);
3309
3310    // ±10% jitter (WA Web: jitter: 0.1)
3311    let jitter_range = base / 10;
3312    let jitter = if jitter_range > 0 {
3313        rand::make_rng::<rand::rngs::StdRng>().random_range(0..=(jitter_range * 2)) as i64
3314            - jitter_range as i64
3315    } else {
3316        0
3317    };
3318    let ms = (base as i64 + jitter).max(0) as u64;
3319    Duration::from_millis(ms)
3320}
3321
3322#[cfg(test)]
3323mod tests {
3324    use super::*;
3325    use crate::lid_pn_cache::LearningSource;
3326    use crate::test_utils::MockHttpClient;
3327    use futures::channel::oneshot;
3328    use wacore_binary::jid::SERVER_JID;
3329
3330    #[tokio::test]
3331    async fn test_ack_behavior_for_incoming_stanzas() {
3332        let backend = crate::test_utils::create_test_backend().await;
3333        let pm = Arc::new(
3334            PersistenceManager::new(backend)
3335                .await
3336                .expect("persistence manager should initialize"),
3337        );
3338        let (client, _rx) = Client::new(
3339            Arc::new(crate::runtime_impl::TokioRuntime),
3340            pm,
3341            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3342            Arc::new(MockHttpClient),
3343            None,
3344        )
3345        .await;
3346
3347        // --- Assertions ---
3348
3349        // Verify that we still ack other critical stanzas (regression check).
3350        use wacore_binary::node::{Attrs, Node, NodeContent};
3351
3352        let mut receipt_attrs = Attrs::new();
3353        receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3354        receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
3355        let receipt_node = Node::new(
3356            "receipt",
3357            receipt_attrs,
3358            Some(NodeContent::String("test".to_string())),
3359        );
3360
3361        let mut notification_attrs = Attrs::new();
3362        notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
3363        notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
3364        let notification_node = Node::new(
3365            "notification",
3366            notification_attrs,
3367            Some(NodeContent::String("test".to_string())),
3368        );
3369
3370        assert!(
3371            client.should_ack(&receipt_node),
3372            "should_ack must still return TRUE for <receipt> stanzas."
3373        );
3374        assert!(
3375            client.should_ack(&notification_node),
3376            "should_ack must still return TRUE for <notification> stanzas."
3377        );
3378
3379        info!(
3380            "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
3381        );
3382    }
3383
3384    #[tokio::test]
3385    async fn test_ack_waiter_resolves() {
3386        let backend = crate::test_utils::create_test_backend().await;
3387        let pm = Arc::new(
3388            PersistenceManager::new(backend)
3389                .await
3390                .expect("persistence manager should initialize"),
3391        );
3392        let (client, _rx) = Client::new(
3393            Arc::new(crate::runtime_impl::TokioRuntime),
3394            pm,
3395            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3396            Arc::new(MockHttpClient),
3397            None,
3398        )
3399        .await;
3400
3401        // 1. Insert a waiter for a specific ID
3402        let test_id = "ack-test-123".to_string();
3403        let (tx, rx) = oneshot::channel();
3404        client
3405            .response_waiters
3406            .lock()
3407            .await
3408            .insert(test_id.clone(), tx);
3409        assert!(
3410            client.response_waiters.lock().await.contains_key(&test_id),
3411            "Waiter should be inserted before handling ack"
3412        );
3413
3414        // 2. Create a mock <ack/> node with the test ID
3415        let ack_node = NodeBuilder::new("ack")
3416            .attr("id", test_id.clone())
3417            .attr("from", SERVER_JID)
3418            .build();
3419
3420        // 3. Handle the ack
3421        let handled = client.handle_ack_response(ack_node).await;
3422        assert!(
3423            handled,
3424            "handle_ack_response should return true when waiter exists"
3425        );
3426
3427        // 4. Await the receiver with a timeout
3428        match tokio::time::timeout(Duration::from_secs(1), rx).await {
3429            Ok(Ok(response_node)) => {
3430                assert!(
3431                    response_node
3432                        .attrs
3433                        .get("id")
3434                        .is_some_and(|v| v == test_id.as_str()),
3435                    "Response node should have correct ID"
3436                );
3437            }
3438            Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
3439            Err(_) => panic!("Test timed out waiting for ack response"),
3440        }
3441
3442        // 5. Verify the waiter was removed
3443        assert!(
3444            !client.response_waiters.lock().await.contains_key(&test_id),
3445            "Waiter should be removed after handling"
3446        );
3447
3448        info!(
3449            "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
3450        );
3451    }
3452
3453    #[tokio::test]
3454    async fn test_ack_without_matching_waiter() {
3455        let backend = crate::test_utils::create_test_backend().await;
3456        let pm = Arc::new(
3457            PersistenceManager::new(backend)
3458                .await
3459                .expect("persistence manager should initialize"),
3460        );
3461        let (client, _rx) = Client::new(
3462            Arc::new(crate::runtime_impl::TokioRuntime),
3463            pm,
3464            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3465            Arc::new(MockHttpClient),
3466            None,
3467        )
3468        .await;
3469
3470        // Create an ack without any matching waiter
3471        let ack_node = NodeBuilder::new("ack")
3472            .attr("id", "non-existent-id")
3473            .attr("from", SERVER_JID)
3474            .build();
3475
3476        // Should return false since there's no waiter
3477        let handled = client.handle_ack_response(ack_node).await;
3478        assert!(
3479            !handled,
3480            "handle_ack_response should return false when no waiter exists"
3481        );
3482
3483        info!(
3484            "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
3485        );
3486    }
3487
3488    /// Test that the lid_pn_cache correctly stores and retrieves LID mappings.
3489    ///
3490    /// This is critical for the LID-PN session mismatch fix. When we receive a message
3491    /// with sender_lid, we cache the phone->LID mapping so that when sending replies,
3492    /// we can reuse the existing LID session instead of creating a new PN session.
3493    #[tokio::test]
3494    async fn test_lid_pn_cache_basic_operations() {
3495        let backend = Arc::new(
3496            crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
3497                .await
3498                .expect("Failed to create in-memory backend for test"),
3499        );
3500        let pm = Arc::new(
3501            PersistenceManager::new(backend)
3502                .await
3503                .expect("persistence manager should initialize"),
3504        );
3505        let (client, _rx) = Client::new(
3506            Arc::new(crate::runtime_impl::TokioRuntime),
3507            pm,
3508            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3509            Arc::new(MockHttpClient),
3510            None,
3511        )
3512        .await;
3513
3514        // Initially, the cache should be empty for a phone number
3515        let phone = "559980000001";
3516        let lid = "100000012345678";
3517
3518        assert!(
3519            client.lid_pn_cache.get_current_lid(phone).await.is_none(),
3520            "Cache should be empty initially"
3521        );
3522
3523        // Insert a phone->LID mapping using add_lid_pn_mapping
3524        client
3525            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3526            .await
3527            .expect("Failed to persist LID-PN mapping in tests");
3528
3529        // Verify we can retrieve it (phone -> LID lookup)
3530        let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
3531        assert!(cached_lid.is_some(), "Cache should contain the mapping");
3532        assert_eq!(
3533            cached_lid.expect("cache should have LID"),
3534            lid,
3535            "Cached LID should match what we inserted"
3536        );
3537
3538        // Verify reverse lookup works (LID -> phone)
3539        let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
3540        assert!(cached_phone.is_some(), "Reverse lookup should work");
3541        assert_eq!(
3542            cached_phone.expect("reverse lookup should return phone"),
3543            phone,
3544            "Cached phone should match what we inserted"
3545        );
3546
3547        // Verify a different phone number returns None
3548        assert!(
3549            client
3550                .lid_pn_cache
3551                .get_current_lid("559980000002")
3552                .await
3553                .is_none(),
3554            "Different phone number should not have a mapping"
3555        );
3556
3557        info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
3558    }
3559
3560    /// Test that the lid_pn_cache respects timestamp-based conflict resolution.
3561    ///
3562    /// When a phone number has multiple LIDs, the most recent one should be returned.
3563    #[tokio::test]
3564    async fn test_lid_pn_cache_timestamp_resolution() {
3565        let backend = Arc::new(
3566            crate::store::SqliteStore::new(
3567                "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
3568            )
3569            .await
3570            .expect("Failed to create in-memory backend for test"),
3571        );
3572        let pm = Arc::new(
3573            PersistenceManager::new(backend)
3574                .await
3575                .expect("persistence manager should initialize"),
3576        );
3577        let (client, _rx) = Client::new(
3578            Arc::new(crate::runtime_impl::TokioRuntime),
3579            pm,
3580            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3581            Arc::new(MockHttpClient),
3582            None,
3583        )
3584        .await;
3585
3586        let phone = "559980000001";
3587        let lid_old = "100000012345678";
3588        let lid_new = "100000087654321";
3589
3590        // Insert initial mapping
3591        client
3592            .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
3593            .await
3594            .expect("Failed to persist LID-PN mapping in tests");
3595
3596        assert_eq!(
3597            client
3598                .lid_pn_cache
3599                .get_current_lid(phone)
3600                .await
3601                .expect("cache should have LID"),
3602            lid_old,
3603            "Initial LID should be stored"
3604        );
3605
3606        // Small delay to ensure different timestamp
3607        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3608
3609        // Add new mapping with newer timestamp
3610        client
3611            .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
3612            .await
3613            .expect("Failed to persist LID-PN mapping in tests");
3614
3615        assert_eq!(
3616            client
3617                .lid_pn_cache
3618                .get_current_lid(phone)
3619                .await
3620                .expect("cache should have newer LID"),
3621            lid_new,
3622            "Newer LID should be returned for phone lookup"
3623        );
3624
3625        // Both LIDs should still resolve to the same phone
3626        assert_eq!(
3627            client
3628                .lid_pn_cache
3629                .get_phone_number(lid_old)
3630                .await
3631                .expect("reverse lookup should return phone"),
3632            phone,
3633            "Old LID should still map to phone"
3634        );
3635        assert_eq!(
3636            client
3637                .lid_pn_cache
3638                .get_phone_number(lid_new)
3639                .await
3640                .expect("reverse lookup should return phone"),
3641            phone,
3642            "New LID should also map to phone"
3643        );
3644
3645        info!(
3646            "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
3647        );
3648    }
3649
3650    /// Test that get_lid_for_phone (from SendContextResolver) returns the cached value.
3651    ///
3652    /// This is the method used by wacore::send to look up LID mappings when encrypting.
3653    #[tokio::test]
3654    async fn test_get_lid_for_phone_via_send_context_resolver() {
3655        use wacore::client::context::SendContextResolver;
3656
3657        let backend = Arc::new(
3658            crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
3659                .await
3660                .expect("Failed to create in-memory backend for test"),
3661        );
3662        let pm = Arc::new(
3663            PersistenceManager::new(backend)
3664                .await
3665                .expect("persistence manager should initialize"),
3666        );
3667        let (client, _rx) = Client::new(
3668            Arc::new(crate::runtime_impl::TokioRuntime),
3669            pm,
3670            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3671            Arc::new(MockHttpClient),
3672            None,
3673        )
3674        .await;
3675
3676        let phone = "559980000001";
3677        let lid = "100000012345678";
3678
3679        // Before caching, should return None
3680        assert!(
3681            client.get_lid_for_phone(phone).await.is_none(),
3682            "get_lid_for_phone should return None before caching"
3683        );
3684
3685        // Cache the mapping using add_lid_pn_mapping
3686        client
3687            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
3688            .await
3689            .expect("Failed to persist LID-PN mapping in tests");
3690
3691        // Now it should return the LID
3692        let result = client.get_lid_for_phone(phone).await;
3693        assert!(
3694            result.is_some(),
3695            "get_lid_for_phone should return Some after caching"
3696        );
3697        assert_eq!(
3698            result.expect("get_lid_for_phone should return Some"),
3699            lid,
3700            "get_lid_for_phone should return the cached LID"
3701        );
3702
3703        info!(
3704            "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
3705        );
3706    }
3707
3708    /// Test that wait_for_offline_delivery_end returns immediately when the flag is already set.
3709    #[tokio::test]
3710    async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
3711        let backend = Arc::new(
3712            crate::store::SqliteStore::new(
3713                "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
3714            )
3715            .await
3716            .expect("Failed to create in-memory backend for test"),
3717        );
3718        let pm = Arc::new(
3719            PersistenceManager::new(backend)
3720                .await
3721                .expect("persistence manager should initialize"),
3722        );
3723        let (client, _rx) = Client::new(
3724            Arc::new(crate::runtime_impl::TokioRuntime),
3725            pm,
3726            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3727            Arc::new(MockHttpClient),
3728            None,
3729        )
3730        .await;
3731
3732        // Set the flag to true (simulating offline sync completed)
3733        client
3734            .offline_sync_completed
3735            .store(true, std::sync::atomic::Ordering::Relaxed);
3736
3737        // This should return immediately (not wait 10 seconds)
3738        let start = std::time::Instant::now();
3739        client.wait_for_offline_delivery_end().await;
3740        let elapsed = start.elapsed();
3741
3742        // Should complete in < 100ms (not 10 second timeout)
3743        assert!(
3744            elapsed.as_millis() < 100,
3745            "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
3746            elapsed
3747        );
3748
3749        info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
3750    }
3751
3752    /// Test that wait_for_offline_delivery_end times out when the flag is NOT set.
3753    /// This verifies the 10-second timeout is working.
3754    #[tokio::test]
3755    async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
3756        let backend = Arc::new(
3757            crate::store::SqliteStore::new(
3758                "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
3759            )
3760            .await
3761            .expect("Failed to create in-memory backend for test"),
3762        );
3763        let pm = Arc::new(
3764            PersistenceManager::new(backend)
3765                .await
3766                .expect("persistence manager should initialize"),
3767        );
3768        let (client, _rx) = Client::new(
3769            Arc::new(crate::runtime_impl::TokioRuntime),
3770            pm,
3771            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3772            Arc::new(MockHttpClient),
3773            None,
3774        )
3775        .await;
3776
3777        // Flag is false by default, so use a short timeout and verify the helper
3778        // marks the sync complete on timeout.
3779        let start = std::time::Instant::now();
3780        client
3781            .wait_for_offline_delivery_end_with_timeout(std::time::Duration::from_millis(50))
3782            .await;
3783
3784        let elapsed = start.elapsed();
3785        // Count available permits by trying to acquire non-blockingly
3786        let semaphore = client
3787            .message_processing_semaphore
3788            .lock()
3789            .expect("message_processing_semaphore poisoned")
3790            .clone();
3791        let mut guards = Vec::new();
3792        while let Some(guard) = semaphore.try_acquire() {
3793            guards.push(guard);
3794        }
3795        let permits = guards.len();
3796        drop(guards);
3797
3798        assert!(
3799            elapsed.as_millis() >= 45, // Allow small timing variance
3800            "Should have waited for the configured timeout duration, took {:?}",
3801            elapsed
3802        );
3803        assert!(
3804            client
3805                .offline_sync_completed
3806                .load(std::sync::atomic::Ordering::Relaxed),
3807            "wait_for_offline_delivery_end should mark offline sync complete on timeout"
3808        );
3809        assert_eq!(
3810            permits, 64,
3811            "timeout completion should restore parallel permits"
3812        );
3813
3814        info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
3815    }
3816
3817    /// Test that wait_for_offline_delivery_end returns when notified.
3818    #[tokio::test]
3819    async fn test_wait_for_offline_delivery_end_returns_on_notify() {
3820        let backend = Arc::new(
3821            crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
3822                .await
3823                .expect("Failed to create in-memory backend for test"),
3824        );
3825        let pm = Arc::new(
3826            PersistenceManager::new(backend)
3827                .await
3828                .expect("persistence manager should initialize"),
3829        );
3830        let (client, _rx) = Client::new(
3831            Arc::new(crate::runtime_impl::TokioRuntime),
3832            pm,
3833            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3834            Arc::new(MockHttpClient),
3835            None,
3836        )
3837        .await;
3838
3839        let client_clone = client.clone();
3840
3841        // Spawn a task that will notify after 50ms
3842        tokio::spawn(async move {
3843            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3844            client_clone.offline_sync_notifier.notify(usize::MAX);
3845        });
3846
3847        let start = std::time::Instant::now();
3848        client.wait_for_offline_delivery_end().await;
3849        let elapsed = start.elapsed();
3850
3851        // Should complete around 50ms (when notified), not 10 seconds
3852        assert!(
3853            elapsed.as_millis() < 200,
3854            "wait_for_offline_delivery_end should return when notified, took {:?}",
3855            elapsed
3856        );
3857        assert!(
3858            elapsed.as_millis() >= 45, // Should have waited for the notify
3859            "Should have waited for the notify, only took {:?}",
3860            elapsed
3861        );
3862
3863        info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
3864    }
3865
3866    /// Test that the offline_sync_completed flag starts as false.
3867    #[tokio::test]
3868    async fn test_offline_sync_flag_initially_false() {
3869        let backend = Arc::new(
3870            crate::store::SqliteStore::new(
3871                "file:memdb_offline_flag_initial?mode=memory&cache=shared",
3872            )
3873            .await
3874            .expect("Failed to create in-memory backend for test"),
3875        );
3876        let pm = Arc::new(
3877            PersistenceManager::new(backend)
3878                .await
3879                .expect("persistence manager should initialize"),
3880        );
3881        let (client, _rx) = Client::new(
3882            Arc::new(crate::runtime_impl::TokioRuntime),
3883            pm,
3884            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3885            Arc::new(MockHttpClient),
3886            None,
3887        )
3888        .await;
3889
3890        // The flag should be false initially
3891        assert!(
3892            !client
3893                .offline_sync_completed
3894                .load(std::sync::atomic::Ordering::Relaxed),
3895            "offline_sync_completed should be false when Client is first created"
3896        );
3897
3898        info!("✅ test_offline_sync_flag_initially_false passed");
3899    }
3900
3901    /// Test the complete offline sync lifecycle:
3902    /// 1. Flag starts false
3903    /// 2. Flag is set true after IB offline stanza
3904    /// 3. Notify is called
3905    #[tokio::test]
3906    async fn test_offline_sync_lifecycle() {
3907        use std::sync::atomic::Ordering;
3908
3909        let backend = Arc::new(
3910            crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
3911                .await
3912                .expect("Failed to create in-memory backend for test"),
3913        );
3914        let pm = Arc::new(
3915            PersistenceManager::new(backend)
3916                .await
3917                .expect("persistence manager should initialize"),
3918        );
3919        let (client, _rx) = Client::new(
3920            Arc::new(crate::runtime_impl::TokioRuntime),
3921            pm,
3922            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3923            Arc::new(MockHttpClient),
3924            None,
3925        )
3926        .await;
3927
3928        // 1. Initially false
3929        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3930
3931        // 2. Spawn a waiter
3932        let client_waiter = client.clone();
3933        let waiter_handle = tokio::spawn(async move {
3934            client_waiter.wait_for_offline_delivery_end().await;
3935            true // Return that we completed
3936        });
3937
3938        // Give the waiter time to start waiting
3939        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3940
3941        // Verify waiter hasn't completed yet
3942        assert!(
3943            !waiter_handle.is_finished(),
3944            "Waiter should still be waiting"
3945        );
3946
3947        // 3. Simulate IB handler behavior (set flag and notify)
3948        client.offline_sync_completed.store(true, Ordering::Relaxed);
3949        client.offline_sync_notifier.notify(usize::MAX);
3950
3951        // 4. Waiter should complete
3952        let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
3953            .await
3954            .expect("Waiter should complete after notify")
3955            .expect("Waiter task should not panic");
3956
3957        assert!(result, "Waiter should have completed successfully");
3958        assert!(client.offline_sync_completed.load(Ordering::Relaxed));
3959
3960        info!("✅ test_offline_sync_lifecycle passed");
3961    }
3962
3963    /// Test that establish_primary_phone_session_immediate returns error when no PN is set.
3964    /// This verifies the "not logged in" guard works.
3965    #[tokio::test]
3966    async fn test_establish_primary_phone_session_fails_without_pn() {
3967        let backend = Arc::new(
3968            crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
3969                .await
3970                .expect("Failed to create in-memory backend for test"),
3971        );
3972        let pm = Arc::new(
3973            PersistenceManager::new(backend)
3974                .await
3975                .expect("persistence manager should initialize"),
3976        );
3977        let (client, _rx) = Client::new(
3978            Arc::new(crate::runtime_impl::TokioRuntime),
3979            pm,
3980            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3981            Arc::new(MockHttpClient),
3982            None,
3983        )
3984        .await;
3985
3986        // No PN set, so this should fail
3987        let result = client.establish_primary_phone_session_immediate().await;
3988
3989        assert!(
3990            result.is_err(),
3991            "establish_primary_phone_session_immediate should fail when no PN is set"
3992        );
3993
3994        let err = result.unwrap_err();
3995        assert!(
3996            err.downcast_ref::<ClientError>()
3997                .is_some_and(|e| matches!(e, ClientError::NotLoggedIn)),
3998            "Error should be ClientError::NotLoggedIn, got: {}",
3999            err
4000        );
4001
4002        info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
4003    }
4004
4005    /// Test that ensure_e2e_sessions waits for offline sync to complete.
4006    /// This is the CRITICAL difference between ensure_e2e_sessions and
4007    /// establish_primary_phone_session_immediate.
4008    #[tokio::test]
4009    async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
4010        use std::sync::atomic::Ordering;
4011        use wacore_binary::jid::Jid;
4012
4013        let backend = Arc::new(
4014            crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
4015                .await
4016                .expect("Failed to create in-memory backend for test"),
4017        );
4018        let pm = Arc::new(
4019            PersistenceManager::new(backend)
4020                .await
4021                .expect("persistence manager should initialize"),
4022        );
4023        let (client, _rx) = Client::new(
4024            Arc::new(crate::runtime_impl::TokioRuntime),
4025            pm,
4026            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4027            Arc::new(MockHttpClient),
4028            None,
4029        )
4030        .await;
4031
4032        // Flag is false (offline sync not complete)
4033        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4034
4035        // Call ensure_e2e_sessions with an empty list (so it returns early after the wait)
4036        // This lets us test the waiting behavior without needing network
4037        let client_clone = client.clone();
4038        let ensure_handle = tokio::spawn(async move {
4039            // Start with some JIDs - but since we're testing the wait, we use empty
4040            // to avoid needing actual session establishment
4041            client_clone.ensure_e2e_sessions(vec![]).await
4042        });
4043
4044        // Wait a bit - ensure_e2e_sessions should return immediately for empty list
4045        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4046        assert!(
4047            ensure_handle.is_finished(),
4048            "ensure_e2e_sessions should return immediately for empty JID list"
4049        );
4050
4051        // Now test with actual JIDs - it should wait for offline sync
4052        let client_clone = client.clone();
4053        let test_jid = Jid::pn("559999999999");
4054        let ensure_handle = tokio::spawn(async move {
4055            // This will wait for offline sync before proceeding
4056            let start = std::time::Instant::now();
4057            let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
4058            start.elapsed()
4059        });
4060
4061        // Give it a moment to start
4062        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
4063
4064        // It should still be waiting (offline sync not complete)
4065        assert!(
4066            !ensure_handle.is_finished(),
4067            "ensure_e2e_sessions should be waiting for offline sync"
4068        );
4069
4070        // Now complete offline sync
4071        client.offline_sync_completed.store(true, Ordering::Relaxed);
4072        client.offline_sync_notifier.notify(usize::MAX);
4073
4074        // Now it should complete (might fail on session establishment, but that's ok)
4075        let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
4076
4077        assert!(
4078            result.is_ok(),
4079            "ensure_e2e_sessions should complete after offline sync"
4080        );
4081
4082        info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
4083    }
4084
4085    /// Integration test: Verify that the immediate session establishment does NOT
4086    /// wait for offline sync. This is critical for PDO to work during offline sync.
4087    ///
4088    /// The flow is:
4089    /// 1. Login -> establish_primary_phone_session_immediate() is called
4090    /// 2. This should NOT wait for offline sync (flag is false at this point)
4091    /// 3. After session is established, offline messages arrive
4092    /// 4. When decryption fails, PDO can immediately send to device 0
4093    #[tokio::test]
4094    async fn test_immediate_session_does_not_wait_for_offline_sync() {
4095        use std::sync::atomic::Ordering;
4096        use wacore_binary::jid::Jid;
4097
4098        let backend = Arc::new(
4099            crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
4100                .await
4101                .expect("Failed to create in-memory backend for test"),
4102        );
4103        let pm = Arc::new(
4104            PersistenceManager::new(backend.clone())
4105                .await
4106                .expect("persistence manager should initialize"),
4107        );
4108
4109        // Set a PN so establish_primary_phone_session_immediate doesn't fail early
4110        pm.modify_device(|device| {
4111            device.pn = Some(Jid::pn("559999999999"));
4112        })
4113        .await;
4114
4115        let (client, _rx) = Client::new(
4116            Arc::new(crate::runtime_impl::TokioRuntime),
4117            pm,
4118            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4119            Arc::new(MockHttpClient),
4120            None,
4121        )
4122        .await;
4123
4124        // Flag is false (offline sync not complete - simulating login state)
4125        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
4126
4127        // Call establish_primary_phone_session_immediate
4128        // It should NOT wait for offline sync - it should proceed immediately
4129        let start = std::time::Instant::now();
4130
4131        // Note: This will fail because we can't actually fetch prekeys in tests,
4132        // but the important thing is that it doesn't WAIT for offline sync
4133        let result = tokio::time::timeout(
4134            std::time::Duration::from_millis(500),
4135            client.establish_primary_phone_session_immediate(),
4136        )
4137        .await;
4138
4139        let elapsed = start.elapsed();
4140
4141        // The call should complete (or fail) quickly, NOT wait for 10 second timeout
4142        assert!(
4143            result.is_ok(),
4144            "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
4145        );
4146
4147        // It should complete in < 500ms (not 10 second wait)
4148        assert!(
4149            elapsed.as_millis() < 500,
4150            "establish_primary_phone_session_immediate should not wait, took {:?}",
4151            elapsed
4152        );
4153
4154        // The actual result might be an error (no network), but that's fine
4155        // The important thing is it didn't wait for offline sync
4156        info!(
4157            "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
4158            elapsed,
4159            result.unwrap().is_ok()
4160        );
4161
4162        info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
4163    }
4164
4165    /// Integration test: Verify that establish_primary_phone_session_immediate
4166    /// skips establishment when a session already exists.
4167    ///
4168    /// This is the CRITICAL fix for MAC verification failures:
4169    /// - BUG (before fix): Called process_prekey_bundle() unconditionally,
4170    ///   replacing the existing session with a new one
4171    /// - RESULT: Remote device still uses old session state, causing MAC failures
4172    #[tokio::test]
4173    async fn test_establish_session_skips_when_exists() {
4174        use wacore::libsignal::protocol::SessionRecord;
4175        use wacore::libsignal::store::SessionStore;
4176        use wacore::types::jid::JidExt;
4177        use wacore_binary::jid::Jid;
4178
4179        let backend = Arc::new(
4180            crate::store::SqliteStore::new("file:memdb_skip_existing?mode=memory&cache=shared")
4181                .await
4182                .expect("Failed to create in-memory backend for test"),
4183        );
4184        let pm = Arc::new(
4185            PersistenceManager::new(backend.clone())
4186                .await
4187                .expect("persistence manager should initialize"),
4188        );
4189
4190        // Set a PN so the function doesn't fail early
4191        let own_pn = Jid::pn("559999999999");
4192        pm.modify_device(|device| {
4193            device.pn = Some(own_pn.clone());
4194        })
4195        .await;
4196
4197        // Pre-populate a session for the primary phone JID (device 0)
4198        let primary_phone_jid = own_pn.with_device(0);
4199        let signal_addr = primary_phone_jid.to_protocol_address();
4200
4201        // Create a dummy session record
4202        let dummy_session = SessionRecord::new_fresh();
4203        {
4204            let device_arc = pm.get_device_arc().await;
4205            let device = device_arc.read().await;
4206            device
4207                .store_session(&signal_addr, &dummy_session)
4208                .await
4209                .expect("Failed to store test session");
4210
4211            // Verify session exists
4212            let exists = device
4213                .contains_session(&signal_addr)
4214                .await
4215                .expect("Failed to check session");
4216            assert!(exists, "Session should exist after store");
4217        }
4218
4219        let (client, _rx) = Client::new(
4220            Arc::new(crate::runtime_impl::TokioRuntime),
4221            pm.clone(),
4222            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4223            Arc::new(MockHttpClient),
4224            None,
4225        )
4226        .await;
4227
4228        // Call establish_primary_phone_session_immediate
4229        // It should return Ok(()) immediately without fetching prekeys
4230        let result = client.establish_primary_phone_session_immediate().await;
4231
4232        assert!(
4233            result.is_ok(),
4234            "establish_primary_phone_session_immediate should succeed when session exists"
4235        );
4236
4237        // Verify the session was NOT replaced (still has the same record)
4238        // This is the critical assertion - if session was replaced, it would cause MAC failures
4239        {
4240            let device_arc = pm.get_device_arc().await;
4241            let device = device_arc.read().await;
4242            let exists = device
4243                .contains_session(&signal_addr)
4244                .await
4245                .expect("Failed to check session");
4246            assert!(exists, "Session should still exist after the call");
4247        }
4248
4249        info!("✅ test_establish_session_skips_when_exists passed");
4250    }
4251
4252    /// Integration test: Verify that the session check prevents MAC failures
4253    /// by documenting the exact control flow that caused the bug.
4254    #[test]
4255    fn test_mac_failure_prevention_flow_documentation() {
4256        // Simulate the decision logic
4257        fn should_establish_session(
4258            check_result: Result<bool, &'static str>,
4259        ) -> Result<bool, String> {
4260            match check_result {
4261                Ok(true) => Ok(false), // Session exists → DON'T establish
4262                Ok(false) => Ok(true), // No session → establish
4263                Err(e) => Err(format!("Cannot verify session: {}", e)), // Fail-safe
4264            }
4265        }
4266
4267        // Test Case 1: Session exists → skip (prevents MAC failure)
4268        let result = should_establish_session(Ok(true));
4269        assert_eq!(result, Ok(false), "Should skip when session exists");
4270
4271        // Test Case 2: No session → establish
4272        let result = should_establish_session(Ok(false));
4273        assert_eq!(result, Ok(true), "Should establish when no session");
4274
4275        // Test Case 3: Check fails → error (fail-safe)
4276        let result = should_establish_session(Err("database error"));
4277        assert!(result.is_err(), "Should fail when check fails");
4278
4279        info!("✅ test_mac_failure_prevention_flow_documentation passed");
4280    }
4281
4282    #[test]
4283    fn test_unified_session_id_calculation() {
4284        // Test the mathematical calculation of the unified session ID.
4285        // Formula: (now_ms + server_offset_ms + 3_days_ms) % 7_days_ms
4286
4287        const DAY_MS: i64 = 24 * 60 * 60 * 1000;
4288        const WEEK_MS: i64 = 7 * DAY_MS;
4289        const OFFSET_MS: i64 = 3 * DAY_MS;
4290
4291        // Helper function matching the implementation
4292        fn calculate_session_id(now_ms: i64, server_offset_ms: i64) -> i64 {
4293            let adjusted_now = now_ms + server_offset_ms;
4294            (adjusted_now + OFFSET_MS) % WEEK_MS
4295        }
4296
4297        // Test 1: Zero offset
4298        let now_ms = 1706000000000_i64; // Some arbitrary timestamp
4299        let id = calculate_session_id(now_ms, 0);
4300        assert!(
4301            (0..WEEK_MS).contains(&id),
4302            "Session ID should be in [0, WEEK_MS)"
4303        );
4304
4305        // Test 2: Positive server offset (server is ahead)
4306        let id_with_positive_offset = calculate_session_id(now_ms, 5000);
4307        assert!(
4308            (0..WEEK_MS).contains(&id_with_positive_offset),
4309            "Session ID should be in [0, WEEK_MS)"
4310        );
4311        // The ID should be different from zero offset (unless wrap-around)
4312        // Not testing exact value as it depends on the offset
4313
4314        // Test 3: Negative server offset (server is behind)
4315        let id_with_negative_offset = calculate_session_id(now_ms, -5000);
4316        assert!(
4317            (0..WEEK_MS).contains(&id_with_negative_offset),
4318            "Session ID should be in [0, WEEK_MS)"
4319        );
4320
4321        // Test 4: Verify modulo wrap-around
4322        // If adjusted_now + OFFSET_MS >= WEEK_MS, it should wrap
4323        let wrap_test_now = WEEK_MS - OFFSET_MS + 1000; // Should produce small result
4324        let wrapped_id = calculate_session_id(wrap_test_now, 0);
4325        assert_eq!(wrapped_id, 1000, "Should wrap around correctly");
4326
4327        // Test 5: Edge case - at exact boundary
4328        let boundary_now = WEEK_MS - OFFSET_MS;
4329        let boundary_id = calculate_session_id(boundary_now, 0);
4330        assert_eq!(boundary_id, 0, "At exact boundary should be 0");
4331    }
4332
4333    #[tokio::test]
4334    async fn test_server_time_offset_extraction() {
4335        use wacore_binary::builder::NodeBuilder;
4336
4337        let backend = crate::test_utils::create_test_backend().await;
4338        let pm = Arc::new(
4339            PersistenceManager::new(backend)
4340                .await
4341                .expect("persistence manager should initialize"),
4342        );
4343        let (client, _rx) = Client::new(
4344            Arc::new(crate::runtime_impl::TokioRuntime),
4345            pm,
4346            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4347            Arc::new(MockHttpClient),
4348            None,
4349        )
4350        .await;
4351
4352        // Initially, offset should be 0
4353        assert_eq!(
4354            client.unified_session.server_time_offset_ms(),
4355            0,
4356            "Initial offset should be 0"
4357        );
4358
4359        // Create a node with a 't' attribute
4360        let server_time = wacore::time::now_secs() + 10; // Server is 10 seconds ahead
4361        let node = NodeBuilder::new("success")
4362            .attr("t", server_time.to_string())
4363            .build();
4364
4365        // Update the offset
4366        client.update_server_time_offset(&node);
4367
4368        // The offset should be approximately 10 * 1000 = 10000 ms
4369        // Allow some tolerance for timing differences during the test
4370        let offset = client.unified_session.server_time_offset_ms();
4371        assert!(
4372            (offset - 10000).abs() < 1000, // Allow 1 second tolerance
4373            "Offset should be approximately 10000ms, got {}",
4374            offset
4375        );
4376
4377        // Test with no 't' attribute - should not change offset
4378        let node_no_t = NodeBuilder::new("success").build();
4379        client.update_server_time_offset(&node_no_t);
4380        let offset_after = client.unified_session.server_time_offset_ms();
4381        assert!(
4382            (offset_after - offset).abs() < 100, // Should be same (or very close)
4383            "Offset should not change when 't' is missing"
4384        );
4385
4386        // Test with invalid 't' attribute - should not change offset
4387        let node_invalid = NodeBuilder::new("success")
4388            .attr("t", "not_a_number")
4389            .build();
4390        client.update_server_time_offset(&node_invalid);
4391        let offset_after_invalid = client.unified_session.server_time_offset_ms();
4392        assert!(
4393            (offset_after_invalid - offset).abs() < 100,
4394            "Offset should not change when 't' is invalid"
4395        );
4396
4397        // Test with negative/zero 't' - should not change offset
4398        let node_zero = NodeBuilder::new("success").attr("t", "0").build();
4399        client.update_server_time_offset(&node_zero);
4400        let offset_after_zero = client.unified_session.server_time_offset_ms();
4401        assert!(
4402            (offset_after_zero - offset).abs() < 100,
4403            "Offset should not change when 't' is 0"
4404        );
4405
4406        info!("✅ test_server_time_offset_extraction passed");
4407    }
4408
4409    #[tokio::test]
4410    async fn test_unified_session_manager_integration() {
4411        // Test the unified session manager through the client
4412
4413        let backend = crate::test_utils::create_test_backend().await;
4414        let pm = Arc::new(
4415            PersistenceManager::new(backend)
4416                .await
4417                .expect("persistence manager should initialize"),
4418        );
4419        let (client, _rx) = Client::new(
4420            Arc::new(crate::runtime_impl::TokioRuntime),
4421            pm,
4422            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4423            Arc::new(MockHttpClient),
4424            None,
4425        )
4426        .await;
4427
4428        // Initially, sequence should be 0
4429        assert_eq!(
4430            client.unified_session.sequence(),
4431            0,
4432            "Initial sequence should be 0"
4433        );
4434
4435        // Duplicate prevention depends on the session ID staying the same between calls.
4436        // Since the session ID is millisecond-based, use a retry loop to handle
4437        // the rare case where we cross a millisecond boundary between calls.
4438        loop {
4439            client.unified_session.reset().await;
4440
4441            let result = client.unified_session.prepare_send().await;
4442            assert!(result.is_some(), "First send should succeed");
4443            let (node, seq) = result.unwrap();
4444            assert_eq!(node.tag, "ib", "Should be an IB stanza");
4445            assert_eq!(seq, 1, "First sequence should be 1 (pre-increment)");
4446            assert_eq!(client.unified_session.sequence(), 1);
4447
4448            let result2 = client.unified_session.prepare_send().await;
4449            if result2.is_none() {
4450                // Duplicate was prevented within the same millisecond
4451                assert_eq!(client.unified_session.sequence(), 1);
4452                break;
4453            }
4454            // Millisecond boundary crossed, retry
4455            tokio::task::yield_now().await;
4456        }
4457
4458        // Clear last sent and try again - sequence resets on "new" session ID
4459        client.unified_session.clear_last_sent().await;
4460        let result3 = client.unified_session.prepare_send().await;
4461        assert!(result3.is_some(), "Should succeed after clearing");
4462        let (_, seq3) = result3.unwrap();
4463        assert_eq!(seq3, 1, "Sequence resets when session ID changes");
4464        assert_eq!(client.unified_session.sequence(), 1);
4465
4466        info!("✅ test_unified_session_manager_integration passed");
4467    }
4468
4469    #[test]
4470    fn test_unified_session_protocol_node() {
4471        // Test the type-safe protocol node implementation
4472        use wacore::ib::{IbStanza, UnifiedSession};
4473        use wacore::protocol::ProtocolNode;
4474
4475        // Create a unified session
4476        let session = UnifiedSession::new("123456789");
4477        assert_eq!(session.id, "123456789");
4478        assert_eq!(session.tag(), "unified_session");
4479
4480        // Convert to node
4481        let node = session.into_node();
4482        assert_eq!(node.tag, "unified_session");
4483        assert!(node.attrs.get("id").is_some_and(|v| v == "123456789"));
4484
4485        // Create an IB stanza
4486        let stanza = IbStanza::unified_session(UnifiedSession::new("987654321"));
4487        assert_eq!(stanza.tag(), "ib");
4488
4489        // Convert to node and verify structure
4490        let ib_node = stanza.into_node();
4491        assert_eq!(ib_node.tag, "ib");
4492        let children = ib_node.children().expect("IB stanza should have children");
4493        assert_eq!(children.len(), 1);
4494        assert_eq!(children[0].tag, "unified_session");
4495        assert!(
4496            children[0]
4497                .attrs
4498                .get("id")
4499                .is_some_and(|v| v == "987654321")
4500        );
4501
4502        info!("✅ test_unified_session_protocol_node passed");
4503    }
4504
4505    /// Helper to create a test client for offline sync tests
4506    async fn create_offline_sync_test_client() -> Arc<Client> {
4507        let backend = crate::test_utils::create_test_backend().await;
4508        let pm = Arc::new(
4509            PersistenceManager::new(backend)
4510                .await
4511                .expect("persistence manager should initialize"),
4512        );
4513        let (client, _rx) = Client::new(
4514            Arc::new(crate::runtime_impl::TokioRuntime),
4515            pm,
4516            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4517            Arc::new(MockHttpClient),
4518            None,
4519        )
4520        .await;
4521        client
4522    }
4523
4524    #[tokio::test]
4525    async fn test_ib_thread_metadata_does_not_end_sync() {
4526        let client = create_offline_sync_test_client().await;
4527        client
4528            .offline_sync_metrics
4529            .active
4530            .store(true, Ordering::Release);
4531
4532        let node = NodeBuilder::new("ib")
4533            .children([NodeBuilder::new("thread_metadata")
4534                .children([NodeBuilder::new("item").build()])
4535                .build()])
4536            .build();
4537
4538        client.process_node(Arc::new(node)).await;
4539        assert!(
4540            client.offline_sync_metrics.active.load(Ordering::Acquire),
4541            "<ib><thread_metadata> should NOT end offline sync"
4542        );
4543    }
4544
4545    #[tokio::test]
4546    async fn test_ib_edge_routing_does_not_end_sync() {
4547        let client = create_offline_sync_test_client().await;
4548        client
4549            .offline_sync_metrics
4550            .active
4551            .store(true, Ordering::Release);
4552
4553        let node = NodeBuilder::new("ib")
4554            .children([NodeBuilder::new("edge_routing")
4555                .children([NodeBuilder::new("routing_info")
4556                    .bytes(vec![1, 2, 3])
4557                    .build()])
4558                .build()])
4559            .build();
4560
4561        client.process_node(Arc::new(node)).await;
4562        assert!(
4563            client.offline_sync_metrics.active.load(Ordering::Acquire),
4564            "<ib><edge_routing> should NOT end offline sync"
4565        );
4566    }
4567
4568    #[tokio::test]
4569    async fn test_ib_dirty_does_not_end_sync() {
4570        let client = create_offline_sync_test_client().await;
4571        client
4572            .offline_sync_metrics
4573            .active
4574            .store(true, Ordering::Release);
4575
4576        let node = NodeBuilder::new("ib")
4577            .children([NodeBuilder::new("dirty")
4578                .attr("type", "groups")
4579                .attr("timestamp", "1234")
4580                .build()])
4581            .build();
4582
4583        client.process_node(Arc::new(node)).await;
4584        assert!(
4585            client.offline_sync_metrics.active.load(Ordering::Acquire),
4586            "<ib><dirty> should NOT end offline sync"
4587        );
4588    }
4589
4590    #[tokio::test]
4591    async fn test_ib_offline_child_ends_sync() {
4592        let client = create_offline_sync_test_client().await;
4593        client
4594            .offline_sync_metrics
4595            .active
4596            .store(true, Ordering::Release);
4597        client
4598            .offline_sync_metrics
4599            .total_messages
4600            .store(301, Ordering::Release);
4601
4602        let node = NodeBuilder::new("ib")
4603            .children([NodeBuilder::new("offline").attr("count", "301").build()])
4604            .build();
4605
4606        client.process_node(Arc::new(node)).await;
4607        assert!(
4608            !client.offline_sync_metrics.active.load(Ordering::Acquire),
4609            "<ib><offline count='301'/> should end offline sync"
4610        );
4611    }
4612
4613    #[tokio::test]
4614    async fn test_ib_offline_preview_starts_sync() {
4615        let client = create_offline_sync_test_client().await;
4616
4617        let node = NodeBuilder::new("ib")
4618            .children([NodeBuilder::new("offline_preview")
4619                .attr("count", "301")
4620                .attr("message", "168")
4621                .attr("notification", "62")
4622                .attr("receipt", "68")
4623                .attr("appdata", "0")
4624                .build()])
4625            .build();
4626
4627        client.process_node(Arc::new(node)).await;
4628        assert!(
4629            client.offline_sync_metrics.active.load(Ordering::Acquire),
4630            "offline_preview with count>0 should activate sync"
4631        );
4632        assert_eq!(
4633            client
4634                .offline_sync_metrics
4635                .total_messages
4636                .load(Ordering::Acquire),
4637            301
4638        );
4639    }
4640
4641    #[tokio::test]
4642    async fn test_offline_message_increments_processed() {
4643        let client = create_offline_sync_test_client().await;
4644        client
4645            .offline_sync_metrics
4646            .active
4647            .store(true, Ordering::Release);
4648        client
4649            .offline_sync_metrics
4650            .total_messages
4651            .store(100, Ordering::Release);
4652
4653        let node = NodeBuilder::new("message")
4654            .attr("offline", "1")
4655            .attr("from", "5551234567@s.whatsapp.net")
4656            .attr("id", "TEST123")
4657            .attr("t", "1772884671")
4658            .attr("type", "text")
4659            .build();
4660
4661        client.process_node(Arc::new(node)).await;
4662        assert_eq!(
4663            client
4664                .offline_sync_metrics
4665                .processed_messages
4666                .load(Ordering::Acquire),
4667            1,
4668            "offline message should increment processed count"
4669        );
4670    }
4671
4672    // ---------------------------------------------------------------
4673    // Server-initiated ping detection tests
4674    //
4675    // The WhatsApp server can send pings in two formats:
4676    //
4677    // 1. Child-element format (legacy/whatsmeow style):
4678    //    <iq type="get" from="s.whatsapp.net" id="...">
4679    //      <ping/>
4680    //    </iq>
4681    //
4682    // 2. xmlns-attribute format (real WhatsApp Web format):
4683    //    <iq from="s.whatsapp.net" t="..." type="get" xmlns="urn:xmpp:ping"/>
4684    //    This is a self-closing tag with NO child elements.
4685    //    Verified against captured WhatsApp Web JS (WAWebCommsHandleStanza):
4686    //      if (t.xmlns === "urn:xmpp:ping") return wap("iq", { type: "result", to: t.from });
4687    //
4688    // Both must be recognized and answered with a pong, otherwise the
4689    // server considers the client dead and stops responding to keepalive
4690    // pings — causing a timeout cascade and forced reconnect.
4691    // ---------------------------------------------------------------
4692
4693    #[tokio::test]
4694    async fn test_handle_iq_ping_with_child_element() {
4695        // Format 1: <iq type="get"><ping/></iq> — the legacy format with a <ping> child node.
4696        let backend = crate::test_utils::create_test_backend().await;
4697        let pm = Arc::new(
4698            PersistenceManager::new(backend)
4699                .await
4700                .expect("persistence manager should initialize"),
4701        );
4702        let (client, _rx) = Client::new(
4703            Arc::new(crate::runtime_impl::TokioRuntime),
4704            pm,
4705            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4706            Arc::new(MockHttpClient),
4707            None,
4708        )
4709        .await;
4710
4711        let ping_node = NodeBuilder::new("iq")
4712            .attr("type", "get")
4713            .attr("from", SERVER_JID)
4714            .attr("id", "ping-child-1")
4715            .children([NodeBuilder::new("ping").build()])
4716            .build();
4717
4718        let handled = client.handle_iq(&ping_node).await;
4719        assert!(
4720            handled,
4721            "handle_iq must recognize ping with <ping> child element"
4722        );
4723    }
4724
4725    #[tokio::test]
4726    async fn test_handle_iq_ping_with_xmlns_attribute() {
4727        // Format 2: <iq type="get" xmlns="urn:xmpp:ping"/> — the real WhatsApp Web format.
4728        // This is a self-closing IQ with NO children, only an xmlns attribute.
4729        // The server sends this format; failing to respond causes keepalive timeout cascade.
4730        let backend = crate::test_utils::create_test_backend().await;
4731        let pm = Arc::new(
4732            PersistenceManager::new(backend)
4733                .await
4734                .expect("persistence manager should initialize"),
4735        );
4736        let (client, _rx) = Client::new(
4737            Arc::new(crate::runtime_impl::TokioRuntime),
4738            pm,
4739            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4740            Arc::new(MockHttpClient),
4741            None,
4742        )
4743        .await;
4744
4745        let ping_node = NodeBuilder::new("iq")
4746            .attr("type", "get")
4747            .attr("from", SERVER_JID)
4748            .attr("id", "ping-xmlns-1")
4749            .attr("xmlns", "urn:xmpp:ping")
4750            .build();
4751
4752        let handled = client.handle_iq(&ping_node).await;
4753        assert!(
4754            handled,
4755            "handle_iq must recognize ping with xmlns=\"urn:xmpp:ping\" attribute (no children)"
4756        );
4757    }
4758
4759    #[tokio::test]
4760    async fn test_handle_iq_ping_with_both_child_and_xmlns() {
4761        // Edge case: node has BOTH a <ping> child AND xmlns="urn:xmpp:ping".
4762        // Should still be handled (OR condition).
4763        let backend = crate::test_utils::create_test_backend().await;
4764        let pm = Arc::new(
4765            PersistenceManager::new(backend)
4766                .await
4767                .expect("persistence manager should initialize"),
4768        );
4769        let (client, _rx) = Client::new(
4770            Arc::new(crate::runtime_impl::TokioRuntime),
4771            pm,
4772            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4773            Arc::new(MockHttpClient),
4774            None,
4775        )
4776        .await;
4777
4778        let ping_node = NodeBuilder::new("iq")
4779            .attr("type", "get")
4780            .attr("from", SERVER_JID)
4781            .attr("id", "ping-both-1")
4782            .attr("xmlns", "urn:xmpp:ping")
4783            .children([NodeBuilder::new("ping").build()])
4784            .build();
4785
4786        let handled = client.handle_iq(&ping_node).await;
4787        assert!(
4788            handled,
4789            "handle_iq must handle ping with both child and xmlns"
4790        );
4791    }
4792
4793    #[tokio::test]
4794    async fn test_handle_iq_non_ping_returns_false() {
4795        // A type="get" IQ without ping child or xmlns should NOT be handled as ping.
4796        let backend = crate::test_utils::create_test_backend().await;
4797        let pm = Arc::new(
4798            PersistenceManager::new(backend)
4799                .await
4800                .expect("persistence manager should initialize"),
4801        );
4802        let (client, _rx) = Client::new(
4803            Arc::new(crate::runtime_impl::TokioRuntime),
4804            pm,
4805            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4806            Arc::new(MockHttpClient),
4807            None,
4808        )
4809        .await;
4810
4811        let non_ping_node = NodeBuilder::new("iq")
4812            .attr("type", "get")
4813            .attr("from", SERVER_JID)
4814            .attr("id", "not-a-ping")
4815            .attr("xmlns", "some:other:namespace")
4816            .build();
4817
4818        let handled = client.handle_iq(&non_ping_node).await;
4819        assert!(
4820            !handled,
4821            "handle_iq must NOT treat non-ping xmlns as a ping"
4822        );
4823    }
4824
4825    #[tokio::test]
4826    async fn test_handle_iq_ping_wrong_type_returns_false() {
4827        // xmlns="urn:xmpp:ping" but type="result" (not "get") — should NOT be handled as ping.
4828        let backend = crate::test_utils::create_test_backend().await;
4829        let pm = Arc::new(
4830            PersistenceManager::new(backend)
4831                .await
4832                .expect("persistence manager should initialize"),
4833        );
4834        let (client, _rx) = Client::new(
4835            Arc::new(crate::runtime_impl::TokioRuntime),
4836            pm,
4837            Arc::new(crate::transport::mock::MockTransportFactory::new()),
4838            Arc::new(MockHttpClient),
4839            None,
4840        )
4841        .await;
4842
4843        let result_node = NodeBuilder::new("iq")
4844            .attr("type", "result")
4845            .attr("from", SERVER_JID)
4846            .attr("id", "ping-result-1")
4847            .attr("xmlns", "urn:xmpp:ping")
4848            .build();
4849
4850        let handled = client.handle_iq(&result_node).await;
4851        assert!(
4852            !handled,
4853            "handle_iq must NOT respond to type=\"result\" even with ping xmlns"
4854        );
4855    }
4856
4857    // ── build_pong tests ──────────────────────────────────────────────
4858
4859    #[test]
4860    fn test_build_pong_with_id() {
4861        let pong = build_pong("s.whatsapp.net".to_string(), Some("ping-123"));
4862        assert!(
4863            pong.attrs.get("id").is_some_and(|v| v == "ping-123"),
4864            "pong should include id when server ping has one"
4865        );
4866        assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4867        assert!(pong.attrs.get("to").is_some_and(|v| v == "s.whatsapp.net"));
4868    }
4869
4870    #[test]
4871    fn test_build_pong_without_id() {
4872        let pong = build_pong("s.whatsapp.net".to_string(), None);
4873        assert!(
4874            !pong.attrs.contains_key("id"),
4875            "pong should NOT include id when server ping has none"
4876        );
4877        assert!(pong.attrs.get("type").is_some_and(|v| v == "result"));
4878    }
4879
4880    #[test]
4881    fn test_encrypt_identity_notification_omits_type() {
4882        let node = NodeBuilder::new("notification")
4883            .attr("from", "186303081611421@lid")
4884            .attr("id", "4128735301")
4885            .attr("type", "encrypt")
4886            .children([NodeBuilder::new("identity").build()])
4887            .build();
4888
4889        assert!(
4890            is_encrypt_identity_notification(&node),
4891            "identity-change notification ACK must omit type to match WA Web"
4892        );
4893    }
4894
4895    #[test]
4896    fn test_device_notification_is_not_encrypt_identity() {
4897        let node = NodeBuilder::new("notification")
4898            .attr("from", "186303081611421@lid")
4899            .attr("id", "269488578")
4900            .attr("type", "devices")
4901            .children([NodeBuilder::new("remove").build()])
4902            .build();
4903
4904        assert!(
4905            !is_encrypt_identity_notification(&node),
4906            "device notification is not an encrypt+identity notification"
4907        );
4908    }
4909
4910    #[test]
4911    fn test_build_ack_node_for_message_omits_type_includes_from() {
4912        // Whatsmeow: message acks do NOT echo type (node.Tag != "message" guard).
4913        // They DO include `from` with own device PN.
4914        let incoming = NodeBuilder::new("message")
4915            .attr("from", "120363161500776365@g.us")
4916            .attr("id", "A5791A5392EF60E3FB0670098DE010D4")
4917            .attr("type", "text")
4918            .attr("participant", "181531758878822@lid")
4919            .build();
4920        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4921            .parse()
4922            .expect("own device PN JID should parse");
4923
4924        let ack = build_ack_node(&incoming, Some(&own_device_pn))
4925            .expect("message ack should be buildable");
4926
4927        assert_eq!(ack.tag, "ack");
4928        // Use PartialEq<str> on NodeValue — works for both String and Jid variants
4929        // without allocation, so tests don't depend on internal representation.
4930        assert!(ack.attrs.get("class").is_some_and(|v| v == "message"));
4931        assert!(
4932            ack.attrs
4933                .get("to")
4934                .is_some_and(|v| v == "120363161500776365@g.us")
4935        );
4936        assert!(
4937            ack.attrs
4938                .get("from")
4939                .is_some_and(|v| v == "155500012345:48@s.whatsapp.net")
4940        );
4941        assert!(
4942            ack.attrs
4943                .get("participant")
4944                .is_some_and(|v| v == "181531758878822@lid")
4945        );
4946        assert!(
4947            !ack.attrs.contains_key("type"),
4948            "message ACK must NOT echo type (matches whatsmeow behavior)"
4949        );
4950    }
4951
4952    #[test]
4953    fn test_build_ack_node_for_identity_change_omits_type_and_from() {
4954        let incoming = NodeBuilder::new("notification")
4955            .attr("from", "186303081611421@lid")
4956            .attr("id", "4128735301")
4957            .attr("type", "encrypt")
4958            .children([NodeBuilder::new("identity").build()])
4959            .build();
4960        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4961            .parse()
4962            .expect("own device PN JID should parse");
4963
4964        let ack = build_ack_node(&incoming, Some(&own_device_pn))
4965            .expect("notification ack should be buildable");
4966
4967        assert!(ack.attrs.get("class").is_some_and(|v| v == "notification"));
4968        assert!(
4969            !ack.attrs.contains_key("type"),
4970            "identity-change notification ACK must omit type"
4971        );
4972        assert!(
4973            !ack.attrs.contains_key("from"),
4974            "notification ACKs should not include our device PN"
4975        );
4976    }
4977
4978    #[test]
4979    fn test_build_ack_node_for_receipt_with_type_echoes_type() {
4980        // Receipt acks should echo the type attribute when present (e.g. "read", "played").
4981        let incoming = NodeBuilder::new("receipt")
4982            .attr("from", "156535032389744@lid")
4983            .attr("id", "RCPT-WITH-TYPE")
4984            .attr("type", "read")
4985            .build();
4986        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
4987            .parse()
4988            .expect("own device PN JID should parse");
4989
4990        let ack = build_ack_node(&incoming, Some(&own_device_pn))
4991            .expect("receipt ack should be buildable");
4992
4993        assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
4994        assert!(
4995            ack.attrs.get("type").is_some_and(|v| v == "read"),
4996            "receipt ACK must echo the type attribute when present"
4997        );
4998        assert!(
4999            !ack.attrs.contains_key("from"),
5000            "receipt ACKs should not include our device PN"
5001        );
5002    }
5003
5004    #[test]
5005    fn test_build_ack_node_for_receipt_without_type_omits_type() {
5006        // Delivery receipts have no type attribute — the ack must also omit it.
5007        // Sending type="delivery" in the ack causes stream:error disconnections.
5008        let incoming = NodeBuilder::new("receipt")
5009            .attr("from", "156535032389744@lid")
5010            .attr("id", "RCPT-NO-TYPE")
5011            .build();
5012        let own_device_pn: Jid = "155500012345:48@s.whatsapp.net"
5013            .parse()
5014            .expect("own device PN JID should parse");
5015
5016        let ack = build_ack_node(&incoming, Some(&own_device_pn))
5017            .expect("receipt ack should be buildable");
5018
5019        assert!(ack.attrs.get("class").is_some_and(|v| v == "receipt"));
5020        assert!(
5021            !ack.attrs.contains_key("type"),
5022            "receipt ACK must NOT contain type when the incoming receipt has no type attribute"
5023        );
5024        assert!(
5025            !ack.attrs.contains_key("from"),
5026            "receipt ACKs should not include our device PN"
5027        );
5028    }
5029
5030    /// Smoke test: server ping with xmlns but no id attribute is handled.
5031    #[tokio::test]
5032    async fn test_handle_iq_ping_without_id() {
5033        let backend = crate::test_utils::create_test_backend().await;
5034        let pm = Arc::new(
5035            PersistenceManager::new(backend)
5036                .await
5037                .expect("persistence manager should initialize"),
5038        );
5039        let (client, _rx) = Client::new(
5040            Arc::new(crate::runtime_impl::TokioRuntime),
5041            pm,
5042            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5043            Arc::new(MockHttpClient),
5044            None,
5045        )
5046        .await;
5047
5048        // Server ping without id — real format observed in production logs
5049        let ping_node = NodeBuilder::new("iq")
5050            .attr("type", "get")
5051            .attr("from", SERVER_JID)
5052            .attr("xmlns", "urn:xmpp:ping")
5053            .build();
5054
5055        let handled = client.handle_iq(&ping_node).await;
5056        assert!(
5057            handled,
5058            "handle_iq must recognize ping without id attribute"
5059        );
5060    }
5061
5062    // ── fibonacci_backoff tests ────────────────────────────────────────
5063
5064    #[test]
5065    fn test_fibonacci_backoff_sequence() {
5066        // WA Web: first=1000, second=1000 → 1,1,2,3,5,8,13,21,34,55,89,144...s
5067        // We test base values without jitter by checking the range (±10%).
5068        let expected_base_ms = [1000, 1000, 2000, 3000, 5000, 8000, 13000, 21000];
5069        for (attempt, &base) in expected_base_ms.iter().enumerate() {
5070            let delay = fibonacci_backoff(attempt as u32);
5071            let ms = delay.as_millis() as u64;
5072            let low = base - base / 10;
5073            let high = base + base / 10;
5074            assert!(
5075                ms >= low && ms <= high,
5076                "attempt {attempt}: expected {low}..={high}ms, got {ms}ms"
5077            );
5078        }
5079    }
5080
5081    #[test]
5082    fn test_fibonacci_backoff_max_900s() {
5083        // After many attempts, should cap at 900s (±10%)
5084        let delay = fibonacci_backoff(100);
5085        let ms = delay.as_millis() as u64;
5086        assert!(
5087            ms <= 990_000,
5088            "should never exceed 900s + 10% jitter, got {ms}ms"
5089        );
5090        assert!(
5091            ms >= 810_000,
5092            "should be at least 900s - 10% jitter, got {ms}ms"
5093        );
5094    }
5095
5096    #[test]
5097    fn test_fibonacci_backoff_first_attempt_is_1s() {
5098        let delay = fibonacci_backoff(0);
5099        let ms = delay.as_millis() as u64;
5100        assert!(
5101            (900..=1100).contains(&ms),
5102            "first attempt should be ~1s (±10%), got {ms}ms"
5103        );
5104    }
5105
5106    // ── stream error tests ─────────────────────────────────────────────
5107
5108    #[tokio::test]
5109    async fn test_stream_error_401_disables_reconnect() {
5110        let client = create_offline_sync_test_client().await;
5111        let node = NodeBuilder::new("stream:error").attr("code", "401").build();
5112        client.handle_stream_error(&node).await;
5113        assert!(
5114            !client.enable_auto_reconnect.load(Ordering::Relaxed),
5115            "401 should disable auto-reconnect"
5116        );
5117    }
5118
5119    #[tokio::test]
5120    async fn test_stream_error_409_disables_reconnect() {
5121        let client = create_offline_sync_test_client().await;
5122        let node = NodeBuilder::new("stream:error").attr("code", "409").build();
5123        client.handle_stream_error(&node).await;
5124        assert!(
5125            !client.enable_auto_reconnect.load(Ordering::Relaxed),
5126            "409 should disable auto-reconnect"
5127        );
5128    }
5129
5130    #[tokio::test]
5131    async fn test_stream_error_429_keeps_reconnect_with_backoff() {
5132        let client = create_offline_sync_test_client().await;
5133        let before = client.auto_reconnect_errors.load(Ordering::Relaxed);
5134        let node = NodeBuilder::new("stream:error").attr("code", "429").build();
5135        client.handle_stream_error(&node).await;
5136        assert!(
5137            client.enable_auto_reconnect.load(Ordering::Relaxed),
5138            "429 should keep auto-reconnect enabled"
5139        );
5140        let after = client.auto_reconnect_errors.load(Ordering::Relaxed);
5141        assert_eq!(
5142            after,
5143            before + 5,
5144            "429 should increase backoff by exactly 5: before={before}, after={after}"
5145        );
5146    }
5147
5148    #[tokio::test]
5149    async fn test_stream_error_503_keeps_reconnect() {
5150        let client = create_offline_sync_test_client().await;
5151        let node = NodeBuilder::new("stream:error").attr("code", "503").build();
5152        client.handle_stream_error(&node).await;
5153        assert!(
5154            client.enable_auto_reconnect.load(Ordering::Relaxed),
5155            "503 should keep auto-reconnect enabled"
5156        );
5157    }
5158
5159    #[tokio::test]
5160    async fn test_custom_cache_config_is_respected() {
5161        use crate::cache_config::{CacheConfig, CacheEntryConfig};
5162        use std::time::Duration;
5163
5164        let backend = crate::test_utils::create_test_backend().await;
5165        let pm = Arc::new(
5166            PersistenceManager::new(backend)
5167                .await
5168                .expect("persistence manager should initialize"),
5169        );
5170
5171        let custom_config = CacheConfig {
5172            group_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5173            device_cache: CacheEntryConfig::new(Some(Duration::from_secs(60)), 10),
5174            ..CacheConfig::default()
5175        };
5176
5177        // Verify that constructing a client with a custom config does not panic
5178        // and the client is usable.
5179        let (client, _rx) = Client::new_with_cache_config(
5180            Arc::new(crate::runtime_impl::TokioRuntime),
5181            pm,
5182            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5183            Arc::new(MockHttpClient),
5184            None,
5185            custom_config,
5186        )
5187        .await;
5188
5189        assert!(!client.is_logged_in());
5190    }
5191
5192    /// Proves that `is_connected()` no longer gives false negatives under mutex
5193    /// contention. Before the fix, `try_lock()` would fail when another task held
5194    /// the noise_socket mutex, causing `is_connected()` to return `false` even
5195    /// though the connection was alive — silently dropping receipt acks.
5196    ///
5197    /// This test sets up a real NoiseSocket (same as socket unit tests) so it
5198    /// accurately models the pre-fix scenario: socket is Some + mutex is held
5199    /// by another task = old is_connected() returned false.
5200    #[tokio::test]
5201    async fn test_is_connected_not_affected_by_mutex_contention() {
5202        use crate::socket::NoiseSocket;
5203        use wacore::handshake::NoiseCipher;
5204
5205        let backend = crate::test_utils::create_test_backend().await;
5206        let pm = Arc::new(
5207            PersistenceManager::new(backend)
5208                .await
5209                .expect("persistence manager should initialize"),
5210        );
5211        let (client, _rx) = Client::new(
5212            Arc::new(crate::runtime_impl::TokioRuntime),
5213            pm,
5214            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5215            Arc::new(MockHttpClient),
5216            None,
5217        )
5218        .await;
5219
5220        // Initially not connected
5221        assert!(!client.is_connected(), "should start disconnected");
5222
5223        // Simulate a real connection: create a NoiseSocket and store it
5224        let transport: Arc<dyn crate::transport::Transport> =
5225            Arc::new(crate::transport::mock::MockTransport);
5226        let key = [0u8; 32];
5227        let write_key = NoiseCipher::new(&key).expect("valid key");
5228        let read_key = NoiseCipher::new(&key).expect("valid key");
5229        let noise_socket = NoiseSocket::new(
5230            Arc::new(crate::runtime_impl::TokioRuntime),
5231            transport,
5232            write_key,
5233            read_key,
5234        );
5235        *client.noise_socket.lock().await = Some(Arc::new(noise_socket));
5236        client.is_connected.store(true, Ordering::Release);
5237
5238        assert!(client.is_connected(), "should report connected");
5239
5240        // Hold the noise_socket mutex — this used to make is_connected() return
5241        // false via try_lock() even though the socket was Some(...)
5242        let _guard = client.noise_socket.lock().await;
5243        assert!(
5244            client.is_connected(),
5245            "is_connected() must return true even while noise_socket mutex is held"
5246        );
5247    }
5248
5249    /// Verifies that `send_ack_for` returns an error (not silent Ok) when
5250    /// disconnected. This ensures the caller's `warn!` fires so dropped acks
5251    /// are visible in logs.
5252    #[tokio::test]
5253    async fn test_send_ack_for_returns_error_when_disconnected() {
5254        let backend = crate::test_utils::create_test_backend().await;
5255        let pm = Arc::new(
5256            PersistenceManager::new(backend)
5257                .await
5258                .expect("persistence manager should initialize"),
5259        );
5260        let (client, _rx) = Client::new(
5261            Arc::new(crate::runtime_impl::TokioRuntime),
5262            pm,
5263            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5264            Arc::new(MockHttpClient),
5265            None,
5266        )
5267        .await;
5268
5269        // Not connected — send_ack_for should return Err, not Ok
5270        let receipt = NodeBuilder::new("receipt")
5271            .attr("from", "120363040237990503@g.us")
5272            .attr("id", "TEST-RECEIPT-ID")
5273            .attr("participant", "236395184570386@lid")
5274            .build();
5275
5276        let result = client.send_ack_for(&receipt).await;
5277        assert!(
5278            matches!(result, Err(ClientError::NotConnected)),
5279            "send_ack_for must return Err(NotConnected) when disconnected, got: {result:?}"
5280        );
5281    }
5282
5283    /// Verifies that `send_ack_for` returns Ok when expected_disconnect is set,
5284    /// since this is an intentional shutdown path.
5285    #[tokio::test]
5286    async fn test_send_ack_for_returns_ok_on_expected_disconnect() {
5287        let backend = crate::test_utils::create_test_backend().await;
5288        let pm = Arc::new(
5289            PersistenceManager::new(backend)
5290                .await
5291                .expect("persistence manager should initialize"),
5292        );
5293        let (client, _rx) = Client::new(
5294            Arc::new(crate::runtime_impl::TokioRuntime),
5295            pm,
5296            Arc::new(crate::transport::mock::MockTransportFactory::new()),
5297            Arc::new(MockHttpClient),
5298            None,
5299        )
5300        .await;
5301
5302        // Set expected disconnect — send_ack_for should gracefully return Ok
5303        client.expected_disconnect.store(true, Ordering::Relaxed);
5304
5305        let receipt = NodeBuilder::new("receipt")
5306            .attr("from", "120363040237990503@g.us")
5307            .attr("id", "TEST-RECEIPT-ID")
5308            .build();
5309
5310        let result = client.send_ack_for(&receipt).await;
5311        assert!(
5312            result.is_ok(),
5313            "send_ack_for should return Ok during expected disconnect"
5314        );
5315    }
5316}