whatsapp_rust/
client.rs

1mod context_impl;
2
3use crate::handshake;
4use crate::pair;
5use anyhow::anyhow;
6use dashmap::DashMap;
7use tokio::sync::watch;
8use wacore::xml::DisplayableNode;
9use wacore_binary::builder::NodeBuilder;
10use wacore_binary::jid::JidExt;
11use wacore_binary::node::Node;
12
13use crate::appstate_sync::AppStateProcessor;
14use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
15use crate::types::enc_handler::EncHandler;
16use crate::types::events::{ConnectFailureReason, Event};
17use crate::types::presence::Presence;
18
19// keep single DashMap import above
20
21use log::{debug, error, info, warn};
22
23use rand::RngCore;
24use scopeguard;
25use std::collections::{HashMap, HashSet, VecDeque};
26use wacore_binary::jid::Jid;
27use wacore_binary::jid::SERVER_JID;
28
29use std::sync::Arc;
30use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
31
32use thiserror::Error;
33use tokio::sync::{Mutex, Notify, RwLock, mpsc, oneshot};
34use tokio::time::{Duration, sleep};
35use wacore::appstate::patch_decode::WAPatchName;
36use wacore::client::context::GroupInfo;
37use waproto::whatsapp as wa;
38
39use crate::socket::{FrameSocket, NoiseSocket, SocketError};
40use crate::sync_task::MajorSyncTask;
41
42const APP_STATE_KEY_WAIT_TIMEOUT: Duration = Duration::from_secs(15);
43const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
44
45const MAX_POOLED_BUFFER_CAP: usize = 512 * 1024;
46
47#[derive(Debug, Error)]
48pub enum ClientError {
49    #[error("client is not connected")]
50    NotConnected,
51    #[error("socket error: {0}")]
52    Socket(#[from] SocketError),
53    #[error("client is already connected")]
54    AlreadyConnected,
55    #[error("client is not logged in")]
56    NotLoggedIn,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
60pub struct RecentMessageKey {
61    pub to: Jid,
62    pub id: String,
63}
64
65#[derive(Debug, Clone)]
66pub(crate) struct RecentMessageManagerHandle(pub mpsc::Sender<RecentMessageCommand>);
67
68impl RecentMessageManagerHandle {
69    pub(crate) async fn send_insert(
70        &self,
71        key: RecentMessageKey,
72        msg: Arc<wa::Message>,
73    ) -> Result<(), mpsc::error::SendError<RecentMessageCommand>> {
74        self.0.send(RecentMessageCommand::Insert(key, msg)).await
75    }
76
77    pub(crate) async fn shutdown(
78        &self,
79    ) -> Result<(), mpsc::error::SendError<RecentMessageCommand>> {
80        self.0.send(RecentMessageCommand::Shutdown).await
81    }
82}
83
84#[derive(Debug)]
85pub enum RecentMessageCommand {
86    Insert(RecentMessageKey, Arc<wa::Message>),
87    Take(RecentMessageKey, oneshot::Sender<Option<Arc<wa::Message>>>),
88    Shutdown,
89}
90
91#[derive(Debug, thiserror::Error)]
92pub enum RecentMessageError {
93    #[error("Manager task unavailable - channel send failed")]
94    ManagerUnavailable,
95    #[error("Manager task did not respond within timeout")]
96    ResponseTimeout,
97    #[error("Manager task panicked or was dropped")]
98    TaskDropped,
99}
100
101pub struct Client {
102    pub(crate) core: wacore::client::CoreClient,
103
104    pub(crate) persistence_manager: Arc<PersistenceManager>,
105    pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
106
107    pub(crate) is_logged_in: Arc<AtomicBool>,
108    pub(crate) is_connecting: Arc<AtomicBool>,
109    pub(crate) is_running: Arc<AtomicBool>,
110    pub(crate) shutdown_notifier: Arc<Notify>,
111
112    pub(crate) frame_socket: Arc<Mutex<Option<FrameSocket>>>,
113    pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
114    pub(crate) frames_rx: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<bytes::Bytes>>>>,
115
116    pub(crate) response_waiters:
117        Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<wacore_binary::Node>>>>,
118    pub(crate) unique_id: String,
119    pub(crate) id_counter: Arc<AtomicU64>,
120
121    pub(crate) chat_locks: Arc<DashMap<Jid, Arc<tokio::sync::Mutex<()>>>>,
122    pub group_cache: Arc<DashMap<Jid, GroupInfo>>,
123    pub device_cache: Arc<DashMap<Jid, (Vec<Jid>, std::time::Instant)>>,
124
125    pub(crate) retried_group_messages: Arc<DashMap<String, ()>>,
126    pub(crate) expected_disconnect: Arc<AtomicBool>,
127
128    pub(crate) recent_msg_tx: RecentMessageManagerHandle,
129
130    pub(crate) pending_retries: Arc<Mutex<HashSet<String>>>,
131
132    pub enable_auto_reconnect: Arc<AtomicBool>,
133    pub auto_reconnect_errors: Arc<AtomicU32>,
134    pub last_successful_connect: Arc<Mutex<Option<chrono::DateTime<chrono::Utc>>>>,
135
136    pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
137
138    pub(crate) app_state_processor: Option<AppStateProcessor>,
139    pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, std::time::Instant>>>,
140    pub(crate) initial_keys_synced_notifier: Arc<Notify>,
141    pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
142    pub(crate) major_sync_task_sender: mpsc::Sender<MajorSyncTask>,
143    pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<watch::Sender<()>>>>,
144
145    pub(crate) send_buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
146
147    /// Custom handlers for encrypted message types
148    pub custom_enc_handlers: Arc<DashMap<String, Arc<dyn EncHandler>>>,
149
150    /// Router for dispatching stanzas to their appropriate handlers
151    pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
152
153    /// Whether to send ACKs synchronously or in a background task
154    pub(crate) synchronous_ack: bool,
155}
156
157impl Client {
158    pub async fn new(
159        persistence_manager: Arc<PersistenceManager>,
160    ) -> (Arc<Self>, mpsc::Receiver<MajorSyncTask>) {
161        let mut unique_id_bytes = [0u8; 2];
162        rand::rng().fill_bytes(&mut unique_id_bytes);
163
164        let device_snapshot = persistence_manager.get_device_snapshot().await;
165        let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
166
167        let (tx, rx) = mpsc::channel(32);
168
169        let (recent_tx, mut recent_rx) = mpsc::channel(256);
170        let recent_handle = RecentMessageManagerHandle(recent_tx);
171
172        let map_inner = Arc::new(Mutex::new(HashMap::with_capacity(256)));
173        let list_inner = Arc::new(Mutex::new(VecDeque::with_capacity(256)));
174        let map_clone = map_inner.clone();
175        let list_clone = list_inner.clone();
176        tokio::spawn(async move {
177            while let Some(cmd) = recent_rx.recv().await {
178                match cmd {
179                    RecentMessageCommand::Insert(key, msg) => {
180                        let mut map = map_clone.lock().await;
181                        let mut list = list_clone.lock().await;
182                        map.insert(key.clone(), msg);
183                        list.push_back(key);
184                        if list.len() > 256
185                            && let Some(old_key) = list.pop_front()
186                        {
187                            map.remove(&old_key);
188                        }
189                    }
190                    RecentMessageCommand::Take(key, responder) => {
191                        let mut map = map_clone.lock().await;
192                        let mut list = list_clone.lock().await;
193                        let msg = map.remove(&key);
194                        list.retain(|k| k != &key);
195                        let _ = responder.send(msg);
196                    }
197                    RecentMessageCommand::Shutdown => {
198                        info!("RecentMessageManager shutting down");
199                        break;
200                    }
201                }
202            }
203        });
204
205        let this = Self {
206            core,
207            persistence_manager: persistence_manager.clone(),
208            media_conn: Arc::new(RwLock::new(None)),
209            is_logged_in: Arc::new(AtomicBool::new(false)),
210            is_connecting: Arc::new(AtomicBool::new(false)),
211            is_running: Arc::new(AtomicBool::new(false)),
212            shutdown_notifier: Arc::new(Notify::new()),
213
214            frame_socket: Arc::new(Mutex::new(None)),
215            noise_socket: Arc::new(Mutex::new(None)),
216            frames_rx: Arc::new(Mutex::new(None)),
217
218            response_waiters: Arc::new(Mutex::new(HashMap::new())),
219            unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
220            id_counter: Arc::new(AtomicU64::new(0)),
221            chat_locks: Arc::new(DashMap::new()),
222            group_cache: Arc::new(DashMap::new()),
223            device_cache: Arc::new(DashMap::new()),
224            retried_group_messages: Arc::new(DashMap::new()),
225
226            expected_disconnect: Arc::new(AtomicBool::new(false)),
227
228            recent_msg_tx: recent_handle,
229
230            pending_retries: Arc::new(Mutex::new(HashSet::new())),
231
232            enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
233            auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
234            last_successful_connect: Arc::new(Mutex::new(None)),
235
236            needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
237
238            app_state_processor: Some(AppStateProcessor::new(persistence_manager.backend())),
239            app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
240            initial_keys_synced_notifier: Arc::new(Notify::new()),
241            initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
242            major_sync_task_sender: tx,
243            pairing_cancellation_tx: Arc::new(Mutex::new(None)),
244            send_buffer_pool: Arc::new(Mutex::new(Vec::with_capacity(4))),
245            custom_enc_handlers: Arc::new(DashMap::new()),
246            stanza_router: Self::create_stanza_router(),
247            synchronous_ack: false,
248        };
249
250        let arc = Arc::new(this);
251        (arc, rx)
252    }
253
254    /// Create and configure the stanza router with all the handlers.
255    fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
256        use crate::handlers::{
257            basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
258            ib::IbHandler,
259            iq::IqHandler,
260            message::MessageHandler,
261            notification::NotificationHandler,
262            receipt::ReceiptHandler,
263            router::StanzaRouter,
264            unimplemented::UnimplementedHandler,
265        };
266
267        let mut router = StanzaRouter::new();
268
269        // Register all handlers
270        router.register(Arc::new(MessageHandler::new()));
271        router.register(Arc::new(ReceiptHandler::new()));
272        router.register(Arc::new(IqHandler::new()));
273        router.register(Arc::new(SuccessHandler::new()));
274        router.register(Arc::new(FailureHandler::new()));
275        router.register(Arc::new(StreamErrorHandler::new()));
276        router.register(Arc::new(IbHandler::new()));
277        router.register(Arc::new(NotificationHandler::new()));
278        router.register(Arc::new(AckHandler::new()));
279
280        // Register unimplemented handlers
281        router.register(Arc::new(UnimplementedHandler::for_call()));
282        router.register(Arc::new(UnimplementedHandler::for_presence()));
283        router.register(Arc::new(UnimplementedHandler::for_chatstate()));
284
285        router
286    }
287
288    pub async fn run(self: &Arc<Self>) {
289        if self.is_running.swap(true, Ordering::SeqCst) {
290            warn!("Client `run` method called while already running.");
291            return;
292        }
293        while self.is_running.load(Ordering::Relaxed) {
294            self.expected_disconnect.store(false, Ordering::Relaxed);
295
296            if self.connect().await.is_err() {
297                error!("Failed to connect, will retry...");
298            } else {
299                if self.read_messages_loop().await.is_err() {
300                    warn!(
301                        "Message loop exited with an error. Will attempt to reconnect if enabled."
302                    );
303                } else {
304                    warn!("Message loop exited gracefully.");
305                }
306
307                self.cleanup_connection_state().await;
308            }
309
310            if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
311                self.is_running.store(false, Ordering::Relaxed);
312                break;
313            }
314
315            let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
316            let delay_secs = u64::from(error_count * 2).min(30);
317            let delay = Duration::from_secs(delay_secs);
318            info!(
319                "Will attempt to reconnect in {:?} (attempt {})",
320                delay,
321                error_count + 1
322            );
323            tokio::select! {
324                _ = sleep(delay) => {},
325                _ = self.shutdown_notifier.notified() => {
326                    self.is_running.store(false, Ordering::Relaxed);
327                    break;
328                }
329            }
330        }
331        info!("Client run loop has shut down.");
332    }
333
334    pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
335        if self.is_connecting.swap(true, Ordering::SeqCst) {
336            return Err(ClientError::AlreadyConnected.into());
337        }
338
339        let _guard = scopeguard::guard((), |_| {
340            self.is_connecting.store(false, Ordering::Relaxed);
341        });
342
343        if self.is_connected() {
344            return Err(ClientError::AlreadyConnected.into());
345        }
346
347        let (mut frame_socket, mut frames_rx) = FrameSocket::new();
348        frame_socket.connect().await?;
349
350        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
351        let noise_socket =
352            handshake::do_handshake(&device_snapshot, &mut frame_socket, &mut frames_rx).await?;
353
354        *self.frame_socket.lock().await = Some(frame_socket);
355        *self.frames_rx.lock().await = Some(frames_rx);
356        *self.noise_socket.lock().await = Some(noise_socket);
357
358        let client_clone = self.clone();
359        tokio::spawn(async move { client_clone.keepalive_loop().await });
360
361        Ok(())
362    }
363
364    pub async fn disconnect(&self) {
365        info!("Disconnecting client intentionally.");
366        self.expected_disconnect.store(true, Ordering::Relaxed);
367        self.is_running.store(false, Ordering::Relaxed);
368        self.shutdown_notifier.notify_waiters();
369
370        // Shutdown recent message manager
371        if let Err(e) = self.recent_msg_tx.shutdown().await {
372            warn!("Failed to shutdown recent message manager: {}", e);
373        }
374
375        if let Some(fs) = self.frame_socket.lock().await.as_mut() {
376            fs.close().await;
377        }
378        self.cleanup_connection_state().await;
379    }
380
381    async fn cleanup_connection_state(&self) {
382        self.is_logged_in.store(false, Ordering::Relaxed);
383        *self.frame_socket.lock().await = None;
384        *self.noise_socket.lock().await = None;
385        *self.frames_rx.lock().await = None;
386        self.retried_group_messages.clear();
387    }
388
389    async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
390        info!(target: "Client", "Starting message processing loop...");
391
392        let mut rx_guard = self.frames_rx.lock().await;
393        let mut frames_rx = rx_guard
394            .take()
395            .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
396        drop(rx_guard);
397
398        loop {
399            tokio::select! {
400                    biased;
401                    _ = self.shutdown_notifier.notified() => {
402                        info!(target: "Client", "Shutdown signaled. Exiting message loop.");
403                        return Ok(());
404                    },
405                    frame_opt = frames_rx.recv() => {
406                        match frame_opt {
407                            Some(encrypted_frame) => {
408                                self.process_encrypted_frame(&encrypted_frame).await;
409                            },
410                            None => {
411                                self.cleanup_connection_state().await;
412                                 if !self.expected_disconnect.load(Ordering::Relaxed) {
413                                    self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
414                                    info!("Socket disconnected unexpectedly.");
415                                    return Err(anyhow::anyhow!("Socket disconnected unexpectedly"));
416                                } else {
417                                    info!("Socket disconnected as expected.");
418                                    return Ok(());
419                                }
420                            }
421                    }
422                }
423            }
424        }
425    }
426
427    pub(crate) async fn take_recent_message(
428        &self,
429        to: Jid,
430        id: String,
431    ) -> Result<Option<Arc<wa::Message>>, RecentMessageError> {
432        let key = RecentMessageKey { to, id };
433        let (oneshot_tx, oneshot_rx) = oneshot::channel();
434
435        // Use a timeout to prevent hanging if the task is unresponsive
436        if (self
437            .recent_msg_tx
438            .0
439            .send(RecentMessageCommand::Take(key, oneshot_tx))
440            .await)
441            .is_err()
442        {
443            return Err(RecentMessageError::ManagerUnavailable);
444        }
445
446        // Wait for response with timeout
447        match tokio::time::timeout(Duration::from_secs(5), oneshot_rx).await {
448            Ok(Ok(msg)) => Ok(msg),
449            Ok(Err(_)) => Err(RecentMessageError::TaskDropped),
450            Err(_) => Err(RecentMessageError::ResponseTimeout),
451        }
452    }
453
454    pub(crate) async fn add_recent_message(
455        &self,
456        to: Jid,
457        id: String,
458        msg: Arc<wa::Message>,
459    ) -> Result<(), RecentMessageError> {
460        let key = RecentMessageKey { to, id };
461        self.recent_msg_tx
462            .send_insert(key, msg)
463            .await
464            .map_err(|_| RecentMessageError::ManagerUnavailable)
465    }
466
467    pub(crate) async fn process_encrypted_frame(self: &Arc<Self>, encrypted_frame: &bytes::Bytes) {
468        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
469        let noise_socket = match noise_socket_arc {
470            Some(s) => s,
471            None => {
472                log::error!("Cannot process frame: not connected (no noise socket)");
473                return;
474            }
475        };
476
477        let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
478            Ok(p) => p,
479            Err(e) => {
480                log::error!(target: "Client", "Failed to decrypt frame: {e}");
481                return;
482            }
483        };
484
485        let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
486            Ok(data) => data,
487            Err(e) => {
488                log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
489                return;
490            }
491        };
492
493        match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
494            Ok(node_ref) => {
495                let node = node_ref.to_owned();
496                self.process_node(&node).await;
497            }
498            Err(e) => log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}"),
499        };
500    }
501
502    pub(crate) async fn process_node(self: &Arc<Self>, node: &Node) {
503        if node.tag == "iq"
504            && let Some(sync_node) = node.get_optional_child("sync")
505            && let Some(collection_node) = sync_node.get_optional_child("collection")
506        {
507            let name = collection_node.attrs().string("name");
508            info!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
509        } else {
510            info!(target: "Client/Recv","{}", DisplayableNode(node));
511        }
512
513        // Prepare deferred ACK cancellation flag (sent after dispatch unless cancelled)
514        let mut cancelled = false;
515
516        if node.tag == "xmlstreamend" {
517            warn!(target: "Client", "Received <xmlstreamend/>, treating as disconnect.");
518            self.shutdown_notifier.notify_one();
519            return;
520        }
521
522        if node.tag == "iq" {
523            let id_opt = node.attrs.get("id");
524            if let Some(id) = id_opt {
525                let has_waiter = self.response_waiters.lock().await.contains_key(id);
526                if has_waiter && self.handle_iq_response(node.clone()).await {
527                    return;
528                }
529            }
530        }
531
532        // Dispatch to appropriate handler using the router
533        if !self
534            .stanza_router
535            .dispatch(self.clone(), node, &mut cancelled)
536            .await
537        {
538            warn!(target: "Client", "Received unknown top-level node: {}", DisplayableNode(node));
539        }
540
541        // Send the deferred ACK if applicable and not cancelled by handler
542        if self.should_ack(node) && !cancelled {
543            self.maybe_deferred_ack(node).await;
544        }
545    }
546
547    /// Determine if a node should be acknowledged with <ack/>.
548    fn should_ack(&self, node: &Node) -> bool {
549        matches!(
550            node.tag.as_str(),
551            "message" | "receipt" | "notification" | "call"
552        ) && node.attrs.contains_key("id")
553            && node.attrs.contains_key("from")
554    }
555
556    /// Possibly send a deferred ack: either immediately or via spawned task.
557    /// Handlers can cancel by setting `cancelled` to true.
558    async fn maybe_deferred_ack(self: &Arc<Self>, node: &Node) {
559        if self.synchronous_ack {
560            if let Err(e) = self.send_ack_for(node).await {
561                warn!(target: "Client", "Failed to send ack: {e:?}");
562            }
563        } else {
564            let this = self.clone();
565            let node_clone = node.clone();
566            tokio::spawn(async move {
567                if let Err(e) = this.send_ack_for(&node_clone).await {
568                    warn!(target: "Client", "Failed to send ack: {e:?}");
569                }
570            });
571        }
572    }
573
574    /// Build and send an <ack/> node corresponding to the given stanza.
575    async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
576        let id = match node.attrs.get("id") {
577            Some(v) => v.clone(),
578            None => return Ok(()),
579        };
580        let from = match node.attrs.get("from") {
581            Some(v) => v.clone(),
582            None => return Ok(()),
583        };
584        let participant = node.attrs.get("participant").cloned();
585        let typ = if node.tag != "message" {
586            node.attrs.get("type").cloned()
587        } else {
588            None
589        };
590        let mut attrs = std::collections::HashMap::new();
591        attrs.insert("class".to_string(), node.tag.clone());
592        attrs.insert("id".to_string(), id);
593        attrs.insert("to".to_string(), from);
594        if let Some(p) = participant {
595            attrs.insert("participant".to_string(), p);
596        }
597        if let Some(t) = typ {
598            attrs.insert("type".to_string(), t);
599        }
600        let ack = Node {
601            tag: "ack".to_string(),
602            attrs,
603            content: None,
604        };
605        self.send_node(ack).await
606    }
607
608    pub(crate) async fn handle_unimplemented(&self, tag: &str) {
609        warn!(target: "Client", "TODO: Implement handler for <{tag}>");
610    }
611
612    pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
613        use crate::request::{InfoQuery, InfoQueryType};
614        use SERVER_JID;
615
616        let tag = if passive { "passive" } else { "active" };
617
618        let query = InfoQuery {
619            namespace: "passive",
620            query_type: InfoQueryType::Set,
621            to: SERVER_JID.parse().unwrap(),
622            target: None,
623            id: None,
624            content: Some(wacore_binary::node::NodeContent::Nodes(vec![
625                NodeBuilder::new(tag).build(),
626            ])),
627            timeout: None,
628        };
629
630        self.send_iq(query).await.map(|_| ())
631    }
632
633    pub(crate) async fn handle_success(self: &Arc<Self>, node: &Node) {
634        info!("Successfully authenticated with WhatsApp servers!");
635        self.is_logged_in.store(true, Ordering::Relaxed);
636        *self.last_successful_connect.lock().await = Some(chrono::Utc::now());
637        self.auto_reconnect_errors.store(0, Ordering::Relaxed);
638
639        if let Some(lid_str) = node.attrs.get("lid") {
640            if let Ok(lid) = lid_str.parse::<Jid>() {
641                let device_snapshot = self.persistence_manager.get_device_snapshot().await;
642                if device_snapshot.lid.as_ref() != Some(&lid) {
643                    info!(target: "Client", "Updating LID from server to '{lid}'");
644                    self.persistence_manager
645                        .process_command(DeviceCommand::SetLid(Some(lid)))
646                        .await;
647                }
648            } else {
649                warn!(target: "Client", "Failed to parse LID from success stanza: {lid_str}");
650            }
651        } else {
652            warn!(target: "Client", "LID not found in <success> stanza. Group messaging may fail.");
653        }
654
655        let client_clone = self.clone();
656        tokio::spawn(async move {
657            if let Err(e) = client_clone.set_passive(false).await {
658                warn!("Failed to send post-connect passive IQ: {e:?}");
659            }
660
661            if let Err(e) = client_clone.send_presence(Presence::Available).await {
662                warn!(
663                    "Could not send initial presence: {e:?}. This is expected if push_name is not yet known."
664                );
665            }
666
667            client_clone
668                .core
669                .event_bus
670                .dispatch(&Event::Connected(crate::types::events::Connected));
671
672            if client_clone.needs_initial_full_sync.load(Ordering::Relaxed) {
673                if !client_clone
674                    .initial_app_state_keys_received
675                    .load(Ordering::Relaxed)
676                {
677                    info!(target: "Client/AppState", "Waiting for initial app state keys before starting full sync (15s timeout)...");
678                    match tokio::time::timeout(
679                        APP_STATE_KEY_WAIT_TIMEOUT,
680                        client_clone.initial_keys_synced_notifier.notified(),
681                    )
682                    .await
683                    {
684                        Ok(_) => {
685                            info!(target: "Client/AppState", "Initial app state keys received; proceeding with full sync.")
686                        }
687                        Err(_) => {
688                            warn!(target: "Client/AppState", "Timed out waiting for initial app state keys; continuing anyway (may see 'app state key not found' warnings).")
689                        }
690                    }
691                } else {
692                    info!(target: "Client/AppState", "Initial app state keys already present; starting full sync immediately.");
693                }
694                let names = [
695                    WAPatchName::CriticalBlock,
696                    WAPatchName::CriticalUnblockLow,
697                    WAPatchName::RegularLow,
698                    WAPatchName::RegularHigh,
699                    WAPatchName::Regular,
700                ];
701                for name in names {
702                    if let Err(e) = client_clone.fetch_app_state_with_retry(name).await {
703                        warn!(
704                            "Failed to full sync app state {:?} after retry logic: {e}",
705                            name
706                        );
707                    }
708                }
709                client_clone
710                    .needs_initial_full_sync
711                    .store(false, Ordering::Relaxed);
712            }
713        });
714    }
715
716    async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
717        let mut attempt = 0u32;
718        loop {
719            attempt += 1;
720            let res = self.process_app_state_sync_task(name, true).await;
721            match res {
722                Ok(()) => return Ok(()),
723                Err(e) => {
724                    let es = e.to_string();
725                    if es.contains("app state key not found") && attempt == 1 {
726                        if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
727                            info!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
728                            if tokio::time::timeout(
729                                Duration::from_secs(10),
730                                self.initial_keys_synced_notifier.notified(),
731                            )
732                            .await
733                            .is_err()
734                            {
735                                warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
736                            }
737                        }
738                        continue;
739                    }
740                    if es.contains("database is locked") && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
741                        let backoff = Duration::from_millis(200 * attempt as u64 + 150);
742                        warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
743                        tokio::time::sleep(backoff).await;
744                        continue;
745                    }
746                    return Err(e);
747                }
748            }
749        }
750    }
751
752    pub(crate) async fn process_app_state_sync_task(
753        &self,
754        name: WAPatchName,
755        full_sync: bool,
756    ) -> anyhow::Result<()> {
757        let backend = self.persistence_manager.backend();
758        let mut full_sync = full_sync;
759
760        let mut state = backend.get_app_state_version(name.as_str()).await?;
761        if state.version == 0 {
762            full_sync = true;
763        }
764
765        let mut has_more = true;
766        let want_snapshot = full_sync;
767
768        if has_more {
769            debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
770
771            let mut collection_builder = NodeBuilder::new("collection")
772                .attr("name", name.as_str())
773                .attr(
774                    "return_snapshot",
775                    if want_snapshot { "true" } else { "false" },
776                );
777            if !want_snapshot {
778                collection_builder = collection_builder.attr("version", state.version.to_string());
779            }
780            let sync_node = NodeBuilder::new("sync")
781                .children([collection_builder.build()])
782                .build();
783            let iq = crate::request::InfoQuery {
784                namespace: "w:sync:app:state",
785                query_type: crate::request::InfoQueryType::Set,
786                to: SERVER_JID.parse().unwrap(),
787                target: None,
788                id: None,
789                content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
790                timeout: None,
791            };
792
793            let resp = self.send_iq(iq).await?;
794            debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
795
796            let _decode_start = std::time::Instant::now();
797            let pre_downloaded_snapshot: Option<Vec<u8>> =
798                match wacore::appstate::patch_decode::parse_patch_list(&resp) {
799                    Ok(pl) => {
800                        debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={}", name, pl.snapshot_ref.is_some(), pl.has_more_patches);
801                        if let Some(ext) = &pl.snapshot_ref {
802                            match self.download(ext).await {
803                                Ok(bytes) => Some(bytes),
804                                Err(e) => {
805                                    warn!("Failed to download external snapshot: {e}");
806                                    None
807                                }
808                            }
809                        } else {
810                            None
811                        }
812                    }
813                    Err(_) => None,
814                };
815
816            let download = |_: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
817                if let Some(bytes) = &pre_downloaded_snapshot {
818                    Ok(bytes.clone())
819                } else {
820                    Err(anyhow::anyhow!("snapshot not pre-downloaded"))
821                }
822            };
823
824            if let Some(proc) = &self.app_state_processor {
825                let (mutations, new_state, list) =
826                    proc.decode_patch_list(&resp, &download, true).await?;
827                let decode_elapsed = _decode_start.elapsed();
828                if decode_elapsed.as_millis() > 500 {
829                    debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
830                }
831
832                let missing = proc.get_missing_key_ids(&list).await.unwrap_or_default();
833                if !missing.is_empty() {
834                    let mut to_request: Vec<Vec<u8>> = Vec::new();
835                    let mut guard = self.app_state_key_requests.lock().await;
836                    let now = std::time::Instant::now();
837                    for key_id in missing {
838                        let hex_id = hex::encode(&key_id);
839                        let should = guard
840                            .get(&hex_id)
841                            .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
842                            .unwrap_or(true);
843                        if should {
844                            guard.insert(hex_id, now);
845                            to_request.push(key_id);
846                        }
847                    }
848                    drop(guard);
849                    if !to_request.is_empty() {
850                        self.request_app_state_keys(&to_request).await;
851                    }
852                }
853
854                for m in mutations {
855                    debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
856                    self.dispatch_app_state_mutation(&m, full_sync).await;
857                }
858
859                state = new_state;
860                has_more = list.has_more_patches;
861                debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more}", name);
862            }
863        }
864
865        backend
866            .set_app_state_version(name.as_str(), state.clone())
867            .await?;
868
869        debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
870        Ok(())
871    }
872
873    #[allow(dead_code)]
874    async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
875        if raw_key_ids.is_empty() {
876            return;
877        }
878        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
879        let own_jid = match device_snapshot.pn.clone() {
880            Some(j) => j,
881            None => return,
882        };
883        let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
884            .iter()
885            .map(|k| wa::message::AppStateSyncKeyId {
886                key_id: Some(k.clone()),
887            })
888            .collect();
889        let msg = wa::Message {
890            protocol_message: Some(Box::new(wa::message::ProtocolMessage {
891                r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
892                app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
893                ..Default::default()
894            })),
895            ..Default::default()
896        };
897        if let Err(e) = self
898            .send_message_impl(
899                own_jid,
900                Arc::new(msg),
901                Some(self.generate_message_id().await),
902                true,
903                false,
904                None,
905            )
906            .await
907        {
908            warn!("Failed to send app state key request: {e}");
909        }
910    }
911
912    #[allow(dead_code)]
913    async fn dispatch_app_state_mutation(
914        &self,
915        m: &crate::appstate_sync::Mutation,
916        full_sync: bool,
917    ) {
918        use wacore::types::events::{
919            ArchiveUpdate, ContactUpdate, Event, MarkChatAsReadUpdate, MuteUpdate, PinUpdate,
920        };
921        if m.operation != wa::syncd_mutation::SyncdOperation::Set {
922            return;
923        }
924        if m.index.is_empty() {
925            return;
926        }
927        let kind = &m.index[0];
928        let ts = m
929            .action_value
930            .as_ref()
931            .and_then(|v| v.timestamp)
932            .unwrap_or(0);
933        let time = chrono::DateTime::from_timestamp_millis(ts).unwrap_or_else(chrono::Utc::now);
934        let jid = if m.index.len() > 1 {
935            m.index[1].parse().unwrap_or_default()
936        } else {
937            Jid::default()
938        };
939        match kind.as_str() {
940            "setting_pushName" => {
941                if let Some(val) = &m.action_value
942                    && let Some(act) = &val.push_name_setting
943                    && let Some(new_name) = &act.name
944                {
945                    let new_name = new_name.clone();
946                    let bus = self.core.event_bus.clone();
947
948                    let snapshot = self.persistence_manager.get_device_snapshot().await;
949                    let old = snapshot.push_name.clone();
950                    if old != new_name {
951                        info!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
952                        self.persistence_manager
953                            .process_command(DeviceCommand::SetPushName(new_name.clone()))
954                            .await;
955                        bus.dispatch(&Event::SelfPushNameUpdated(
956                            crate::types::events::SelfPushNameUpdated {
957                                from_server: true,
958                                old_name: old,
959                                new_name: new_name.clone(),
960                            },
961                        ));
962                    } else {
963                        debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
964                    }
965                }
966            }
967            "mute" => {
968                if let Some(val) = &m.action_value
969                    && let Some(act) = &val.mute_action
970                {
971                    self.core.event_bus.dispatch(&Event::MuteUpdate(MuteUpdate {
972                        jid,
973                        timestamp: time,
974                        action: Box::new(*act),
975                        from_full_sync: full_sync,
976                    }));
977                }
978            }
979            "pin" | "pin_v1" => {
980                if let Some(val) = &m.action_value
981                    && let Some(act) = &val.pin_action
982                {
983                    self.core.event_bus.dispatch(&Event::PinUpdate(PinUpdate {
984                        jid,
985                        timestamp: time,
986                        action: Box::new(*act),
987                        from_full_sync: full_sync,
988                    }));
989                }
990            }
991            "archive" => {
992                if let Some(val) = &m.action_value
993                    && let Some(act) = &val.archive_chat_action
994                {
995                    self.core
996                        .event_bus
997                        .dispatch(&Event::ArchiveUpdate(ArchiveUpdate {
998                            jid,
999                            timestamp: time,
1000                            action: Box::new(act.clone()),
1001                            from_full_sync: full_sync,
1002                        }));
1003                }
1004            }
1005            "contact" => {
1006                if let Some(val) = &m.action_value
1007                    && let Some(act) = &val.contact_action
1008                {
1009                    self.core
1010                        .event_bus
1011                        .dispatch(&Event::ContactUpdate(ContactUpdate {
1012                            jid,
1013                            timestamp: time,
1014                            action: Box::new(act.clone()),
1015                            from_full_sync: full_sync,
1016                        }));
1017                }
1018            }
1019            "mark_chat_as_read" | "markChatAsRead" => {
1020                if let Some(val) = &m.action_value
1021                    && let Some(act) = &val.mark_chat_as_read_action
1022                {
1023                    self.core.event_bus.dispatch(&Event::MarkChatAsReadUpdate(
1024                        MarkChatAsReadUpdate {
1025                            jid,
1026                            timestamp: time,
1027                            action: Box::new(act.clone()),
1028                            from_full_sync: full_sync,
1029                        },
1030                    ));
1031                }
1032            }
1033            _ => {}
1034        }
1035    }
1036
1037    async fn expect_disconnect(&self) {
1038        self.expected_disconnect.store(true, Ordering::Relaxed);
1039    }
1040
1041    pub(crate) async fn handle_stream_error(&self, node: &Node) {
1042        self.is_logged_in.store(false, Ordering::Relaxed);
1043
1044        let mut attrs = node.attrs();
1045        let code = attrs.optional_string("code").unwrap_or("");
1046        let conflict_type = node
1047            .get_optional_child("conflict")
1048            .map(|n| n.attrs().optional_string("type").unwrap_or("").to_string())
1049            .unwrap_or_default();
1050
1051        match (code, conflict_type.as_str()) {
1052            ("515", _) => {
1053                info!(target: "Client", "Got 515 stream error, server is closing stream. Will auto-reconnect.");
1054            }
1055            ("401", "device_removed") | (_, "replaced") => {
1056                info!(target: "Client", "Got stream error indicating client was removed or replaced. Logging out.");
1057                self.expect_disconnect().await;
1058                self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1059
1060                let event = if conflict_type == "replaced" {
1061                    Event::StreamReplaced(crate::types::events::StreamReplaced)
1062                } else {
1063                    Event::LoggedOut(crate::types::events::LoggedOut {
1064                        on_connect: false,
1065                        reason: ConnectFailureReason::LoggedOut,
1066                    })
1067                };
1068                self.core.event_bus.dispatch(&event);
1069            }
1070            ("503", _) => {
1071                info!(target: "Client", "Got 503 service unavailable, will auto-reconnect.");
1072            }
1073            _ => {
1074                error!(target: "Client", "Unknown stream error: {}", DisplayableNode(node));
1075                self.expect_disconnect().await;
1076                self.core.event_bus.dispatch(&Event::StreamError(
1077                    crate::types::events::StreamError {
1078                        code: code.to_string(),
1079                        raw: Some(node.clone()),
1080                    },
1081                ));
1082            }
1083        }
1084
1085        self.shutdown_notifier.notify_one();
1086    }
1087
1088    pub(crate) async fn handle_connect_failure(&self, node: &Node) {
1089        self.expected_disconnect.store(true, Ordering::Relaxed);
1090        self.shutdown_notifier.notify_one();
1091
1092        let mut attrs = node.attrs();
1093        let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
1094        let reason = ConnectFailureReason::from(reason_code);
1095
1096        if reason.should_reconnect() {
1097            self.expected_disconnect.store(false, Ordering::Relaxed);
1098        } else {
1099            self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1100        }
1101
1102        if reason.is_logged_out() {
1103            info!(target: "Client", "Got {reason:?} connect failure, logging out.");
1104            self.core
1105                .event_bus
1106                .dispatch(&wacore::types::events::Event::LoggedOut(
1107                    crate::types::events::LoggedOut {
1108                        on_connect: true,
1109                        reason,
1110                    },
1111                ));
1112        } else if let ConnectFailureReason::TempBanned = reason {
1113            let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
1114            let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
1115            let expire_duration =
1116                chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
1117            warn!(target: "Client", "Temporary ban connect failure: {}", DisplayableNode(node));
1118            self.core.event_bus.dispatch(&Event::TemporaryBan(
1119                crate::types::events::TemporaryBan {
1120                    code: crate::types::events::TempBanReason::from(ban_code),
1121                    expire: expire_duration,
1122                },
1123            ));
1124        } else if let ConnectFailureReason::ClientOutdated = reason {
1125            error!(target: "Client", "Client is outdated and was rejected by server.");
1126            self.core
1127                .event_bus
1128                .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
1129        } else {
1130            warn!(target: "Client", "Unknown connect failure: {}", DisplayableNode(node));
1131            self.core.event_bus.dispatch(&Event::ConnectFailure(
1132                crate::types::events::ConnectFailure {
1133                    reason,
1134                    message: attrs.optional_string("message").unwrap_or("").to_string(),
1135                    raw: Some(node.clone()),
1136                },
1137            ));
1138        }
1139    }
1140
1141    pub(crate) async fn handle_iq(self: &Arc<Self>, node: &Node) -> bool {
1142        if let Some("get") = node.attrs().optional_string("type")
1143            && let Some(_ping_node) = node.get_optional_child("ping")
1144        {
1145            info!(target: "Client", "Received ping, sending pong.");
1146            let mut parser = node.attrs();
1147            let from_jid = parser.jid("from");
1148            let id = parser.string("id");
1149            let pong = NodeBuilder::new("iq")
1150                .attrs([
1151                    ("to", from_jid.to_string()),
1152                    ("id", id),
1153                    ("type", "result".to_string()),
1154                ])
1155                .build();
1156            if let Err(e) = self.send_node(pong).await {
1157                warn!("Failed to send pong: {e:?}");
1158            }
1159            return true;
1160        }
1161
1162        if pair::handle_iq(self, node).await {
1163            return true;
1164        }
1165
1166        false
1167    }
1168
1169    pub fn is_connected(&self) -> bool {
1170        self.noise_socket
1171            .try_lock()
1172            .is_ok_and(|guard| guard.is_some())
1173    }
1174
1175    pub fn is_logged_in(&self) -> bool {
1176        self.is_logged_in.load(Ordering::Relaxed)
1177    }
1178
1179    /// Get access to the PersistenceManager for this client.
1180    /// This is useful for multi-account scenarios to get the device ID.
1181    pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
1182        self.persistence_manager.clone()
1183    }
1184
1185    pub async fn edit_message(
1186        &self,
1187        to: Jid,
1188        original_id: String,
1189        new_content: wa::Message,
1190    ) -> Result<String, anyhow::Error> {
1191        let own_jid = self
1192            .get_pn()
1193            .await
1194            .ok_or_else(|| anyhow!("Not logged in"))?;
1195
1196        let edit_container_message = wa::Message {
1197            edited_message: Some(Box::new(wa::message::FutureProofMessage {
1198                message: Some(Box::new(wa::Message {
1199                    protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1200                        key: Some(wa::MessageKey {
1201                            remote_jid: Some(to.to_string()),
1202                            from_me: Some(true),
1203                            id: Some(original_id.clone()),
1204                            participant: if to.is_group() {
1205                                Some(own_jid.to_non_ad().to_string())
1206                            } else {
1207                                None
1208                            },
1209                        }),
1210                        r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
1211                        edited_message: Some(Box::new(new_content)),
1212                        timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
1213                        ..Default::default()
1214                    })),
1215                    ..Default::default()
1216                })),
1217            })),
1218            ..Default::default()
1219        };
1220
1221        self.send_message_impl(
1222            to,
1223            Arc::new(edit_container_message),
1224            Some(original_id.clone()),
1225            false,
1226            false,
1227            Some(crate::types::message::EditAttribute::MessageEdit),
1228        )
1229        .await?;
1230
1231        Ok(original_id)
1232    }
1233
1234    pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
1235        let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1236        let noise_socket = match noise_socket_arc {
1237            Some(socket) => socket,
1238            None => return Err(ClientError::NotConnected),
1239        };
1240
1241        info!(target: "Client/Send", "{}", DisplayableNode(&node));
1242        let mut pool_guard = self.send_buffer_pool.lock().await;
1243        let mut plaintext_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1244        let mut encrypted_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1245        drop(pool_guard);
1246
1247        plaintext_buf.clear();
1248        encrypted_buf.clear();
1249
1250        if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
1251            error!("Failed to marshal node: {e:?}");
1252            let mut g = self.send_buffer_pool.lock().await;
1253            g.push(plaintext_buf);
1254            g.push(encrypted_buf);
1255            return Err(SocketError::Crypto("Marshal error".to_string()).into());
1256        }
1257
1258        let send_res = noise_socket
1259            .encrypt_and_send(plaintext_buf, encrypted_buf)
1260            .await;
1261
1262        let plaintext_buf = match send_res {
1263            Ok(buf) => buf,
1264            Err(e) => {
1265                return Err(e.into());
1266            }
1267        };
1268
1269        let mut g = self.send_buffer_pool.lock().await;
1270        if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1271            g.push(plaintext_buf);
1272        }
1273        g.push(Vec::new());
1274        Ok(())
1275    }
1276
1277    pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
1278        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1279        let old_name = device_snapshot.push_name.clone();
1280
1281        if old_name == new_name {
1282            return;
1283        }
1284
1285        log::info!("Updating push name from '{}' -> '{}'", old_name, new_name);
1286        self.persistence_manager
1287            .process_command(DeviceCommand::SetPushName(new_name.clone()))
1288            .await;
1289
1290        self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
1291            crate::types::events::SelfPushNameUpdated {
1292                from_server: true,
1293                old_name,
1294                new_name: new_name.clone(),
1295            },
1296        ));
1297
1298        let client_clone = self.clone();
1299        tokio::spawn(async move {
1300            if let Err(e) = client_clone.send_presence(Presence::Available).await {
1301                log::warn!("Failed to send presence after push name update: {:?}", e);
1302            } else {
1303                log::info!("Sent presence after push name update.");
1304            }
1305        });
1306    }
1307
1308    pub async fn get_push_name(&self) -> String {
1309        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1310        device_snapshot.push_name.clone()
1311    }
1312
1313    pub async fn get_pn(&self) -> Option<Jid> {
1314        let snapshot = self.persistence_manager.get_device_snapshot().await;
1315        snapshot.pn.clone()
1316    }
1317
1318    pub async fn get_lid(&self) -> Option<Jid> {
1319        let snapshot = self.persistence_manager.get_device_snapshot().await;
1320        snapshot.lid.clone()
1321    }
1322
1323    pub(crate) async fn send_protocol_receipt(
1324        &self,
1325        id: String,
1326        receipt_type: crate::types::presence::ReceiptType,
1327    ) {
1328        if id.is_empty() {
1329            return;
1330        }
1331        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1332        if let Some(own_jid) = &device_snapshot.pn {
1333            let node = NodeBuilder::new("receipt")
1334                .attrs([
1335                    ("id", id),
1336                    ("type", format!("{:?}", receipt_type).to_lowercase()),
1337                    ("to", own_jid.to_non_ad().to_string()),
1338                ])
1339                .build();
1340
1341            if let Err(e) = self.send_node(node).await {
1342                warn!(
1343                    "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
1344                    receipt_type, self.unique_id, e
1345                );
1346            }
1347        }
1348    }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353    use super::*;
1354    use wacore_binary::builder::NodeBuilder;
1355
1356    #[tokio::test]
1357    async fn test_ack_behavior_for_incoming_stanzas() {
1358        let backend = Arc::new(
1359            crate::store::sqlite_store::SqliteStore::new(":memory:")
1360                .await
1361                .expect("Failed to create in-memory backend for test"),
1362        );
1363        let pm = Arc::new(PersistenceManager::new(backend).await.unwrap());
1364        let (client, _rx) = Client::new(pm).await;
1365
1366        // 2. A <receipt> stanza.
1367        // These MUST be acknowledged for the server to know we've processed them.
1368        let receipt_node = NodeBuilder::new("receipt")
1369            .attr("from", "s.whatsapp.net")
1370            .attr("id", "RCPT-1")
1371            .build();
1372
1373        // 3. A <notification> stanza.
1374        // These also require an acknowledgment.
1375        let notification_node = NodeBuilder::new("notification")
1376            .attr("from", "s.whatsapp.net")
1377            .attr("id", "NOTIF-1")
1378            .build();
1379
1380        // --- Assertions ---
1381
1382        // Verify that we still ack other critical stanzas (regression check).
1383        assert!(
1384            client.should_ack(&receipt_node),
1385            "should_ack must still return TRUE for <receipt> stanzas."
1386        );
1387        assert!(
1388            client.should_ack(&notification_node),
1389            "should_ack must still return TRUE for <notification> stanzas."
1390        );
1391
1392        info!(
1393            "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
1394        );
1395    }
1396}