whatsapp_rust/
client.rs

1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::handshake;
8use crate::lid_pn_cache::LidPnCache;
9use crate::pair;
10use anyhow::{Result, anyhow};
11use dashmap::DashMap;
12use indexmap::IndexMap;
13use moka::future::Cache;
14use tokio::sync::watch;
15use wacore::xml::DisplayableNode;
16use wacore_binary::builder::NodeBuilder;
17use wacore_binary::jid::JidExt;
18use wacore_binary::node::Node;
19
20use crate::appstate_sync::AppStateProcessor;
21use crate::jid_utils::server_jid;
22use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
23use crate::types::enc_handler::EncHandler;
24use crate::types::events::{ConnectFailureReason, Event};
25
26use log::{debug, error, info, warn};
27
28use rand::RngCore;
29use scopeguard;
30use std::collections::{HashMap, HashSet};
31use wacore_binary::jid::Jid;
32
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
35
36use thiserror::Error;
37use tokio::sync::{Mutex, Notify, OnceCell, RwLock, mpsc};
38use tokio::time::{Duration, sleep};
39use wacore::appstate::patch_decode::WAPatchName;
40use wacore::client::context::GroupInfo;
41use waproto::whatsapp as wa;
42
43use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
44use crate::sync_task::MajorSyncTask;
45
46const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
47
48const MAX_POOLED_BUFFER_CAP: usize = 512 * 1024;
49
50#[derive(Debug, Error)]
51pub enum ClientError {
52    #[error("client is not connected")]
53    NotConnected,
54    #[error("socket error: {0}")]
55    Socket(#[from] SocketError),
56    #[error("encrypt/send error: {0}")]
57    EncryptSend(#[from] EncryptSendError),
58    #[error("client is already connected")]
59    AlreadyConnected,
60    #[error("client is not logged in")]
61    NotLoggedIn,
62}
63
64/// Key for looking up recent messages for retry functionality.
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct RecentMessageKey {
67    pub to: Jid,
68    pub id: String,
69}
70
71pub struct Client {
72    pub(crate) core: wacore::client::CoreClient,
73
74    pub(crate) persistence_manager: Arc<PersistenceManager>,
75    pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
76
77    pub(crate) is_logged_in: Arc<AtomicBool>,
78    pub(crate) is_connecting: Arc<AtomicBool>,
79    pub(crate) is_running: Arc<AtomicBool>,
80    pub(crate) shutdown_notifier: Arc<Notify>,
81
82    pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
83    pub(crate) transport_events:
84        Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
85    pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
86    pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
87
88    pub(crate) response_waiters:
89        Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<wacore_binary::Node>>>>,
90    pub(crate) unique_id: String,
91    pub(crate) id_counter: Arc<AtomicU64>,
92
93    /// Per-device session locks for Signal protocol operations.
94    /// Prevents race conditions when multiple messages from the same sender
95    /// are processed concurrently across different chats.
96    /// Keys are Signal protocol address strings (e.g., "user@s.whatsapp.net:0")
97    /// to match the SignalProtocolStoreAdapter's internal locking.
98    pub(crate) session_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
99
100    /// Per-chat message queues for sequential message processing.
101    /// Prevents race conditions where a later message is processed before
102    /// the PreKey message that establishes the Signal session.
103    pub(crate) message_queues: Cache<String, mpsc::Sender<Arc<Node>>>,
104
105    /// Cache for LID to Phone Number mappings (bidirectional).
106    /// When we receive a message with sender_lid/sender_pn attributes, we store the mapping here.
107    /// This allows us to reuse existing LID-based sessions when sending replies.
108    /// The cache is backed by persistent storage and warmed up on client initialization.
109    pub(crate) lid_pn_cache: Arc<LidPnCache>,
110
111    /// Per-chat mutex for serializing message enqueue operations.
112    /// This ensures messages are enqueued in the order they arrive,
113    /// preventing race conditions during queue initialization.
114    pub(crate) message_enqueue_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
115
116    pub group_cache: OnceCell<Cache<Jid, GroupInfo>>,
117    pub device_cache: OnceCell<Cache<Jid, Vec<Jid>>>,
118
119    pub(crate) retried_group_messages: Cache<String, ()>,
120    pub(crate) expected_disconnect: Arc<AtomicBool>,
121
122    /// Connection generation counter - incremented on each new connection.
123    /// Used to detect stale post-login tasks from previous connections.
124    pub(crate) connection_generation: Arc<AtomicU64>,
125
126    /// Cache for recent messages (serialized bytes) for retry functionality.
127    /// Uses moka cache with TTL and max capacity for automatic eviction.
128    pub(crate) recent_messages: Cache<RecentMessageKey, Vec<u8>>,
129
130    pub(crate) pending_retries: Arc<Mutex<HashSet<String>>>,
131
132    /// Track retry attempts per message to prevent infinite retry loops.
133    /// Key: "{chat}:{msg_id}:{sender}", Value: retry count
134    /// Matches WhatsApp Web's MAX_RETRY = 5 behavior.
135    pub(crate) message_retry_counts: Cache<String, u8>,
136
137    pub enable_auto_reconnect: Arc<AtomicBool>,
138    pub auto_reconnect_errors: Arc<AtomicU32>,
139    pub last_successful_connect: Arc<Mutex<Option<chrono::DateTime<chrono::Utc>>>>,
140
141    pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
142
143    pub(crate) app_state_processor: OnceCell<AppStateProcessor>,
144    pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, std::time::Instant>>>,
145    pub(crate) initial_keys_synced_notifier: Arc<Notify>,
146    pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
147
148    /// Notifier for when offline sync (ib offline stanza) is received.
149    /// WhatsApp Web waits for this before sending passive tasks (prekey upload, active IQ, presence).
150    pub(crate) offline_sync_notifier: Arc<Notify>,
151    /// Flag indicating offline sync has completed (received ib offline stanza).
152    pub(crate) offline_sync_completed: Arc<AtomicBool>,
153    /// Notifier for when the noise socket is established (before login).
154    /// Use this to wait for the socket to be ready for sending messages.
155    pub(crate) socket_ready_notifier: Arc<Notify>,
156    /// Notifier for when the client is fully connected and logged in.
157    /// Triggered after Event::Connected is dispatched.
158    pub(crate) connected_notifier: Arc<Notify>,
159    pub(crate) major_sync_task_sender: mpsc::Sender<MajorSyncTask>,
160    pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<watch::Sender<()>>>>,
161
162    /// State machine for pair code authentication flow.
163    /// Tracks the pending pair code request and ephemeral keys.
164    pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
165
166    pub(crate) send_buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
167
168    /// Custom handlers for encrypted message types
169    pub custom_enc_handlers: Arc<DashMap<String, Arc<dyn EncHandler>>>,
170
171    /// Cache for pending PDO (Peer Data Operation) requests.
172    /// Maps message cache keys (chat:id) to pending request info.
173    pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
174
175    /// LRU cache for device registry (matches WhatsApp Web's 5000 entry limit).
176    /// Maps user ID to DeviceListRecord for fast device existence checks.
177    /// Backed by persistent storage.
178    pub(crate) device_registry_cache: Cache<String, wacore::store::traits::DeviceListRecord>,
179
180    /// Router for dispatching stanzas to their appropriate handlers
181    pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
182
183    /// Whether to send ACKs synchronously or in a background task
184    pub(crate) synchronous_ack: bool,
185
186    /// HTTP client for making HTTP requests (media upload/download, version fetching)
187    pub http_client: Arc<dyn crate::http::HttpClient>,
188
189    /// Version override for testing or manual specification
190    pub(crate) override_version: Option<(u32, u32, u32)>,
191}
192
193impl Client {
194    pub async fn new(
195        persistence_manager: Arc<PersistenceManager>,
196        transport_factory: Arc<dyn crate::transport::TransportFactory>,
197        http_client: Arc<dyn crate::http::HttpClient>,
198        override_version: Option<(u32, u32, u32)>,
199    ) -> (Arc<Self>, mpsc::Receiver<MajorSyncTask>) {
200        let mut unique_id_bytes = [0u8; 2];
201        rand::rng().fill_bytes(&mut unique_id_bytes);
202
203        let device_snapshot = persistence_manager.get_device_snapshot().await;
204        let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
205
206        let (tx, rx) = mpsc::channel(32);
207
208        let this = Self {
209            core,
210            persistence_manager: persistence_manager.clone(),
211            media_conn: Arc::new(RwLock::new(None)),
212            is_logged_in: Arc::new(AtomicBool::new(false)),
213            is_connecting: Arc::new(AtomicBool::new(false)),
214            is_running: Arc::new(AtomicBool::new(false)),
215            shutdown_notifier: Arc::new(Notify::new()),
216
217            transport: Arc::new(Mutex::new(None)),
218            transport_events: Arc::new(Mutex::new(None)),
219            transport_factory,
220            noise_socket: Arc::new(Mutex::new(None)),
221
222            response_waiters: Arc::new(Mutex::new(HashMap::new())),
223            unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
224            id_counter: Arc::new(AtomicU64::new(0)),
225
226            session_locks: Cache::builder()
227                .time_to_live(Duration::from_secs(300)) // 5 minute TTL
228                .max_capacity(10_000) // Limit to 10k concurrent sessions
229                .build(),
230            message_queues: Cache::builder()
231                .time_to_live(Duration::from_secs(300)) // Idle queues expire after 5 mins
232                .max_capacity(10_000) // Limit to 10k concurrent chats
233                .build(),
234            lid_pn_cache: Arc::new(LidPnCache::new()),
235            message_enqueue_locks: Cache::builder()
236                .time_to_live(Duration::from_secs(300))
237                .max_capacity(10_000)
238                .build(),
239            group_cache: OnceCell::new(),
240            device_cache: OnceCell::new(),
241            retried_group_messages: Cache::builder()
242                .time_to_live(Duration::from_secs(300))
243                .max_capacity(2_000)
244                .build(),
245
246            expected_disconnect: Arc::new(AtomicBool::new(false)),
247            connection_generation: Arc::new(AtomicU64::new(0)),
248
249            // Recent messages cache for retry functionality
250            // TTL of 5 minutes (retries don't happen after that)
251            // Max 1000 messages to bound memory usage
252            recent_messages: Cache::builder()
253                .time_to_live(Duration::from_secs(300))
254                .max_capacity(1_000)
255                .build(),
256
257            pending_retries: Arc::new(Mutex::new(HashSet::new())),
258
259            // Retry count tracking cache for preventing infinite retry loops.
260            // TTL of 5 minutes to match retry functionality, max 5000 entries.
261            message_retry_counts: Cache::builder()
262                .time_to_live(Duration::from_secs(300))
263                .max_capacity(5_000)
264                .build(),
265
266            enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
267            auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
268            last_successful_connect: Arc::new(Mutex::new(None)),
269
270            needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
271
272            app_state_processor: OnceCell::new(),
273            app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
274            initial_keys_synced_notifier: Arc::new(Notify::new()),
275            initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
276            offline_sync_notifier: Arc::new(Notify::new()),
277            offline_sync_completed: Arc::new(AtomicBool::new(false)),
278            socket_ready_notifier: Arc::new(Notify::new()),
279            connected_notifier: Arc::new(Notify::new()),
280            major_sync_task_sender: tx,
281            pairing_cancellation_tx: Arc::new(Mutex::new(None)),
282            pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
283            send_buffer_pool: Arc::new(Mutex::new(Vec::with_capacity(4))),
284            custom_enc_handlers: Arc::new(DashMap::new()),
285            pdo_pending_requests: crate::pdo::new_pdo_cache(),
286            device_registry_cache: Cache::builder()
287                .max_capacity(5_000) // Match WhatsApp Web's 5000 entry limit
288                .time_to_live(Duration::from_secs(3600)) // 1 hour TTL
289                .build(),
290            stanza_router: Self::create_stanza_router(),
291            synchronous_ack: false,
292            http_client,
293            override_version,
294        };
295
296        let arc = Arc::new(this);
297
298        // Warm up the LID-PN cache from persistent storage
299        let warm_up_arc = arc.clone();
300        tokio::spawn(async move {
301            if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
302                warn!("Failed to warm up LID-PN cache: {e}");
303            }
304        });
305
306        // Start background task to clean up stale device registry entries
307        let cleanup_arc = arc.clone();
308        tokio::spawn(async move {
309            cleanup_arc.device_registry_cleanup_loop().await;
310        });
311
312        (arc, rx)
313    }
314
315    pub(crate) async fn get_group_cache(&self) -> &Cache<Jid, GroupInfo> {
316        self.group_cache
317            .get_or_init(|| async {
318                info!("Initializing Group Cache for the first time.");
319                Cache::builder()
320                    .time_to_live(Duration::from_secs(3600))
321                    .max_capacity(1_000)
322                    .build()
323            })
324            .await
325    }
326
327    pub(crate) async fn get_device_cache(&self) -> &Cache<Jid, Vec<Jid>> {
328        self.device_cache
329            .get_or_init(|| async {
330                info!("Initializing Device Cache for the first time.");
331                Cache::builder()
332                    .time_to_live(Duration::from_secs(3600))
333                    .max_capacity(5_000)
334                    .build()
335            })
336            .await
337    }
338
339    pub(crate) async fn get_app_state_processor(&self) -> &AppStateProcessor {
340        self.app_state_processor
341            .get_or_init(|| async {
342                info!("Initializing AppStateProcessor for the first time.");
343                AppStateProcessor::new(self.persistence_manager.backend())
344            })
345            .await
346    }
347
348    /// Create and configure the stanza router with all the handlers.
349    fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
350        use crate::handlers::{
351            basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
352            ib::IbHandler,
353            iq::IqHandler,
354            message::MessageHandler,
355            notification::NotificationHandler,
356            receipt::ReceiptHandler,
357            router::StanzaRouter,
358            unimplemented::UnimplementedHandler,
359        };
360
361        let mut router = StanzaRouter::new();
362
363        // Register all handlers
364        router.register(Arc::new(MessageHandler));
365        router.register(Arc::new(ReceiptHandler));
366        router.register(Arc::new(IqHandler));
367        router.register(Arc::new(SuccessHandler));
368        router.register(Arc::new(FailureHandler));
369        router.register(Arc::new(StreamErrorHandler));
370        router.register(Arc::new(IbHandler));
371        router.register(Arc::new(NotificationHandler));
372        router.register(Arc::new(AckHandler));
373
374        // Register unimplemented handlers
375        router.register(Arc::new(UnimplementedHandler::for_call()));
376        router.register(Arc::new(UnimplementedHandler::for_presence()));
377        router.register(Arc::new(UnimplementedHandler::for_chatstate()));
378
379        router
380    }
381
382    pub async fn run(self: &Arc<Self>) {
383        if self.is_running.swap(true, Ordering::SeqCst) {
384            warn!("Client `run` method called while already running.");
385            return;
386        }
387        while self.is_running.load(Ordering::Relaxed) {
388            self.expected_disconnect.store(false, Ordering::Relaxed);
389
390            if self.connect().await.is_err() {
391                error!("Failed to connect, will retry...");
392            } else {
393                if self.read_messages_loop().await.is_err() {
394                    warn!(
395                        "Message loop exited with an error. Will attempt to reconnect if enabled."
396                    );
397                } else if self.expected_disconnect.load(Ordering::Relaxed) {
398                    debug!("Message loop exited gracefully (expected disconnect).");
399                } else {
400                    info!("Message loop exited gracefully.");
401                }
402
403                self.cleanup_connection_state().await;
404            }
405
406            if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
407                info!("Auto-reconnect disabled, shutting down.");
408                self.is_running.store(false, Ordering::Relaxed);
409                break;
410            }
411
412            // If this was an expected disconnect (e.g., 515 after pairing), reconnect immediately
413            if self.expected_disconnect.load(Ordering::Relaxed) {
414                self.auto_reconnect_errors.store(0, Ordering::Relaxed);
415                info!("Expected disconnect (e.g., 515), reconnecting immediately...");
416                continue;
417            }
418
419            let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
420            let delay_secs = u64::from(error_count * 2).min(30);
421            let delay = Duration::from_secs(delay_secs);
422            info!(
423                "Will attempt to reconnect in {:?} (attempt {})",
424                delay,
425                error_count + 1
426            );
427            sleep(delay).await;
428        }
429        info!("Client run loop has shut down.");
430    }
431
432    pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
433        if self.is_connecting.swap(true, Ordering::SeqCst) {
434            return Err(ClientError::AlreadyConnected.into());
435        }
436
437        let _guard = scopeguard::guard((), |_| {
438            self.is_connecting.store(false, Ordering::Relaxed);
439        });
440
441        if self.is_connected() {
442            return Err(ClientError::AlreadyConnected.into());
443        }
444
445        // Reset login state for new connection attempt. This ensures that
446        // handle_success will properly process the <success> stanza even if
447        // a previous connection's post-login task bailed out early.
448        self.is_logged_in.store(false, Ordering::Relaxed);
449        self.offline_sync_completed.store(false, Ordering::Relaxed);
450
451        let version_future = crate::version::resolve_and_update_version(
452            &self.persistence_manager,
453            &self.http_client,
454            self.override_version,
455        );
456
457        let transport_future = self.transport_factory.create_transport();
458
459        info!("Connecting WebSocket and fetching latest client version in parallel...");
460        let (version_result, transport_result) = tokio::join!(version_future, transport_future);
461
462        version_result.map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
463        let (transport, mut transport_events) = transport_result?;
464        info!("Version fetch and transport connection established.");
465
466        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
467
468        let noise_socket =
469            handshake::do_handshake(&device_snapshot, transport.clone(), &mut transport_events)
470                .await?;
471
472        *self.transport.lock().await = Some(transport);
473        *self.transport_events.lock().await = Some(transport_events);
474        *self.noise_socket.lock().await = Some(noise_socket);
475
476        // Notify waiters that socket is ready (before login)
477        self.socket_ready_notifier.notify_waiters();
478
479        let client_clone = self.clone();
480        tokio::spawn(async move { client_clone.keepalive_loop().await });
481
482        Ok(())
483    }
484
485    pub async fn disconnect(&self) {
486        info!("Disconnecting client intentionally.");
487        self.expected_disconnect.store(true, Ordering::Relaxed);
488        self.is_running.store(false, Ordering::Relaxed);
489        self.shutdown_notifier.notify_waiters();
490
491        if let Some(transport) = self.transport.lock().await.as_ref() {
492            transport.disconnect().await;
493        }
494        self.cleanup_connection_state().await;
495    }
496
497    async fn cleanup_connection_state(&self) {
498        self.is_logged_in.store(false, Ordering::Relaxed);
499        *self.transport.lock().await = None;
500        *self.transport_events.lock().await = None;
501        *self.noise_socket.lock().await = None;
502        self.retried_group_messages.invalidate_all();
503        // Reset offline sync state for next connection
504        self.offline_sync_completed.store(false, Ordering::Relaxed);
505    }
506
507    async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
508        info!(target: "Client", "Starting message processing loop...");
509
510        let mut rx_guard = self.transport_events.lock().await;
511        let transport_events = rx_guard
512            .take()
513            .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
514        drop(rx_guard);
515
516        // Frame decoder to parse incoming data
517        let mut frame_decoder = wacore::framing::FrameDecoder::new();
518
519        loop {
520            tokio::select! {
521                    biased;
522                    _ = self.shutdown_notifier.notified() => {
523                        info!(target: "Client", "Shutdown signaled in message loop. Exiting message loop.");
524                        return Ok(());
525                    },
526                    event_result = transport_events.recv() => {
527                        match event_result {
528                            Ok(crate::transport::TransportEvent::DataReceived(data)) => {
529                                // Feed data into the frame decoder
530                                frame_decoder.feed(&data);
531
532                                // Process all complete frames
533                                // Note: Frame decryption must be sequential (noise protocol counter),
534                                // but we spawn node processing concurrently after decryption
535                                while let Some(encrypted_frame) = frame_decoder.decode_frame() {
536                                    // Decrypt the frame synchronously (required for noise counter ordering)
537                                    if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
538                                        // Handle critical nodes synchronously to avoid race conditions.
539                                        // <success> must be processed inline to ensure is_logged_in state
540                                        // is set before checking expected_disconnect or spawning other tasks.
541                                        let is_critical = matches!(node.tag.as_str(), "success" | "failure" | "stream:error");
542
543                                        if is_critical {
544                                            // Process critical nodes inline
545                                            self.process_decrypted_node(node).await;
546                                        } else {
547                                            // Spawn non-critical node processing as a separate task
548                                            // to allow concurrent handling (Signal protocol work, etc.)
549                                            let client = self.clone();
550                                            tokio::spawn(async move {
551                                                client.process_decrypted_node(node).await;
552                                            });
553                                        }
554                                    }
555
556                                    // Check if we should exit after processing (e.g., after 515 stream error)
557                                    if self.expected_disconnect.load(Ordering::Relaxed) {
558                                        info!(target: "Client", "Expected disconnect signaled during frame processing. Exiting message loop.");
559                                        return Ok(());
560                                    }
561                                }
562                            },
563                            Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
564                                self.cleanup_connection_state().await;
565                                 if !self.expected_disconnect.load(Ordering::Relaxed) {
566                                    self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
567                                    info!("Transport disconnected unexpectedly.");
568                                    return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
569                                } else {
570                                    info!("Transport disconnected as expected.");
571                                    return Ok(());
572                                }
573                            }
574                            Ok(crate::transport::TransportEvent::Connected) => {
575                                // Already handled during handshake, but could be useful for logging
576                                debug!("Transport connected event received");
577                            }
578                    }
579                }
580            }
581        }
582    }
583
584    /// Decrypt a frame and return the parsed node.
585    /// This must be called sequentially due to noise protocol counter requirements.
586    pub(crate) async fn decrypt_frame(
587        self: &Arc<Self>,
588        encrypted_frame: &bytes::Bytes,
589    ) -> Option<wacore_binary::node::Node> {
590        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
591        let noise_socket = match noise_socket_arc {
592            Some(s) => s,
593            None => {
594                log::error!("Cannot process frame: not connected (no noise socket)");
595                return None;
596            }
597        };
598
599        let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
600            Ok(p) => p,
601            Err(e) => {
602                log::error!(target: "Client", "Failed to decrypt frame: {e}");
603                return None;
604            }
605        };
606
607        let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
608            Ok(data) => data,
609            Err(e) => {
610                log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
611                return None;
612            }
613        };
614
615        match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
616            Ok(node_ref) => Some(node_ref.to_owned()),
617            Err(e) => {
618                log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
619                None
620            }
621        }
622    }
623
624    /// Process an already-decrypted node.
625    /// This can be spawned concurrently since it doesn't depend on noise protocol state.
626    /// The node is wrapped in Arc to avoid cloning when passing through handlers.
627    pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
628        // Wrap in Arc once - all handlers will share this same allocation
629        let node_arc = Arc::new(node);
630        self.process_node(node_arc).await;
631    }
632
633    /// Process a node wrapped in Arc. Handlers receive the Arc and can share/store it cheaply.
634    pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
635        use wacore::xml::DisplayableNode;
636
637        if node.tag.as_str() == "iq"
638            && let Some(sync_node) = node.get_optional_child("sync")
639            && let Some(collection_node) = sync_node.get_optional_child("collection")
640        {
641            let name = collection_node.attrs().string("name");
642            info!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
643        } else {
644            info!(target: "Client/Recv","{}", DisplayableNode(&node));
645        }
646
647        // Prepare deferred ACK cancellation flag (sent after dispatch unless cancelled)
648        let mut cancelled = false;
649
650        if node.tag.as_str() == "xmlstreamend" {
651            if self.expected_disconnect.load(Ordering::Relaxed) {
652                debug!(target: "Client", "Received <xmlstreamend/>, expected disconnect.");
653            } else {
654                warn!(target: "Client", "Received <xmlstreamend/>, treating as disconnect.");
655            }
656            self.shutdown_notifier.notify_waiters();
657            return;
658        }
659
660        if node.tag.as_str() == "iq" {
661            let id_opt = node.attrs.get("id");
662            if let Some(id) = id_opt {
663                let has_waiter = self.response_waiters.lock().await.contains_key(id.as_str());
664                if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
665                    return;
666                }
667            }
668        }
669
670        // Dispatch to appropriate handler using the router
671        // Clone Arc (cheap - just reference count) not the Node itself
672        if !self
673            .stanza_router
674            .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
675            .await
676        {
677            warn!(target: "Client", "Received unknown top-level node: {}", DisplayableNode(&node));
678        }
679
680        // Send the deferred ACK if applicable and not cancelled by handler
681        if self.should_ack(&node) && !cancelled {
682            self.maybe_deferred_ack(node).await;
683        }
684    }
685
686    /// Determine if a Node should be acknowledged with <ack/>.
687    fn should_ack(&self, node: &Node) -> bool {
688        matches!(
689            node.tag.as_str(),
690            "message" | "receipt" | "notification" | "call"
691        ) && node.attrs.contains_key("id")
692            && node.attrs.contains_key("from")
693    }
694
695    /// Possibly send a deferred ack: either immediately or via spawned task.
696    /// Handlers can cancel by setting `cancelled` to true.
697    /// Uses Arc<Node> to avoid cloning when spawning the async task.
698    async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
699        if self.synchronous_ack {
700            if let Err(e) = self.send_ack_for(&node).await {
701                warn!(target: "Client", "Failed to send ack: {e:?}");
702            }
703        } else {
704            let this = self.clone();
705            // Node is already in Arc - just clone the Arc (cheap), not the Node
706            tokio::spawn(async move {
707                if let Err(e) = this.send_ack_for(&node).await {
708                    warn!(target: "Client", "Failed to send ack: {e:?}");
709                }
710            });
711        }
712    }
713
714    /// Build and send an <ack/> node corresponding to the given stanza.
715    async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
716        let id = match node.attrs.get("id") {
717            Some(v) => v.clone(),
718            None => return Ok(()),
719        };
720        let from = match node.attrs.get("from") {
721            Some(v) => v.clone(),
722            None => return Ok(()),
723        };
724        let participant = node.attrs.get("participant").cloned();
725        let typ = if node.tag != "message" {
726            node.attrs.get("type").cloned()
727        } else {
728            None
729        };
730        let mut attrs = IndexMap::new();
731        attrs.insert("class".to_string(), node.tag.clone());
732        attrs.insert("id".to_string(), id);
733        attrs.insert("to".to_string(), from);
734        if let Some(p) = participant {
735            attrs.insert("participant".to_string(), p);
736        }
737        if let Some(t) = typ {
738            attrs.insert("type".to_string(), t);
739        }
740        let ack = Node {
741            tag: "ack".to_string(),
742            attrs,
743            content: None,
744        };
745        self.send_node(ack).await
746    }
747
748    pub(crate) async fn handle_unimplemented(&self, tag: &str) {
749        warn!(target: "Client", "TODO: Implement handler for <{tag}>");
750    }
751
752    pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
753        use crate::request::InfoQuery;
754
755        let tag = if passive { "passive" } else { "active" };
756
757        let query = InfoQuery::set(
758            "passive",
759            server_jid(),
760            Some(wacore_binary::node::NodeContent::Nodes(vec![
761                NodeBuilder::new(tag).build(),
762            ])),
763        );
764
765        self.send_iq(query).await.map(|_| ())
766    }
767
768    pub async fn clean_dirty_bits(
769        &self,
770        type_: &str,
771        timestamp: Option<&str>,
772    ) -> Result<(), ClientError> {
773        let id = self.generate_request_id();
774        let mut clean_builder = NodeBuilder::new("clean").attr("type", type_);
775        if let Some(ts) = timestamp {
776            clean_builder = clean_builder.attr("timestamp", ts);
777        }
778
779        let node = NodeBuilder::new("iq")
780            .attr("to", server_jid().to_string())
781            .attr("type", "set")
782            .attr("xmlns", "urn:xmpp:whatsapp:dirty")
783            .attr("id", id)
784            .children([clean_builder.build()])
785            .build();
786
787        self.send_node(node).await
788    }
789
790    pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
791        use crate::request::InfoQuery;
792
793        debug!(target: "Client", "Fetching properties (props)...");
794
795        let props_node = NodeBuilder::new("props")
796            .attr("protocol", "2")
797            .attr("hash", "") // TODO: load hash from persistence
798            .build();
799
800        let iq = InfoQuery::get(
801            "w",
802            server_jid(),
803            Some(wacore_binary::node::NodeContent::Nodes(vec![props_node])),
804        );
805
806        self.send_iq(iq).await.map(|_| ())
807    }
808
809    pub async fn fetch_privacy_settings(&self) -> Result<(), crate::request::IqError> {
810        use crate::request::InfoQuery;
811
812        debug!(target: "Client", "Fetching privacy settings...");
813
814        let iq = InfoQuery::get(
815            "privacy",
816            server_jid(),
817            Some(wacore_binary::node::NodeContent::Nodes(vec![
818                NodeBuilder::new("privacy").build(),
819            ])),
820        );
821
822        self.send_iq(iq).await.map(|_| ())
823    }
824
825    pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
826        use crate::request::InfoQuery;
827
828        debug!(target: "Client", "Sending digest key bundle...");
829
830        let digest_node = NodeBuilder::new("digest").build();
831        let iq = InfoQuery::get(
832            "encrypt",
833            server_jid(),
834            Some(wacore_binary::node::NodeContent::Nodes(vec![digest_node])),
835        );
836
837        self.send_iq(iq).await.map(|_| ())
838    }
839
840    pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
841        // Skip processing if an expected disconnect is pending (e.g., 515 received).
842        // This prevents race conditions where a spawned success handler runs after
843        // cleanup_connection_state has already reset is_logged_in.
844        if self.expected_disconnect.load(Ordering::Relaxed) {
845            debug!(target: "Client", "Ignoring <success> stanza: expected disconnect pending");
846            return;
847        }
848
849        // Guard against multiple <success> stanzas (WhatsApp may send more than one during
850        // routing/reconnection). Only process the first one per connection.
851        if self.is_logged_in.swap(true, Ordering::SeqCst) {
852            debug!(target: "Client", "Ignoring duplicate <success> stanza (already logged in)");
853            return;
854        }
855
856        // Increment connection generation to invalidate any stale post-login tasks
857        // from previous connections (e.g., during 515 reconnect cycles).
858        let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
859
860        info!(
861            "Successfully authenticated with WhatsApp servers! (gen={})",
862            current_generation
863        );
864        *self.last_successful_connect.lock().await = Some(chrono::Utc::now());
865        self.auto_reconnect_errors.store(0, Ordering::Relaxed);
866
867        if let Some(lid_str) = node.attrs.get("lid") {
868            if let Ok(lid) = lid_str.parse::<Jid>() {
869                let device_snapshot = self.persistence_manager.get_device_snapshot().await;
870                if device_snapshot.lid.as_ref() != Some(&lid) {
871                    info!(target: "Client", "Updating LID from server to '{lid}'");
872                    self.persistence_manager
873                        .process_command(DeviceCommand::SetLid(Some(lid)))
874                        .await;
875                }
876            } else {
877                warn!(target: "Client", "Failed to parse LID from success stanza: {lid_str}");
878            }
879        } else {
880            warn!(target: "Client", "LID not found in <success> stanza. Group messaging may fail.");
881        }
882
883        let client_clone = self.clone();
884        let task_generation = current_generation;
885        tokio::spawn(async move {
886            // Macro to check if this task is still valid (connection hasn't been replaced)
887            macro_rules! check_generation {
888                () => {
889                    if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
890                    {
891                        debug!("Post-login task cancelled: connection generation changed");
892                        return;
893                    }
894                };
895            }
896
897            info!(target: "Client", "Starting post-login initialization sequence (gen={})...", task_generation);
898
899            let mut force_initial_sync = false;
900            let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
901            if device_snapshot.push_name.is_empty() {
902                const DEFAULT_PUSH_NAME: &str = "WhatsApp Rust";
903                warn!(
904                    target: "Client",
905                    "Push name is empty! Setting default to '{DEFAULT_PUSH_NAME}' to allow presence."
906                );
907                client_clone
908                    .persistence_manager
909                    .process_command(DeviceCommand::SetPushName(DEFAULT_PUSH_NAME.to_string()))
910                    .await;
911                force_initial_sync = true;
912            }
913
914            // Check connection before network operations.
915            // During pairing, a 515 disconnect happens quickly after success,
916            // so the socket may already be gone.
917            if !client_clone.is_connected() {
918                debug!(
919                    "Skipping post-login init: connection closed (likely pairing phase reconnect)"
920                );
921                return;
922            }
923
924            // === Establish session with primary phone for PDO ===
925            // This must happen BEFORE we exit passive mode (before offline messages arrive).
926            // PDO needs a session with device 0 to request decrypted content from our phone.
927            // Matches WhatsApp Web's bootstrapDeviceCapabilities() pattern.
928            check_generation!();
929            if let Err(e) = client_clone
930                .establish_primary_phone_session_immediate()
931                .await
932            {
933                warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
934                // Don't fail login - PDO will retry via ensure_e2e_sessions fallback
935            }
936
937            // === Send active IQ ===
938            // The server sends <ib><offline count="X"/></ib> AFTER we exit passive mode.
939            // This matches WhatsApp Web's behavior: sendPassiveModeProtocol("active") first,
940            // then wait for offlineDeliveryEnd.
941            check_generation!();
942            if let Err(e) = client_clone.set_passive(false).await {
943                warn!("Failed to send post-connect active IQ: {e:?}");
944            }
945
946            // === Wait for offline sync to complete ===
947            // The server sends <ib><offline count="X"/></ib> after we exit passive mode.
948            // Use a timeout to handle cases where the server doesn't send offline ib
949            // (e.g., during initial pairing or if there are no offline messages).
950            const OFFLINE_SYNC_TIMEOUT_SECS: u64 = 5;
951
952            if !client_clone.offline_sync_completed.load(Ordering::Relaxed) {
953                info!(target: "Client", "Waiting for offline sync to complete (up to {}s)...", OFFLINE_SYNC_TIMEOUT_SECS);
954                let wait_result = tokio::time::timeout(
955                    Duration::from_secs(OFFLINE_SYNC_TIMEOUT_SECS),
956                    client_clone.offline_sync_notifier.notified(),
957                )
958                .await;
959
960                // Check if connection was replaced while waiting
961                check_generation!();
962
963                if wait_result.is_err() {
964                    info!(target: "Client", "Offline sync wait timed out, proceeding with passive tasks");
965                } else {
966                    info!(target: "Client", "Offline sync completed, proceeding with passive tasks");
967                }
968            }
969
970            // === Passive Tasks (mimics WhatsApp Web's PassiveTaskManager) ===
971            // These tasks run after offline delivery ends.
972
973            check_generation!();
974            if let Err(e) = client_clone.upload_pre_keys().await {
975                warn!("Failed to upload pre-keys during startup: {e:?}");
976            }
977
978            // Re-check connection and generation before sending presence
979            check_generation!();
980            if !client_clone.is_connected() {
981                debug!("Skipping presence: connection closed");
982                return;
983            }
984
985            // Send presence (like WhatsApp Web's sendPresenceAvailable after passive tasks)
986            if let Err(e) = client_clone.presence().set_available().await {
987                warn!("Failed to send initial presence: {e:?}");
988            } else {
989                info!("Initial presence sent successfully.");
990            }
991
992            // === End of Passive Tasks ===
993
994            check_generation!();
995
996            // Background initialization queries (can run in parallel, non-blocking)
997            let bg_client = client_clone.clone();
998            let bg_generation = task_generation;
999            tokio::spawn(async move {
1000                // Check connection and generation before starting background queries
1001                if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1002                    debug!("Skipping background init queries: connection generation changed");
1003                    return;
1004                }
1005                if !bg_client.is_connected() {
1006                    debug!("Skipping background init queries: connection closed");
1007                    return;
1008                }
1009
1010                info!(
1011                    target: "Client",
1012                    "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1013                );
1014
1015                let props_fut = bg_client.fetch_props();
1016                let binding = bg_client.blocking();
1017                let blocklist_fut = binding.get_blocklist();
1018                let privacy_fut = bg_client.fetch_privacy_settings();
1019                let digest_fut = bg_client.send_digest_key_bundle();
1020
1021                let (r_props, r_block, r_priv, r_digest) =
1022                    tokio::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1023
1024                if let Err(e) = r_props {
1025                    warn!("Background init: Failed to fetch props: {e:?}");
1026                }
1027                if let Err(e) = r_block {
1028                    warn!("Background init: Failed to fetch blocklist: {e:?}");
1029                }
1030                if let Err(e) = r_priv {
1031                    warn!("Background init: Failed to fetch privacy settings: {e:?}");
1032                }
1033                if let Err(e) = r_digest {
1034                    warn!("Background init: Failed to send digest: {e:?}");
1035                }
1036            });
1037
1038            client_clone
1039                .core
1040                .event_bus
1041                .dispatch(&Event::Connected(crate::types::events::Connected));
1042            client_clone.connected_notifier.notify_waiters();
1043
1044            check_generation!();
1045
1046            let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1047            if flag_set || force_initial_sync {
1048                info!(
1049                    target: "Client/AppState",
1050                    "Starting Initial App State Sync (flag_set={flag_set}, force={force_initial_sync})"
1051                );
1052
1053                if !client_clone
1054                    .initial_app_state_keys_received
1055                    .load(Ordering::Relaxed)
1056                {
1057                    info!(
1058                        target: "Client/AppState",
1059                        "Waiting up to 5s for app state keys..."
1060                    );
1061                    let _ = tokio::time::timeout(
1062                        Duration::from_secs(5),
1063                        client_clone.initial_keys_synced_notifier.notified(),
1064                    )
1065                    .await;
1066
1067                    // Check if connection was replaced while waiting
1068                    check_generation!();
1069                }
1070
1071                let sync_client = client_clone.clone();
1072                let sync_generation = task_generation;
1073                tokio::spawn(async move {
1074                    let names = [
1075                        WAPatchName::CriticalBlock,
1076                        WAPatchName::CriticalUnblockLow,
1077                        WAPatchName::RegularLow,
1078                        WAPatchName::RegularHigh,
1079                        WAPatchName::Regular,
1080                    ];
1081
1082                    for name in names {
1083                        // Check generation before each sync to avoid racing with new connections
1084                        if sync_client.connection_generation.load(Ordering::SeqCst)
1085                            != sync_generation
1086                        {
1087                            debug!("App state sync cancelled: connection generation changed");
1088                            return;
1089                        }
1090
1091                        if let Err(e) = sync_client.fetch_app_state_with_retry(name).await {
1092                            warn!("Failed to full sync app state {:?}: {e}", name);
1093                        }
1094                    }
1095
1096                    sync_client
1097                        .needs_initial_full_sync
1098                        .store(false, Ordering::Relaxed);
1099                    info!(target: "Client/AppState", "Initial App State Sync Completed.");
1100                });
1101            }
1102        });
1103    }
1104
1105    /// Handles incoming `<ack/>` stanzas by resolving pending response waiters.
1106    ///
1107    /// If an ack with an ID that matches a pending task in `response_waiters`,
1108    /// the task is resolved and the function returns `true`. Otherwise, returns `false`.
1109    pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1110        let id_opt = node.attrs.get("id").cloned();
1111        if let Some(id) = id_opt
1112            && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1113        {
1114            if waiter.send(node).is_err() {
1115                warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1116            }
1117            return true;
1118        }
1119        false
1120    }
1121
1122    async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1123        let mut attempt = 0u32;
1124        loop {
1125            attempt += 1;
1126            let res = self.process_app_state_sync_task(name, true).await;
1127            match res {
1128                Ok(()) => return Ok(()),
1129                Err(e) => {
1130                    let es = e.to_string();
1131                    if es.contains("app state key not found") && attempt == 1 {
1132                        if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
1133                            info!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
1134                            if tokio::time::timeout(
1135                                Duration::from_secs(10),
1136                                self.initial_keys_synced_notifier.notified(),
1137                            )
1138                            .await
1139                            .is_err()
1140                            {
1141                                warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
1142                            }
1143                        }
1144                        continue;
1145                    }
1146                    if es.contains("database is locked") && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
1147                        let backoff = Duration::from_millis(200 * attempt as u64 + 150);
1148                        warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
1149                        tokio::time::sleep(backoff).await;
1150                        continue;
1151                    }
1152                    return Err(e);
1153                }
1154            }
1155        }
1156    }
1157
1158    pub(crate) async fn process_app_state_sync_task(
1159        &self,
1160        name: WAPatchName,
1161        full_sync: bool,
1162    ) -> anyhow::Result<()> {
1163        let backend = self.persistence_manager.backend();
1164        let mut full_sync = full_sync;
1165
1166        let mut state = backend.get_version(name.as_str()).await?;
1167        if state.version == 0 {
1168            full_sync = true;
1169        }
1170
1171        let mut has_more = true;
1172        let want_snapshot = full_sync;
1173
1174        if has_more {
1175            debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
1176
1177            let mut collection_builder = NodeBuilder::new("collection")
1178                .attr("name", name.as_str())
1179                .attr(
1180                    "return_snapshot",
1181                    if want_snapshot { "true" } else { "false" },
1182                );
1183            if !want_snapshot {
1184                collection_builder = collection_builder.attr("version", state.version.to_string());
1185            }
1186            let sync_node = NodeBuilder::new("sync")
1187                .children([collection_builder.build()])
1188                .build();
1189            let iq = crate::request::InfoQuery {
1190                namespace: "w:sync:app:state",
1191                query_type: crate::request::InfoQueryType::Set,
1192                to: server_jid(),
1193                target: None,
1194                id: None,
1195                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
1196                timeout: None,
1197            };
1198
1199            let resp = self.send_iq(iq).await?;
1200            debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
1201
1202            let _decode_start = std::time::Instant::now();
1203            let pre_downloaded_snapshot: Option<Vec<u8>> =
1204                match wacore::appstate::patch_decode::parse_patch_list(&resp) {
1205                    Ok(pl) => {
1206                        debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={}", name, pl.snapshot_ref.is_some(), pl.has_more_patches);
1207                        if let Some(ext) = &pl.snapshot_ref {
1208                            match self.download(ext).await {
1209                                Ok(bytes) => Some(bytes),
1210                                Err(e) => {
1211                                    warn!("Failed to download external snapshot: {e}");
1212                                    None
1213                                }
1214                            }
1215                        } else {
1216                            None
1217                        }
1218                    }
1219                    Err(_) => None,
1220                };
1221
1222            let download = |_: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
1223                if let Some(bytes) = &pre_downloaded_snapshot {
1224                    Ok(bytes.clone())
1225                } else {
1226                    Err(anyhow::anyhow!("snapshot not pre-downloaded"))
1227                }
1228            };
1229
1230            let proc = self.get_app_state_processor().await;
1231            let (mutations, new_state, list) =
1232                proc.decode_patch_list(&resp, &download, true).await?;
1233            let decode_elapsed = _decode_start.elapsed();
1234            if decode_elapsed.as_millis() > 500 {
1235                debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
1236            }
1237
1238            let missing = match proc.get_missing_key_ids(&list).await {
1239                Ok(v) => v,
1240                Err(e) => {
1241                    warn!("Failed to get missing key IDs for {:?}: {}", name, e);
1242                    Vec::new()
1243                }
1244            };
1245            if !missing.is_empty() {
1246                let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
1247                let mut guard = self.app_state_key_requests.lock().await;
1248                let now = std::time::Instant::now();
1249                for key_id in missing {
1250                    let hex_id = hex::encode(&key_id);
1251                    let should = guard
1252                        .get(&hex_id)
1253                        .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
1254                        .unwrap_or(true);
1255                    if should {
1256                        guard.insert(hex_id, now);
1257                        to_request.push(key_id);
1258                    }
1259                }
1260                drop(guard);
1261                if !to_request.is_empty() {
1262                    self.request_app_state_keys(&to_request).await;
1263                }
1264            }
1265
1266            for m in mutations {
1267                debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
1268                self.dispatch_app_state_mutation(&m, full_sync).await;
1269            }
1270
1271            state = new_state;
1272            has_more = list.has_more_patches;
1273            debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more}", name);
1274        }
1275
1276        backend.set_version(name.as_str(), state.clone()).await?;
1277
1278        debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
1279        Ok(())
1280    }
1281
1282    #[allow(dead_code)]
1283    async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
1284        if raw_key_ids.is_empty() {
1285            return;
1286        }
1287        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1288        let own_jid = match device_snapshot.pn.clone() {
1289            Some(j) => j,
1290            None => return,
1291        };
1292        let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
1293            .iter()
1294            .map(|k| wa::message::AppStateSyncKeyId {
1295                key_id: Some(k.clone()),
1296            })
1297            .collect();
1298        let msg = wa::Message {
1299            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1300                r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
1301                app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
1302                ..Default::default()
1303            })),
1304            ..Default::default()
1305        };
1306        if let Err(e) = self
1307            .send_message_impl(
1308                own_jid,
1309                &msg,
1310                Some(self.generate_message_id().await),
1311                true,
1312                false,
1313                None,
1314            )
1315            .await
1316        {
1317            warn!("Failed to send app state key request: {e}");
1318        }
1319    }
1320
1321    #[allow(dead_code)]
1322    async fn dispatch_app_state_mutation(
1323        &self,
1324        m: &crate::appstate_sync::Mutation,
1325        full_sync: bool,
1326    ) {
1327        use wacore::types::events::{
1328            ArchiveUpdate, ContactUpdate, Event, MarkChatAsReadUpdate, MuteUpdate, PinUpdate,
1329        };
1330        if m.operation != wa::syncd_mutation::SyncdOperation::Set {
1331            return;
1332        }
1333        if m.index.is_empty() {
1334            return;
1335        }
1336        let kind = &m.index[0];
1337        let ts = m
1338            .action_value
1339            .as_ref()
1340            .and_then(|v| v.timestamp)
1341            .unwrap_or(0);
1342        let time = chrono::DateTime::from_timestamp_millis(ts).unwrap_or_else(chrono::Utc::now);
1343        let jid = if m.index.len() > 1 {
1344            m.index[1].parse().unwrap_or_default()
1345        } else {
1346            Jid::default()
1347        };
1348        match kind.as_str() {
1349            "setting_pushName" => {
1350                if let Some(val) = &m.action_value
1351                    && let Some(act) = &val.push_name_setting
1352                    && let Some(new_name) = &act.name
1353                {
1354                    let new_name = new_name.clone();
1355                    let bus = self.core.event_bus.clone();
1356
1357                    let snapshot = self.persistence_manager.get_device_snapshot().await;
1358                    let old = snapshot.push_name.clone();
1359                    if old != new_name {
1360                        info!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
1361                        self.persistence_manager
1362                            .process_command(DeviceCommand::SetPushName(new_name.clone()))
1363                            .await;
1364                        bus.dispatch(&Event::SelfPushNameUpdated(
1365                            crate::types::events::SelfPushNameUpdated {
1366                                from_server: true,
1367                                old_name: old,
1368                                new_name: new_name.clone(),
1369                            },
1370                        ));
1371                    } else {
1372                        debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
1373                    }
1374                }
1375            }
1376            "mute" => {
1377                if let Some(val) = &m.action_value
1378                    && let Some(act) = &val.mute_action
1379                {
1380                    self.core.event_bus.dispatch(&Event::MuteUpdate(MuteUpdate {
1381                        jid,
1382                        timestamp: time,
1383                        action: Box::new(*act),
1384                        from_full_sync: full_sync,
1385                    }));
1386                }
1387            }
1388            "pin" | "pin_v1" => {
1389                if let Some(val) = &m.action_value
1390                    && let Some(act) = &val.pin_action
1391                {
1392                    self.core.event_bus.dispatch(&Event::PinUpdate(PinUpdate {
1393                        jid,
1394                        timestamp: time,
1395                        action: Box::new(*act),
1396                        from_full_sync: full_sync,
1397                    }));
1398                }
1399            }
1400            "archive" => {
1401                if let Some(val) = &m.action_value
1402                    && let Some(act) = &val.archive_chat_action
1403                {
1404                    self.core
1405                        .event_bus
1406                        .dispatch(&Event::ArchiveUpdate(ArchiveUpdate {
1407                            jid,
1408                            timestamp: time,
1409                            action: Box::new(act.clone()),
1410                            from_full_sync: full_sync,
1411                        }));
1412                }
1413            }
1414            "contact" => {
1415                if let Some(val) = &m.action_value
1416                    && let Some(act) = &val.contact_action
1417                {
1418                    self.core
1419                        .event_bus
1420                        .dispatch(&Event::ContactUpdate(ContactUpdate {
1421                            jid,
1422                            timestamp: time,
1423                            action: Box::new(act.clone()),
1424                            from_full_sync: full_sync,
1425                        }));
1426                }
1427            }
1428            "mark_chat_as_read" | "markChatAsRead" => {
1429                if let Some(val) = &m.action_value
1430                    && let Some(act) = &val.mark_chat_as_read_action
1431                {
1432                    self.core.event_bus.dispatch(&Event::MarkChatAsReadUpdate(
1433                        MarkChatAsReadUpdate {
1434                            jid,
1435                            timestamp: time,
1436                            action: Box::new(act.clone()),
1437                            from_full_sync: full_sync,
1438                        },
1439                    ));
1440                }
1441            }
1442            _ => {}
1443        }
1444    }
1445
1446    async fn expect_disconnect(&self) {
1447        self.expected_disconnect.store(true, Ordering::Relaxed);
1448    }
1449
1450    pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
1451        self.is_logged_in.store(false, Ordering::Relaxed);
1452
1453        let mut attrs = node.attrs();
1454        let code = attrs.optional_string("code").unwrap_or("");
1455        let conflict_type = node
1456            .get_optional_child("conflict")
1457            .map(|n| n.attrs().optional_string("type").unwrap_or("").to_string())
1458            .unwrap_or_default();
1459
1460        match (code, conflict_type.as_str()) {
1461            ("515", _) => {
1462                // 515 is expected during registration/pairing phase - server closes stream after pairing
1463                info!(target: "Client", "Got 515 stream error, server is closing stream. Will auto-reconnect.");
1464                self.expect_disconnect().await;
1465                // Proactively disconnect transport since server may not close the connection
1466                // Clone the transport Arc before spawning to avoid holding the lock
1467                let transport_opt = self.transport.lock().await.clone();
1468                if let Some(transport) = transport_opt {
1469                    // Spawn disconnect in background so we don't block the message loop
1470                    tokio::spawn(async move {
1471                        info!(target: "Client", "Disconnecting transport after 515");
1472                        transport.disconnect().await;
1473                    });
1474                }
1475            }
1476            ("401", "device_removed") | (_, "replaced") => {
1477                info!(target: "Client", "Got stream error indicating client was removed or replaced. Logging out.");
1478                self.expect_disconnect().await;
1479                self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1480
1481                let event = if conflict_type == "replaced" {
1482                    Event::StreamReplaced(crate::types::events::StreamReplaced)
1483                } else {
1484                    Event::LoggedOut(crate::types::events::LoggedOut {
1485                        on_connect: false,
1486                        reason: ConnectFailureReason::LoggedOut,
1487                    })
1488                };
1489                self.core.event_bus.dispatch(&event);
1490            }
1491            ("503", _) => {
1492                info!(target: "Client", "Got 503 service unavailable, will auto-reconnect.");
1493            }
1494            _ => {
1495                error!(target: "Client", "Unknown stream error: {}", DisplayableNode(node));
1496                self.expect_disconnect().await;
1497                self.core.event_bus.dispatch(&Event::StreamError(
1498                    crate::types::events::StreamError {
1499                        code: code.to_string(),
1500                        raw: Some(node.clone()),
1501                    },
1502                ));
1503            }
1504        }
1505
1506        info!(target: "Client", "Notifying shutdown from stream error handler");
1507        self.shutdown_notifier.notify_waiters();
1508    }
1509
1510    pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
1511        self.expected_disconnect.store(true, Ordering::Relaxed);
1512        self.shutdown_notifier.notify_waiters();
1513
1514        let mut attrs = node.attrs();
1515        let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
1516        let reason = ConnectFailureReason::from(reason_code);
1517
1518        if reason.should_reconnect() {
1519            self.expected_disconnect.store(false, Ordering::Relaxed);
1520        } else {
1521            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1522        }
1523
1524        if reason.is_logged_out() {
1525            info!(target: "Client", "Got {reason:?} connect failure, logging out.");
1526            self.core
1527                .event_bus
1528                .dispatch(&wacore::types::events::Event::LoggedOut(
1529                    crate::types::events::LoggedOut {
1530                        on_connect: true,
1531                        reason,
1532                    },
1533                ));
1534        } else if let ConnectFailureReason::TempBanned = reason {
1535            let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
1536            let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
1537            let expire_duration =
1538                chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
1539            warn!(target: "Client", "Temporary ban connect failure: {}", DisplayableNode(node));
1540            self.core.event_bus.dispatch(&Event::TemporaryBan(
1541                crate::types::events::TemporaryBan {
1542                    code: crate::types::events::TempBanReason::from(ban_code),
1543                    expire: expire_duration,
1544                },
1545            ));
1546        } else if let ConnectFailureReason::ClientOutdated = reason {
1547            error!(target: "Client", "Client is outdated and was rejected by server.");
1548            self.core
1549                .event_bus
1550                .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
1551        } else {
1552            warn!(target: "Client", "Unknown connect failure: {}", DisplayableNode(node));
1553            self.core.event_bus.dispatch(&Event::ConnectFailure(
1554                crate::types::events::ConnectFailure {
1555                    reason,
1556                    message: attrs.optional_string("message").unwrap_or("").to_string(),
1557                    raw: Some(node.clone()),
1558                },
1559            ));
1560        }
1561    }
1562
1563    pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
1564        if let Some("get") = node.attrs.get("type").map(|s| s.as_str())
1565            && node.get_optional_child("ping").is_some()
1566        {
1567            info!(target: "Client", "Received ping, sending pong.");
1568            let mut parser = node.attrs();
1569            let from_jid = parser.jid("from");
1570            let id = parser.string("id");
1571            let pong = NodeBuilder::new("iq")
1572                .attrs([
1573                    ("to", from_jid.to_string()),
1574                    ("id", id),
1575                    ("type", "result".to_string()),
1576                ])
1577                .build();
1578            if let Err(e) = self.send_node(pong).await {
1579                warn!("Failed to send pong: {e:?}");
1580            }
1581            return true;
1582        }
1583
1584        // Pass Node directly to pair handling
1585        if pair::handle_iq(self, node).await {
1586            return true;
1587        }
1588
1589        false
1590    }
1591
1592    pub fn is_connected(&self) -> bool {
1593        self.noise_socket
1594            .try_lock()
1595            .is_ok_and(|guard| guard.is_some())
1596    }
1597
1598    pub fn is_logged_in(&self) -> bool {
1599        self.is_logged_in.load(Ordering::Relaxed)
1600    }
1601
1602    /// Waits for the noise socket to be established.
1603    ///
1604    /// Returns `Ok(())` when the socket is ready, or `Err` on timeout.
1605    /// This is useful for code that needs to send messages before login,
1606    /// such as requesting a pair code during initial pairing.
1607    ///
1608    /// If the socket is already connected, returns immediately.
1609    pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
1610        // Fast path: already connected
1611        if self.is_connected() {
1612            return Ok(());
1613        }
1614
1615        // Register waiter and re-check to avoid race condition:
1616        // If socket becomes ready between checks, the notified future captures it.
1617        let notified = self.socket_ready_notifier.notified();
1618        if self.is_connected() {
1619            return Ok(());
1620        }
1621
1622        tokio::time::timeout(timeout, notified)
1623            .await
1624            .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
1625    }
1626
1627    /// Waits for the client to establish a connection and complete login.
1628    ///
1629    /// Returns `Ok(())` when connected, or `Err` on timeout.
1630    /// This is useful for code that needs to run after connection is established
1631    /// and authentication is complete.
1632    ///
1633    /// If the client is already connected and logged in, returns immediately.
1634    pub async fn wait_for_connected(
1635        &self,
1636        timeout: std::time::Duration,
1637    ) -> Result<(), anyhow::Error> {
1638        // Fast path: already connected and logged in
1639        if self.is_connected() && self.is_logged_in() {
1640            return Ok(());
1641        }
1642
1643        // Register waiter and re-check to avoid race condition:
1644        // If connection completes between checks, the notified future captures it.
1645        let notified = self.connected_notifier.notified();
1646        if self.is_connected() && self.is_logged_in() {
1647            return Ok(());
1648        }
1649
1650        tokio::time::timeout(timeout, notified)
1651            .await
1652            .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
1653    }
1654
1655    /// Get access to the PersistenceManager for this client.
1656    /// This is useful for multi-account scenarios to get the device ID.
1657    pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
1658        self.persistence_manager.clone()
1659    }
1660
1661    pub async fn edit_message(
1662        &self,
1663        to: Jid,
1664        original_id: String,
1665        new_content: wa::Message,
1666    ) -> Result<String, anyhow::Error> {
1667        let own_jid = self
1668            .get_pn()
1669            .await
1670            .ok_or_else(|| anyhow!("Not logged in"))?;
1671
1672        let edit_container_message = wa::Message {
1673            edited_message: Some(Box::new(wa::message::FutureProofMessage {
1674                message: Some(Box::new(wa::Message {
1675                    protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1676                        key: Some(wa::MessageKey {
1677                            remote_jid: Some(to.to_string()),
1678                            from_me: Some(true),
1679                            id: Some(original_id.clone()),
1680                            participant: if to.is_group() {
1681                                Some(own_jid.to_non_ad().to_string())
1682                            } else {
1683                                None
1684                            },
1685                        }),
1686                        r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
1687                        edited_message: Some(Box::new(new_content)),
1688                        timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
1689                        ..Default::default()
1690                    })),
1691                    ..Default::default()
1692                })),
1693            })),
1694            ..Default::default()
1695        };
1696
1697        self.send_message_impl(
1698            to,
1699            &edit_container_message,
1700            Some(original_id.clone()),
1701            false,
1702            false,
1703            Some(crate::types::message::EditAttribute::MessageEdit),
1704        )
1705        .await?;
1706
1707        Ok(original_id)
1708    }
1709
1710    pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
1711        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1712        let noise_socket = match noise_socket_arc {
1713            Some(socket) => socket,
1714            None => return Err(ClientError::NotConnected),
1715        };
1716
1717        info!(target: "Client/Send", "{}", DisplayableNode(&node));
1718        let mut pool_guard = self.send_buffer_pool.lock().await;
1719        let mut plaintext_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1720        let mut encrypted_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1721        drop(pool_guard);
1722
1723        plaintext_buf.clear();
1724        encrypted_buf.clear();
1725
1726        if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
1727            error!("Failed to marshal node: {e:?}");
1728            let mut g = self.send_buffer_pool.lock().await;
1729            if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1730                g.push(plaintext_buf);
1731            }
1732            if encrypted_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1733                g.push(encrypted_buf);
1734            }
1735            return Err(SocketError::Crypto("Marshal error".to_string()).into());
1736        }
1737
1738        let (plaintext_buf, encrypted_buf) = match noise_socket
1739            .encrypt_and_send(plaintext_buf, encrypted_buf)
1740            .await
1741        {
1742            Ok(bufs) => bufs,
1743            Err(mut e) => {
1744                let p_buf = std::mem::take(&mut e.plaintext_buf);
1745                let o_buf = std::mem::take(&mut e.out_buf);
1746                let mut g = self.send_buffer_pool.lock().await;
1747                if p_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1748                    g.push(p_buf);
1749                }
1750                if o_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1751                    g.push(o_buf);
1752                }
1753                return Err(e.into());
1754            }
1755        };
1756
1757        let mut g = self.send_buffer_pool.lock().await;
1758        if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1759            g.push(plaintext_buf);
1760        }
1761        if encrypted_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1762            g.push(encrypted_buf);
1763        }
1764        Ok(())
1765    }
1766
1767    pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
1768        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1769        let old_name = device_snapshot.push_name.clone();
1770
1771        if old_name == new_name {
1772            return;
1773        }
1774
1775        log::info!("Updating push name from '{}' -> '{}'", old_name, new_name);
1776        self.persistence_manager
1777            .process_command(DeviceCommand::SetPushName(new_name.clone()))
1778            .await;
1779
1780        self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
1781            crate::types::events::SelfPushNameUpdated {
1782                from_server: true,
1783                old_name,
1784                new_name: new_name.clone(),
1785            },
1786        ));
1787
1788        let client_clone = self.clone();
1789        tokio::spawn(async move {
1790            if let Err(e) = client_clone.presence().set_available().await {
1791                log::warn!("Failed to send presence after push name update: {:?}", e);
1792            } else {
1793                log::info!("Sent presence after push name update.");
1794            }
1795        });
1796    }
1797
1798    pub async fn get_push_name(&self) -> String {
1799        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1800        device_snapshot.push_name.clone()
1801    }
1802
1803    pub async fn get_pn(&self) -> Option<Jid> {
1804        let snapshot = self.persistence_manager.get_device_snapshot().await;
1805        snapshot.pn.clone()
1806    }
1807
1808    pub async fn get_lid(&self) -> Option<Jid> {
1809        let snapshot = self.persistence_manager.get_device_snapshot().await;
1810        snapshot.lid.clone()
1811    }
1812
1813    // get_phone_number_from_lid is in client/lid_pn.rs
1814
1815    pub(crate) async fn send_protocol_receipt(
1816        &self,
1817        id: String,
1818        receipt_type: crate::types::presence::ReceiptType,
1819    ) {
1820        if id.is_empty() {
1821            return;
1822        }
1823        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1824        if let Some(own_jid) = &device_snapshot.pn {
1825            let type_str = match receipt_type {
1826                crate::types::presence::ReceiptType::HistorySync => "hist_sync",
1827                crate::types::presence::ReceiptType::Read => "read",
1828                crate::types::presence::ReceiptType::ReadSelf => "read-self",
1829                crate::types::presence::ReceiptType::Delivered => "delivery",
1830                crate::types::presence::ReceiptType::Played => "played",
1831                crate::types::presence::ReceiptType::PlayedSelf => "played-self",
1832                crate::types::presence::ReceiptType::Inactive => "inactive",
1833                crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
1834                crate::types::presence::ReceiptType::Sender => "sender",
1835                crate::types::presence::ReceiptType::ServerError => "server-error",
1836                crate::types::presence::ReceiptType::Retry => "retry",
1837                crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
1838            };
1839
1840            let node = NodeBuilder::new("receipt")
1841                .attrs([
1842                    ("id", id),
1843                    ("type", type_str.to_string()),
1844                    ("to", own_jid.to_non_ad().to_string()),
1845                ])
1846                .build();
1847
1848            if let Err(e) = self.send_node(node).await {
1849                warn!(
1850                    "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
1851                    receipt_type, self.unique_id, e
1852                );
1853            }
1854        }
1855    }
1856}
1857
1858#[cfg(test)]
1859mod tests {
1860    use super::*;
1861    use crate::lid_pn_cache::LearningSource;
1862    use crate::test_utils::MockHttpClient;
1863    use tokio::sync::oneshot;
1864    use wacore_binary::jid::SERVER_JID;
1865
1866    #[tokio::test]
1867    async fn test_ack_behavior_for_incoming_stanzas() {
1868        let backend = Arc::new(
1869            crate::store::SqliteStore::new(":memory:")
1870                .await
1871                .expect("Failed to create in-memory backend for test"),
1872        );
1873        let pm = Arc::new(
1874            PersistenceManager::new(backend)
1875                .await
1876                .expect("persistence manager should initialize"),
1877        );
1878        let (client, _rx) = Client::new(
1879            pm,
1880            Arc::new(crate::transport::mock::MockTransportFactory::new()),
1881            Arc::new(MockHttpClient),
1882            None,
1883        )
1884        .await;
1885
1886        // --- Assertions ---
1887
1888        // Verify that we still ack other critical stanzas (regression check).
1889        use indexmap::IndexMap;
1890        use wacore_binary::node::{Node, NodeContent};
1891
1892        let mut receipt_attrs = IndexMap::new();
1893        receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
1894        receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
1895        let receipt_node = Node::new(
1896            "receipt",
1897            receipt_attrs,
1898            Some(NodeContent::String("test".to_string())),
1899        );
1900
1901        let mut notification_attrs = IndexMap::new();
1902        notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
1903        notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
1904        let notification_node = Node::new(
1905            "notification",
1906            notification_attrs,
1907            Some(NodeContent::String("test".to_string())),
1908        );
1909
1910        assert!(
1911            client.should_ack(&receipt_node),
1912            "should_ack must still return TRUE for <receipt> stanzas."
1913        );
1914        assert!(
1915            client.should_ack(&notification_node),
1916            "should_ack must still return TRUE for <notification> stanzas."
1917        );
1918
1919        info!(
1920            "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
1921        );
1922    }
1923
1924    #[tokio::test]
1925    async fn test_send_buffer_pool_reuses_both_buffers() {
1926        let backend = Arc::new(
1927            crate::store::SqliteStore::new(":memory:")
1928                .await
1929                .expect("Failed to create in-memory backend for test"),
1930        );
1931        let pm = Arc::new(
1932            PersistenceManager::new(backend)
1933                .await
1934                .expect("persistence manager should initialize"),
1935        );
1936        let (client, _rx) = Client::new(
1937            pm,
1938            Arc::new(crate::transport::mock::MockTransportFactory::new()),
1939            Arc::new(MockHttpClient),
1940            None,
1941        )
1942        .await;
1943
1944        // Check initial pool size
1945        let initial_pool_size = {
1946            let pool = client.send_buffer_pool.lock().await;
1947            pool.len()
1948        };
1949
1950        // Attempt to send a node (this will fail because we're not connected, but that's okay)
1951        let test_node = NodeBuilder::new("test").attr("id", "test-123").build();
1952
1953        let _ = client.send_node(test_node).await;
1954
1955        // After the send attempt, the pool should have the same or more buffers
1956        // (depending on whether buffers were consumed and returned)
1957        let final_pool_size = {
1958            let pool = client.send_buffer_pool.lock().await;
1959            pool.len()
1960        };
1961
1962        // The key assertion: we should not be leaking buffers
1963        // If the fix works, buffers should be returned to the pool
1964        // (or at least not allocating new ones unnecessarily)
1965        assert!(
1966            final_pool_size >= initial_pool_size,
1967            "Buffer pool should not shrink after send operations"
1968        );
1969
1970        info!(
1971            "✅ test_send_buffer_pool_reuses_both_buffers passed: Buffer pool properly manages buffers"
1972        );
1973    }
1974
1975    #[tokio::test]
1976    async fn test_ack_waiter_resolves() {
1977        let backend = Arc::new(
1978            crate::store::SqliteStore::new(":memory:")
1979                .await
1980                .expect("Failed to create in-memory backend for test"),
1981        );
1982        let pm = Arc::new(
1983            PersistenceManager::new(backend)
1984                .await
1985                .expect("persistence manager should initialize"),
1986        );
1987        let (client, _rx) = Client::new(
1988            pm,
1989            Arc::new(crate::transport::mock::MockTransportFactory::new()),
1990            Arc::new(MockHttpClient),
1991            None,
1992        )
1993        .await;
1994
1995        // 1. Insert a waiter for a specific ID
1996        let test_id = "ack-test-123".to_string();
1997        let (tx, rx) = oneshot::channel();
1998        client
1999            .response_waiters
2000            .lock()
2001            .await
2002            .insert(test_id.clone(), tx);
2003        assert!(
2004            client.response_waiters.lock().await.contains_key(&test_id),
2005            "Waiter should be inserted before handling ack"
2006        );
2007
2008        // 2. Create a mock <ack/> node with the test ID
2009        let ack_node = NodeBuilder::new("ack")
2010            .attr("id", test_id.clone())
2011            .attr("from", SERVER_JID)
2012            .build();
2013
2014        // 3. Handle the ack
2015        let handled = client.handle_ack_response(ack_node).await;
2016        assert!(
2017            handled,
2018            "handle_ack_response should return true when waiter exists"
2019        );
2020
2021        // 4. Await the receiver with a timeout
2022        match tokio::time::timeout(Duration::from_secs(1), rx).await {
2023            Ok(Ok(response_node)) => {
2024                assert_eq!(
2025                    response_node.attrs.get("id"),
2026                    Some(&test_id),
2027                    "Response node should have correct ID"
2028                );
2029            }
2030            Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
2031            Err(_) => panic!("Test timed out waiting for ack response"),
2032        }
2033
2034        // 5. Verify the waiter was removed
2035        assert!(
2036            !client.response_waiters.lock().await.contains_key(&test_id),
2037            "Waiter should be removed after handling"
2038        );
2039
2040        info!(
2041            "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
2042        );
2043    }
2044
2045    #[tokio::test]
2046    async fn test_ack_without_matching_waiter() {
2047        let backend = Arc::new(
2048            crate::store::SqliteStore::new(":memory:")
2049                .await
2050                .expect("Failed to create in-memory backend for test"),
2051        );
2052        let pm = Arc::new(
2053            PersistenceManager::new(backend)
2054                .await
2055                .expect("persistence manager should initialize"),
2056        );
2057        let (client, _rx) = Client::new(
2058            pm,
2059            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2060            Arc::new(MockHttpClient),
2061            None,
2062        )
2063        .await;
2064
2065        // Create an ack without any matching waiter
2066        let ack_node = NodeBuilder::new("ack")
2067            .attr("id", "non-existent-id")
2068            .attr("from", SERVER_JID)
2069            .build();
2070
2071        // Should return false since there's no waiter
2072        let handled = client.handle_ack_response(ack_node).await;
2073        assert!(
2074            !handled,
2075            "handle_ack_response should return false when no waiter exists"
2076        );
2077
2078        info!(
2079            "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
2080        );
2081    }
2082
2083    /// Test that the lid_pn_cache correctly stores and retrieves LID mappings.
2084    ///
2085    /// This is critical for the LID-PN session mismatch fix. When we receive a message
2086    /// with sender_lid, we cache the phone->LID mapping so that when sending replies,
2087    /// we can reuse the existing LID session instead of creating a new PN session.
2088    #[tokio::test]
2089    async fn test_lid_pn_cache_basic_operations() {
2090        let backend = Arc::new(
2091            crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
2092                .await
2093                .expect("Failed to create in-memory backend for test"),
2094        );
2095        let pm = Arc::new(
2096            PersistenceManager::new(backend)
2097                .await
2098                .expect("persistence manager should initialize"),
2099        );
2100        let (client, _rx) = Client::new(
2101            pm,
2102            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2103            Arc::new(MockHttpClient),
2104            None,
2105        )
2106        .await;
2107
2108        // Initially, the cache should be empty for a phone number
2109        let phone = "559980000001";
2110        let lid = "100000012345678";
2111
2112        assert!(
2113            client.lid_pn_cache.get_current_lid(phone).await.is_none(),
2114            "Cache should be empty initially"
2115        );
2116
2117        // Insert a phone->LID mapping using add_lid_pn_mapping
2118        client
2119            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2120            .await
2121            .expect("Failed to persist LID-PN mapping in tests");
2122
2123        // Verify we can retrieve it (phone -> LID lookup)
2124        let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
2125        assert!(cached_lid.is_some(), "Cache should contain the mapping");
2126        assert_eq!(
2127            cached_lid.expect("cache should have LID"),
2128            lid,
2129            "Cached LID should match what we inserted"
2130        );
2131
2132        // Verify reverse lookup works (LID -> phone)
2133        let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
2134        assert!(cached_phone.is_some(), "Reverse lookup should work");
2135        assert_eq!(
2136            cached_phone.expect("reverse lookup should return phone"),
2137            phone,
2138            "Cached phone should match what we inserted"
2139        );
2140
2141        // Verify a different phone number returns None
2142        assert!(
2143            client
2144                .lid_pn_cache
2145                .get_current_lid("559980000002")
2146                .await
2147                .is_none(),
2148            "Different phone number should not have a mapping"
2149        );
2150
2151        info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
2152    }
2153
2154    /// Test that the lid_pn_cache respects timestamp-based conflict resolution.
2155    ///
2156    /// When a phone number has multiple LIDs, the most recent one should be returned.
2157    #[tokio::test]
2158    async fn test_lid_pn_cache_timestamp_resolution() {
2159        let backend = Arc::new(
2160            crate::store::SqliteStore::new(
2161                "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
2162            )
2163            .await
2164            .expect("Failed to create in-memory backend for test"),
2165        );
2166        let pm = Arc::new(
2167            PersistenceManager::new(backend)
2168                .await
2169                .expect("persistence manager should initialize"),
2170        );
2171        let (client, _rx) = Client::new(
2172            pm,
2173            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2174            Arc::new(MockHttpClient),
2175            None,
2176        )
2177        .await;
2178
2179        let phone = "559980000001";
2180        let lid_old = "100000012345678";
2181        let lid_new = "100000087654321";
2182
2183        // Insert initial mapping
2184        client
2185            .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
2186            .await
2187            .expect("Failed to persist LID-PN mapping in tests");
2188
2189        assert_eq!(
2190            client
2191                .lid_pn_cache
2192                .get_current_lid(phone)
2193                .await
2194                .expect("cache should have LID"),
2195            lid_old,
2196            "Initial LID should be stored"
2197        );
2198
2199        // Small delay to ensure different timestamp
2200        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2201
2202        // Add new mapping with newer timestamp
2203        client
2204            .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
2205            .await
2206            .expect("Failed to persist LID-PN mapping in tests");
2207
2208        assert_eq!(
2209            client
2210                .lid_pn_cache
2211                .get_current_lid(phone)
2212                .await
2213                .expect("cache should have newer LID"),
2214            lid_new,
2215            "Newer LID should be returned for phone lookup"
2216        );
2217
2218        // Both LIDs should still resolve to the same phone
2219        assert_eq!(
2220            client
2221                .lid_pn_cache
2222                .get_phone_number(lid_old)
2223                .await
2224                .expect("reverse lookup should return phone"),
2225            phone,
2226            "Old LID should still map to phone"
2227        );
2228        assert_eq!(
2229            client
2230                .lid_pn_cache
2231                .get_phone_number(lid_new)
2232                .await
2233                .expect("reverse lookup should return phone"),
2234            phone,
2235            "New LID should also map to phone"
2236        );
2237
2238        info!(
2239            "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
2240        );
2241    }
2242
2243    /// Test that get_lid_for_phone (from SendContextResolver) returns the cached value.
2244    ///
2245    /// This is the method used by wacore::send to look up LID mappings when encrypting.
2246    #[tokio::test]
2247    async fn test_get_lid_for_phone_via_send_context_resolver() {
2248        use wacore::client::context::SendContextResolver;
2249
2250        let backend = Arc::new(
2251            crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
2252                .await
2253                .expect("Failed to create in-memory backend for test"),
2254        );
2255        let pm = Arc::new(
2256            PersistenceManager::new(backend)
2257                .await
2258                .expect("persistence manager should initialize"),
2259        );
2260        let (client, _rx) = Client::new(
2261            pm,
2262            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2263            Arc::new(MockHttpClient),
2264            None,
2265        )
2266        .await;
2267
2268        let phone = "559980000001";
2269        let lid = "100000012345678";
2270
2271        // Before caching, should return None
2272        assert!(
2273            client.get_lid_for_phone(phone).await.is_none(),
2274            "get_lid_for_phone should return None before caching"
2275        );
2276
2277        // Cache the mapping using add_lid_pn_mapping
2278        client
2279            .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2280            .await
2281            .expect("Failed to persist LID-PN mapping in tests");
2282
2283        // Now it should return the LID
2284        let result = client.get_lid_for_phone(phone).await;
2285        assert!(
2286            result.is_some(),
2287            "get_lid_for_phone should return Some after caching"
2288        );
2289        assert_eq!(
2290            result.expect("get_lid_for_phone should return Some"),
2291            lid,
2292            "get_lid_for_phone should return the cached LID"
2293        );
2294
2295        info!(
2296            "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
2297        );
2298    }
2299
2300    // =========================================================================
2301    // PDO Session Establishment Timing Tests
2302    // =========================================================================
2303    // These tests verify the critical timing behavior for PDO:
2304    // - Session with device 0 must be established BEFORE offline messages arrive
2305    // - ensure_e2e_sessions() waits for offline sync (for normal message sending)
2306    // - establish_primary_phone_session_immediate() does NOT wait (for login)
2307    // =========================================================================
2308
2309    /// Test that wait_for_offline_delivery_end returns immediately when the flag is already set.
2310    #[tokio::test]
2311    async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
2312        let backend = Arc::new(
2313            crate::store::SqliteStore::new(
2314                "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
2315            )
2316            .await
2317            .expect("Failed to create in-memory backend for test"),
2318        );
2319        let pm = Arc::new(
2320            PersistenceManager::new(backend)
2321                .await
2322                .expect("persistence manager should initialize"),
2323        );
2324        let (client, _rx) = Client::new(
2325            pm,
2326            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2327            Arc::new(MockHttpClient),
2328            None,
2329        )
2330        .await;
2331
2332        // Set the flag to true (simulating offline sync completed)
2333        client
2334            .offline_sync_completed
2335            .store(true, std::sync::atomic::Ordering::Relaxed);
2336
2337        // This should return immediately (not wait 10 seconds)
2338        let start = std::time::Instant::now();
2339        client.wait_for_offline_delivery_end().await;
2340        let elapsed = start.elapsed();
2341
2342        // Should complete in < 100ms (not 10 second timeout)
2343        assert!(
2344            elapsed.as_millis() < 100,
2345            "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
2346            elapsed
2347        );
2348
2349        info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
2350    }
2351
2352    /// Test that wait_for_offline_delivery_end times out when the flag is NOT set.
2353    /// This verifies the 10-second timeout is working.
2354    #[tokio::test]
2355    async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
2356        let backend = Arc::new(
2357            crate::store::SqliteStore::new(
2358                "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
2359            )
2360            .await
2361            .expect("Failed to create in-memory backend for test"),
2362        );
2363        let pm = Arc::new(
2364            PersistenceManager::new(backend)
2365                .await
2366                .expect("persistence manager should initialize"),
2367        );
2368        let (client, _rx) = Client::new(
2369            pm,
2370            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2371            Arc::new(MockHttpClient),
2372            None,
2373        )
2374        .await;
2375
2376        // Flag is false by default, so we need to use a shorter timeout for the test
2377        // We'll verify behavior by using tokio timeout
2378        let start = std::time::Instant::now();
2379
2380        // Use a short timeout to test the behavior without waiting 10 seconds
2381        let result = tokio::time::timeout(
2382            std::time::Duration::from_millis(100),
2383            client.wait_for_offline_delivery_end(),
2384        )
2385        .await;
2386
2387        let elapsed = start.elapsed();
2388
2389        // The wait should NOT complete immediately - it should timeout
2390        // (because the flag is false and no one is notifying)
2391        assert!(
2392            result.is_err(),
2393            "wait_for_offline_delivery_end should not return immediately when flag is false"
2394        );
2395        assert!(
2396            elapsed.as_millis() >= 95, // Allow small timing variance
2397            "Should have waited for the timeout duration, took {:?}",
2398            elapsed
2399        );
2400
2401        info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
2402    }
2403
2404    /// Test that wait_for_offline_delivery_end returns when notified.
2405    #[tokio::test]
2406    async fn test_wait_for_offline_delivery_end_returns_on_notify() {
2407        let backend = Arc::new(
2408            crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
2409                .await
2410                .expect("Failed to create in-memory backend for test"),
2411        );
2412        let pm = Arc::new(
2413            PersistenceManager::new(backend)
2414                .await
2415                .expect("persistence manager should initialize"),
2416        );
2417        let (client, _rx) = Client::new(
2418            pm,
2419            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2420            Arc::new(MockHttpClient),
2421            None,
2422        )
2423        .await;
2424
2425        let client_clone = client.clone();
2426
2427        // Spawn a task that will notify after 50ms
2428        tokio::spawn(async move {
2429            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2430            client_clone.offline_sync_notifier.notify_waiters();
2431        });
2432
2433        let start = std::time::Instant::now();
2434        client.wait_for_offline_delivery_end().await;
2435        let elapsed = start.elapsed();
2436
2437        // Should complete around 50ms (when notified), not 10 seconds
2438        assert!(
2439            elapsed.as_millis() < 200,
2440            "wait_for_offline_delivery_end should return when notified, took {:?}",
2441            elapsed
2442        );
2443        assert!(
2444            elapsed.as_millis() >= 45, // Should have waited for the notify
2445            "Should have waited for the notify, only took {:?}",
2446            elapsed
2447        );
2448
2449        info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
2450    }
2451
2452    /// Test that the offline_sync_completed flag starts as false.
2453    #[tokio::test]
2454    async fn test_offline_sync_flag_initially_false() {
2455        let backend = Arc::new(
2456            crate::store::SqliteStore::new(
2457                "file:memdb_offline_flag_initial?mode=memory&cache=shared",
2458            )
2459            .await
2460            .expect("Failed to create in-memory backend for test"),
2461        );
2462        let pm = Arc::new(
2463            PersistenceManager::new(backend)
2464                .await
2465                .expect("persistence manager should initialize"),
2466        );
2467        let (client, _rx) = Client::new(
2468            pm,
2469            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2470            Arc::new(MockHttpClient),
2471            None,
2472        )
2473        .await;
2474
2475        // The flag should be false initially
2476        assert!(
2477            !client
2478                .offline_sync_completed
2479                .load(std::sync::atomic::Ordering::Relaxed),
2480            "offline_sync_completed should be false when Client is first created"
2481        );
2482
2483        info!("✅ test_offline_sync_flag_initially_false passed");
2484    }
2485
2486    /// Test the complete offline sync lifecycle:
2487    /// 1. Flag starts false
2488    /// 2. Flag is set true after IB offline stanza
2489    /// 3. Notify is called
2490    #[tokio::test]
2491    async fn test_offline_sync_lifecycle() {
2492        use std::sync::atomic::Ordering;
2493
2494        let backend = Arc::new(
2495            crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
2496                .await
2497                .expect("Failed to create in-memory backend for test"),
2498        );
2499        let pm = Arc::new(
2500            PersistenceManager::new(backend)
2501                .await
2502                .expect("persistence manager should initialize"),
2503        );
2504        let (client, _rx) = Client::new(
2505            pm,
2506            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2507            Arc::new(MockHttpClient),
2508            None,
2509        )
2510        .await;
2511
2512        // 1. Initially false
2513        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
2514
2515        // 2. Spawn a waiter
2516        let client_waiter = client.clone();
2517        let waiter_handle = tokio::spawn(async move {
2518            client_waiter.wait_for_offline_delivery_end().await;
2519            true // Return that we completed
2520        });
2521
2522        // Give the waiter time to start waiting
2523        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2524
2525        // Verify waiter hasn't completed yet
2526        assert!(
2527            !waiter_handle.is_finished(),
2528            "Waiter should still be waiting"
2529        );
2530
2531        // 3. Simulate IB handler behavior (set flag and notify)
2532        client.offline_sync_completed.store(true, Ordering::Relaxed);
2533        client.offline_sync_notifier.notify_waiters();
2534
2535        // 4. Waiter should complete
2536        let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
2537            .await
2538            .expect("Waiter should complete after notify")
2539            .expect("Waiter task should not panic");
2540
2541        assert!(result, "Waiter should have completed successfully");
2542        assert!(client.offline_sync_completed.load(Ordering::Relaxed));
2543
2544        info!("✅ test_offline_sync_lifecycle passed");
2545    }
2546
2547    /// Test that establish_primary_phone_session_immediate returns error when no PN is set.
2548    /// This verifies the "not logged in" guard works.
2549    #[tokio::test]
2550    async fn test_establish_primary_phone_session_fails_without_pn() {
2551        let backend = Arc::new(
2552            crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
2553                .await
2554                .expect("Failed to create in-memory backend for test"),
2555        );
2556        let pm = Arc::new(
2557            PersistenceManager::new(backend)
2558                .await
2559                .expect("persistence manager should initialize"),
2560        );
2561        let (client, _rx) = Client::new(
2562            pm,
2563            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2564            Arc::new(MockHttpClient),
2565            None,
2566        )
2567        .await;
2568
2569        // No PN set, so this should fail
2570        let result = client.establish_primary_phone_session_immediate().await;
2571
2572        assert!(
2573            result.is_err(),
2574            "establish_primary_phone_session_immediate should fail when no PN is set"
2575        );
2576
2577        let error_msg = result.unwrap_err().to_string();
2578        assert!(
2579            error_msg.contains("Not logged in"),
2580            "Error should mention 'Not logged in', got: {}",
2581            error_msg
2582        );
2583
2584        info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
2585    }
2586
2587    /// Test that ensure_e2e_sessions waits for offline sync to complete.
2588    /// This is the CRITICAL difference between ensure_e2e_sessions and
2589    /// establish_primary_phone_session_immediate.
2590    #[tokio::test]
2591    async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
2592        use std::sync::atomic::Ordering;
2593        use wacore_binary::jid::Jid;
2594
2595        let backend = Arc::new(
2596            crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
2597                .await
2598                .expect("Failed to create in-memory backend for test"),
2599        );
2600        let pm = Arc::new(
2601            PersistenceManager::new(backend)
2602                .await
2603                .expect("persistence manager should initialize"),
2604        );
2605        let (client, _rx) = Client::new(
2606            pm,
2607            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2608            Arc::new(MockHttpClient),
2609            None,
2610        )
2611        .await;
2612
2613        // Flag is false (offline sync not complete)
2614        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
2615
2616        // Call ensure_e2e_sessions with an empty list (so it returns early after the wait)
2617        // This lets us test the waiting behavior without needing network
2618        let client_clone = client.clone();
2619        let ensure_handle = tokio::spawn(async move {
2620            // Start with some JIDs - but since we're testing the wait, we use empty
2621            // to avoid needing actual session establishment
2622            client_clone.ensure_e2e_sessions(vec![]).await
2623        });
2624
2625        // Wait a bit - ensure_e2e_sessions should return immediately for empty list
2626        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2627        assert!(
2628            ensure_handle.is_finished(),
2629            "ensure_e2e_sessions should return immediately for empty JID list"
2630        );
2631
2632        // Now test with actual JIDs - it should wait for offline sync
2633        let client_clone = client.clone();
2634        let test_jid = Jid::pn("559999999999");
2635        let ensure_handle = tokio::spawn(async move {
2636            // This will wait for offline sync before proceeding
2637            let start = std::time::Instant::now();
2638            let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
2639            start.elapsed()
2640        });
2641
2642        // Give it a moment to start
2643        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2644
2645        // It should still be waiting (offline sync not complete)
2646        assert!(
2647            !ensure_handle.is_finished(),
2648            "ensure_e2e_sessions should be waiting for offline sync"
2649        );
2650
2651        // Now complete offline sync
2652        client.offline_sync_completed.store(true, Ordering::Relaxed);
2653        client.offline_sync_notifier.notify_waiters();
2654
2655        // Now it should complete (might fail on session establishment, but that's ok)
2656        let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
2657
2658        assert!(
2659            result.is_ok(),
2660            "ensure_e2e_sessions should complete after offline sync"
2661        );
2662
2663        info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
2664    }
2665
2666    /// Integration test: Verify that the immediate session establishment does NOT
2667    /// wait for offline sync. This is critical for PDO to work during offline sync.
2668    ///
2669    /// The flow is:
2670    /// 1. Login -> establish_primary_phone_session_immediate() is called
2671    /// 2. This should NOT wait for offline sync (flag is false at this point)
2672    /// 3. After session is established, offline messages arrive
2673    /// 4. When decryption fails, PDO can immediately send to device 0
2674    #[tokio::test]
2675    async fn test_immediate_session_does_not_wait_for_offline_sync() {
2676        use std::sync::atomic::Ordering;
2677        use wacore_binary::jid::Jid;
2678
2679        let backend = Arc::new(
2680            crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
2681                .await
2682                .expect("Failed to create in-memory backend for test"),
2683        );
2684        let pm = Arc::new(
2685            PersistenceManager::new(backend.clone())
2686                .await
2687                .expect("persistence manager should initialize"),
2688        );
2689
2690        // Set a PN so establish_primary_phone_session_immediate doesn't fail early
2691        pm.modify_device(|device| {
2692            device.pn = Some(Jid::pn("559999999999"));
2693        })
2694        .await;
2695
2696        let (client, _rx) = Client::new(
2697            pm,
2698            Arc::new(crate::transport::mock::MockTransportFactory::new()),
2699            Arc::new(MockHttpClient),
2700            None,
2701        )
2702        .await;
2703
2704        // Flag is false (offline sync not complete - simulating login state)
2705        assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
2706
2707        // Call establish_primary_phone_session_immediate
2708        // It should NOT wait for offline sync - it should proceed immediately
2709        let start = std::time::Instant::now();
2710
2711        // Note: This will fail because we can't actually fetch prekeys in tests,
2712        // but the important thing is that it doesn't WAIT for offline sync
2713        let result = tokio::time::timeout(
2714            std::time::Duration::from_millis(500),
2715            client.establish_primary_phone_session_immediate(),
2716        )
2717        .await;
2718
2719        let elapsed = start.elapsed();
2720
2721        // The call should complete (or fail) quickly, NOT wait for 10 second timeout
2722        assert!(
2723            result.is_ok(),
2724            "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
2725        );
2726
2727        // It should complete in < 500ms (not 10 second wait)
2728        assert!(
2729            elapsed.as_millis() < 500,
2730            "establish_primary_phone_session_immediate should not wait, took {:?}",
2731            elapsed
2732        );
2733
2734        // The actual result might be an error (no network), but that's fine
2735        // The important thing is it didn't wait for offline sync
2736        info!(
2737            "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
2738            elapsed,
2739            result.unwrap().is_ok()
2740        );
2741
2742        info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
2743    }
2744}