1mod context_impl;
2
3use crate::handshake;
4use crate::pair;
5use anyhow::anyhow;
6use dashmap::DashMap;
7use tokio::sync::watch;
8use wacore::xml::DisplayableNode;
9use wacore_binary::builder::NodeBuilder;
10use wacore_binary::jid::JidExt;
11use wacore_binary::node::Node;
12
13use crate::appstate_sync::AppStateProcessor;
14use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
15use crate::types::enc_handler::EncHandler;
16use crate::types::events::{ConnectFailureReason, Event};
17use crate::types::presence::Presence;
18
19use log::{debug, error, info, warn};
22
23use rand::RngCore;
24use scopeguard;
25use std::collections::{HashMap, HashSet, VecDeque};
26use wacore_binary::jid::Jid;
27use wacore_binary::jid::SERVER_JID;
28
29use std::sync::Arc;
30use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
31
32use thiserror::Error;
33use tokio::sync::{Mutex, Notify, RwLock, mpsc, oneshot};
34use tokio::time::{Duration, sleep};
35use wacore::appstate::patch_decode::WAPatchName;
36use wacore::client::context::GroupInfo;
37use waproto::whatsapp as wa;
38
39use crate::socket::{FrameSocket, NoiseSocket, SocketError};
40use crate::sync_task::MajorSyncTask;
41
42const APP_STATE_KEY_WAIT_TIMEOUT: Duration = Duration::from_secs(15);
43const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
44
45const MAX_POOLED_BUFFER_CAP: usize = 512 * 1024;
46
47#[derive(Debug, Error)]
48pub enum ClientError {
49 #[error("client is not connected")]
50 NotConnected,
51 #[error("socket error: {0}")]
52 Socket(#[from] SocketError),
53 #[error("client is already connected")]
54 AlreadyConnected,
55 #[error("client is not logged in")]
56 NotLoggedIn,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
60pub struct RecentMessageKey {
61 pub to: Jid,
62 pub id: String,
63}
64
65#[derive(Debug, Clone)]
66pub(crate) struct RecentMessageManagerHandle(pub mpsc::Sender<RecentMessageCommand>);
67
68impl RecentMessageManagerHandle {
69 pub(crate) async fn send_insert(
70 &self,
71 key: RecentMessageKey,
72 msg: Arc<wa::Message>,
73 ) -> Result<(), mpsc::error::SendError<RecentMessageCommand>> {
74 self.0.send(RecentMessageCommand::Insert(key, msg)).await
75 }
76
77 pub(crate) async fn shutdown(
78 &self,
79 ) -> Result<(), mpsc::error::SendError<RecentMessageCommand>> {
80 self.0.send(RecentMessageCommand::Shutdown).await
81 }
82}
83
84#[derive(Debug)]
85pub enum RecentMessageCommand {
86 Insert(RecentMessageKey, Arc<wa::Message>),
87 Take(RecentMessageKey, oneshot::Sender<Option<Arc<wa::Message>>>),
88 Shutdown,
89}
90
91#[derive(Debug, thiserror::Error)]
92pub enum RecentMessageError {
93 #[error("Manager task unavailable - channel send failed")]
94 ManagerUnavailable,
95 #[error("Manager task did not respond within timeout")]
96 ResponseTimeout,
97 #[error("Manager task panicked or was dropped")]
98 TaskDropped,
99}
100
101pub struct Client {
102 pub(crate) core: wacore::client::CoreClient,
103
104 pub(crate) persistence_manager: Arc<PersistenceManager>,
105 pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
106
107 pub(crate) is_logged_in: Arc<AtomicBool>,
108 pub(crate) is_connecting: Arc<AtomicBool>,
109 pub(crate) is_running: Arc<AtomicBool>,
110 pub(crate) shutdown_notifier: Arc<Notify>,
111
112 pub(crate) frame_socket: Arc<Mutex<Option<FrameSocket>>>,
113 pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
114 pub(crate) frames_rx: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<bytes::Bytes>>>>,
115
116 pub(crate) response_waiters:
117 Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<wacore_binary::Node>>>>,
118 pub(crate) unique_id: String,
119 pub(crate) id_counter: Arc<AtomicU64>,
120
121 pub(crate) chat_locks: Arc<DashMap<Jid, Arc<tokio::sync::Mutex<()>>>>,
122 pub group_cache: Arc<DashMap<Jid, GroupInfo>>,
123 pub device_cache: Arc<DashMap<Jid, (Vec<Jid>, std::time::Instant)>>,
124
125 pub(crate) retried_group_messages: Arc<DashMap<String, ()>>,
126 pub(crate) expected_disconnect: Arc<AtomicBool>,
127
128 pub(crate) recent_msg_tx: RecentMessageManagerHandle,
129
130 pub(crate) pending_retries: Arc<Mutex<HashSet<String>>>,
131
132 pub enable_auto_reconnect: Arc<AtomicBool>,
133 pub auto_reconnect_errors: Arc<AtomicU32>,
134 pub last_successful_connect: Arc<Mutex<Option<chrono::DateTime<chrono::Utc>>>>,
135
136 pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
137
138 pub(crate) app_state_processor: Option<AppStateProcessor>,
139 pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, std::time::Instant>>>,
140 pub(crate) initial_keys_synced_notifier: Arc<Notify>,
141 pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
142 pub(crate) major_sync_task_sender: mpsc::Sender<MajorSyncTask>,
143 pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<watch::Sender<()>>>>,
144
145 pub(crate) send_buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
146
147 pub custom_enc_handlers: Arc<DashMap<String, Arc<dyn EncHandler>>>,
149
150 pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
152
153 pub(crate) synchronous_ack: bool,
155}
156
157impl Client {
158 pub async fn new(
159 persistence_manager: Arc<PersistenceManager>,
160 ) -> (Arc<Self>, mpsc::Receiver<MajorSyncTask>) {
161 let mut unique_id_bytes = [0u8; 2];
162 rand::rng().fill_bytes(&mut unique_id_bytes);
163
164 let device_snapshot = persistence_manager.get_device_snapshot().await;
165 let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
166
167 let (tx, rx) = mpsc::channel(32);
168
169 let (recent_tx, mut recent_rx) = mpsc::channel(256);
170 let recent_handle = RecentMessageManagerHandle(recent_tx);
171
172 let map_inner = Arc::new(Mutex::new(HashMap::with_capacity(256)));
173 let list_inner = Arc::new(Mutex::new(VecDeque::with_capacity(256)));
174 let map_clone = map_inner.clone();
175 let list_clone = list_inner.clone();
176 tokio::spawn(async move {
177 while let Some(cmd) = recent_rx.recv().await {
178 match cmd {
179 RecentMessageCommand::Insert(key, msg) => {
180 let mut map = map_clone.lock().await;
181 let mut list = list_clone.lock().await;
182 map.insert(key.clone(), msg);
183 list.push_back(key);
184 if list.len() > 256
185 && let Some(old_key) = list.pop_front()
186 {
187 map.remove(&old_key);
188 }
189 }
190 RecentMessageCommand::Take(key, responder) => {
191 let mut map = map_clone.lock().await;
192 let mut list = list_clone.lock().await;
193 let msg = map.remove(&key);
194 list.retain(|k| k != &key);
195 let _ = responder.send(msg);
196 }
197 RecentMessageCommand::Shutdown => {
198 info!("RecentMessageManager shutting down");
199 break;
200 }
201 }
202 }
203 });
204
205 let this = Self {
206 core,
207 persistence_manager: persistence_manager.clone(),
208 media_conn: Arc::new(RwLock::new(None)),
209 is_logged_in: Arc::new(AtomicBool::new(false)),
210 is_connecting: Arc::new(AtomicBool::new(false)),
211 is_running: Arc::new(AtomicBool::new(false)),
212 shutdown_notifier: Arc::new(Notify::new()),
213
214 frame_socket: Arc::new(Mutex::new(None)),
215 noise_socket: Arc::new(Mutex::new(None)),
216 frames_rx: Arc::new(Mutex::new(None)),
217
218 response_waiters: Arc::new(Mutex::new(HashMap::new())),
219 unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
220 id_counter: Arc::new(AtomicU64::new(0)),
221 chat_locks: Arc::new(DashMap::new()),
222 group_cache: Arc::new(DashMap::new()),
223 device_cache: Arc::new(DashMap::new()),
224 retried_group_messages: Arc::new(DashMap::new()),
225
226 expected_disconnect: Arc::new(AtomicBool::new(false)),
227
228 recent_msg_tx: recent_handle,
229
230 pending_retries: Arc::new(Mutex::new(HashSet::new())),
231
232 enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
233 auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
234 last_successful_connect: Arc::new(Mutex::new(None)),
235
236 needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
237
238 app_state_processor: Some(AppStateProcessor::new(persistence_manager.backend())),
239 app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
240 initial_keys_synced_notifier: Arc::new(Notify::new()),
241 initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
242 major_sync_task_sender: tx,
243 pairing_cancellation_tx: Arc::new(Mutex::new(None)),
244 send_buffer_pool: Arc::new(Mutex::new(Vec::with_capacity(4))),
245 custom_enc_handlers: Arc::new(DashMap::new()),
246 stanza_router: Self::create_stanza_router(),
247 synchronous_ack: false,
248 };
249
250 let arc = Arc::new(this);
251 (arc, rx)
252 }
253
254 fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
256 use crate::handlers::{
257 basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
258 ib::IbHandler,
259 iq::IqHandler,
260 message::MessageHandler,
261 notification::NotificationHandler,
262 receipt::ReceiptHandler,
263 router::StanzaRouter,
264 unimplemented::UnimplementedHandler,
265 };
266
267 let mut router = StanzaRouter::new();
268
269 router.register(Arc::new(MessageHandler::new()));
271 router.register(Arc::new(ReceiptHandler::new()));
272 router.register(Arc::new(IqHandler::new()));
273 router.register(Arc::new(SuccessHandler::new()));
274 router.register(Arc::new(FailureHandler::new()));
275 router.register(Arc::new(StreamErrorHandler::new()));
276 router.register(Arc::new(IbHandler::new()));
277 router.register(Arc::new(NotificationHandler::new()));
278 router.register(Arc::new(AckHandler::new()));
279
280 router.register(Arc::new(UnimplementedHandler::for_call()));
282 router.register(Arc::new(UnimplementedHandler::for_presence()));
283 router.register(Arc::new(UnimplementedHandler::for_chatstate()));
284
285 router
286 }
287
288 pub async fn run(self: &Arc<Self>) {
289 if self.is_running.swap(true, Ordering::SeqCst) {
290 warn!("Client `run` method called while already running.");
291 return;
292 }
293 while self.is_running.load(Ordering::Relaxed) {
294 self.expected_disconnect.store(false, Ordering::Relaxed);
295
296 if self.connect().await.is_err() {
297 error!("Failed to connect, will retry...");
298 } else {
299 if self.read_messages_loop().await.is_err() {
300 warn!(
301 "Message loop exited with an error. Will attempt to reconnect if enabled."
302 );
303 } else {
304 warn!("Message loop exited gracefully.");
305 }
306
307 self.cleanup_connection_state().await;
308 }
309
310 if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
311 self.is_running.store(false, Ordering::Relaxed);
312 break;
313 }
314
315 let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
316 let delay_secs = u64::from(error_count * 2).min(30);
317 let delay = Duration::from_secs(delay_secs);
318 info!(
319 "Will attempt to reconnect in {:?} (attempt {})",
320 delay,
321 error_count + 1
322 );
323 tokio::select! {
324 _ = sleep(delay) => {},
325 _ = self.shutdown_notifier.notified() => {
326 self.is_running.store(false, Ordering::Relaxed);
327 break;
328 }
329 }
330 }
331 info!("Client run loop has shut down.");
332 }
333
334 pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
335 if self.is_connecting.swap(true, Ordering::SeqCst) {
336 return Err(ClientError::AlreadyConnected.into());
337 }
338
339 let _guard = scopeguard::guard((), |_| {
340 self.is_connecting.store(false, Ordering::Relaxed);
341 });
342
343 if self.is_connected() {
344 return Err(ClientError::AlreadyConnected.into());
345 }
346
347 let (mut frame_socket, mut frames_rx) = FrameSocket::new();
348 frame_socket.connect().await?;
349
350 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
351 let noise_socket =
352 handshake::do_handshake(&device_snapshot, &mut frame_socket, &mut frames_rx).await?;
353
354 *self.frame_socket.lock().await = Some(frame_socket);
355 *self.frames_rx.lock().await = Some(frames_rx);
356 *self.noise_socket.lock().await = Some(noise_socket);
357
358 let client_clone = self.clone();
359 tokio::spawn(async move { client_clone.keepalive_loop().await });
360
361 Ok(())
362 }
363
364 pub async fn disconnect(&self) {
365 info!("Disconnecting client intentionally.");
366 self.expected_disconnect.store(true, Ordering::Relaxed);
367 self.is_running.store(false, Ordering::Relaxed);
368 self.shutdown_notifier.notify_waiters();
369
370 if let Err(e) = self.recent_msg_tx.shutdown().await {
372 warn!("Failed to shutdown recent message manager: {}", e);
373 }
374
375 if let Some(fs) = self.frame_socket.lock().await.as_mut() {
376 fs.close().await;
377 }
378 self.cleanup_connection_state().await;
379 }
380
381 async fn cleanup_connection_state(&self) {
382 self.is_logged_in.store(false, Ordering::Relaxed);
383 *self.frame_socket.lock().await = None;
384 *self.noise_socket.lock().await = None;
385 *self.frames_rx.lock().await = None;
386 self.retried_group_messages.clear();
387 }
388
389 async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
390 info!(target: "Client", "Starting message processing loop...");
391
392 let mut rx_guard = self.frames_rx.lock().await;
393 let mut frames_rx = rx_guard
394 .take()
395 .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
396 drop(rx_guard);
397
398 loop {
399 tokio::select! {
400 biased;
401 _ = self.shutdown_notifier.notified() => {
402 info!(target: "Client", "Shutdown signaled. Exiting message loop.");
403 return Ok(());
404 },
405 frame_opt = frames_rx.recv() => {
406 match frame_opt {
407 Some(encrypted_frame) => {
408 self.process_encrypted_frame(&encrypted_frame).await;
409 },
410 None => {
411 self.cleanup_connection_state().await;
412 if !self.expected_disconnect.load(Ordering::Relaxed) {
413 self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
414 info!("Socket disconnected unexpectedly.");
415 return Err(anyhow::anyhow!("Socket disconnected unexpectedly"));
416 } else {
417 info!("Socket disconnected as expected.");
418 return Ok(());
419 }
420 }
421 }
422 }
423 }
424 }
425 }
426
427 pub(crate) async fn take_recent_message(
428 &self,
429 to: Jid,
430 id: String,
431 ) -> Result<Option<Arc<wa::Message>>, RecentMessageError> {
432 let key = RecentMessageKey { to, id };
433 let (oneshot_tx, oneshot_rx) = oneshot::channel();
434
435 if (self
437 .recent_msg_tx
438 .0
439 .send(RecentMessageCommand::Take(key, oneshot_tx))
440 .await)
441 .is_err()
442 {
443 return Err(RecentMessageError::ManagerUnavailable);
444 }
445
446 match tokio::time::timeout(Duration::from_secs(5), oneshot_rx).await {
448 Ok(Ok(msg)) => Ok(msg),
449 Ok(Err(_)) => Err(RecentMessageError::TaskDropped),
450 Err(_) => Err(RecentMessageError::ResponseTimeout),
451 }
452 }
453
454 pub(crate) async fn add_recent_message(
455 &self,
456 to: Jid,
457 id: String,
458 msg: Arc<wa::Message>,
459 ) -> Result<(), RecentMessageError> {
460 let key = RecentMessageKey { to, id };
461 self.recent_msg_tx
462 .send_insert(key, msg)
463 .await
464 .map_err(|_| RecentMessageError::ManagerUnavailable)
465 }
466
467 pub(crate) async fn process_encrypted_frame(self: &Arc<Self>, encrypted_frame: &bytes::Bytes) {
468 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
469 let noise_socket = match noise_socket_arc {
470 Some(s) => s,
471 None => {
472 log::error!("Cannot process frame: not connected (no noise socket)");
473 return;
474 }
475 };
476
477 let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
478 Ok(p) => p,
479 Err(e) => {
480 log::error!(target: "Client", "Failed to decrypt frame: {e}");
481 return;
482 }
483 };
484
485 let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
486 Ok(data) => data,
487 Err(e) => {
488 log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
489 return;
490 }
491 };
492
493 match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
494 Ok(node_ref) => {
495 let node = node_ref.to_owned();
496 self.process_node(&node).await;
497 }
498 Err(e) => log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}"),
499 };
500 }
501
502 pub(crate) async fn process_node(self: &Arc<Self>, node: &Node) {
503 if node.tag == "iq"
504 && let Some(sync_node) = node.get_optional_child("sync")
505 && let Some(collection_node) = sync_node.get_optional_child("collection")
506 {
507 let name = collection_node.attrs().string("name");
508 info!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
509 } else {
510 info!(target: "Client/Recv","{}", DisplayableNode(node));
511 }
512
513 let mut cancelled = false;
515
516 if node.tag == "xmlstreamend" {
517 warn!(target: "Client", "Received <xmlstreamend/>, treating as disconnect.");
518 self.shutdown_notifier.notify_one();
519 return;
520 }
521
522 if node.tag == "iq" {
523 let id_opt = node.attrs.get("id");
524 if let Some(id) = id_opt {
525 let has_waiter = self.response_waiters.lock().await.contains_key(id);
526 if has_waiter && self.handle_iq_response(node.clone()).await {
527 return;
528 }
529 }
530 }
531
532 if !self
534 .stanza_router
535 .dispatch(self.clone(), node, &mut cancelled)
536 .await
537 {
538 warn!(target: "Client", "Received unknown top-level node: {}", DisplayableNode(node));
539 }
540
541 if self.should_ack(node) && !cancelled {
543 self.maybe_deferred_ack(node).await;
544 }
545 }
546
547 fn should_ack(&self, node: &Node) -> bool {
549 matches!(
550 node.tag.as_str(),
551 "message" | "receipt" | "notification" | "call"
552 ) && node.attrs.contains_key("id")
553 && node.attrs.contains_key("from")
554 }
555
556 async fn maybe_deferred_ack(self: &Arc<Self>, node: &Node) {
559 if self.synchronous_ack {
560 if let Err(e) = self.send_ack_for(node).await {
561 warn!(target: "Client", "Failed to send ack: {e:?}");
562 }
563 } else {
564 let this = self.clone();
565 let node_clone = node.clone();
566 tokio::spawn(async move {
567 if let Err(e) = this.send_ack_for(&node_clone).await {
568 warn!(target: "Client", "Failed to send ack: {e:?}");
569 }
570 });
571 }
572 }
573
574 async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
576 let id = match node.attrs.get("id") {
577 Some(v) => v.clone(),
578 None => return Ok(()),
579 };
580 let from = match node.attrs.get("from") {
581 Some(v) => v.clone(),
582 None => return Ok(()),
583 };
584 let participant = node.attrs.get("participant").cloned();
585 let typ = if node.tag != "message" {
586 node.attrs.get("type").cloned()
587 } else {
588 None
589 };
590 let mut attrs = std::collections::HashMap::new();
591 attrs.insert("class".to_string(), node.tag.clone());
592 attrs.insert("id".to_string(), id);
593 attrs.insert("to".to_string(), from);
594 if let Some(p) = participant {
595 attrs.insert("participant".to_string(), p);
596 }
597 if let Some(t) = typ {
598 attrs.insert("type".to_string(), t);
599 }
600 let ack = Node {
601 tag: "ack".to_string(),
602 attrs,
603 content: None,
604 };
605 self.send_node(ack).await
606 }
607
608 pub(crate) async fn handle_unimplemented(&self, tag: &str) {
609 warn!(target: "Client", "TODO: Implement handler for <{tag}>");
610 }
611
612 pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
613 use crate::request::{InfoQuery, InfoQueryType};
614 use SERVER_JID;
615
616 let tag = if passive { "passive" } else { "active" };
617
618 let query = InfoQuery {
619 namespace: "passive",
620 query_type: InfoQueryType::Set,
621 to: SERVER_JID.parse().unwrap(),
622 target: None,
623 id: None,
624 content: Some(wacore_binary::node::NodeContent::Nodes(vec![
625 NodeBuilder::new(tag).build(),
626 ])),
627 timeout: None,
628 };
629
630 self.send_iq(query).await.map(|_| ())
631 }
632
633 pub(crate) async fn handle_success(self: &Arc<Self>, node: &Node) {
634 info!("Successfully authenticated with WhatsApp servers!");
635 self.is_logged_in.store(true, Ordering::Relaxed);
636 *self.last_successful_connect.lock().await = Some(chrono::Utc::now());
637 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
638
639 if let Some(lid_str) = node.attrs.get("lid") {
640 if let Ok(lid) = lid_str.parse::<Jid>() {
641 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
642 if device_snapshot.lid.as_ref() != Some(&lid) {
643 info!(target: "Client", "Updating LID from server to '{lid}'");
644 self.persistence_manager
645 .process_command(DeviceCommand::SetLid(Some(lid)))
646 .await;
647 }
648 } else {
649 warn!(target: "Client", "Failed to parse LID from success stanza: {lid_str}");
650 }
651 } else {
652 warn!(target: "Client", "LID not found in <success> stanza. Group messaging may fail.");
653 }
654
655 let client_clone = self.clone();
656 tokio::spawn(async move {
657 if let Err(e) = client_clone.set_passive(false).await {
658 warn!("Failed to send post-connect passive IQ: {e:?}");
659 }
660
661 if let Err(e) = client_clone.send_presence(Presence::Available).await {
662 warn!(
663 "Could not send initial presence: {e:?}. This is expected if push_name is not yet known."
664 );
665 }
666
667 client_clone
668 .core
669 .event_bus
670 .dispatch(&Event::Connected(crate::types::events::Connected));
671
672 if client_clone.needs_initial_full_sync.load(Ordering::Relaxed) {
673 if !client_clone
674 .initial_app_state_keys_received
675 .load(Ordering::Relaxed)
676 {
677 info!(target: "Client/AppState", "Waiting for initial app state keys before starting full sync (15s timeout)...");
678 match tokio::time::timeout(
679 APP_STATE_KEY_WAIT_TIMEOUT,
680 client_clone.initial_keys_synced_notifier.notified(),
681 )
682 .await
683 {
684 Ok(_) => {
685 info!(target: "Client/AppState", "Initial app state keys received; proceeding with full sync.")
686 }
687 Err(_) => {
688 warn!(target: "Client/AppState", "Timed out waiting for initial app state keys; continuing anyway (may see 'app state key not found' warnings).")
689 }
690 }
691 } else {
692 info!(target: "Client/AppState", "Initial app state keys already present; starting full sync immediately.");
693 }
694 let names = [
695 WAPatchName::CriticalBlock,
696 WAPatchName::CriticalUnblockLow,
697 WAPatchName::RegularLow,
698 WAPatchName::RegularHigh,
699 WAPatchName::Regular,
700 ];
701 for name in names {
702 if let Err(e) = client_clone.fetch_app_state_with_retry(name).await {
703 warn!(
704 "Failed to full sync app state {:?} after retry logic: {e}",
705 name
706 );
707 }
708 }
709 client_clone
710 .needs_initial_full_sync
711 .store(false, Ordering::Relaxed);
712 }
713 });
714 }
715
716 async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
717 let mut attempt = 0u32;
718 loop {
719 attempt += 1;
720 let res = self.process_app_state_sync_task(name, true).await;
721 match res {
722 Ok(()) => return Ok(()),
723 Err(e) => {
724 let es = e.to_string();
725 if es.contains("app state key not found") && attempt == 1 {
726 if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
727 info!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
728 if tokio::time::timeout(
729 Duration::from_secs(10),
730 self.initial_keys_synced_notifier.notified(),
731 )
732 .await
733 .is_err()
734 {
735 warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
736 }
737 }
738 continue;
739 }
740 if es.contains("database is locked") && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
741 let backoff = Duration::from_millis(200 * attempt as u64 + 150);
742 warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
743 tokio::time::sleep(backoff).await;
744 continue;
745 }
746 return Err(e);
747 }
748 }
749 }
750 }
751
752 pub(crate) async fn process_app_state_sync_task(
753 &self,
754 name: WAPatchName,
755 full_sync: bool,
756 ) -> anyhow::Result<()> {
757 let backend = self.persistence_manager.backend();
758 let mut full_sync = full_sync;
759
760 let mut state = backend.get_app_state_version(name.as_str()).await?;
761 if state.version == 0 {
762 full_sync = true;
763 }
764
765 let mut has_more = true;
766 let want_snapshot = full_sync;
767
768 if has_more {
769 debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
770
771 let mut collection_builder = NodeBuilder::new("collection")
772 .attr("name", name.as_str())
773 .attr(
774 "return_snapshot",
775 if want_snapshot { "true" } else { "false" },
776 );
777 if !want_snapshot {
778 collection_builder = collection_builder.attr("version", state.version.to_string());
779 }
780 let sync_node = NodeBuilder::new("sync")
781 .children([collection_builder.build()])
782 .build();
783 let iq = crate::request::InfoQuery {
784 namespace: "w:sync:app:state",
785 query_type: crate::request::InfoQueryType::Set,
786 to: SERVER_JID.parse().unwrap(),
787 target: None,
788 id: None,
789 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
790 timeout: None,
791 };
792
793 let resp = self.send_iq(iq).await?;
794 debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
795
796 let _decode_start = std::time::Instant::now();
797 let pre_downloaded_snapshot: Option<Vec<u8>> =
798 match wacore::appstate::patch_decode::parse_patch_list(&resp) {
799 Ok(pl) => {
800 debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={}", name, pl.snapshot_ref.is_some(), pl.has_more_patches);
801 if let Some(ext) = &pl.snapshot_ref {
802 match self.download(ext).await {
803 Ok(bytes) => Some(bytes),
804 Err(e) => {
805 warn!("Failed to download external snapshot: {e}");
806 None
807 }
808 }
809 } else {
810 None
811 }
812 }
813 Err(_) => None,
814 };
815
816 let download = |_: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
817 if let Some(bytes) = &pre_downloaded_snapshot {
818 Ok(bytes.clone())
819 } else {
820 Err(anyhow::anyhow!("snapshot not pre-downloaded"))
821 }
822 };
823
824 if let Some(proc) = &self.app_state_processor {
825 let (mutations, new_state, list) =
826 proc.decode_patch_list(&resp, &download, true).await?;
827 let decode_elapsed = _decode_start.elapsed();
828 if decode_elapsed.as_millis() > 500 {
829 debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
830 }
831
832 let missing = proc.get_missing_key_ids(&list).await.unwrap_or_default();
833 if !missing.is_empty() {
834 let mut to_request: Vec<Vec<u8>> = Vec::new();
835 let mut guard = self.app_state_key_requests.lock().await;
836 let now = std::time::Instant::now();
837 for key_id in missing {
838 let hex_id = hex::encode(&key_id);
839 let should = guard
840 .get(&hex_id)
841 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
842 .unwrap_or(true);
843 if should {
844 guard.insert(hex_id, now);
845 to_request.push(key_id);
846 }
847 }
848 drop(guard);
849 if !to_request.is_empty() {
850 self.request_app_state_keys(&to_request).await;
851 }
852 }
853
854 for m in mutations {
855 debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
856 self.dispatch_app_state_mutation(&m, full_sync).await;
857 }
858
859 state = new_state;
860 has_more = list.has_more_patches;
861 debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more}", name);
862 }
863 }
864
865 backend
866 .set_app_state_version(name.as_str(), state.clone())
867 .await?;
868
869 debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
870 Ok(())
871 }
872
873 #[allow(dead_code)]
874 async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
875 if raw_key_ids.is_empty() {
876 return;
877 }
878 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
879 let own_jid = match device_snapshot.pn.clone() {
880 Some(j) => j,
881 None => return,
882 };
883 let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
884 .iter()
885 .map(|k| wa::message::AppStateSyncKeyId {
886 key_id: Some(k.clone()),
887 })
888 .collect();
889 let msg = wa::Message {
890 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
891 r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
892 app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
893 ..Default::default()
894 })),
895 ..Default::default()
896 };
897 if let Err(e) = self
898 .send_message_impl(
899 own_jid,
900 Arc::new(msg),
901 Some(self.generate_message_id().await),
902 true,
903 false,
904 None,
905 )
906 .await
907 {
908 warn!("Failed to send app state key request: {e}");
909 }
910 }
911
912 #[allow(dead_code)]
913 async fn dispatch_app_state_mutation(
914 &self,
915 m: &crate::appstate_sync::Mutation,
916 full_sync: bool,
917 ) {
918 use wacore::types::events::{
919 ArchiveUpdate, ContactUpdate, Event, MarkChatAsReadUpdate, MuteUpdate, PinUpdate,
920 };
921 if m.operation != wa::syncd_mutation::SyncdOperation::Set {
922 return;
923 }
924 if m.index.is_empty() {
925 return;
926 }
927 let kind = &m.index[0];
928 let ts = m
929 .action_value
930 .as_ref()
931 .and_then(|v| v.timestamp)
932 .unwrap_or(0);
933 let time = chrono::DateTime::from_timestamp_millis(ts).unwrap_or_else(chrono::Utc::now);
934 let jid = if m.index.len() > 1 {
935 m.index[1].parse().unwrap_or_default()
936 } else {
937 Jid::default()
938 };
939 match kind.as_str() {
940 "setting_pushName" => {
941 if let Some(val) = &m.action_value
942 && let Some(act) = &val.push_name_setting
943 && let Some(new_name) = &act.name
944 {
945 let new_name = new_name.clone();
946 let bus = self.core.event_bus.clone();
947
948 let snapshot = self.persistence_manager.get_device_snapshot().await;
949 let old = snapshot.push_name.clone();
950 if old != new_name {
951 info!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
952 self.persistence_manager
953 .process_command(DeviceCommand::SetPushName(new_name.clone()))
954 .await;
955 bus.dispatch(&Event::SelfPushNameUpdated(
956 crate::types::events::SelfPushNameUpdated {
957 from_server: true,
958 old_name: old,
959 new_name: new_name.clone(),
960 },
961 ));
962 } else {
963 debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
964 }
965 }
966 }
967 "mute" => {
968 if let Some(val) = &m.action_value
969 && let Some(act) = &val.mute_action
970 {
971 self.core.event_bus.dispatch(&Event::MuteUpdate(MuteUpdate {
972 jid,
973 timestamp: time,
974 action: Box::new(*act),
975 from_full_sync: full_sync,
976 }));
977 }
978 }
979 "pin" | "pin_v1" => {
980 if let Some(val) = &m.action_value
981 && let Some(act) = &val.pin_action
982 {
983 self.core.event_bus.dispatch(&Event::PinUpdate(PinUpdate {
984 jid,
985 timestamp: time,
986 action: Box::new(*act),
987 from_full_sync: full_sync,
988 }));
989 }
990 }
991 "archive" => {
992 if let Some(val) = &m.action_value
993 && let Some(act) = &val.archive_chat_action
994 {
995 self.core
996 .event_bus
997 .dispatch(&Event::ArchiveUpdate(ArchiveUpdate {
998 jid,
999 timestamp: time,
1000 action: Box::new(act.clone()),
1001 from_full_sync: full_sync,
1002 }));
1003 }
1004 }
1005 "contact" => {
1006 if let Some(val) = &m.action_value
1007 && let Some(act) = &val.contact_action
1008 {
1009 self.core
1010 .event_bus
1011 .dispatch(&Event::ContactUpdate(ContactUpdate {
1012 jid,
1013 timestamp: time,
1014 action: Box::new(act.clone()),
1015 from_full_sync: full_sync,
1016 }));
1017 }
1018 }
1019 "mark_chat_as_read" | "markChatAsRead" => {
1020 if let Some(val) = &m.action_value
1021 && let Some(act) = &val.mark_chat_as_read_action
1022 {
1023 self.core.event_bus.dispatch(&Event::MarkChatAsReadUpdate(
1024 MarkChatAsReadUpdate {
1025 jid,
1026 timestamp: time,
1027 action: Box::new(act.clone()),
1028 from_full_sync: full_sync,
1029 },
1030 ));
1031 }
1032 }
1033 _ => {}
1034 }
1035 }
1036
1037 async fn expect_disconnect(&self) {
1038 self.expected_disconnect.store(true, Ordering::Relaxed);
1039 }
1040
1041 pub(crate) async fn handle_stream_error(&self, node: &Node) {
1042 self.is_logged_in.store(false, Ordering::Relaxed);
1043
1044 let mut attrs = node.attrs();
1045 let code = attrs.optional_string("code").unwrap_or("");
1046 let conflict_type = node
1047 .get_optional_child("conflict")
1048 .map(|n| n.attrs().optional_string("type").unwrap_or("").to_string())
1049 .unwrap_or_default();
1050
1051 match (code, conflict_type.as_str()) {
1052 ("515", _) => {
1053 info!(target: "Client", "Got 515 stream error, server is closing stream. Will auto-reconnect.");
1054 }
1055 ("401", "device_removed") | (_, "replaced") => {
1056 info!(target: "Client", "Got stream error indicating client was removed or replaced. Logging out.");
1057 self.expect_disconnect().await;
1058 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1059
1060 let event = if conflict_type == "replaced" {
1061 Event::StreamReplaced(crate::types::events::StreamReplaced)
1062 } else {
1063 Event::LoggedOut(crate::types::events::LoggedOut {
1064 on_connect: false,
1065 reason: ConnectFailureReason::LoggedOut,
1066 })
1067 };
1068 self.core.event_bus.dispatch(&event);
1069 }
1070 ("503", _) => {
1071 info!(target: "Client", "Got 503 service unavailable, will auto-reconnect.");
1072 }
1073 _ => {
1074 error!(target: "Client", "Unknown stream error: {}", DisplayableNode(node));
1075 self.expect_disconnect().await;
1076 self.core.event_bus.dispatch(&Event::StreamError(
1077 crate::types::events::StreamError {
1078 code: code.to_string(),
1079 raw: Some(node.clone()),
1080 },
1081 ));
1082 }
1083 }
1084
1085 self.shutdown_notifier.notify_one();
1086 }
1087
1088 pub(crate) async fn handle_connect_failure(&self, node: &Node) {
1089 self.expected_disconnect.store(true, Ordering::Relaxed);
1090 self.shutdown_notifier.notify_one();
1091
1092 let mut attrs = node.attrs();
1093 let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
1094 let reason = ConnectFailureReason::from(reason_code);
1095
1096 if reason.should_reconnect() {
1097 self.expected_disconnect.store(false, Ordering::Relaxed);
1098 } else {
1099 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1100 }
1101
1102 if reason.is_logged_out() {
1103 info!(target: "Client", "Got {reason:?} connect failure, logging out.");
1104 self.core
1105 .event_bus
1106 .dispatch(&wacore::types::events::Event::LoggedOut(
1107 crate::types::events::LoggedOut {
1108 on_connect: true,
1109 reason,
1110 },
1111 ));
1112 } else if let ConnectFailureReason::TempBanned = reason {
1113 let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
1114 let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
1115 let expire_duration =
1116 chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
1117 warn!(target: "Client", "Temporary ban connect failure: {}", DisplayableNode(node));
1118 self.core.event_bus.dispatch(&Event::TemporaryBan(
1119 crate::types::events::TemporaryBan {
1120 code: crate::types::events::TempBanReason::from(ban_code),
1121 expire: expire_duration,
1122 },
1123 ));
1124 } else if let ConnectFailureReason::ClientOutdated = reason {
1125 error!(target: "Client", "Client is outdated and was rejected by server.");
1126 self.core
1127 .event_bus
1128 .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
1129 } else {
1130 warn!(target: "Client", "Unknown connect failure: {}", DisplayableNode(node));
1131 self.core.event_bus.dispatch(&Event::ConnectFailure(
1132 crate::types::events::ConnectFailure {
1133 reason,
1134 message: attrs.optional_string("message").unwrap_or("").to_string(),
1135 raw: Some(node.clone()),
1136 },
1137 ));
1138 }
1139 }
1140
1141 pub(crate) async fn handle_iq(self: &Arc<Self>, node: &Node) -> bool {
1142 if let Some("get") = node.attrs().optional_string("type")
1143 && let Some(_ping_node) = node.get_optional_child("ping")
1144 {
1145 info!(target: "Client", "Received ping, sending pong.");
1146 let mut parser = node.attrs();
1147 let from_jid = parser.jid("from");
1148 let id = parser.string("id");
1149 let pong = NodeBuilder::new("iq")
1150 .attrs([
1151 ("to", from_jid.to_string()),
1152 ("id", id),
1153 ("type", "result".to_string()),
1154 ])
1155 .build();
1156 if let Err(e) = self.send_node(pong).await {
1157 warn!("Failed to send pong: {e:?}");
1158 }
1159 return true;
1160 }
1161
1162 if pair::handle_iq(self, node).await {
1163 return true;
1164 }
1165
1166 false
1167 }
1168
1169 pub fn is_connected(&self) -> bool {
1170 self.noise_socket
1171 .try_lock()
1172 .is_ok_and(|guard| guard.is_some())
1173 }
1174
1175 pub fn is_logged_in(&self) -> bool {
1176 self.is_logged_in.load(Ordering::Relaxed)
1177 }
1178
1179 pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
1182 self.persistence_manager.clone()
1183 }
1184
1185 pub async fn edit_message(
1186 &self,
1187 to: Jid,
1188 original_id: String,
1189 new_content: wa::Message,
1190 ) -> Result<String, anyhow::Error> {
1191 let own_jid = self
1192 .get_pn()
1193 .await
1194 .ok_or_else(|| anyhow!("Not logged in"))?;
1195
1196 let edit_container_message = wa::Message {
1197 edited_message: Some(Box::new(wa::message::FutureProofMessage {
1198 message: Some(Box::new(wa::Message {
1199 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1200 key: Some(wa::MessageKey {
1201 remote_jid: Some(to.to_string()),
1202 from_me: Some(true),
1203 id: Some(original_id.clone()),
1204 participant: if to.is_group() {
1205 Some(own_jid.to_non_ad().to_string())
1206 } else {
1207 None
1208 },
1209 }),
1210 r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
1211 edited_message: Some(Box::new(new_content)),
1212 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
1213 ..Default::default()
1214 })),
1215 ..Default::default()
1216 })),
1217 })),
1218 ..Default::default()
1219 };
1220
1221 self.send_message_impl(
1222 to,
1223 Arc::new(edit_container_message),
1224 Some(original_id.clone()),
1225 false,
1226 false,
1227 Some(crate::types::message::EditAttribute::MessageEdit),
1228 )
1229 .await?;
1230
1231 Ok(original_id)
1232 }
1233
1234 pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
1235 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1236 let noise_socket = match noise_socket_arc {
1237 Some(socket) => socket,
1238 None => return Err(ClientError::NotConnected),
1239 };
1240
1241 info!(target: "Client/Send", "{}", DisplayableNode(&node));
1242 let mut pool_guard = self.send_buffer_pool.lock().await;
1243 let mut plaintext_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1244 let mut encrypted_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1245 drop(pool_guard);
1246
1247 plaintext_buf.clear();
1248 encrypted_buf.clear();
1249
1250 if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
1251 error!("Failed to marshal node: {e:?}");
1252 let mut g = self.send_buffer_pool.lock().await;
1253 g.push(plaintext_buf);
1254 g.push(encrypted_buf);
1255 return Err(SocketError::Crypto("Marshal error".to_string()).into());
1256 }
1257
1258 let send_res = noise_socket
1259 .encrypt_and_send(plaintext_buf, encrypted_buf)
1260 .await;
1261
1262 let plaintext_buf = match send_res {
1263 Ok(buf) => buf,
1264 Err(e) => {
1265 return Err(e.into());
1266 }
1267 };
1268
1269 let mut g = self.send_buffer_pool.lock().await;
1270 if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1271 g.push(plaintext_buf);
1272 }
1273 g.push(Vec::new());
1274 Ok(())
1275 }
1276
1277 pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
1278 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1279 let old_name = device_snapshot.push_name.clone();
1280
1281 if old_name == new_name {
1282 return;
1283 }
1284
1285 log::info!("Updating push name from '{}' -> '{}'", old_name, new_name);
1286 self.persistence_manager
1287 .process_command(DeviceCommand::SetPushName(new_name.clone()))
1288 .await;
1289
1290 self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
1291 crate::types::events::SelfPushNameUpdated {
1292 from_server: true,
1293 old_name,
1294 new_name: new_name.clone(),
1295 },
1296 ));
1297
1298 let client_clone = self.clone();
1299 tokio::spawn(async move {
1300 if let Err(e) = client_clone.send_presence(Presence::Available).await {
1301 log::warn!("Failed to send presence after push name update: {:?}", e);
1302 } else {
1303 log::info!("Sent presence after push name update.");
1304 }
1305 });
1306 }
1307
1308 pub async fn get_push_name(&self) -> String {
1309 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1310 device_snapshot.push_name.clone()
1311 }
1312
1313 pub async fn get_pn(&self) -> Option<Jid> {
1314 let snapshot = self.persistence_manager.get_device_snapshot().await;
1315 snapshot.pn.clone()
1316 }
1317
1318 pub async fn get_lid(&self) -> Option<Jid> {
1319 let snapshot = self.persistence_manager.get_device_snapshot().await;
1320 snapshot.lid.clone()
1321 }
1322
1323 pub(crate) async fn send_protocol_receipt(
1324 &self,
1325 id: String,
1326 receipt_type: crate::types::presence::ReceiptType,
1327 ) {
1328 if id.is_empty() {
1329 return;
1330 }
1331 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1332 if let Some(own_jid) = &device_snapshot.pn {
1333 let node = NodeBuilder::new("receipt")
1334 .attrs([
1335 ("id", id),
1336 ("type", format!("{:?}", receipt_type).to_lowercase()),
1337 ("to", own_jid.to_non_ad().to_string()),
1338 ])
1339 .build();
1340
1341 if let Err(e) = self.send_node(node).await {
1342 warn!(
1343 "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
1344 receipt_type, self.unique_id, e
1345 );
1346 }
1347 }
1348 }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353 use super::*;
1354 use wacore_binary::builder::NodeBuilder;
1355
1356 #[tokio::test]
1357 async fn test_ack_behavior_for_incoming_stanzas() {
1358 let backend = Arc::new(
1359 crate::store::sqlite_store::SqliteStore::new(":memory:")
1360 .await
1361 .expect("Failed to create in-memory backend for test"),
1362 );
1363 let pm = Arc::new(PersistenceManager::new(backend).await.unwrap());
1364 let (client, _rx) = Client::new(pm).await;
1365
1366 let receipt_node = NodeBuilder::new("receipt")
1369 .attr("from", "s.whatsapp.net")
1370 .attr("id", "RCPT-1")
1371 .build();
1372
1373 let notification_node = NodeBuilder::new("notification")
1376 .attr("from", "s.whatsapp.net")
1377 .attr("id", "NOTIF-1")
1378 .build();
1379
1380 assert!(
1384 client.should_ack(&receipt_node),
1385 "should_ack must still return TRUE for <receipt> stanzas."
1386 );
1387 assert!(
1388 client.should_ack(¬ification_node),
1389 "should_ack must still return TRUE for <notification> stanzas."
1390 );
1391
1392 info!(
1393 "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
1394 );
1395 }
1396}