Skip to main content

whatsapp_rust/
client.rs

1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::handshake;
8use crate::lid_pn_cache::LidPnCache;
9use crate::pair;
10use anyhow::{Result, anyhow};
11use dashmap::DashMap;
12use moka::future::Cache;
13use tokio::sync::watch;
14use wacore::xml::DisplayableNode;
15use wacore_binary::builder::NodeBuilder;
16use wacore_binary::jid::JidExt;
17use wacore_binary::node::{Attrs, Node};
18
19use crate::appstate_sync::AppStateProcessor;
20use crate::handlers::chatstate::ChatStateEvent;
21use crate::jid_utils::server_jid;
22use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
23use crate::types::enc_handler::EncHandler;
24use crate::types::events::{ConnectFailureReason, Event};
25
26use log::{debug, error, info, trace, warn};
27
28use rand::RngCore;
29use scopeguard;
30use std::collections::{HashMap, HashSet};
31use wacore_binary::jid::Jid;
32
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
35
36use thiserror::Error;
37use tokio::sync::{Mutex, Notify, OnceCell, RwLock, mpsc};
38use tokio::time::{Duration, sleep};
39use wacore::appstate::patch_decode::WAPatchName;
40use wacore::client::context::GroupInfo;
41use waproto::whatsapp as wa;
42
43use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
44use crate::sync_task::MajorSyncTask;
45
46/// Type alias for chatstate event handler functions.
47type ChatStateHandler = Arc<dyn Fn(ChatStateEvent) + Send + Sync>;
48
49const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
50
51const MAX_POOLED_BUFFER_CAP: usize = 512 * 1024;
52
53#[derive(Debug, Error)]
54pub enum ClientError {
55    #[error("client is not connected")]
56    NotConnected,
57    #[error("socket error: {0}")]
58    Socket(#[from] SocketError),
59    #[error("encrypt/send error: {0}")]
60    EncryptSend(#[from] EncryptSendError),
61    #[error("client is already connected")]
62    AlreadyConnected,
63    #[error("client is not logged in")]
64    NotLoggedIn,
65}
66
67use wacore::types::message::StanzaKey;
68
69/// Metrics for tracking offline sync progress
70#[derive(Debug)]
71pub(crate) struct OfflineSyncMetrics {
72    pub active: AtomicBool,
73    pub total_messages: AtomicUsize,
74    pub processed_messages: AtomicUsize,
75    // Using simple std Mutex for timestamp as it's rarely contended and non-async
76    pub start_time: std::sync::Mutex<Option<std::time::Instant>>,
77}
78
79/// Metrics for tracking the effectiveness of the local re-queue optimization.
80/// This helps monitor whether the 500ms delay and cache TTL are well-tuned.
81pub(crate) struct RetryMetrics {
82    /// Number of times messages triggered local re-queue (NoSession on first attempt)
83    pub local_requeue_attempts: AtomicUsize,
84    /// Number of times re-queued messages successfully decrypted
85    pub local_requeue_success: AtomicUsize,
86    /// Number of times re-queued messages still failed (fell back to network retry)
87    pub local_requeue_fallback: AtomicUsize,
88}
89
90pub struct Client {
91    pub(crate) core: wacore::client::CoreClient,
92
93    pub(crate) persistence_manager: Arc<PersistenceManager>,
94    pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
95
96    pub(crate) is_logged_in: Arc<AtomicBool>,
97    pub(crate) is_connecting: Arc<AtomicBool>,
98    pub(crate) is_running: Arc<AtomicBool>,
99    pub(crate) shutdown_notifier: Arc<Notify>,
100
101    pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
102    pub(crate) transport_events:
103        Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
104    pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
105    pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
106
107    pub(crate) response_waiters:
108        Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<wacore_binary::Node>>>>,
109    pub(crate) unique_id: String,
110    pub(crate) id_counter: Arc<AtomicU64>,
111
112    pub(crate) unified_session: crate::unified_session::UnifiedSessionManager,
113
114    /// Per-device session locks for Signal protocol operations.
115    /// Prevents race conditions when multiple messages from the same sender
116    /// are processed concurrently across different chats.
117    /// Keys are Signal protocol address strings (e.g., "user@s.whatsapp.net:0")
118    /// to match the SignalProtocolStoreAdapter's internal locking.
119    pub(crate) session_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
120
121    /// Per-chat message queues for sequential message processing.
122    /// Prevents race conditions where a later message is processed before
123    /// the PreKey message that establishes the Signal session.
124    pub(crate) message_queues: Cache<String, mpsc::Sender<Arc<Node>>>,
125
126    /// Cache for LID to Phone Number mappings (bidirectional).
127    /// When we receive a message with sender_lid/sender_pn attributes, we store the mapping here.
128    /// This allows us to reuse existing LID-based sessions when sending replies.
129    /// The cache is backed by persistent storage and warmed up on client initialization.
130    pub(crate) lid_pn_cache: Arc<LidPnCache>,
131
132    /// Per-chat mutex for serializing message enqueue operations.
133    /// This ensures messages are enqueued in the order they arrive,
134    /// preventing race conditions during queue initialization.
135    pub(crate) message_enqueue_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
136
137    pub group_cache: OnceCell<Cache<Jid, GroupInfo>>,
138    pub device_cache: OnceCell<Cache<Jid, Vec<Jid>>>,
139
140    pub(crate) retried_group_messages: Cache<String, ()>,
141    pub(crate) expected_disconnect: Arc<AtomicBool>,
142
143    /// Connection generation counter - incremented on each new connection.
144    /// Used to detect stale post-login tasks from previous connections.
145    pub(crate) connection_generation: Arc<AtomicU64>,
146
147    /// Cache for recent messages (serialized bytes) for retry functionality.
148    /// Uses moka cache with TTL and max capacity for automatic eviction.
149    pub(crate) recent_messages: Cache<StanzaKey, Vec<u8>>,
150
151    pub(crate) pending_retries: Arc<Mutex<HashSet<String>>>,
152
153    /// Track retry attempts per message to prevent infinite retry loops.
154    /// Key: "{chat}:{msg_id}:{sender}", Value: retry count
155    /// Matches WhatsApp Web's MAX_RETRY = 5 behavior.
156    pub(crate) message_retry_counts: Cache<String, u8>,
157
158    /// Cache for tracking local re-queue attempts for NoSession errors.
159    /// Used to tolerate out-of-order delivery where skmsg arrives before pkmsg.
160    /// Key: Message ID, Value: ()
161    /// TTL: 10 seconds (short-lived buffer)
162    pub(crate) local_retry_cache: Cache<String, ()>,
163
164    pub enable_auto_reconnect: Arc<AtomicBool>,
165    pub auto_reconnect_errors: Arc<AtomicU32>,
166    pub last_successful_connect: Arc<Mutex<Option<chrono::DateTime<chrono::Utc>>>>,
167
168    pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
169
170    pub(crate) app_state_processor: OnceCell<AppStateProcessor>,
171    pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, std::time::Instant>>>,
172    /// Tracks collections currently being synced to prevent duplicate sync tasks.
173    /// Matches WA Web's in-flight tracking set in WAWebSyncdCollectionsStateMachine.
174    pub(crate) app_state_syncing: Arc<Mutex<HashSet<WAPatchName>>>,
175    pub(crate) initial_keys_synced_notifier: Arc<Notify>,
176    pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
177
178    /// Notifier for when offline sync (ib offline stanza) is received.
179    /// WhatsApp Web waits for this before sending passive tasks (prekey upload, active IQ, presence).
180    pub(crate) offline_sync_notifier: Arc<Notify>,
181    /// Flag indicating offline sync has completed (received ib offline stanza).
182    pub(crate) offline_sync_completed: Arc<AtomicBool>,
183    /// Metrics for granular offline sync logging
184    pub(crate) offline_sync_metrics: Arc<OfflineSyncMetrics>,
185    /// Metrics for tracking the effectiveness of local re-queue optimization
186    pub(crate) retry_metrics: Arc<RetryMetrics>,
187    /// Notifier for when the noise socket is established (before login).
188    /// Use this to wait for the socket to be ready for sending messages.
189    pub(crate) socket_ready_notifier: Arc<Notify>,
190    /// Notifier for when the client is fully connected and logged in.
191    /// Triggered after Event::Connected is dispatched.
192    pub(crate) connected_notifier: Arc<Notify>,
193    pub(crate) major_sync_task_sender: mpsc::Sender<MajorSyncTask>,
194    pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<watch::Sender<()>>>>,
195
196    /// State machine for pair code authentication flow.
197    /// Tracks the pending pair code request and ephemeral keys.
198    pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
199
200    /// Pool for reusing plaintext marshal buffers.
201    /// Note: encrypted buffers are not pooled since they're moved to transport (zero-copy).
202    pub(crate) plaintext_buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
203
204    /// Custom handlers for encrypted message types
205    pub custom_enc_handlers: Arc<DashMap<String, Arc<dyn EncHandler>>>,
206
207    /// Chat state (typing indicator) handlers registered by external consumers.
208    /// Each handler receives a `ChatStateEvent` describing the chat, optional participant and state.
209    pub(crate) chatstate_handlers: Arc<RwLock<Vec<ChatStateHandler>>>,
210
211    /// Cache for pending PDO (Peer Data Operation) requests.
212    /// Maps message cache keys (chat:id) to pending request info.
213    pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
214
215    /// LRU cache for device registry (matches WhatsApp Web's 5000 entry limit).
216    /// Maps user ID to DeviceListRecord for fast device existence checks.
217    /// Backed by persistent storage.
218    pub(crate) device_registry_cache: Cache<String, wacore::store::traits::DeviceListRecord>,
219
220    /// Router for dispatching stanzas to their appropriate handlers
221    pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
222
223    /// Whether to send ACKs synchronously or in a background task
224    pub(crate) synchronous_ack: bool,
225
226    /// HTTP client for making HTTP requests (media upload/download, version fetching)
227    pub http_client: Arc<dyn crate::http::HttpClient>,
228
229    /// Version override for testing or manual specification
230    pub(crate) override_version: Option<(u32, u32, u32)>,
231
232    /// When true, history sync notifications are acknowledged but not downloaded
233    /// or processed. Set via `BotBuilder::skip_history_sync()`.
234    pub(crate) skip_history_sync: AtomicBool,
235}
236
237impl Client {
238    /// Enable or disable skipping of history sync notifications at runtime.
239    ///
240    /// When enabled, the client will acknowledge incoming history sync
241    /// notifications but will not download or process the data.
242    pub fn set_skip_history_sync(&self, enabled: bool) {
243        self.skip_history_sync.store(enabled, Ordering::Relaxed);
244    }
245
246    /// Returns `true` if history sync notifications are currently being skipped.
247    pub fn skip_history_sync_enabled(&self) -> bool {
248        self.skip_history_sync.load(Ordering::Relaxed)
249    }
250
251    pub async fn new(
252        persistence_manager: Arc<PersistenceManager>,
253        transport_factory: Arc<dyn crate::transport::TransportFactory>,
254        http_client: Arc<dyn crate::http::HttpClient>,
255        override_version: Option<(u32, u32, u32)>,
256    ) -> (Arc<Self>, mpsc::Receiver<MajorSyncTask>) {
257        let mut unique_id_bytes = [0u8; 2];
258        rand::rng().fill_bytes(&mut unique_id_bytes);
259
260        let device_snapshot = persistence_manager.get_device_snapshot().await;
261        let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
262
263        let (tx, rx) = mpsc::channel(32);
264
265        let this = Self {
266            core,
267            persistence_manager: persistence_manager.clone(),
268            media_conn: Arc::new(RwLock::new(None)),
269            is_logged_in: Arc::new(AtomicBool::new(false)),
270            is_connecting: Arc::new(AtomicBool::new(false)),
271            is_running: Arc::new(AtomicBool::new(false)),
272            shutdown_notifier: Arc::new(Notify::new()),
273
274            transport: Arc::new(Mutex::new(None)),
275            transport_events: Arc::new(Mutex::new(None)),
276            transport_factory,
277            noise_socket: Arc::new(Mutex::new(None)),
278
279            response_waiters: Arc::new(Mutex::new(HashMap::new())),
280            unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
281            id_counter: Arc::new(AtomicU64::new(0)),
282            unified_session: crate::unified_session::UnifiedSessionManager::new(),
283
284            session_locks: Cache::builder()
285                .time_to_live(Duration::from_secs(300)) // 5 minute TTL
286                .max_capacity(10_000) // Limit to 10k concurrent sessions
287                .build(),
288            message_queues: Cache::builder()
289                .time_to_live(Duration::from_secs(300)) // Idle queues expire after 5 mins
290                .max_capacity(10_000) // Limit to 10k concurrent chats
291                .build(),
292            lid_pn_cache: Arc::new(LidPnCache::new()),
293            message_enqueue_locks: Cache::builder()
294                .time_to_live(Duration::from_secs(300))
295                .max_capacity(10_000)
296                .build(),
297            group_cache: OnceCell::new(),
298            device_cache: OnceCell::new(),
299            retried_group_messages: Cache::builder()
300                .time_to_live(Duration::from_secs(300))
301                .max_capacity(2_000)
302                .build(),
303
304            expected_disconnect: Arc::new(AtomicBool::new(false)),
305            connection_generation: Arc::new(AtomicU64::new(0)),
306
307            // Recent messages cache for retry functionality
308            // TTL of 5 minutes (retries don't happen after that)
309            // Max 1000 messages to bound memory usage
310            recent_messages: Cache::builder()
311                .time_to_live(Duration::from_secs(300))
312                .max_capacity(1_000)
313                .build(),
314
315            pending_retries: Arc::new(Mutex::new(HashSet::new())),
316
317            // Retry count tracking cache for preventing infinite retry loops.
318            // TTL of 5 minutes to match retry functionality, max 5000 entries.
319            message_retry_counts: Cache::builder()
320                .time_to_live(Duration::from_secs(300))
321                .max_capacity(5_000)
322                .build(),
323
324            // Local retry cache for out-of-order message tolerance
325            // 10s TTL is sufficient for packet reordering
326            local_retry_cache: Cache::builder()
327                .time_to_live(Duration::from_secs(10))
328                .max_capacity(5_000)
329                .build(),
330
331            offline_sync_metrics: Arc::new(OfflineSyncMetrics {
332                active: AtomicBool::new(false),
333                total_messages: AtomicUsize::new(0),
334                processed_messages: AtomicUsize::new(0),
335                start_time: std::sync::Mutex::new(None),
336            }),
337
338            retry_metrics: Arc::new(RetryMetrics {
339                local_requeue_attempts: AtomicUsize::new(0),
340                local_requeue_success: AtomicUsize::new(0),
341                local_requeue_fallback: AtomicUsize::new(0),
342            }),
343
344            enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
345            auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
346            last_successful_connect: Arc::new(Mutex::new(None)),
347
348            needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
349
350            app_state_processor: OnceCell::new(),
351            app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
352            app_state_syncing: Arc::new(Mutex::new(HashSet::new())),
353            initial_keys_synced_notifier: Arc::new(Notify::new()),
354            initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
355            offline_sync_notifier: Arc::new(Notify::new()),
356            offline_sync_completed: Arc::new(AtomicBool::new(false)),
357            socket_ready_notifier: Arc::new(Notify::new()),
358            connected_notifier: Arc::new(Notify::new()),
359            major_sync_task_sender: tx,
360            pairing_cancellation_tx: Arc::new(Mutex::new(None)),
361            pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
362            plaintext_buffer_pool: Arc::new(Mutex::new(Vec::with_capacity(4))),
363            custom_enc_handlers: Arc::new(DashMap::new()),
364            chatstate_handlers: Arc::new(RwLock::new(Vec::new())),
365            pdo_pending_requests: crate::pdo::new_pdo_cache(),
366            device_registry_cache: Cache::builder()
367                .max_capacity(5_000) // Match WhatsApp Web's 5000 entry limit
368                .time_to_live(Duration::from_secs(3600)) // 1 hour TTL
369                .build(),
370            stanza_router: Self::create_stanza_router(),
371            synchronous_ack: false,
372            http_client,
373            override_version,
374            skip_history_sync: AtomicBool::new(false),
375        };
376
377        let arc = Arc::new(this);
378
379        // Warm up the LID-PN cache from persistent storage
380        let warm_up_arc = arc.clone();
381        tokio::spawn(async move {
382            if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
383                warn!("Failed to warm up LID-PN cache: {e}");
384            }
385        });
386
387        // Start background task to clean up stale device registry entries
388        let cleanup_arc = arc.clone();
389        tokio::spawn(async move {
390            cleanup_arc.device_registry_cleanup_loop().await;
391        });
392
393        (arc, rx)
394    }
395
396    pub(crate) async fn get_group_cache(&self) -> &Cache<Jid, GroupInfo> {
397        self.group_cache
398            .get_or_init(|| async {
399                debug!("Initializing Group Cache for the first time.");
400                Cache::builder()
401                    .time_to_live(Duration::from_secs(3600))
402                    .max_capacity(1_000)
403                    .build()
404            })
405            .await
406    }
407
408    pub(crate) async fn get_device_cache(&self) -> &Cache<Jid, Vec<Jid>> {
409        self.device_cache
410            .get_or_init(|| async {
411                debug!("Initializing Device Cache for the first time.");
412                Cache::builder()
413                    .time_to_live(Duration::from_secs(3600))
414                    .max_capacity(5_000)
415                    .build()
416            })
417            .await
418    }
419
420    pub(crate) async fn get_app_state_processor(&self) -> &AppStateProcessor {
421        self.app_state_processor
422            .get_or_init(|| async {
423                debug!("Initializing AppStateProcessor for the first time.");
424                AppStateProcessor::new(self.persistence_manager.backend())
425            })
426            .await
427    }
428
429    /// Create and configure the stanza router with all the handlers.
430    fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
431        use crate::handlers::{
432            basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
433            chatstate::ChatstateHandler,
434            ib::IbHandler,
435            iq::IqHandler,
436            message::MessageHandler,
437            notification::NotificationHandler,
438            receipt::ReceiptHandler,
439            router::StanzaRouter,
440            unimplemented::UnimplementedHandler,
441        };
442
443        let mut router = StanzaRouter::new();
444
445        // Register all handlers
446        router.register(Arc::new(MessageHandler));
447        router.register(Arc::new(ReceiptHandler));
448        router.register(Arc::new(IqHandler));
449        router.register(Arc::new(SuccessHandler));
450        router.register(Arc::new(FailureHandler));
451        router.register(Arc::new(StreamErrorHandler));
452        router.register(Arc::new(IbHandler));
453        router.register(Arc::new(NotificationHandler));
454        router.register(Arc::new(AckHandler));
455        router.register(Arc::new(ChatstateHandler));
456
457        // Register unimplemented handlers
458        router.register(Arc::new(UnimplementedHandler::for_call()));
459        router.register(Arc::new(UnimplementedHandler::for_presence()));
460
461        router
462    }
463
464    /// Registers an external event handler to the core event bus.
465    pub fn register_handler(&self, handler: Arc<dyn wacore::types::events::EventHandler>) {
466        self.core.event_bus.add_handler(handler);
467    }
468
469    /// Register a chatstate handler which will be invoked when a `<chatstate>` stanza is received.
470    ///
471    /// The handler receives a `ChatStateEvent` with the parsed chat state information.
472    pub async fn register_chatstate_handler(
473        &self,
474        handler: Arc<dyn Fn(ChatStateEvent) + Send + Sync>,
475    ) {
476        self.chatstate_handlers.write().await.push(handler);
477    }
478
479    /// Dispatch a parsed chatstate stanza to registered handlers.
480    ///
481    /// Called by `ChatstateHandler` after parsing the incoming stanza.
482    pub(crate) async fn dispatch_chatstate_event(
483        &self,
484        stanza: wacore::iq::chatstate::ChatstateStanza,
485    ) {
486        let event = ChatStateEvent::from_stanza(stanza);
487
488        // Invoke handlers asynchronously
489        let handlers = self.chatstate_handlers.read().await.clone();
490        for handler in handlers {
491            let event_clone = event.clone();
492            let handler_clone = handler.clone();
493            tokio::spawn(async move {
494                (handler_clone)(event_clone);
495            });
496        }
497    }
498
499    pub async fn run(self: &Arc<Self>) {
500        if self.is_running.swap(true, Ordering::SeqCst) {
501            warn!("Client `run` method called while already running.");
502            return;
503        }
504        while self.is_running.load(Ordering::Relaxed) {
505            self.expected_disconnect.store(false, Ordering::Relaxed);
506
507            if self.connect().await.is_err() {
508                error!("Failed to connect, will retry...");
509            } else {
510                if self.read_messages_loop().await.is_err() {
511                    warn!(
512                        "Message loop exited with an error. Will attempt to reconnect if enabled."
513                    );
514                } else if self.expected_disconnect.load(Ordering::Relaxed) {
515                    debug!("Message loop exited gracefully (expected disconnect).");
516                } else {
517                    info!("Message loop exited gracefully.");
518                }
519
520                self.cleanup_connection_state().await;
521            }
522
523            if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
524                info!("Auto-reconnect disabled, shutting down.");
525                self.is_running.store(false, Ordering::Relaxed);
526                break;
527            }
528
529            // If this was an expected disconnect (e.g., 515 after pairing), reconnect immediately
530            if self.expected_disconnect.load(Ordering::Relaxed) {
531                self.auto_reconnect_errors.store(0, Ordering::Relaxed);
532                info!("Expected disconnect (e.g., 515), reconnecting immediately...");
533                continue;
534            }
535
536            let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
537            let delay_secs = u64::from(error_count * 2).min(30);
538            let delay = Duration::from_secs(delay_secs);
539            info!(
540                "Will attempt to reconnect in {:?} (attempt {})",
541                delay,
542                error_count + 1
543            );
544            sleep(delay).await;
545        }
546        info!("Client run loop has shut down.");
547    }
548
549    pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
550        if self.is_connecting.swap(true, Ordering::SeqCst) {
551            return Err(ClientError::AlreadyConnected.into());
552        }
553
554        let _guard = scopeguard::guard((), |_| {
555            self.is_connecting.store(false, Ordering::Relaxed);
556        });
557
558        if self.is_connected() {
559            return Err(ClientError::AlreadyConnected.into());
560        }
561
562        // Reset login state for new connection attempt. This ensures that
563        // handle_success will properly process the <success> stanza even if
564        // a previous connection's post-login task bailed out early.
565        self.is_logged_in.store(false, Ordering::Relaxed);
566        self.offline_sync_completed.store(false, Ordering::Relaxed);
567
568        let version_future = crate::version::resolve_and_update_version(
569            &self.persistence_manager,
570            &self.http_client,
571            self.override_version,
572        );
573
574        let transport_future = self.transport_factory.create_transport();
575
576        debug!("Connecting WebSocket and fetching latest client version in parallel...");
577        let (version_result, transport_result) = tokio::join!(version_future, transport_future);
578
579        version_result.map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
580        let (transport, mut transport_events) = transport_result?;
581        debug!("Version fetch and transport connection established.");
582
583        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
584
585        let noise_socket =
586            handshake::do_handshake(&device_snapshot, transport.clone(), &mut transport_events)
587                .await?;
588
589        *self.transport.lock().await = Some(transport);
590        *self.transport_events.lock().await = Some(transport_events);
591        *self.noise_socket.lock().await = Some(noise_socket);
592
593        // Notify waiters that socket is ready (before login)
594        self.socket_ready_notifier.notify_waiters();
595
596        let client_clone = self.clone();
597        tokio::spawn(async move { client_clone.keepalive_loop().await });
598
599        Ok(())
600    }
601
602    pub async fn disconnect(self: &Arc<Self>) {
603        info!("Disconnecting client intentionally.");
604        self.expected_disconnect.store(true, Ordering::Relaxed);
605        self.is_running.store(false, Ordering::Relaxed);
606        self.shutdown_notifier.notify_waiters();
607
608        if let Some(transport) = self.transport.lock().await.as_ref() {
609            transport.disconnect().await;
610        }
611        self.cleanup_connection_state().await;
612    }
613
614    async fn cleanup_connection_state(&self) {
615        self.is_logged_in.store(false, Ordering::Relaxed);
616        *self.transport.lock().await = None;
617        *self.transport_events.lock().await = None;
618        *self.noise_socket.lock().await = None;
619        self.retried_group_messages.invalidate_all();
620        // Reset offline sync state for next connection
621        self.offline_sync_completed.store(false, Ordering::Relaxed);
622    }
623
624    async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
625        debug!("Starting message processing loop...");
626
627        let mut rx_guard = self.transport_events.lock().await;
628        let transport_events = rx_guard
629            .take()
630            .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
631        drop(rx_guard);
632
633        // Frame decoder to parse incoming data
634        let mut frame_decoder = wacore::framing::FrameDecoder::new();
635
636        loop {
637            tokio::select! {
638                    biased;
639                    _ = self.shutdown_notifier.notified() => {
640                        debug!("Shutdown signaled in message loop. Exiting message loop.");
641                        return Ok(());
642                    },
643                    event_result = transport_events.recv() => {
644                        match event_result {
645                            Ok(crate::transport::TransportEvent::DataReceived(data)) => {
646                                // Feed data into the frame decoder
647                                frame_decoder.feed(&data);
648
649                                // Process all complete frames
650                                // Note: Frame decryption must be sequential (noise protocol counter),
651                                // but we spawn node processing concurrently after decryption
652                                while let Some(encrypted_frame) = frame_decoder.decode_frame() {
653                                    // Decrypt the frame synchronously (required for noise counter ordering)
654                                    if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
655                                        // Handle critical nodes synchronously to avoid race conditions.
656                                        // <success> must be processed inline to ensure is_logged_in state
657                                        // is set before checking expected_disconnect or spawning other tasks.
658                                        let is_critical = matches!(node.tag.as_str(), "success" | "failure" | "stream:error");
659
660                                        if is_critical {
661                                            // Process critical nodes inline
662                                            self.process_decrypted_node(node).await;
663                                        } else {
664                                            // Spawn non-critical node processing as a separate task
665                                            // to allow concurrent handling (Signal protocol work, etc.)
666                                            let client = self.clone();
667                                            tokio::spawn(async move {
668                                                client.process_decrypted_node(node).await;
669                                            });
670                                        }
671                                    }
672
673                                    // Check if we should exit after processing (e.g., after 515 stream error)
674                                    if self.expected_disconnect.load(Ordering::Relaxed) {
675                                        debug!("Expected disconnect signaled during frame processing. Exiting message loop.");
676                                        return Ok(());
677                                    }
678                                }
679                            },
680                            Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
681                                self.cleanup_connection_state().await;
682                                 if !self.expected_disconnect.load(Ordering::Relaxed) {
683                                    self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
684                                    debug!("Transport disconnected unexpectedly.");
685                                    return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
686                                } else {
687                                    debug!("Transport disconnected as expected.");
688                                    return Ok(());
689                                }
690                            }
691                            Ok(crate::transport::TransportEvent::Connected) => {
692                                // Already handled during handshake, but could be useful for logging
693                                debug!("Transport connected event received");
694                            }
695                    }
696                }
697            }
698        }
699    }
700
701    /// Decrypt a frame and return the parsed node.
702    /// This must be called sequentially due to noise protocol counter requirements.
703    pub(crate) async fn decrypt_frame(
704        self: &Arc<Self>,
705        encrypted_frame: &bytes::Bytes,
706    ) -> Option<wacore_binary::node::Node> {
707        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
708        let noise_socket = match noise_socket_arc {
709            Some(s) => s,
710            None => {
711                log::error!("Cannot process frame: not connected (no noise socket)");
712                return None;
713            }
714        };
715
716        let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
717            Ok(p) => p,
718            Err(e) => {
719                log::error!("Failed to decrypt frame: {e}");
720                return None;
721            }
722        };
723
724        let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
725            Ok(data) => data,
726            Err(e) => {
727                log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
728                return None;
729            }
730        };
731
732        match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
733            Ok(node_ref) => Some(node_ref.to_owned()),
734            Err(e) => {
735                log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
736                None
737            }
738        }
739    }
740
741    /// Process an already-decrypted node.
742    /// This can be spawned concurrently since it doesn't depend on noise protocol state.
743    /// The node is wrapped in Arc to avoid cloning when passing through handlers.
744    pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
745        // Wrap in Arc once - all handlers will share this same allocation
746        let node_arc = Arc::new(node);
747        self.process_node(node_arc).await;
748    }
749
750    /// Process a node wrapped in Arc. Handlers receive the Arc and can share/store it cheaply.
751    pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
752        use wacore::xml::DisplayableNode;
753
754        // --- Offline Sync Tracking ---
755        if node.tag.as_str() == "ib" {
756            // Check for offline_preview child to get expected count
757            if let Some(preview) = node.get_optional_child("offline_preview") {
758                let count: usize = preview
759                    .attrs
760                    .get("count")
761                    .and_then(|v| v.as_str())
762                    .and_then(|s| s.parse().ok())
763                    .unwrap_or(0);
764
765                if count == 0 {
766                    self.offline_sync_metrics
767                        .active
768                        .store(false, Ordering::Release);
769                    debug!(target: "Client/OfflineSync", "Sync COMPLETED: 0 items.");
770                } else {
771                    // Use stronger memory ordering for state transitions
772                    self.offline_sync_metrics
773                        .total_messages
774                        .store(count, Ordering::Release);
775                    self.offline_sync_metrics
776                        .processed_messages
777                        .store(0, Ordering::Release);
778                    self.offline_sync_metrics
779                        .active
780                        .store(true, Ordering::Release);
781                    match self.offline_sync_metrics.start_time.lock() {
782                        Ok(mut guard) => *guard = Some(std::time::Instant::now()),
783                        Err(poison) => *poison.into_inner() = Some(std::time::Instant::now()),
784                    }
785                    debug!(target: "Client/OfflineSync", "Sync STARTED: Expecting {} items.", count);
786                }
787            } else if self.offline_sync_metrics.active.load(Ordering::Acquire)
788                && node.get_optional_child("offline").is_some()
789            {
790                // Handle end marker: <ib><offline count="N"/> signals sync completion
791                // Only <ib> with an <offline> child is a real end marker.
792                // Other <ib> children (thread_metadata, edge_routing, dirty) are NOT end markers.
793                let processed = self
794                    .offline_sync_metrics
795                    .processed_messages
796                    .load(Ordering::Acquire);
797                let elapsed = match self.offline_sync_metrics.start_time.lock() {
798                    Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
799                    Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
800                };
801                debug!(target: "Client/OfflineSync", "Sync COMPLETED: End marker received. Processed {} items in {:.2?}.", processed, elapsed);
802                self.offline_sync_metrics
803                    .active
804                    .store(false, Ordering::Release);
805            }
806        }
807
808        // Track progress if active
809        if self.offline_sync_metrics.active.load(Ordering::Acquire) {
810            // Check for 'offline' attribute on relevant stanzas
811            if node.attrs.contains_key("offline") {
812                let processed = self
813                    .offline_sync_metrics
814                    .processed_messages
815                    .fetch_add(1, Ordering::Release)
816                    + 1;
817                let total = self
818                    .offline_sync_metrics
819                    .total_messages
820                    .load(Ordering::Acquire);
821
822                if processed.is_multiple_of(50) || processed == total {
823                    trace!(target: "Client/OfflineSync", "Sync Progress: {}/{}", processed, total);
824                }
825
826                if processed >= total {
827                    let elapsed = match self.offline_sync_metrics.start_time.lock() {
828                        Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
829                        Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
830                    };
831                    debug!(target: "Client/OfflineSync", "Sync COMPLETED: Processed {} items in {:.2?}.", processed, elapsed);
832                    self.offline_sync_metrics
833                        .active
834                        .store(false, Ordering::Release);
835                }
836            }
837        }
838        // --- End Tracking ---
839
840        if node.tag.as_str() == "iq"
841            && let Some(sync_node) = node.get_optional_child("sync")
842            && let Some(collection_node) = sync_node.get_optional_child("collection")
843        {
844            let name = collection_node
845                .attrs()
846                .optional_string("name")
847                .unwrap_or("<unknown>");
848            debug!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
849        } else {
850            debug!(target: "Client/Recv","{}", DisplayableNode(&node));
851        }
852
853        // Prepare deferred ACK cancellation flag (sent after dispatch unless cancelled)
854        let mut cancelled = false;
855
856        if node.tag.as_str() == "xmlstreamend" {
857            if self.expected_disconnect.load(Ordering::Relaxed) {
858                debug!("Received <xmlstreamend/>, expected disconnect.");
859            } else {
860                warn!("Received <xmlstreamend/>, treating as disconnect.");
861            }
862            self.shutdown_notifier.notify_waiters();
863            return;
864        }
865
866        if node.tag.as_str() == "iq"
867            && let Some(id) = node.attrs.get("id").and_then(|v| v.as_str())
868        {
869            let has_waiter = self.response_waiters.lock().await.contains_key(id);
870            if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
871                return;
872            }
873        }
874
875        // Dispatch to appropriate handler using the router
876        // Clone Arc (cheap - just reference count) not the Node itself
877        if !self
878            .stanza_router
879            .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
880            .await
881        {
882            warn!(
883                "Received unknown top-level node: {}",
884                DisplayableNode(&node)
885            );
886        }
887
888        // Send the deferred ACK if applicable and not cancelled by handler
889        if self.should_ack(&node) && !cancelled {
890            self.maybe_deferred_ack(node).await;
891        }
892    }
893
894    /// Determine if a Node should be acknowledged with <ack/>.
895    fn should_ack(&self, node: &Node) -> bool {
896        matches!(
897            node.tag.as_str(),
898            "message" | "receipt" | "notification" | "call"
899        ) && node.attrs.contains_key("id")
900            && node.attrs.contains_key("from")
901    }
902
903    /// Possibly send a deferred ack: either immediately or via spawned task.
904    /// Handlers can cancel by setting `cancelled` to true.
905    /// Uses Arc<Node> to avoid cloning when spawning the async task.
906    async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
907        if self.synchronous_ack {
908            if let Err(e) = self.send_ack_for(&node).await {
909                warn!("Failed to send ack: {e:?}");
910            }
911        } else {
912            let this = self.clone();
913            // Node is already in Arc - just clone the Arc (cheap), not the Node
914            tokio::spawn(async move {
915                if let Err(e) = this.send_ack_for(&node).await {
916                    warn!("Failed to send ack: {e:?}");
917                }
918            });
919        }
920    }
921
922    /// Build and send an <ack/> node corresponding to the given stanza.
923    async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
924        if !self.is_connected() || self.expected_disconnect.load(Ordering::Relaxed) {
925            return Ok(());
926        }
927        let id = match node.attrs.get("id") {
928            Some(v) => v.clone(),
929            None => return Ok(()),
930        };
931        let from = match node.attrs.get("from") {
932            Some(v) => v.clone(),
933            None => return Ok(()),
934        };
935        let participant = node.attrs.get("participant").cloned();
936        let typ = if node.tag != "message" {
937            node.attrs.get("type").cloned()
938        } else {
939            None
940        };
941        let mut attrs = Attrs::new();
942        attrs.insert("class".to_string(), node.tag.clone());
943        attrs.insert("id".to_string(), id);
944        attrs.insert("to".to_string(), from);
945        if let Some(p) = participant {
946            attrs.insert("participant".to_string(), p);
947        }
948        if let Some(t) = typ {
949            attrs.insert("type".to_string(), t);
950        }
951        let ack = Node {
952            tag: "ack".to_string(),
953            attrs,
954            content: None,
955        };
956        self.send_node(ack).await
957    }
958
959    pub(crate) async fn handle_unimplemented(&self, tag: &str) {
960        warn!("TODO: Implement handler for <{tag}>");
961    }
962
963    pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
964        use wacore::iq::passive::PassiveModeSpec;
965        self.execute(PassiveModeSpec::new(passive)).await
966    }
967
968    pub async fn clean_dirty_bits(
969        &self,
970        type_: &str,
971        timestamp: Option<&str>,
972    ) -> Result<(), crate::request::IqError> {
973        use wacore::iq::dirty::CleanDirtyBitsSpec;
974
975        let spec = CleanDirtyBitsSpec::single(type_, timestamp)?;
976        self.execute(spec).await
977    }
978
979    pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
980        use wacore::iq::props::PropsSpec;
981        use wacore::store::commands::DeviceCommand;
982
983        let stored_hash = self
984            .persistence_manager
985            .get_device_snapshot()
986            .await
987            .props_hash
988            .clone();
989
990        let spec = match &stored_hash {
991            Some(hash) => {
992                debug!("Fetching props with hash for delta update...");
993                PropsSpec::with_hash(hash)
994            }
995            None => {
996                debug!("Fetching props (full, no stored hash)...");
997                PropsSpec::new()
998            }
999        };
1000
1001        let response = self.execute(spec).await?;
1002
1003        if response.delta_update {
1004            debug!(
1005                "Props delta update received ({} changed props)",
1006                response.props.len()
1007            );
1008        } else {
1009            debug!(
1010                "Props full update received ({} props, hash={:?})",
1011                response.props.len(),
1012                response.hash
1013            );
1014        }
1015
1016        if let Some(new_hash) = response.hash {
1017            self.persistence_manager
1018                .process_command(DeviceCommand::SetPropsHash(Some(new_hash)))
1019                .await;
1020        }
1021
1022        Ok(())
1023    }
1024
1025    pub async fn fetch_privacy_settings(
1026        &self,
1027    ) -> Result<wacore::iq::privacy::PrivacySettingsResponse, crate::request::IqError> {
1028        use wacore::iq::privacy::PrivacySettingsSpec;
1029
1030        debug!("Fetching privacy settings...");
1031
1032        self.execute(PrivacySettingsSpec::new()).await
1033    }
1034
1035    pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
1036        use wacore::iq::prekeys::DigestKeyBundleSpec;
1037
1038        debug!("Sending digest key bundle...");
1039
1040        self.execute(DigestKeyBundleSpec::new()).await.map(|_| ())
1041    }
1042
1043    pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
1044        // Skip processing if an expected disconnect is pending (e.g., 515 received).
1045        // This prevents race conditions where a spawned success handler runs after
1046        // cleanup_connection_state has already reset is_logged_in.
1047        if self.expected_disconnect.load(Ordering::Relaxed) {
1048            debug!("Ignoring <success> stanza: expected disconnect pending");
1049            return;
1050        }
1051
1052        // Guard against multiple <success> stanzas (WhatsApp may send more than one during
1053        // routing/reconnection). Only process the first one per connection.
1054        if self.is_logged_in.swap(true, Ordering::SeqCst) {
1055            debug!("Ignoring duplicate <success> stanza (already logged in)");
1056            return;
1057        }
1058
1059        // Increment connection generation to invalidate any stale post-login tasks
1060        // from previous connections (e.g., during 515 reconnect cycles).
1061        let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
1062
1063        info!(
1064            "Successfully authenticated with WhatsApp servers! (gen={})",
1065            current_generation
1066        );
1067        *self.last_successful_connect.lock().await = Some(chrono::Utc::now());
1068        self.auto_reconnect_errors.store(0, Ordering::Relaxed);
1069
1070        self.update_server_time_offset(node);
1071
1072        if let Some(lid_value) = node.attrs.get("lid") {
1073            if let Some(lid) = lid_value.to_jid() {
1074                let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1075                if device_snapshot.lid.as_ref() != Some(&lid) {
1076                    debug!("Updating LID from server to '{lid}'");
1077                    self.persistence_manager
1078                        .process_command(DeviceCommand::SetLid(Some(lid)))
1079                        .await;
1080                }
1081            } else {
1082                warn!("Failed to parse LID from success stanza: {lid_value}");
1083            }
1084        } else {
1085            warn!("LID not found in <success> stanza. Group messaging may fail.");
1086        }
1087
1088        let client_clone = self.clone();
1089        let task_generation = current_generation;
1090        tokio::spawn(async move {
1091            // Macro to check if this task is still valid (connection hasn't been replaced)
1092            macro_rules! check_generation {
1093                () => {
1094                    if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
1095                    {
1096                        debug!("Post-login task cancelled: connection generation changed");
1097                        return;
1098                    }
1099                };
1100            }
1101
1102            debug!(
1103                "Starting post-login initialization sequence (gen={})...",
1104                task_generation
1105            );
1106
1107            // Check if we need initial app state sync (empty pushname indicates fresh pairing
1108            // where pushname will come from app state sync's setting_pushName mutation)
1109            let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1110            let needs_pushname_from_sync = device_snapshot.push_name.is_empty();
1111            if needs_pushname_from_sync {
1112                debug!("Push name is empty - will be set from app state sync (setting_pushName)");
1113            }
1114
1115            // Check connection before network operations.
1116            // During pairing, a 515 disconnect happens quickly after success,
1117            // so the socket may already be gone.
1118            if !client_clone.is_connected() {
1119                debug!(
1120                    "Skipping post-login init: connection closed (likely pairing phase reconnect)"
1121                );
1122                return;
1123            }
1124
1125            check_generation!();
1126            client_clone.send_unified_session().await;
1127
1128            // === Establish session with primary phone for PDO ===
1129            // This must happen BEFORE we exit passive mode (before offline messages arrive).
1130            // PDO needs a session with device 0 to request decrypted content from our phone.
1131            // Matches WhatsApp Web's bootstrapDeviceCapabilities() pattern.
1132            check_generation!();
1133            if let Err(e) = client_clone
1134                .establish_primary_phone_session_immediate()
1135                .await
1136            {
1137                warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
1138                // Don't fail login - PDO will retry via ensure_e2e_sessions fallback
1139            }
1140
1141            // === Passive Tasks (mimics WhatsApp Web's PassiveTaskManager) ===
1142            // WhatsApp Web executes passive tasks (like PreKey upload) BEFORE sending the active IQ.
1143            check_generation!();
1144            if let Err(e) = client_clone.upload_pre_keys().await {
1145                warn!("Failed to upload pre-keys during startup: {e:?}");
1146            }
1147
1148            // === Send active IQ ===
1149            // The server sends <ib><offline count="X"/></ib> AFTER we exit passive mode.
1150            // This matches WhatsApp Web's behavior: executePassiveTasks() -> sendPassiveModeProtocol("active")
1151            check_generation!();
1152            if let Err(e) = client_clone.set_passive(false).await {
1153                warn!("Failed to send post-connect active IQ: {e:?}");
1154            }
1155
1156            // === Wait for offline sync to complete ===
1157            // The server sends <ib><offline count="X"/></ib> after we exit passive mode.
1158            // Use a timeout to handle cases where the server doesn't send offline ib
1159            // (e.g., during initial pairing or if there are no offline messages).
1160            const OFFLINE_SYNC_TIMEOUT_SECS: u64 = 5;
1161
1162            if !client_clone.offline_sync_completed.load(Ordering::Relaxed) {
1163                debug!(
1164                    "Waiting for offline sync to complete (up to {}s)...",
1165                    OFFLINE_SYNC_TIMEOUT_SECS
1166                );
1167                let wait_result = tokio::time::timeout(
1168                    Duration::from_secs(OFFLINE_SYNC_TIMEOUT_SECS),
1169                    client_clone.offline_sync_notifier.notified(),
1170                )
1171                .await;
1172
1173                // Check if connection was replaced while waiting
1174                check_generation!();
1175
1176                if wait_result.is_err() {
1177                    debug!("Offline sync wait timed out, proceeding with passive tasks");
1178                } else {
1179                    debug!("Offline sync completed, proceeding with passive tasks");
1180                }
1181            }
1182
1183            // Re-check connection and generation before sending presence
1184            check_generation!();
1185            if !client_clone.is_connected() {
1186                debug!("Skipping presence: connection closed");
1187                return;
1188            }
1189
1190            // Background initialization queries (can run in parallel, non-blocking)
1191            let bg_client = client_clone.clone();
1192            let bg_generation = task_generation;
1193            tokio::spawn(async move {
1194                // Check connection and generation before starting background queries
1195                if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1196                    debug!("Skipping background init queries: connection generation changed");
1197                    return;
1198                }
1199                if !bg_client.is_connected() {
1200                    debug!("Skipping background init queries: connection closed");
1201                    return;
1202                }
1203
1204                debug!(
1205                    "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1206                );
1207
1208                let props_fut = bg_client.fetch_props();
1209                let binding = bg_client.blocking();
1210                let blocklist_fut = binding.get_blocklist();
1211                let privacy_fut = bg_client.fetch_privacy_settings();
1212                let digest_fut = bg_client.send_digest_key_bundle();
1213
1214                let (r_props, r_block, r_priv, r_digest) =
1215                    tokio::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1216
1217                if let Err(e) = r_props {
1218                    warn!("Background init: Failed to fetch props: {e:?}");
1219                }
1220                if let Err(e) = r_block {
1221                    warn!("Background init: Failed to fetch blocklist: {e:?}");
1222                }
1223                if let Err(e) = r_priv {
1224                    warn!("Background init: Failed to fetch privacy settings: {e:?}");
1225                }
1226                if let Err(e) = r_digest {
1227                    warn!("Background init: Failed to send digest: {e:?}");
1228                }
1229
1230                // Prune expired tcTokens on connect (matches WhatsApp Web's PrivacyTokenJob)
1231                if let Err(e) = bg_client.tc_token().prune_expired().await {
1232                    warn!("Background init: Failed to prune expired tc_tokens: {e:?}");
1233                }
1234            });
1235
1236            check_generation!();
1237
1238            let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1239            let needs_initial_sync = flag_set || needs_pushname_from_sync;
1240
1241            if needs_initial_sync {
1242                // === Fresh pairing path ===
1243                // Like WhatsApp Web's syncCriticalData(): await critical collections before
1244                // dispatching Connected, so blocklist/privacy settings are applied first.
1245                debug!(
1246                    target: "Client/AppState",
1247                    "Starting Initial App State Sync (flag_set={flag_set}, needs_pushname={needs_pushname_from_sync})"
1248                );
1249
1250                if !client_clone
1251                    .initial_app_state_keys_received
1252                    .load(Ordering::Relaxed)
1253                {
1254                    debug!(
1255                        target: "Client/AppState",
1256                        "Waiting up to 5s for app state keys..."
1257                    );
1258                    let _ = tokio::time::timeout(
1259                        Duration::from_secs(5),
1260                        client_clone.initial_keys_synced_notifier.notified(),
1261                    )
1262                    .await;
1263
1264                    // Check if connection was replaced while waiting
1265                    check_generation!();
1266                }
1267
1268                // Await critical collections via batched IQ before dispatching Connected.
1269                check_generation!();
1270                if let Err(e) = client_clone
1271                    .sync_collections_batched(vec![
1272                        WAPatchName::CriticalBlock,
1273                        WAPatchName::CriticalUnblockLow,
1274                    ])
1275                    .await
1276                {
1277                    warn!("Failed to sync critical app state: {e}");
1278                }
1279
1280                check_generation!();
1281
1282                // Dispatch Connected after critical sync completes.
1283                // Presence is NOT sent here — WhatsApp Web sends presence from the
1284                // setting_pushName mutation handler (WAWebPushNameSync), not from
1285                // criticalSyncDone. Our setting_pushName handler already does this.
1286                client_clone
1287                    .core
1288                    .event_bus
1289                    .dispatch(&Event::Connected(crate::types::events::Connected));
1290                client_clone.connected_notifier.notify_waiters();
1291
1292                // Spawn remaining non-critical collections in background
1293                let sync_client = client_clone.clone();
1294                let sync_generation = task_generation;
1295                tokio::spawn(async move {
1296                    if sync_client.connection_generation.load(Ordering::SeqCst) != sync_generation {
1297                        debug!("App state sync cancelled: connection generation changed");
1298                        return;
1299                    }
1300
1301                    if let Err(e) = sync_client
1302                        .sync_collections_batched(vec![
1303                            WAPatchName::RegularLow,
1304                            WAPatchName::RegularHigh,
1305                            WAPatchName::Regular,
1306                        ])
1307                        .await
1308                    {
1309                        warn!("Failed to batch sync non-critical app state: {e}");
1310                    }
1311
1312                    sync_client
1313                        .needs_initial_full_sync
1314                        .store(false, Ordering::Relaxed);
1315                    debug!(target: "Client/AppState", "Initial App State Sync Completed.");
1316                });
1317            } else {
1318                // === Reconnection path ===
1319                // Pushname is already known, send presence and Connected immediately.
1320                let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1321                if !device_snapshot.push_name.is_empty() {
1322                    if let Err(e) = client_clone.presence().set_available().await {
1323                        warn!("Failed to send initial presence: {e:?}");
1324                    } else {
1325                        debug!("Initial presence sent successfully.");
1326                    }
1327                }
1328
1329                // Re-check generation after awaits to avoid dispatching Connected
1330                // for an outdated connection that was replaced mid-await.
1331                check_generation!();
1332
1333                client_clone
1334                    .core
1335                    .event_bus
1336                    .dispatch(&Event::Connected(crate::types::events::Connected));
1337                client_clone.connected_notifier.notify_waiters();
1338            }
1339        });
1340    }
1341
1342    /// Handles incoming `<ack/>` stanzas by resolving pending response waiters.
1343    ///
1344    /// If an ack with an ID that matches a pending task in `response_waiters`,
1345    /// the task is resolved and the function returns `true`. Otherwise, returns `false`.
1346    pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1347        let id_opt = node.attrs.get("id").map(|v| v.to_string_value());
1348        if let Some(id) = id_opt
1349            && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1350        {
1351            if waiter.send(node).is_err() {
1352                warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1353            }
1354            return true;
1355        }
1356        false
1357    }
1358
1359    #[allow(dead_code)] // Used by per-collection callers (e.g., critical sync gating)
1360    pub(crate) async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1361        // In-flight dedup: skip if this collection is already being synced.
1362        // Matches WA Web's WAWebSyncdCollectionsStateMachine which tracks in-flight syncs
1363        // and queues new requests to a pending set.
1364        {
1365            let mut syncing = self.app_state_syncing.lock().await;
1366            if !syncing.insert(name) {
1367                debug!(target: "Client/AppState", "Skipping sync for {:?}: already in flight", name);
1368                return Ok(());
1369            }
1370        }
1371
1372        let result = self.fetch_app_state_with_retry_inner(name).await;
1373
1374        // Always remove from in-flight set when done
1375        self.app_state_syncing.lock().await.remove(&name);
1376
1377        result
1378    }
1379
1380    #[allow(dead_code)]
1381    async fn fetch_app_state_with_retry_inner(&self, name: WAPatchName) -> anyhow::Result<()> {
1382        let mut attempt = 0u32;
1383        loop {
1384            attempt += 1;
1385            // full_sync=false lets process_app_state_sync_task auto-detect:
1386            // version 0 → snapshot (full sync), version > 0 → incremental patches.
1387            // Matches WA Web which only requests snapshot when version is undefined.
1388            let res = self.process_app_state_sync_task(name, false).await;
1389            match res {
1390                Ok(()) => return Ok(()),
1391                Err(e) => {
1392                    let es = e.to_string();
1393                    if es.contains("app state key not found") && attempt == 1 {
1394                        if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
1395                            debug!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
1396                            if tokio::time::timeout(
1397                                Duration::from_secs(10),
1398                                self.initial_keys_synced_notifier.notified(),
1399                            )
1400                            .await
1401                            .is_err()
1402                            {
1403                                warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
1404                            }
1405                        }
1406                        continue;
1407                    }
1408                    if es.contains("database is locked") && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
1409                        let backoff = Duration::from_millis(200 * attempt as u64 + 150);
1410                        warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
1411                        tokio::time::sleep(backoff).await;
1412                        continue;
1413                    }
1414                    return Err(e);
1415                }
1416            }
1417        }
1418    }
1419
1420    /// Sync multiple collections in a single IQ request, re-fetching those with `has_more_patches`.
1421    /// Matches WA Web's `serverSync()` outer loop (`3JJWKHeu5-P.js:54278-54305`).
1422    /// Max 5 iterations (WA Web's `C=5` constant).
1423    pub(crate) async fn sync_collections_batched(
1424        &self,
1425        collections: Vec<WAPatchName>,
1426    ) -> anyhow::Result<()> {
1427        if collections.is_empty() {
1428            return Ok(());
1429        }
1430
1431        // In-flight dedup: filter out collections already being synced
1432        let pending = {
1433            let mut syncing = self.app_state_syncing.lock().await;
1434            let mut filtered = Vec::with_capacity(collections.len());
1435            for name in collections {
1436                if syncing.insert(name) {
1437                    filtered.push(name);
1438                } else {
1439                    debug!(target: "Client/AppState", "Skipping {:?} in batch: already in flight", name);
1440                }
1441            }
1442            filtered
1443        };
1444
1445        if pending.is_empty() {
1446            return Ok(());
1447        }
1448
1449        // Track all collections for cleanup
1450        let all_collections: Vec<WAPatchName> = pending.clone();
1451
1452        let result = self.sync_collections_batched_inner(pending).await;
1453
1454        // Always clean up in-flight set
1455        {
1456            let mut syncing = self.app_state_syncing.lock().await;
1457            for name in &all_collections {
1458                syncing.remove(name);
1459            }
1460        }
1461
1462        result
1463    }
1464
1465    async fn sync_collections_batched_inner(
1466        &self,
1467        mut pending: Vec<WAPatchName>,
1468    ) -> anyhow::Result<()> {
1469        use wacore::appstate::patch_decode::CollectionSyncError;
1470        const MAX_ITERATIONS: usize = 5;
1471        let mut iteration = 0;
1472
1473        while !pending.is_empty() && iteration < MAX_ITERATIONS {
1474            iteration += 1;
1475            debug!(
1476                target: "Client/AppState",
1477                "Batched sync iteration {}/{}: {:?}",
1478                iteration, MAX_ITERATIONS, pending
1479            );
1480
1481            let backend = self.persistence_manager.backend();
1482
1483            // Build multi-collection IQ, tracking which collections need a snapshot
1484            let mut collection_nodes = Vec::with_capacity(pending.len());
1485            let mut was_snapshot = std::collections::HashSet::new();
1486            for &name in &pending {
1487                let state = backend.get_version(name.as_str()).await?;
1488                let want_snapshot = state.version == 0;
1489                if want_snapshot {
1490                    was_snapshot.insert(name);
1491                }
1492                let mut builder = NodeBuilder::new("collection")
1493                    .attr("name", name.as_str())
1494                    .attr(
1495                        "return_snapshot",
1496                        if want_snapshot { "true" } else { "false" },
1497                    );
1498                if !want_snapshot {
1499                    builder = builder.attr("version", state.version.to_string());
1500                }
1501                collection_nodes.push(builder.build());
1502            }
1503
1504            let sync_node = NodeBuilder::new("sync").children(collection_nodes).build();
1505            let iq = crate::request::InfoQuery {
1506                namespace: "w:sync:app:state",
1507                query_type: crate::request::InfoQueryType::Set,
1508                to: server_jid(),
1509                target: None,
1510                id: None,
1511                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
1512                timeout: None,
1513            };
1514
1515            let resp = self.send_iq(iq).await?;
1516
1517            // Pre-download all external blobs for all collections in the response
1518            let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
1519                std::collections::HashMap::new();
1520
1521            if let Ok(patch_lists) = wacore::appstate::patch_decode::parse_patch_lists(&resp) {
1522                for pl in &patch_lists {
1523                    // Download external snapshot
1524                    if let Some(ext) = &pl.snapshot_ref
1525                        && let Some(path) = &ext.direct_path
1526                    {
1527                        match self.download(ext).await {
1528                            Ok(bytes) => {
1529                                pre_downloaded.insert(path.clone(), bytes);
1530                            }
1531                            Err(e) => {
1532                                warn!(
1533                                    "Failed to download external snapshot for {:?}: {e}",
1534                                    pl.name
1535                                );
1536                            }
1537                        }
1538                    }
1539
1540                    // Download external mutations
1541                    for patch in &pl.patches {
1542                        if let Some(ext) = &patch.external_mutations
1543                            && let Some(path) = &ext.direct_path
1544                        {
1545                            match self.download(ext).await {
1546                                Ok(bytes) => {
1547                                    pre_downloaded.insert(path.clone(), bytes);
1548                                }
1549                                Err(e) => {
1550                                    let v =
1551                                        patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
1552                                    warn!(
1553                                        "Failed to download external mutations for patch v{}: {e}",
1554                                        v
1555                                    );
1556                                }
1557                            }
1558                        }
1559                    }
1560                }
1561            }
1562
1563            let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
1564                if let Some(path) = &ext.direct_path {
1565                    if let Some(bytes) = pre_downloaded.get(path) {
1566                        Ok(bytes.clone())
1567                    } else {
1568                        Err(anyhow::anyhow!(
1569                            "external blob not pre-downloaded: {}",
1570                            path
1571                        ))
1572                    }
1573                } else {
1574                    Err(anyhow::anyhow!("external blob has no directPath"))
1575                }
1576            };
1577
1578            // Parse and process all collections from the response
1579            let proc = self.get_app_state_processor().await;
1580            let results = proc.decode_multi_patch_list(&resp, &download, true).await?;
1581
1582            let mut needs_refetch = Vec::new();
1583
1584            for (mutations, new_state, list) in results {
1585                let name = list.name;
1586
1587                // Handle per-collection errors
1588                if let Some(ref err) = list.error {
1589                    match err {
1590                        CollectionSyncError::Conflict { has_more } => {
1591                            warn!(target: "Client/AppState", "Collection {:?} conflict (has_more={}), will refetch", name, has_more);
1592                            needs_refetch.push(name);
1593                            continue;
1594                        }
1595                        CollectionSyncError::Fatal { code, text } => {
1596                            warn!(target: "Client/AppState", "Collection {:?} fatal error {}: {}", name, code, text);
1597                            continue;
1598                        }
1599                        CollectionSyncError::Retry { code, text } => {
1600                            warn!(target: "Client/AppState", "Collection {:?} retryable error {}: {}, will refetch", name, code, text);
1601                            needs_refetch.push(name);
1602                            continue;
1603                        }
1604                    }
1605                }
1606
1607                // Handle missing keys
1608                let missing = match proc.get_missing_key_ids(&list).await {
1609                    Ok(v) => v,
1610                    Err(e) => {
1611                        warn!("Failed to get missing key IDs for {:?}: {}", name, e);
1612                        Vec::new()
1613                    }
1614                };
1615                if !missing.is_empty() {
1616                    let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
1617                    let mut guard = self.app_state_key_requests.lock().await;
1618                    let now = std::time::Instant::now();
1619                    for key_id in missing {
1620                        let hex_id = hex::encode(&key_id);
1621                        let should = guard
1622                            .get(&hex_id)
1623                            .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
1624                            .unwrap_or(true);
1625                        if should {
1626                            guard.insert(hex_id, now);
1627                            to_request.push(key_id);
1628                        }
1629                    }
1630                    drop(guard);
1631                    if !to_request.is_empty() {
1632                        self.request_app_state_keys(&to_request).await;
1633                    }
1634                }
1635
1636                // full_sync is true only when this collection had a snapshot
1637                // (version was 0 before sync). This prevents server_sync-triggered
1638                // incremental syncs from being incorrectly marked as full syncs.
1639                let full_sync = was_snapshot.contains(&name);
1640                for m in mutations {
1641                    self.dispatch_app_state_mutation(&m, full_sync).await;
1642                }
1643
1644                // Save version
1645                backend
1646                    .set_version(name.as_str(), new_state.clone())
1647                    .await?;
1648
1649                // Check if this collection needs more patches
1650                if list.has_more_patches {
1651                    needs_refetch.push(name);
1652                }
1653
1654                debug!(
1655                    target: "Client/AppState",
1656                    "Batched sync: {:?} done (version={}, has_more={})",
1657                    name, new_state.version, list.has_more_patches
1658                );
1659            }
1660
1661            pending = needs_refetch;
1662        }
1663
1664        if !pending.is_empty() {
1665            warn!(
1666                target: "Client/AppState",
1667                "Batched sync: max iterations ({}) reached for {:?}",
1668                MAX_ITERATIONS, pending
1669            );
1670        }
1671
1672        Ok(())
1673    }
1674
1675    pub(crate) async fn process_app_state_sync_task(
1676        &self,
1677        name: WAPatchName,
1678        full_sync: bool,
1679    ) -> anyhow::Result<()> {
1680        let backend = self.persistence_manager.backend();
1681        let mut full_sync = full_sync;
1682
1683        let mut state = backend.get_version(name.as_str()).await?;
1684        if state.version == 0 {
1685            full_sync = true;
1686        }
1687
1688        let mut has_more = true;
1689        let mut want_snapshot = full_sync;
1690        // Safety cap to prevent infinite loops if the server keeps returning
1691        // has_more_patches=true without advancing the version (WA Web uses 500).
1692        const MAX_PAGINATION_ITERATIONS: u32 = 500;
1693        let mut iteration = 0u32;
1694
1695        while has_more {
1696            iteration += 1;
1697            if iteration > MAX_PAGINATION_ITERATIONS {
1698                warn!(target: "Client/AppState", "App state sync for {:?} exceeded {} iterations, aborting", name, MAX_PAGINATION_ITERATIONS);
1699                break;
1700            }
1701            debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
1702
1703            let mut collection_builder = NodeBuilder::new("collection")
1704                .attr("name", name.as_str())
1705                .attr(
1706                    "return_snapshot",
1707                    if want_snapshot { "true" } else { "false" },
1708                );
1709            if !want_snapshot {
1710                collection_builder = collection_builder.attr("version", state.version.to_string());
1711            }
1712            let sync_node = NodeBuilder::new("sync")
1713                .children([collection_builder.build()])
1714                .build();
1715            let iq = crate::request::InfoQuery {
1716                namespace: "w:sync:app:state",
1717                query_type: crate::request::InfoQueryType::Set,
1718                to: server_jid(),
1719                target: None,
1720                id: None,
1721                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
1722                timeout: None,
1723            };
1724
1725            let resp = self.send_iq(iq).await?;
1726            debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
1727
1728            let _decode_start = std::time::Instant::now();
1729
1730            // Pre-download all external blobs (snapshot and patch mutations)
1731            // We use directPath as the key to identify each blob
1732            let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
1733                std::collections::HashMap::new();
1734
1735            if let Ok(pl) = wacore::appstate::patch_decode::parse_patch_list(&resp) {
1736                debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={} patches_count={}",
1737                    name, pl.snapshot_ref.is_some(), pl.has_more_patches, pl.patches.len());
1738
1739                // Download external snapshot if present
1740                if let Some(ext) = &pl.snapshot_ref
1741                    && let Some(path) = &ext.direct_path
1742                {
1743                    match self.download(ext).await {
1744                        Ok(bytes) => {
1745                            debug!(target: "Client/AppState", "Downloaded external snapshot ({} bytes)", bytes.len());
1746                            pre_downloaded.insert(path.clone(), bytes);
1747                        }
1748                        Err(e) => {
1749                            warn!("Failed to download external snapshot: {e}");
1750                        }
1751                    }
1752                }
1753
1754                // Download external mutations for each patch that has them
1755                for patch in &pl.patches {
1756                    if let Some(ext) = &patch.external_mutations
1757                        && let Some(path) = &ext.direct_path
1758                    {
1759                        let patch_version =
1760                            patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
1761                        match self.download(ext).await {
1762                            Ok(bytes) => {
1763                                debug!(target: "Client/AppState", "Downloaded external mutations for patch v{} ({} bytes)", patch_version, bytes.len());
1764                                pre_downloaded.insert(path.clone(), bytes);
1765                            }
1766                            Err(e) => {
1767                                warn!(
1768                                    "Failed to download external mutations for patch v{}: {e}",
1769                                    patch_version
1770                                );
1771                            }
1772                        }
1773                    }
1774                }
1775            }
1776
1777            let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
1778                if let Some(path) = &ext.direct_path {
1779                    if let Some(bytes) = pre_downloaded.get(path) {
1780                        Ok(bytes.clone())
1781                    } else {
1782                        Err(anyhow::anyhow!(
1783                            "external blob not pre-downloaded: {}",
1784                            path
1785                        ))
1786                    }
1787                } else {
1788                    Err(anyhow::anyhow!("external blob has no directPath"))
1789                }
1790            };
1791
1792            let proc = self.get_app_state_processor().await;
1793            let (mutations, new_state, list) =
1794                proc.decode_patch_list(&resp, &download, true).await?;
1795            let decode_elapsed = _decode_start.elapsed();
1796            if decode_elapsed.as_millis() > 500 {
1797                debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
1798            }
1799
1800            let missing = match proc.get_missing_key_ids(&list).await {
1801                Ok(v) => v,
1802                Err(e) => {
1803                    warn!("Failed to get missing key IDs for {:?}: {}", name, e);
1804                    Vec::new()
1805                }
1806            };
1807            if !missing.is_empty() {
1808                let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
1809                let mut guard = self.app_state_key_requests.lock().await;
1810                let now = std::time::Instant::now();
1811                for key_id in missing {
1812                    let hex_id = hex::encode(&key_id);
1813                    let should = guard
1814                        .get(&hex_id)
1815                        .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
1816                        .unwrap_or(true);
1817                    if should {
1818                        guard.insert(hex_id, now);
1819                        to_request.push(key_id);
1820                    }
1821                }
1822                drop(guard);
1823                if !to_request.is_empty() {
1824                    self.request_app_state_keys(&to_request).await;
1825                }
1826            }
1827
1828            for m in mutations {
1829                debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
1830                self.dispatch_app_state_mutation(&m, full_sync).await;
1831            }
1832
1833            state = new_state;
1834            has_more = list.has_more_patches;
1835            // After the first batch, never request a snapshot again — only incremental patches.
1836            want_snapshot = false;
1837            debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more} new_version={}", name, state.version);
1838        }
1839
1840        backend.set_version(name.as_str(), state.clone()).await?;
1841
1842        debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
1843        Ok(())
1844    }
1845
1846    async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
1847        if raw_key_ids.is_empty() {
1848            return;
1849        }
1850        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1851        let own_jid = match device_snapshot.pn.clone() {
1852            Some(j) => j,
1853            None => return,
1854        };
1855        let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
1856            .iter()
1857            .map(|k| wa::message::AppStateSyncKeyId {
1858                key_id: Some(k.clone()),
1859            })
1860            .collect();
1861        let msg = wa::Message {
1862            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1863                r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
1864                app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
1865                ..Default::default()
1866            })),
1867            ..Default::default()
1868        };
1869        if let Err(e) = self
1870            .send_message_impl(
1871                own_jid,
1872                &msg,
1873                Some(self.generate_message_id().await),
1874                true,
1875                false,
1876                None,
1877                vec![],
1878            )
1879            .await
1880        {
1881            warn!("Failed to send app state key request: {e}");
1882        }
1883    }
1884
1885    async fn dispatch_app_state_mutation(
1886        &self,
1887        m: &crate::appstate_sync::Mutation,
1888        full_sync: bool,
1889    ) {
1890        use wacore::types::events::{
1891            ArchiveUpdate, ContactUpdate, Event, MarkChatAsReadUpdate, MuteUpdate, PinUpdate,
1892        };
1893        if m.operation != wa::syncd_mutation::SyncdOperation::Set {
1894            return;
1895        }
1896        if m.index.is_empty() {
1897            return;
1898        }
1899        let kind = &m.index[0];
1900        let ts = m
1901            .action_value
1902            .as_ref()
1903            .and_then(|v| v.timestamp)
1904            .unwrap_or(0);
1905        let time = chrono::DateTime::from_timestamp_millis(ts).unwrap_or_else(chrono::Utc::now);
1906        let jid = if m.index.len() > 1 {
1907            m.index[1].parse().unwrap_or_default()
1908        } else {
1909            Jid::default()
1910        };
1911        match kind.as_str() {
1912            "setting_pushName" => {
1913                if let Some(val) = &m.action_value
1914                    && let Some(act) = &val.push_name_setting
1915                    && let Some(new_name) = &act.name
1916                {
1917                    let new_name = new_name.clone();
1918                    let bus = self.core.event_bus.clone();
1919
1920                    let snapshot = self.persistence_manager.get_device_snapshot().await;
1921                    let old = snapshot.push_name.clone();
1922                    if old != new_name {
1923                        debug!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
1924                        self.persistence_manager
1925                            .process_command(DeviceCommand::SetPushName(new_name.clone()))
1926                            .await;
1927                        bus.dispatch(&Event::SelfPushNameUpdated(
1928                            crate::types::events::SelfPushNameUpdated {
1929                                from_server: true,
1930                                old_name: old.clone(),
1931                                new_name: new_name.clone(),
1932                            },
1933                        ));
1934
1935                        // WhatsApp Web sends presence immediately when receiving pushname from
1936                        if old.is_empty() && !new_name.is_empty() {
1937                            debug!(target: "Client/AppState", "Sending presence after receiving initial pushname from app state sync");
1938                            if let Err(e) = self.presence().set_available().await {
1939                                warn!(target: "Client/AppState", "Failed to send presence after pushname sync: {e:?}");
1940                            }
1941                        }
1942                    } else {
1943                        debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
1944                    }
1945                }
1946            }
1947            "mute" => {
1948                if let Some(val) = &m.action_value
1949                    && let Some(act) = &val.mute_action
1950                {
1951                    self.core.event_bus.dispatch(&Event::MuteUpdate(MuteUpdate {
1952                        jid,
1953                        timestamp: time,
1954                        action: Box::new(*act),
1955                        from_full_sync: full_sync,
1956                    }));
1957                }
1958            }
1959            "pin" | "pin_v1" => {
1960                if let Some(val) = &m.action_value
1961                    && let Some(act) = &val.pin_action
1962                {
1963                    self.core.event_bus.dispatch(&Event::PinUpdate(PinUpdate {
1964                        jid,
1965                        timestamp: time,
1966                        action: Box::new(*act),
1967                        from_full_sync: full_sync,
1968                    }));
1969                }
1970            }
1971            "archive" => {
1972                if let Some(val) = &m.action_value
1973                    && let Some(act) = &val.archive_chat_action
1974                {
1975                    self.core
1976                        .event_bus
1977                        .dispatch(&Event::ArchiveUpdate(ArchiveUpdate {
1978                            jid,
1979                            timestamp: time,
1980                            action: Box::new(act.clone()),
1981                            from_full_sync: full_sync,
1982                        }));
1983                }
1984            }
1985            "contact" => {
1986                if let Some(val) = &m.action_value
1987                    && let Some(act) = &val.contact_action
1988                {
1989                    self.core
1990                        .event_bus
1991                        .dispatch(&Event::ContactUpdate(ContactUpdate {
1992                            jid,
1993                            timestamp: time,
1994                            action: Box::new(act.clone()),
1995                            from_full_sync: full_sync,
1996                        }));
1997                }
1998            }
1999            "mark_chat_as_read" | "markChatAsRead" => {
2000                if let Some(val) = &m.action_value
2001                    && let Some(act) = &val.mark_chat_as_read_action
2002                {
2003                    self.core.event_bus.dispatch(&Event::MarkChatAsReadUpdate(
2004                        MarkChatAsReadUpdate {
2005                            jid,
2006                            timestamp: time,
2007                            action: Box::new(act.clone()),
2008                            from_full_sync: full_sync,
2009                        },
2010                    ));
2011                }
2012            }
2013            _ => {}
2014        }
2015    }
2016
2017    async fn expect_disconnect(&self) {
2018        self.expected_disconnect.store(true, Ordering::Relaxed);
2019    }
2020
2021    pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
2022        self.is_logged_in.store(false, Ordering::Relaxed);
2023
2024        let mut attrs = node.attrs();
2025        let code = attrs.optional_string("code").unwrap_or("");
2026        let conflict_type = node
2027            .get_optional_child("conflict")
2028            .map(|n| n.attrs().optional_string("type").unwrap_or("").to_string())
2029            .unwrap_or_default();
2030
2031        if !conflict_type.is_empty() {
2032            info!(
2033                "Got stream error indicating client was removed or replaced (conflict={}). Logging out.",
2034                conflict_type
2035            );
2036            self.expect_disconnect().await;
2037            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2038
2039            let event = if conflict_type == "replaced" {
2040                Event::StreamReplaced(crate::types::events::StreamReplaced)
2041            } else {
2042                Event::LoggedOut(crate::types::events::LoggedOut {
2043                    on_connect: false,
2044                    reason: ConnectFailureReason::LoggedOut,
2045                })
2046            };
2047            self.core.event_bus.dispatch(&event);
2048
2049            let transport_opt = self.transport.lock().await.clone();
2050            if let Some(transport) = transport_opt {
2051                tokio::spawn(async move {
2052                    info!("Disconnecting transport after conflict");
2053                    transport.disconnect().await;
2054                });
2055            }
2056        } else {
2057            match code {
2058                "515" => {
2059                    // 515 is expected during registration/pairing phase - server closes stream after pairing
2060                    info!(
2061                        "Got 515 stream error, server is closing stream (expected after pairing). Will auto-reconnect."
2062                    );
2063                    self.expect_disconnect().await;
2064                    // Proactively disconnect transport since server may not close the connection
2065                    // Clone the transport Arc before spawning to avoid holding the lock
2066                    let transport_opt = self.transport.lock().await.clone();
2067                    if let Some(transport) = transport_opt {
2068                        // Spawn disconnect in background so we don't block the message loop
2069                        tokio::spawn(async move {
2070                            info!("Disconnecting transport after 515");
2071                            transport.disconnect().await;
2072                        });
2073                    }
2074                }
2075                "516" => {
2076                    info!("Got 516 stream error (device removed). Logging out.");
2077                    self.expect_disconnect().await;
2078                    self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2079                    self.core.event_bus.dispatch(&Event::LoggedOut(
2080                        crate::types::events::LoggedOut {
2081                            on_connect: false,
2082                            reason: ConnectFailureReason::LoggedOut,
2083                        },
2084                    ));
2085
2086                    let transport_opt = self.transport.lock().await.clone();
2087                    if let Some(transport) = transport_opt {
2088                        tokio::spawn(async move {
2089                            info!("Disconnecting transport after 516");
2090                            transport.disconnect().await;
2091                        });
2092                    }
2093                }
2094                "503" => {
2095                    info!("Got 503 service unavailable, will auto-reconnect.");
2096                }
2097                _ => {
2098                    error!("Unknown stream error: {}", DisplayableNode(node));
2099                    self.expect_disconnect().await;
2100                    self.core.event_bus.dispatch(&Event::StreamError(
2101                        crate::types::events::StreamError {
2102                            code: code.to_string(),
2103                            raw: Some(node.clone()),
2104                        },
2105                    ));
2106                }
2107            }
2108        }
2109
2110        info!("Notifying shutdown from stream error handler");
2111        self.shutdown_notifier.notify_waiters();
2112    }
2113
2114    pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
2115        self.expected_disconnect.store(true, Ordering::Relaxed);
2116        self.shutdown_notifier.notify_waiters();
2117
2118        let mut attrs = node.attrs();
2119        let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
2120        let reason = ConnectFailureReason::from(reason_code);
2121
2122        if reason.should_reconnect() {
2123            self.expected_disconnect.store(false, Ordering::Relaxed);
2124        } else {
2125            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2126        }
2127
2128        if reason.is_logged_out() {
2129            info!("Got {reason:?} connect failure, logging out.");
2130            self.core
2131                .event_bus
2132                .dispatch(&wacore::types::events::Event::LoggedOut(
2133                    crate::types::events::LoggedOut {
2134                        on_connect: true,
2135                        reason,
2136                    },
2137                ));
2138        } else if let ConnectFailureReason::TempBanned = reason {
2139            let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
2140            let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
2141            let expire_duration =
2142                chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
2143            warn!("Temporary ban connect failure: {}", DisplayableNode(node));
2144            self.core.event_bus.dispatch(&Event::TemporaryBan(
2145                crate::types::events::TemporaryBan {
2146                    code: crate::types::events::TempBanReason::from(ban_code),
2147                    expire: expire_duration,
2148                },
2149            ));
2150        } else if let ConnectFailureReason::ClientOutdated = reason {
2151            error!("Client is outdated and was rejected by server.");
2152            self.core
2153                .event_bus
2154                .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
2155        } else {
2156            warn!("Unknown connect failure: {}", DisplayableNode(node));
2157            self.core.event_bus.dispatch(&Event::ConnectFailure(
2158                crate::types::events::ConnectFailure {
2159                    reason,
2160                    message: attrs.optional_string("message").unwrap_or("").to_string(),
2161                    raw: Some(node.clone()),
2162                },
2163            ));
2164        }
2165    }
2166
2167    pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
2168        if let Some("get") = node.attrs.get("type").and_then(|s| s.as_str())
2169            && node.get_optional_child("ping").is_some()
2170        {
2171            info!("Received ping, sending pong.");
2172            let mut parser = node.attrs();
2173            let from_jid = parser.jid("from");
2174            let id = parser.optional_string("id").unwrap_or("").to_string();
2175            let pong = NodeBuilder::new("iq")
2176                .attrs([
2177                    ("to", from_jid.to_string()),
2178                    ("id", id),
2179                    ("type", "result".to_string()),
2180                ])
2181                .build();
2182            if let Err(e) = self.send_node(pong).await {
2183                warn!("Failed to send pong: {e:?}");
2184            }
2185            return true;
2186        }
2187
2188        // Pass Node directly to pair handling
2189        if pair::handle_iq(self, node).await {
2190            return true;
2191        }
2192
2193        false
2194    }
2195
2196    pub fn is_connected(&self) -> bool {
2197        self.noise_socket
2198            .try_lock()
2199            .is_ok_and(|guard| guard.is_some())
2200    }
2201
2202    pub fn is_logged_in(&self) -> bool {
2203        self.is_logged_in.load(Ordering::Relaxed)
2204    }
2205
2206    pub(crate) fn update_server_time_offset(&self, node: &wacore_binary::node::Node) {
2207        self.unified_session.update_server_time_offset(node);
2208    }
2209
2210    pub(crate) async fn send_unified_session(&self) {
2211        if !self.is_connected() {
2212            debug!(target: "Client/UnifiedSession", "Skipping: not connected");
2213            return;
2214        }
2215
2216        let Some((node, _sequence)) = self.unified_session.prepare_send().await else {
2217            return;
2218        };
2219
2220        if let Err(e) = self.send_node(node).await {
2221            debug!(target: "Client/UnifiedSession", "Send failed: {e}");
2222            self.unified_session.clear_last_sent().await;
2223        }
2224    }
2225
2226    /// Waits for the noise socket to be established.
2227    ///
2228    /// Returns `Ok(())` when the socket is ready, or `Err` on timeout.
2229    /// This is useful for code that needs to send messages before login,
2230    /// such as requesting a pair code during initial pairing.
2231    ///
2232    /// If the socket is already connected, returns immediately.
2233    pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
2234        // Fast path: already connected
2235        if self.is_connected() {
2236            return Ok(());
2237        }
2238
2239        // Register waiter and re-check to avoid race condition:
2240        // If socket becomes ready between checks, the notified future captures it.
2241        let notified = self.socket_ready_notifier.notified();
2242        if self.is_connected() {
2243            return Ok(());
2244        }
2245
2246        tokio::time::timeout(timeout, notified)
2247            .await
2248            .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
2249    }
2250
2251    /// Waits for the client to establish a connection and complete login.
2252    ///
2253    /// Returns `Ok(())` when connected, or `Err` on timeout.
2254    /// This is useful for code that needs to run after connection is established
2255    /// and authentication is complete.
2256    ///
2257    /// If the client is already connected and logged in, returns immediately.
2258    pub async fn wait_for_connected(
2259        &self,
2260        timeout: std::time::Duration,
2261    ) -> Result<(), anyhow::Error> {
2262        // Fast path: already connected and logged in
2263        if self.is_connected() && self.is_logged_in() {
2264            return Ok(());
2265        }
2266
2267        // Register waiter and re-check to avoid race condition:
2268        // If connection completes between checks, the notified future captures it.
2269        let notified = self.connected_notifier.notified();
2270        if self.is_connected() && self.is_logged_in() {
2271            return Ok(());
2272        }
2273
2274        tokio::time::timeout(timeout, notified)
2275            .await
2276            .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
2277    }
2278
2279    /// Get access to the PersistenceManager for this client.
2280    /// This is useful for multi-account scenarios to get the device ID.
2281    pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
2282        self.persistence_manager.clone()
2283    }
2284
2285    pub async fn edit_message(
2286        &self,
2287        to: Jid,
2288        original_id: impl Into<String>,
2289        new_content: wa::Message,
2290    ) -> Result<String, anyhow::Error> {
2291        let original_id = original_id.into();
2292
2293        // WhatsApp Web uses getMeUserLidOrJidForChat(chat, EditMessage) which
2294        // returns LID for LID-addressing groups and PN otherwise.
2295        let participant = if to.is_group() {
2296            Some(
2297                self.get_own_jid_for_group(&to)
2298                    .await?
2299                    .to_non_ad()
2300                    .to_string(),
2301            )
2302        } else {
2303            if self.get_pn().await.is_none() {
2304                return Err(anyhow!("Not logged in"));
2305            }
2306            None
2307        };
2308
2309        let edit_container_message = wa::Message {
2310            edited_message: Some(Box::new(wa::message::FutureProofMessage {
2311                message: Some(Box::new(wa::Message {
2312                    protocol_message: Some(Box::new(wa::message::ProtocolMessage {
2313                        key: Some(wa::MessageKey {
2314                            remote_jid: Some(to.to_string()),
2315                            from_me: Some(true),
2316                            id: Some(original_id.clone()),
2317                            participant,
2318                        }),
2319                        r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
2320                        edited_message: Some(Box::new(new_content)),
2321                        timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
2322                        ..Default::default()
2323                    })),
2324                    ..Default::default()
2325                })),
2326            })),
2327            ..Default::default()
2328        };
2329
2330        // Use a new stanza ID instead of reusing the original message ID.
2331        // The original message ID is already embedded in protocolMessage.key.id
2332        // inside the encrypted payload. Reusing it as the outer stanza ID causes
2333        // the server to deduplicate against the original message and silently
2334        // drop the edit.
2335        self.send_message_impl(
2336            to,
2337            &edit_container_message,
2338            None,
2339            false,
2340            false,
2341            Some(crate::types::message::EditAttribute::MessageEdit),
2342            vec![],
2343        )
2344        .await?;
2345
2346        Ok(original_id)
2347    }
2348
2349    pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
2350        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
2351        let noise_socket = match noise_socket_arc {
2352            Some(socket) => socket,
2353            None => return Err(ClientError::NotConnected),
2354        };
2355
2356        debug!(target: "Client/Send", "{}", DisplayableNode(&node));
2357
2358        let mut plaintext_buf = {
2359            let mut pool = self.plaintext_buffer_pool.lock().await;
2360            pool.pop().unwrap_or_else(|| Vec::with_capacity(1024))
2361        };
2362        plaintext_buf.clear();
2363
2364        if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
2365            error!("Failed to marshal node: {e:?}");
2366            let mut pool = self.plaintext_buffer_pool.lock().await;
2367            if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
2368                pool.push(plaintext_buf);
2369            }
2370            return Err(SocketError::Crypto("Marshal error".to_string()).into());
2371        }
2372
2373        // Size based on plaintext + encryption overhead (16 byte tag + 3 byte frame header)
2374        let encrypted_buf = Vec::with_capacity(plaintext_buf.len() + 32);
2375
2376        let (plaintext_buf, _) = match noise_socket
2377            .encrypt_and_send(plaintext_buf, encrypted_buf)
2378            .await
2379        {
2380            Ok(bufs) => bufs,
2381            Err(mut e) => {
2382                let p_buf = std::mem::take(&mut e.plaintext_buf);
2383                let mut pool = self.plaintext_buffer_pool.lock().await;
2384                if p_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
2385                    pool.push(p_buf);
2386                }
2387                return Err(e.into());
2388            }
2389        };
2390
2391        let mut pool = self.plaintext_buffer_pool.lock().await;
2392        if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
2393            pool.push(plaintext_buf);
2394        }
2395        Ok(())
2396    }
2397
2398    pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
2399        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2400        let old_name = device_snapshot.push_name.clone();
2401
2402        if old_name == new_name {
2403            return;
2404        }
2405
2406        log::debug!("Updating push name from '{}' -> '{}'", old_name, new_name);
2407        self.persistence_manager
2408            .process_command(DeviceCommand::SetPushName(new_name.clone()))
2409            .await;
2410
2411        self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
2412            crate::types::events::SelfPushNameUpdated {
2413                from_server: true,
2414                old_name,
2415                new_name: new_name.clone(),
2416            },
2417        ));
2418
2419        let client_clone = self.clone();
2420        tokio::spawn(async move {
2421            if let Err(e) = client_clone.presence().set_available().await {
2422                log::warn!("Failed to send presence after push name update: {:?}", e);
2423            } else {
2424                log::debug!("Sent presence after push name update.");
2425            }
2426        });
2427    }
2428
2429    pub async fn get_push_name(&self) -> String {
2430        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2431        device_snapshot.push_name.clone()
2432    }
2433
2434    pub async fn get_pn(&self) -> Option<Jid> {
2435        let snapshot = self.persistence_manager.get_device_snapshot().await;
2436        snapshot.pn.clone()
2437    }
2438
2439    pub async fn get_lid(&self) -> Option<Jid> {
2440        let snapshot = self.persistence_manager.get_device_snapshot().await;
2441        snapshot.lid.clone()
2442    }
2443
2444    /// Resolve our own JID for a group, respecting its addressing mode.
2445    ///
2446    /// Returns LID for LID-addressing groups, PN otherwise.
2447    /// Matches WhatsApp Web's `getMeUserLidOrJidForChat`.
2448    pub(crate) async fn get_own_jid_for_group(
2449        &self,
2450        group_jid: &Jid,
2451    ) -> Result<Jid, anyhow::Error> {
2452        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2453        let own_pn = device_snapshot
2454            .pn
2455            .clone()
2456            .ok_or_else(|| anyhow!("Not logged in"))?;
2457
2458        let addressing_mode = self
2459            .groups()
2460            .query_info(group_jid)
2461            .await
2462            .map(|info| info.addressing_mode)
2463            .unwrap_or(crate::types::message::AddressingMode::Pn);
2464
2465        Ok(match addressing_mode {
2466            crate::types::message::AddressingMode::Lid => {
2467                device_snapshot.lid.clone().unwrap_or(own_pn)
2468            }
2469            crate::types::message::AddressingMode::Pn => own_pn,
2470        })
2471    }
2472
2473    /// Creates a normalized StanzaKey by resolving PN to LID JIDs.
2474    pub(crate) async fn make_stanza_key(&self, chat: Jid, id: String) -> StanzaKey {
2475        // Resolve chat JID to LID if possible
2476        let chat = self.resolve_encryption_jid(&chat).await;
2477
2478        StanzaKey { chat, id }
2479    }
2480
2481    // get_phone_number_from_lid is in client/lid_pn.rs
2482
2483    pub(crate) async fn send_protocol_receipt(
2484        &self,
2485        id: String,
2486        receipt_type: crate::types::presence::ReceiptType,
2487    ) {
2488        if id.is_empty() {
2489            return;
2490        }
2491        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2492        if let Some(own_jid) = &device_snapshot.pn {
2493            let type_str = match receipt_type {
2494                crate::types::presence::ReceiptType::HistorySync => "hist_sync",
2495                crate::types::presence::ReceiptType::Read => "read",
2496                crate::types::presence::ReceiptType::ReadSelf => "read-self",
2497                crate::types::presence::ReceiptType::Delivered => "delivery",
2498                crate::types::presence::ReceiptType::Played => "played",
2499                crate::types::presence::ReceiptType::PlayedSelf => "played-self",
2500                crate::types::presence::ReceiptType::Inactive => "inactive",
2501                crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
2502                crate::types::presence::ReceiptType::Sender => "sender",
2503                crate::types::presence::ReceiptType::ServerError => "server-error",
2504                crate::types::presence::ReceiptType::Retry => "retry",
2505                crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
2506            };
2507
2508            let node = NodeBuilder::new("receipt")
2509                .attrs([
2510                    ("id", id),
2511                    ("type", type_str.to_string()),
2512                    ("to", own_jid.to_non_ad().to_string()),
2513                ])
2514                .build();
2515
2516            if let Err(e) = self.send_node(node).await {
2517                warn!(
2518                    "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
2519                    receipt_type, self.unique_id, e
2520                );
2521            }
2522        }
2523    }
2524}
2525
2526#[cfg(test)]
2527mod tests {
2528    use super::*;
2529    use crate::lid_pn_cache::LearningSource;
2530    use crate::test_utils::MockHttpClient;
2531    use tokio::sync::oneshot;
2532    use wacore_binary::jid::SERVER_JID;
2533
2534    #[tokio::test]
2535    async fn test_ack_behavior_for_incoming_stanzas() {
2536        let backend = crate::test_utils::create_test_backend().await;
2537        let pm = Arc::new(
2538            PersistenceManager::new(backend)
2539                .await
2540                .expect("persistence manager should initialize"),
2541        );
2542        let (client, _rx) = Client::new(
2543            pm,
2544            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2545            Arc::new(MockHttpClient),
2546            None,
2547        )
2548        .await;
2549
2550        // --- Assertions ---
2551
2552        // Verify that we still ack other critical stanzas (regression check).
2553        use wacore_binary::node::{Attrs, Node, NodeContent};
2554
2555        let mut receipt_attrs = Attrs::new();
2556        receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
2557        receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
2558        let receipt_node = Node::new(
2559            "receipt",
2560            receipt_attrs,
2561            Some(NodeContent::String("test".to_string())),
2562        );
2563
2564        let mut notification_attrs = Attrs::new();
2565        notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
2566        notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
2567        let notification_node = Node::new(
2568            "notification",
2569            notification_attrs,
2570            Some(NodeContent::String("test".to_string())),
2571        );
2572
2573        assert!(
2574            client.should_ack(&receipt_node),
2575            "should_ack must still return TRUE for <receipt> stanzas."
2576        );
2577        assert!(
2578            client.should_ack(&notification_node),
2579            "should_ack must still return TRUE for <notification> stanzas."
2580        );
2581
2582        info!(
2583            "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
2584        );
2585    }
2586
2587    #[tokio::test]
2588    async fn test_plaintext_buffer_pool_reuses_buffers() {
2589        let backend = crate::test_utils::create_test_backend().await;
2590        let pm = Arc::new(
2591            PersistenceManager::new(backend)
2592                .await
2593                .expect("persistence manager should initialize"),
2594        );
2595        let (client, _rx) = Client::new(
2596            pm,
2597            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2598            Arc::new(MockHttpClient),
2599            None,
2600        )
2601        .await;
2602
2603        // Check initial pool size
2604        let initial_pool_size = {
2605            let pool = client.plaintext_buffer_pool.lock().await;
2606            pool.len()
2607        };
2608
2609        // Attempt to send a node (this will fail because we're not connected, but that's okay)
2610        let test_node = NodeBuilder::new("test").attr("id", "test-123").build();
2611
2612        let _ = client.send_node(test_node).await;
2613
2614        // After the send attempt, the pool should have the same or more buffers
2615        // (depending on whether buffers were consumed and returned)
2616        let final_pool_size = {
2617            let pool = client.plaintext_buffer_pool.lock().await;
2618            pool.len()
2619        };
2620
2621        assert!(
2622            final_pool_size >= initial_pool_size,
2623            "Plaintext buffer pool should not shrink after send operations"
2624        );
2625
2626        info!(
2627            "✅ test_plaintext_buffer_pool_reuses_buffers passed: Buffer pool properly manages plaintext buffers"
2628        );
2629    }
2630
2631    #[tokio::test]
2632    async fn test_ack_waiter_resolves() {
2633        let backend = crate::test_utils::create_test_backend().await;
2634        let pm = Arc::new(
2635            PersistenceManager::new(backend)
2636                .await
2637                .expect("persistence manager should initialize"),
2638        );
2639        let (client, _rx) = Client::new(
2640            pm,
2641            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2642            Arc::new(MockHttpClient),
2643            None,
2644        )
2645        .await;
2646
2647        // 1. Insert a waiter for a specific ID
2648        let test_id = "ack-test-123".to_string();
2649        let (tx, rx) = oneshot::channel();
2650        client
2651            .response_waiters
2652            .lock()
2653            .await
2654            .insert(test_id.clone(), tx);
2655        assert!(
2656            client.response_waiters.lock().await.contains_key(&test_id),
2657            "Waiter should be inserted before handling ack"
2658        );
2659
2660        // 2. Create a mock <ack/> node with the test ID
2661        let ack_node = NodeBuilder::new("ack")
2662            .attr("id", test_id.clone())
2663            .attr("from", SERVER_JID)
2664            .build();
2665
2666        // 3. Handle the ack
2667        let handled = client.handle_ack_response(ack_node).await;
2668        assert!(
2669            handled,
2670            "handle_ack_response should return true when waiter exists"
2671        );
2672
2673        // 4. Await the receiver with a timeout
2674        match tokio::time::timeout(Duration::from_secs(1), rx).await {
2675            Ok(Ok(response_node)) => {
2676                assert_eq!(
2677                    response_node.attrs.get("id").and_then(|v| v.as_str()),
2678                    Some(test_id.as_str()),
2679                    "Response node should have correct ID"
2680                );
2681            }
2682            Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
2683            Err(_) => panic!("Test timed out waiting for ack response"),
2684        }
2685
2686        // 5. Verify the waiter was removed
2687        assert!(
2688            !client.response_waiters.lock().await.contains_key(&test_id),
2689            "Waiter should be removed after handling"
2690        );
2691
2692        info!(
2693            "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
2694        );
2695    }
2696
2697    #[tokio::test]
2698    async fn test_ack_without_matching_waiter() {
2699        let backend = crate::test_utils::create_test_backend().await;
2700        let pm = Arc::new(
2701            PersistenceManager::new(backend)
2702                .await
2703                .expect("persistence manager should initialize"),
2704        );
2705        let (client, _rx) = Client::new(
2706            pm,
2707            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2708            Arc::new(MockHttpClient),
2709            None,
2710        )
2711        .await;
2712
2713        // Create an ack without any matching waiter
2714        let ack_node = NodeBuilder::new("ack")
2715            .attr("id", "non-existent-id")
2716            .attr("from", SERVER_JID)
2717            .build();
2718
2719        // Should return false since there's no waiter
2720        let handled = client.handle_ack_response(ack_node).await;
2721        assert!(
2722            !handled,
2723            "handle_ack_response should return false when no waiter exists"
2724        );
2725
2726        info!(
2727            "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
2728        );
2729    }
2730
2731    /// Test that the lid_pn_cache correctly stores and retrieves LID mappings.
2732    ///
2733    /// This is critical for the LID-PN session mismatch fix. When we receive a message
2734    /// with sender_lid, we cache the phone->LID mapping so that when sending replies,
2735    /// we can reuse the existing LID session instead of creating a new PN session.
2736    #[tokio::test]
2737    async fn test_lid_pn_cache_basic_operations() {
2738        let backend = Arc::new(
2739            crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
2740                .await
2741                .expect("Failed to create in-memory backend for test"),
2742        );
2743        let pm = Arc::new(
2744            PersistenceManager::new(backend)
2745                .await
2746                .expect("persistence manager should initialize"),
2747        );
2748        let (client, _rx) = Client::new(
2749            pm,
2750            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2751            Arc::new(MockHttpClient),
2752            None,
2753        )
2754        .await;
2755
2756        // Initially, the cache should be empty for a phone number
2757        let phone = "559980000001";
2758        let lid = "100000012345678";
2759
2760        assert!(
2761            client.lid_pn_cache.get_current_lid(phone).await.is_none(),
2762            "Cache should be empty initially"
2763        );
2764
2765        // Insert a phone->LID mapping using add_lid_pn_mapping
2766        client
2767            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2768            .await
2769            .expect("Failed to persist LID-PN mapping in tests");
2770
2771        // Verify we can retrieve it (phone -> LID lookup)
2772        let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
2773        assert!(cached_lid.is_some(), "Cache should contain the mapping");
2774        assert_eq!(
2775            cached_lid.expect("cache should have LID"),
2776            lid,
2777            "Cached LID should match what we inserted"
2778        );
2779
2780        // Verify reverse lookup works (LID -> phone)
2781        let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
2782        assert!(cached_phone.is_some(), "Reverse lookup should work");
2783        assert_eq!(
2784            cached_phone.expect("reverse lookup should return phone"),
2785            phone,
2786            "Cached phone should match what we inserted"
2787        );
2788
2789        // Verify a different phone number returns None
2790        assert!(
2791            client
2792                .lid_pn_cache
2793                .get_current_lid("559980000002")
2794                .await
2795                .is_none(),
2796            "Different phone number should not have a mapping"
2797        );
2798
2799        info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
2800    }
2801
2802    /// Test that the lid_pn_cache respects timestamp-based conflict resolution.
2803    ///
2804    /// When a phone number has multiple LIDs, the most recent one should be returned.
2805    #[tokio::test]
2806    async fn test_lid_pn_cache_timestamp_resolution() {
2807        let backend = Arc::new(
2808            crate::store::SqliteStore::new(
2809                "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
2810            )
2811            .await
2812            .expect("Failed to create in-memory backend for test"),
2813        );
2814        let pm = Arc::new(
2815            PersistenceManager::new(backend)
2816                .await
2817                .expect("persistence manager should initialize"),
2818        );
2819        let (client, _rx) = Client::new(
2820            pm,
2821            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2822            Arc::new(MockHttpClient),
2823            None,
2824        )
2825        .await;
2826
2827        let phone = "559980000001";
2828        let lid_old = "100000012345678";
2829        let lid_new = "100000087654321";
2830
2831        // Insert initial mapping
2832        client
2833            .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
2834            .await
2835            .expect("Failed to persist LID-PN mapping in tests");
2836
2837        assert_eq!(
2838            client
2839                .lid_pn_cache
2840                .get_current_lid(phone)
2841                .await
2842                .expect("cache should have LID"),
2843            lid_old,
2844            "Initial LID should be stored"
2845        );
2846
2847        // Small delay to ensure different timestamp
2848        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2849
2850        // Add new mapping with newer timestamp
2851        client
2852            .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
2853            .await
2854            .expect("Failed to persist LID-PN mapping in tests");
2855
2856        assert_eq!(
2857            client
2858                .lid_pn_cache
2859                .get_current_lid(phone)
2860                .await
2861                .expect("cache should have newer LID"),
2862            lid_new,
2863            "Newer LID should be returned for phone lookup"
2864        );
2865
2866        // Both LIDs should still resolve to the same phone
2867        assert_eq!(
2868            client
2869                .lid_pn_cache
2870                .get_phone_number(lid_old)
2871                .await
2872                .expect("reverse lookup should return phone"),
2873            phone,
2874            "Old LID should still map to phone"
2875        );
2876        assert_eq!(
2877            client
2878                .lid_pn_cache
2879                .get_phone_number(lid_new)
2880                .await
2881                .expect("reverse lookup should return phone"),
2882            phone,
2883            "New LID should also map to phone"
2884        );
2885
2886        info!(
2887            "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
2888        );
2889    }
2890
2891    /// Test that get_lid_for_phone (from SendContextResolver) returns the cached value.
2892    ///
2893    /// This is the method used by wacore::send to look up LID mappings when encrypting.
2894    #[tokio::test]
2895    async fn test_get_lid_for_phone_via_send_context_resolver() {
2896        use wacore::client::context::SendContextResolver;
2897
2898        let backend = Arc::new(
2899            crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
2900                .await
2901                .expect("Failed to create in-memory backend for test"),
2902        );
2903        let pm = Arc::new(
2904            PersistenceManager::new(backend)
2905                .await
2906                .expect("persistence manager should initialize"),
2907        );
2908        let (client, _rx) = Client::new(
2909            pm,
2910            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2911            Arc::new(MockHttpClient),
2912            None,
2913        )
2914        .await;
2915
2916        let phone = "559980000001";
2917        let lid = "100000012345678";
2918
2919        // Before caching, should return None
2920        assert!(
2921            client.get_lid_for_phone(phone).await.is_none(),
2922            "get_lid_for_phone should return None before caching"
2923        );
2924
2925        // Cache the mapping using add_lid_pn_mapping
2926        client
2927            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2928            .await
2929            .expect("Failed to persist LID-PN mapping in tests");
2930
2931        // Now it should return the LID
2932        let result = client.get_lid_for_phone(phone).await;
2933        assert!(
2934            result.is_some(),
2935            "get_lid_for_phone should return Some after caching"
2936        );
2937        assert_eq!(
2938            result.expect("get_lid_for_phone should return Some"),
2939            lid,
2940            "get_lid_for_phone should return the cached LID"
2941        );
2942
2943        info!(
2944            "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
2945        );
2946    }
2947
2948    /// Test that wait_for_offline_delivery_end returns immediately when the flag is already set.
2949    #[tokio::test]
2950    async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
2951        let backend = Arc::new(
2952            crate::store::SqliteStore::new(
2953                "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
2954            )
2955            .await
2956            .expect("Failed to create in-memory backend for test"),
2957        );
2958        let pm = Arc::new(
2959            PersistenceManager::new(backend)
2960                .await
2961                .expect("persistence manager should initialize"),
2962        );
2963        let (client, _rx) = Client::new(
2964            pm,
2965            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2966            Arc::new(MockHttpClient),
2967            None,
2968        )
2969        .await;
2970
2971        // Set the flag to true (simulating offline sync completed)
2972        client
2973            .offline_sync_completed
2974            .store(true, std::sync::atomic::Ordering::Relaxed);
2975
2976        // This should return immediately (not wait 10 seconds)
2977        let start = std::time::Instant::now();
2978        client.wait_for_offline_delivery_end().await;
2979        let elapsed = start.elapsed();
2980
2981        // Should complete in < 100ms (not 10 second timeout)
2982        assert!(
2983            elapsed.as_millis() < 100,
2984            "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
2985            elapsed
2986        );
2987
2988        info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
2989    }
2990
2991    /// Test that wait_for_offline_delivery_end times out when the flag is NOT set.
2992    /// This verifies the 10-second timeout is working.
2993    #[tokio::test]
2994    async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
2995        let backend = Arc::new(
2996            crate::store::SqliteStore::new(
2997                "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
2998            )
2999            .await
3000            .expect("Failed to create in-memory backend for test"),
3001        );
3002        let pm = Arc::new(
3003            PersistenceManager::new(backend)
3004                .await
3005                .expect("persistence manager should initialize"),
3006        );
3007        let (client, _rx) = Client::new(
3008            pm,
3009            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3010            Arc::new(MockHttpClient),
3011            None,
3012        )
3013        .await;
3014
3015        // Flag is false by default, so we need to use a shorter timeout for the test
3016        // We'll verify behavior by using tokio timeout
3017        let start = std::time::Instant::now();
3018
3019        // Use a short timeout to test the behavior without waiting 10 seconds
3020        let result = tokio::time::timeout(
3021            std::time::Duration::from_millis(100),
3022            client.wait_for_offline_delivery_end(),
3023        )
3024        .await;
3025
3026        let elapsed = start.elapsed();
3027
3028        // The wait should NOT complete immediately - it should timeout
3029        // (because the flag is false and no one is notifying)
3030        assert!(
3031            result.is_err(),
3032            "wait_for_offline_delivery_end should not return immediately when flag is false"
3033        );
3034        assert!(
3035            elapsed.as_millis() >= 95, // Allow small timing variance
3036            "Should have waited for the timeout duration, took {:?}",
3037            elapsed
3038        );
3039
3040        info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
3041    }
3042
3043    /// Test that wait_for_offline_delivery_end returns when notified.
3044    #[tokio::test]
3045    async fn test_wait_for_offline_delivery_end_returns_on_notify() {
3046        let backend = Arc::new(
3047            crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
3048                .await
3049                .expect("Failed to create in-memory backend for test"),
3050        );
3051        let pm = Arc::new(
3052            PersistenceManager::new(backend)
3053                .await
3054                .expect("persistence manager should initialize"),
3055        );
3056        let (client, _rx) = Client::new(
3057            pm,
3058            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3059            Arc::new(MockHttpClient),
3060            None,
3061        )
3062        .await;
3063
3064        let client_clone = client.clone();
3065
3066        // Spawn a task that will notify after 50ms
3067        tokio::spawn(async move {
3068            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3069            client_clone.offline_sync_notifier.notify_waiters();
3070        });
3071
3072        let start = std::time::Instant::now();
3073        client.wait_for_offline_delivery_end().await;
3074        let elapsed = start.elapsed();
3075
3076        // Should complete around 50ms (when notified), not 10 seconds
3077        assert!(
3078            elapsed.as_millis() < 200,
3079            "wait_for_offline_delivery_end should return when notified, took {:?}",
3080            elapsed
3081        );
3082        assert!(
3083            elapsed.as_millis() >= 45, // Should have waited for the notify
3084            "Should have waited for the notify, only took {:?}",
3085            elapsed
3086        );
3087
3088        info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
3089    }
3090
3091    /// Test that the offline_sync_completed flag starts as false.
3092    #[tokio::test]
3093    async fn test_offline_sync_flag_initially_false() {
3094        let backend = Arc::new(
3095            crate::store::SqliteStore::new(
3096                "file:memdb_offline_flag_initial?mode=memory&cache=shared",
3097            )
3098            .await
3099            .expect("Failed to create in-memory backend for test"),
3100        );
3101        let pm = Arc::new(
3102            PersistenceManager::new(backend)
3103                .await
3104                .expect("persistence manager should initialize"),
3105        );
3106        let (client, _rx) = Client::new(
3107            pm,
3108            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3109            Arc::new(MockHttpClient),
3110            None,
3111        )
3112        .await;
3113
3114        // The flag should be false initially
3115        assert!(
3116            !client
3117                .offline_sync_completed
3118                .load(std::sync::atomic::Ordering::Relaxed),
3119            "offline_sync_completed should be false when Client is first created"
3120        );
3121
3122        info!("✅ test_offline_sync_flag_initially_false passed");
3123    }
3124
3125    /// Test the complete offline sync lifecycle:
3126    /// 1. Flag starts false
3127    /// 2. Flag is set true after IB offline stanza
3128    /// 3. Notify is called
3129    #[tokio::test]
3130    async fn test_offline_sync_lifecycle() {
3131        use std::sync::atomic::Ordering;
3132
3133        let backend = Arc::new(
3134            crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
3135                .await
3136                .expect("Failed to create in-memory backend for test"),
3137        );
3138        let pm = Arc::new(
3139            PersistenceManager::new(backend)
3140                .await
3141                .expect("persistence manager should initialize"),
3142        );
3143        let (client, _rx) = Client::new(
3144            pm,
3145            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3146            Arc::new(MockHttpClient),
3147            None,
3148        )
3149        .await;
3150
3151        // 1. Initially false
3152        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3153
3154        // 2. Spawn a waiter
3155        let client_waiter = client.clone();
3156        let waiter_handle = tokio::spawn(async move {
3157            client_waiter.wait_for_offline_delivery_end().await;
3158            true // Return that we completed
3159        });
3160
3161        // Give the waiter time to start waiting
3162        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3163
3164        // Verify waiter hasn't completed yet
3165        assert!(
3166            !waiter_handle.is_finished(),
3167            "Waiter should still be waiting"
3168        );
3169
3170        // 3. Simulate IB handler behavior (set flag and notify)
3171        client.offline_sync_completed.store(true, Ordering::Relaxed);
3172        client.offline_sync_notifier.notify_waiters();
3173
3174        // 4. Waiter should complete
3175        let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
3176            .await
3177            .expect("Waiter should complete after notify")
3178            .expect("Waiter task should not panic");
3179
3180        assert!(result, "Waiter should have completed successfully");
3181        assert!(client.offline_sync_completed.load(Ordering::Relaxed));
3182
3183        info!("✅ test_offline_sync_lifecycle passed");
3184    }
3185
3186    /// Test that establish_primary_phone_session_immediate returns error when no PN is set.
3187    /// This verifies the "not logged in" guard works.
3188    #[tokio::test]
3189    async fn test_establish_primary_phone_session_fails_without_pn() {
3190        let backend = Arc::new(
3191            crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
3192                .await
3193                .expect("Failed to create in-memory backend for test"),
3194        );
3195        let pm = Arc::new(
3196            PersistenceManager::new(backend)
3197                .await
3198                .expect("persistence manager should initialize"),
3199        );
3200        let (client, _rx) = Client::new(
3201            pm,
3202            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3203            Arc::new(MockHttpClient),
3204            None,
3205        )
3206        .await;
3207
3208        // No PN set, so this should fail
3209        let result = client.establish_primary_phone_session_immediate().await;
3210
3211        assert!(
3212            result.is_err(),
3213            "establish_primary_phone_session_immediate should fail when no PN is set"
3214        );
3215
3216        let error_msg = result.unwrap_err().to_string();
3217        assert!(
3218            error_msg.contains("Not logged in"),
3219            "Error should mention 'Not logged in', got: {}",
3220            error_msg
3221        );
3222
3223        info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
3224    }
3225
3226    /// Test that ensure_e2e_sessions waits for offline sync to complete.
3227    /// This is the CRITICAL difference between ensure_e2e_sessions and
3228    /// establish_primary_phone_session_immediate.
3229    #[tokio::test]
3230    async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
3231        use std::sync::atomic::Ordering;
3232        use wacore_binary::jid::Jid;
3233
3234        let backend = Arc::new(
3235            crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
3236                .await
3237                .expect("Failed to create in-memory backend for test"),
3238        );
3239        let pm = Arc::new(
3240            PersistenceManager::new(backend)
3241                .await
3242                .expect("persistence manager should initialize"),
3243        );
3244        let (client, _rx) = Client::new(
3245            pm,
3246            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3247            Arc::new(MockHttpClient),
3248            None,
3249        )
3250        .await;
3251
3252        // Flag is false (offline sync not complete)
3253        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3254
3255        // Call ensure_e2e_sessions with an empty list (so it returns early after the wait)
3256        // This lets us test the waiting behavior without needing network
3257        let client_clone = client.clone();
3258        let ensure_handle = tokio::spawn(async move {
3259            // Start with some JIDs - but since we're testing the wait, we use empty
3260            // to avoid needing actual session establishment
3261            client_clone.ensure_e2e_sessions(vec![]).await
3262        });
3263
3264        // Wait a bit - ensure_e2e_sessions should return immediately for empty list
3265        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3266        assert!(
3267            ensure_handle.is_finished(),
3268            "ensure_e2e_sessions should return immediately for empty JID list"
3269        );
3270
3271        // Now test with actual JIDs - it should wait for offline sync
3272        let client_clone = client.clone();
3273        let test_jid = Jid::pn("559999999999");
3274        let ensure_handle = tokio::spawn(async move {
3275            // This will wait for offline sync before proceeding
3276            let start = std::time::Instant::now();
3277            let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
3278            start.elapsed()
3279        });
3280
3281        // Give it a moment to start
3282        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3283
3284        // It should still be waiting (offline sync not complete)
3285        assert!(
3286            !ensure_handle.is_finished(),
3287            "ensure_e2e_sessions should be waiting for offline sync"
3288        );
3289
3290        // Now complete offline sync
3291        client.offline_sync_completed.store(true, Ordering::Relaxed);
3292        client.offline_sync_notifier.notify_waiters();
3293
3294        // Now it should complete (might fail on session establishment, but that's ok)
3295        let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
3296
3297        assert!(
3298            result.is_ok(),
3299            "ensure_e2e_sessions should complete after offline sync"
3300        );
3301
3302        info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
3303    }
3304
3305    /// Integration test: Verify that the immediate session establishment does NOT
3306    /// wait for offline sync. This is critical for PDO to work during offline sync.
3307    ///
3308    /// The flow is:
3309    /// 1. Login -> establish_primary_phone_session_immediate() is called
3310    /// 2. This should NOT wait for offline sync (flag is false at this point)
3311    /// 3. After session is established, offline messages arrive
3312    /// 4. When decryption fails, PDO can immediately send to device 0
3313    #[tokio::test]
3314    async fn test_immediate_session_does_not_wait_for_offline_sync() {
3315        use std::sync::atomic::Ordering;
3316        use wacore_binary::jid::Jid;
3317
3318        let backend = Arc::new(
3319            crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
3320                .await
3321                .expect("Failed to create in-memory backend for test"),
3322        );
3323        let pm = Arc::new(
3324            PersistenceManager::new(backend.clone())
3325                .await
3326                .expect("persistence manager should initialize"),
3327        );
3328
3329        // Set a PN so establish_primary_phone_session_immediate doesn't fail early
3330        pm.modify_device(|device| {
3331            device.pn = Some(Jid::pn("559999999999"));
3332        })
3333        .await;
3334
3335        let (client, _rx) = Client::new(
3336            pm,
3337            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3338            Arc::new(MockHttpClient),
3339            None,
3340        )
3341        .await;
3342
3343        // Flag is false (offline sync not complete - simulating login state)
3344        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3345
3346        // Call establish_primary_phone_session_immediate
3347        // It should NOT wait for offline sync - it should proceed immediately
3348        let start = std::time::Instant::now();
3349
3350        // Note: This will fail because we can't actually fetch prekeys in tests,
3351        // but the important thing is that it doesn't WAIT for offline sync
3352        let result = tokio::time::timeout(
3353            std::time::Duration::from_millis(500),
3354            client.establish_primary_phone_session_immediate(),
3355        )
3356        .await;
3357
3358        let elapsed = start.elapsed();
3359
3360        // The call should complete (or fail) quickly, NOT wait for 10 second timeout
3361        assert!(
3362            result.is_ok(),
3363            "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
3364        );
3365
3366        // It should complete in < 500ms (not 10 second wait)
3367        assert!(
3368            elapsed.as_millis() < 500,
3369            "establish_primary_phone_session_immediate should not wait, took {:?}",
3370            elapsed
3371        );
3372
3373        // The actual result might be an error (no network), but that's fine
3374        // The important thing is it didn't wait for offline sync
3375        info!(
3376            "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
3377            elapsed,
3378            result.unwrap().is_ok()
3379        );
3380
3381        info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
3382    }
3383
3384    /// Integration test: Verify that establish_primary_phone_session_immediate
3385    /// skips establishment when a session already exists.
3386    ///
3387    /// This is the CRITICAL fix for MAC verification failures:
3388    /// - BUG (before fix): Called process_prekey_bundle() unconditionally,
3389    ///   replacing the existing session with a new one
3390    /// - RESULT: Remote device still uses old session state, causing MAC failures
3391    #[tokio::test]
3392    async fn test_establish_session_skips_when_exists() {
3393        use wacore::libsignal::protocol::SessionRecord;
3394        use wacore::libsignal::store::SessionStore;
3395        use wacore::types::jid::JidExt;
3396        use wacore_binary::jid::Jid;
3397
3398        let backend = Arc::new(
3399            crate::store::SqliteStore::new("file:memdb_skip_existing?mode=memory&cache=shared")
3400                .await
3401                .expect("Failed to create in-memory backend for test"),
3402        );
3403        let pm = Arc::new(
3404            PersistenceManager::new(backend.clone())
3405                .await
3406                .expect("persistence manager should initialize"),
3407        );
3408
3409        // Set a PN so the function doesn't fail early
3410        let own_pn = Jid::pn("559999999999");
3411        pm.modify_device(|device| {
3412            device.pn = Some(own_pn.clone());
3413        })
3414        .await;
3415
3416        // Pre-populate a session for the primary phone JID (device 0)
3417        let primary_phone_jid = own_pn.with_device(0);
3418        let signal_addr = primary_phone_jid.to_protocol_address();
3419
3420        // Create a dummy session record
3421        let dummy_session = SessionRecord::new_fresh();
3422        {
3423            let device_arc = pm.get_device_arc().await;
3424            let device = device_arc.read().await;
3425            device
3426                .store_session(&signal_addr, &dummy_session)
3427                .await
3428                .expect("Failed to store test session");
3429
3430            // Verify session exists
3431            let exists = device
3432                .contains_session(&signal_addr)
3433                .await
3434                .expect("Failed to check session");
3435            assert!(exists, "Session should exist after store");
3436        }
3437
3438        let (client, _rx) = Client::new(
3439            pm.clone(),
3440            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3441            Arc::new(MockHttpClient),
3442            None,
3443        )
3444        .await;
3445
3446        // Call establish_primary_phone_session_immediate
3447        // It should return Ok(()) immediately without fetching prekeys
3448        let result = client.establish_primary_phone_session_immediate().await;
3449
3450        assert!(
3451            result.is_ok(),
3452            "establish_primary_phone_session_immediate should succeed when session exists"
3453        );
3454
3455        // Verify the session was NOT replaced (still has the same record)
3456        // This is the critical assertion - if session was replaced, it would cause MAC failures
3457        {
3458            let device_arc = pm.get_device_arc().await;
3459            let device = device_arc.read().await;
3460            let exists = device
3461                .contains_session(&signal_addr)
3462                .await
3463                .expect("Failed to check session");
3464            assert!(exists, "Session should still exist after the call");
3465        }
3466
3467        info!("✅ test_establish_session_skips_when_exists passed");
3468    }
3469
3470    /// Integration test: Verify that the session check prevents MAC failures
3471    /// by documenting the exact control flow that caused the bug.
3472    #[test]
3473    fn test_mac_failure_prevention_flow_documentation() {
3474        // Simulate the decision logic
3475        fn should_establish_session(
3476            check_result: Result<bool, &'static str>,
3477        ) -> Result<bool, String> {
3478            match check_result {
3479                Ok(true) => Ok(false), // Session exists → DON'T establish
3480                Ok(false) => Ok(true), // No session → establish
3481                Err(e) => Err(format!("Cannot verify session: {}", e)), // Fail-safe
3482            }
3483        }
3484
3485        // Test Case 1: Session exists → skip (prevents MAC failure)
3486        let result = should_establish_session(Ok(true));
3487        assert_eq!(result, Ok(false), "Should skip when session exists");
3488
3489        // Test Case 2: No session → establish
3490        let result = should_establish_session(Ok(false));
3491        assert_eq!(result, Ok(true), "Should establish when no session");
3492
3493        // Test Case 3: Check fails → error (fail-safe)
3494        let result = should_establish_session(Err("database error"));
3495        assert!(result.is_err(), "Should fail when check fails");
3496
3497        info!("✅ test_mac_failure_prevention_flow_documentation passed");
3498    }
3499
3500    #[test]
3501    fn test_unified_session_id_calculation() {
3502        // Test the mathematical calculation of the unified session ID.
3503        // Formula: (now_ms + server_offset_ms + 3_days_ms) % 7_days_ms
3504
3505        const DAY_MS: i64 = 24 * 60 * 60 * 1000;
3506        const WEEK_MS: i64 = 7 * DAY_MS;
3507        const OFFSET_MS: i64 = 3 * DAY_MS;
3508
3509        // Helper function matching the implementation
3510        fn calculate_session_id(now_ms: i64, server_offset_ms: i64) -> i64 {
3511            let adjusted_now = now_ms + server_offset_ms;
3512            (adjusted_now + OFFSET_MS) % WEEK_MS
3513        }
3514
3515        // Test 1: Zero offset
3516        let now_ms = 1706000000000_i64; // Some arbitrary timestamp
3517        let id = calculate_session_id(now_ms, 0);
3518        assert!(
3519            (0..WEEK_MS).contains(&id),
3520            "Session ID should be in [0, WEEK_MS)"
3521        );
3522
3523        // Test 2: Positive server offset (server is ahead)
3524        let id_with_positive_offset = calculate_session_id(now_ms, 5000);
3525        assert!(
3526            (0..WEEK_MS).contains(&id_with_positive_offset),
3527            "Session ID should be in [0, WEEK_MS)"
3528        );
3529        // The ID should be different from zero offset (unless wrap-around)
3530        // Not testing exact value as it depends on the offset
3531
3532        // Test 3: Negative server offset (server is behind)
3533        let id_with_negative_offset = calculate_session_id(now_ms, -5000);
3534        assert!(
3535            (0..WEEK_MS).contains(&id_with_negative_offset),
3536            "Session ID should be in [0, WEEK_MS)"
3537        );
3538
3539        // Test 4: Verify modulo wrap-around
3540        // If adjusted_now + OFFSET_MS >= WEEK_MS, it should wrap
3541        let wrap_test_now = WEEK_MS - OFFSET_MS + 1000; // Should produce small result
3542        let wrapped_id = calculate_session_id(wrap_test_now, 0);
3543        assert_eq!(wrapped_id, 1000, "Should wrap around correctly");
3544
3545        // Test 5: Edge case - at exact boundary
3546        let boundary_now = WEEK_MS - OFFSET_MS;
3547        let boundary_id = calculate_session_id(boundary_now, 0);
3548        assert_eq!(boundary_id, 0, "At exact boundary should be 0");
3549    }
3550
3551    #[tokio::test]
3552    async fn test_server_time_offset_extraction() {
3553        use wacore_binary::builder::NodeBuilder;
3554
3555        let backend = crate::test_utils::create_test_backend().await;
3556        let pm = Arc::new(
3557            PersistenceManager::new(backend)
3558                .await
3559                .expect("persistence manager should initialize"),
3560        );
3561        let (client, _rx) = Client::new(
3562            pm,
3563            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3564            Arc::new(MockHttpClient),
3565            None,
3566        )
3567        .await;
3568
3569        // Initially, offset should be 0
3570        assert_eq!(
3571            client.unified_session.server_time_offset_ms(),
3572            0,
3573            "Initial offset should be 0"
3574        );
3575
3576        // Create a node with a 't' attribute
3577        let server_time = chrono::Utc::now().timestamp() + 10; // Server is 10 seconds ahead
3578        let node = NodeBuilder::new("success")
3579            .attr("t", server_time.to_string())
3580            .build();
3581
3582        // Update the offset
3583        client.update_server_time_offset(&node);
3584
3585        // The offset should be approximately 10 * 1000 = 10000 ms
3586        // Allow some tolerance for timing differences during the test
3587        let offset = client.unified_session.server_time_offset_ms();
3588        assert!(
3589            (offset - 10000).abs() < 1000, // Allow 1 second tolerance
3590            "Offset should be approximately 10000ms, got {}",
3591            offset
3592        );
3593
3594        // Test with no 't' attribute - should not change offset
3595        let node_no_t = NodeBuilder::new("success").build();
3596        client.update_server_time_offset(&node_no_t);
3597        let offset_after = client.unified_session.server_time_offset_ms();
3598        assert!(
3599            (offset_after - offset).abs() < 100, // Should be same (or very close)
3600            "Offset should not change when 't' is missing"
3601        );
3602
3603        // Test with invalid 't' attribute - should not change offset
3604        let node_invalid = NodeBuilder::new("success")
3605            .attr("t", "not_a_number")
3606            .build();
3607        client.update_server_time_offset(&node_invalid);
3608        let offset_after_invalid = client.unified_session.server_time_offset_ms();
3609        assert!(
3610            (offset_after_invalid - offset).abs() < 100,
3611            "Offset should not change when 't' is invalid"
3612        );
3613
3614        // Test with negative/zero 't' - should not change offset
3615        let node_zero = NodeBuilder::new("success").attr("t", "0").build();
3616        client.update_server_time_offset(&node_zero);
3617        let offset_after_zero = client.unified_session.server_time_offset_ms();
3618        assert!(
3619            (offset_after_zero - offset).abs() < 100,
3620            "Offset should not change when 't' is 0"
3621        );
3622
3623        info!("✅ test_server_time_offset_extraction passed");
3624    }
3625
3626    #[tokio::test]
3627    async fn test_unified_session_manager_integration() {
3628        // Test the unified session manager through the client
3629
3630        let backend = crate::test_utils::create_test_backend().await;
3631        let pm = Arc::new(
3632            PersistenceManager::new(backend)
3633                .await
3634                .expect("persistence manager should initialize"),
3635        );
3636        let (client, _rx) = Client::new(
3637            pm,
3638            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3639            Arc::new(MockHttpClient),
3640            None,
3641        )
3642        .await;
3643
3644        // Initially, sequence should be 0
3645        assert_eq!(
3646            client.unified_session.sequence(),
3647            0,
3648            "Initial sequence should be 0"
3649        );
3650
3651        // Duplicate prevention depends on the session ID staying the same between calls.
3652        // Since the session ID is millisecond-based, use a retry loop to handle
3653        // the rare case where we cross a millisecond boundary between calls.
3654        loop {
3655            client.unified_session.reset().await;
3656
3657            let result = client.unified_session.prepare_send().await;
3658            assert!(result.is_some(), "First send should succeed");
3659            let (node, seq) = result.unwrap();
3660            assert_eq!(node.tag, "ib", "Should be an IB stanza");
3661            assert_eq!(seq, 1, "First sequence should be 1 (pre-increment)");
3662            assert_eq!(client.unified_session.sequence(), 1);
3663
3664            let result2 = client.unified_session.prepare_send().await;
3665            if result2.is_none() {
3666                // Duplicate was prevented within the same millisecond
3667                assert_eq!(client.unified_session.sequence(), 1);
3668                break;
3669            }
3670            // Millisecond boundary crossed, retry
3671            tokio::task::yield_now().await;
3672        }
3673
3674        // Clear last sent and try again - sequence resets on "new" session ID
3675        client.unified_session.clear_last_sent().await;
3676        let result3 = client.unified_session.prepare_send().await;
3677        assert!(result3.is_some(), "Should succeed after clearing");
3678        let (_, seq3) = result3.unwrap();
3679        assert_eq!(seq3, 1, "Sequence resets when session ID changes");
3680        assert_eq!(client.unified_session.sequence(), 1);
3681
3682        info!("✅ test_unified_session_manager_integration passed");
3683    }
3684
3685    #[test]
3686    fn test_unified_session_protocol_node() {
3687        // Test the type-safe protocol node implementation
3688        use wacore::ib::{IbStanza, UnifiedSession};
3689        use wacore::protocol::ProtocolNode;
3690
3691        // Create a unified session
3692        let session = UnifiedSession::new("123456789");
3693        assert_eq!(session.id, "123456789");
3694        assert_eq!(session.tag(), "unified_session");
3695
3696        // Convert to node
3697        let node = session.into_node();
3698        assert_eq!(node.tag, "unified_session");
3699        assert_eq!(
3700            node.attrs.get("id").and_then(|v| v.as_str()),
3701            Some("123456789")
3702        );
3703
3704        // Create an IB stanza
3705        let stanza = IbStanza::unified_session(UnifiedSession::new("987654321"));
3706        assert_eq!(stanza.tag(), "ib");
3707
3708        // Convert to node and verify structure
3709        let ib_node = stanza.into_node();
3710        assert_eq!(ib_node.tag, "ib");
3711        let children = ib_node.children().expect("IB stanza should have children");
3712        assert_eq!(children.len(), 1);
3713        assert_eq!(children[0].tag, "unified_session");
3714        assert_eq!(
3715            children[0].attrs.get("id").and_then(|v| v.as_str()),
3716            Some("987654321")
3717        );
3718
3719        info!("✅ test_unified_session_protocol_node passed");
3720    }
3721
3722    /// Helper to create a test client for offline sync tests
3723    async fn create_offline_sync_test_client() -> Arc<Client> {
3724        let backend = crate::test_utils::create_test_backend().await;
3725        let pm = Arc::new(
3726            PersistenceManager::new(backend)
3727                .await
3728                .expect("persistence manager should initialize"),
3729        );
3730        let (client, _rx) = Client::new(
3731            pm,
3732            Arc::new(crate::transport::mock::MockTransportFactory::new()),
3733            Arc::new(MockHttpClient),
3734            None,
3735        )
3736        .await;
3737        client
3738    }
3739
3740    #[tokio::test]
3741    async fn test_ib_thread_metadata_does_not_end_sync() {
3742        let client = create_offline_sync_test_client().await;
3743        client
3744            .offline_sync_metrics
3745            .active
3746            .store(true, Ordering::Release);
3747
3748        let node = NodeBuilder::new("ib")
3749            .children([NodeBuilder::new("thread_metadata")
3750                .children([NodeBuilder::new("item").build()])
3751                .build()])
3752            .build();
3753
3754        client.process_node(Arc::new(node)).await;
3755        assert!(
3756            client.offline_sync_metrics.active.load(Ordering::Acquire),
3757            "<ib><thread_metadata> should NOT end offline sync"
3758        );
3759    }
3760
3761    #[tokio::test]
3762    async fn test_ib_edge_routing_does_not_end_sync() {
3763        let client = create_offline_sync_test_client().await;
3764        client
3765            .offline_sync_metrics
3766            .active
3767            .store(true, Ordering::Release);
3768
3769        let node = NodeBuilder::new("ib")
3770            .children([NodeBuilder::new("edge_routing")
3771                .children([NodeBuilder::new("routing_info")
3772                    .bytes(vec![1, 2, 3])
3773                    .build()])
3774                .build()])
3775            .build();
3776
3777        client.process_node(Arc::new(node)).await;
3778        assert!(
3779            client.offline_sync_metrics.active.load(Ordering::Acquire),
3780            "<ib><edge_routing> should NOT end offline sync"
3781        );
3782    }
3783
3784    #[tokio::test]
3785    async fn test_ib_dirty_does_not_end_sync() {
3786        let client = create_offline_sync_test_client().await;
3787        client
3788            .offline_sync_metrics
3789            .active
3790            .store(true, Ordering::Release);
3791
3792        let node = NodeBuilder::new("ib")
3793            .children([NodeBuilder::new("dirty")
3794                .attr("type", "groups")
3795                .attr("timestamp", "1234")
3796                .build()])
3797            .build();
3798
3799        client.process_node(Arc::new(node)).await;
3800        assert!(
3801            client.offline_sync_metrics.active.load(Ordering::Acquire),
3802            "<ib><dirty> should NOT end offline sync"
3803        );
3804    }
3805
3806    #[tokio::test]
3807    async fn test_ib_offline_child_ends_sync() {
3808        let client = create_offline_sync_test_client().await;
3809        client
3810            .offline_sync_metrics
3811            .active
3812            .store(true, Ordering::Release);
3813        client
3814            .offline_sync_metrics
3815            .total_messages
3816            .store(301, Ordering::Release);
3817
3818        let node = NodeBuilder::new("ib")
3819            .children([NodeBuilder::new("offline").attr("count", "301").build()])
3820            .build();
3821
3822        client.process_node(Arc::new(node)).await;
3823        assert!(
3824            !client.offline_sync_metrics.active.load(Ordering::Acquire),
3825            "<ib><offline count='301'/> should end offline sync"
3826        );
3827    }
3828
3829    #[tokio::test]
3830    async fn test_ib_offline_preview_starts_sync() {
3831        let client = create_offline_sync_test_client().await;
3832
3833        let node = NodeBuilder::new("ib")
3834            .children([NodeBuilder::new("offline_preview")
3835                .attr("count", "301")
3836                .attr("message", "168")
3837                .attr("notification", "62")
3838                .attr("receipt", "68")
3839                .attr("appdata", "0")
3840                .build()])
3841            .build();
3842
3843        client.process_node(Arc::new(node)).await;
3844        assert!(
3845            client.offline_sync_metrics.active.load(Ordering::Acquire),
3846            "offline_preview with count>0 should activate sync"
3847        );
3848        assert_eq!(
3849            client
3850                .offline_sync_metrics
3851                .total_messages
3852                .load(Ordering::Acquire),
3853            301
3854        );
3855    }
3856
3857    #[tokio::test]
3858    async fn test_offline_message_increments_processed() {
3859        let client = create_offline_sync_test_client().await;
3860        client
3861            .offline_sync_metrics
3862            .active
3863            .store(true, Ordering::Release);
3864        client
3865            .offline_sync_metrics
3866            .total_messages
3867            .store(100, Ordering::Release);
3868
3869        let node = NodeBuilder::new("message")
3870            .attr("offline", "1")
3871            .attr("from", "5551234567@s.whatsapp.net")
3872            .attr("id", "TEST123")
3873            .attr("t", "1772884671")
3874            .attr("type", "text")
3875            .build();
3876
3877        client.process_node(Arc::new(node)).await;
3878        assert_eq!(
3879            client
3880                .offline_sync_metrics
3881                .processed_messages
3882                .load(Ordering::Acquire),
3883            1,
3884            "offline message should increment processed count"
3885        );
3886    }
3887}