1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::handshake;
8use crate::lid_pn_cache::LidPnCache;
9use crate::pair;
10use anyhow::{Result, anyhow};
11use dashmap::DashMap;
12use indexmap::IndexMap;
13use moka::future::Cache;
14use tokio::sync::watch;
15use wacore::xml::DisplayableNode;
16use wacore_binary::builder::NodeBuilder;
17use wacore_binary::jid::JidExt;
18use wacore_binary::node::Node;
19
20use crate::appstate_sync::AppStateProcessor;
21use crate::jid_utils::server_jid;
22use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
23use crate::types::enc_handler::EncHandler;
24use crate::types::events::{ConnectFailureReason, Event};
25
26use log::{debug, error, info, warn};
27
28use rand::RngCore;
29use scopeguard;
30use std::collections::{HashMap, HashSet};
31use wacore_binary::jid::Jid;
32
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
35
36use thiserror::Error;
37use tokio::sync::{Mutex, Notify, OnceCell, RwLock, mpsc};
38use tokio::time::{Duration, sleep};
39use wacore::appstate::patch_decode::WAPatchName;
40use wacore::client::context::GroupInfo;
41use waproto::whatsapp as wa;
42
43use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
44use crate::sync_task::MajorSyncTask;
45
46const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
47
48const MAX_POOLED_BUFFER_CAP: usize = 512 * 1024;
49
50#[derive(Debug, Error)]
51pub enum ClientError {
52 #[error("client is not connected")]
53 NotConnected,
54 #[error("socket error: {0}")]
55 Socket(#[from] SocketError),
56 #[error("encrypt/send error: {0}")]
57 EncryptSend(#[from] EncryptSendError),
58 #[error("client is already connected")]
59 AlreadyConnected,
60 #[error("client is not logged in")]
61 NotLoggedIn,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct RecentMessageKey {
67 pub to: Jid,
68 pub id: String,
69}
70
71pub struct Client {
72 pub(crate) core: wacore::client::CoreClient,
73
74 pub(crate) persistence_manager: Arc<PersistenceManager>,
75 pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
76
77 pub(crate) is_logged_in: Arc<AtomicBool>,
78 pub(crate) is_connecting: Arc<AtomicBool>,
79 pub(crate) is_running: Arc<AtomicBool>,
80 pub(crate) shutdown_notifier: Arc<Notify>,
81
82 pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
83 pub(crate) transport_events:
84 Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
85 pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
86 pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
87
88 pub(crate) response_waiters:
89 Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<wacore_binary::Node>>>>,
90 pub(crate) unique_id: String,
91 pub(crate) id_counter: Arc<AtomicU64>,
92
93 pub(crate) session_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
99
100 pub(crate) message_queues: Cache<String, mpsc::Sender<Arc<Node>>>,
104
105 pub(crate) lid_pn_cache: Arc<LidPnCache>,
110
111 pub(crate) message_enqueue_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
115
116 pub group_cache: OnceCell<Cache<Jid, GroupInfo>>,
117 pub device_cache: OnceCell<Cache<Jid, Vec<Jid>>>,
118
119 pub(crate) retried_group_messages: Cache<String, ()>,
120 pub(crate) expected_disconnect: Arc<AtomicBool>,
121
122 pub(crate) connection_generation: Arc<AtomicU64>,
125
126 pub(crate) recent_messages: Cache<RecentMessageKey, Vec<u8>>,
129
130 pub(crate) pending_retries: Arc<Mutex<HashSet<String>>>,
131
132 pub(crate) message_retry_counts: Cache<String, u8>,
136
137 pub enable_auto_reconnect: Arc<AtomicBool>,
138 pub auto_reconnect_errors: Arc<AtomicU32>,
139 pub last_successful_connect: Arc<Mutex<Option<chrono::DateTime<chrono::Utc>>>>,
140
141 pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
142
143 pub(crate) app_state_processor: OnceCell<AppStateProcessor>,
144 pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, std::time::Instant>>>,
145 pub(crate) initial_keys_synced_notifier: Arc<Notify>,
146 pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
147
148 pub(crate) offline_sync_notifier: Arc<Notify>,
151 pub(crate) offline_sync_completed: Arc<AtomicBool>,
153 pub(crate) socket_ready_notifier: Arc<Notify>,
156 pub(crate) connected_notifier: Arc<Notify>,
159 pub(crate) major_sync_task_sender: mpsc::Sender<MajorSyncTask>,
160 pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<watch::Sender<()>>>>,
161
162 pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
165
166 pub(crate) send_buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
167
168 pub custom_enc_handlers: Arc<DashMap<String, Arc<dyn EncHandler>>>,
170
171 pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
174
175 pub(crate) device_registry_cache: Cache<String, wacore::store::traits::DeviceListRecord>,
179
180 pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
182
183 pub(crate) synchronous_ack: bool,
185
186 pub http_client: Arc<dyn crate::http::HttpClient>,
188
189 pub(crate) override_version: Option<(u32, u32, u32)>,
191}
192
193impl Client {
194 pub async fn new(
195 persistence_manager: Arc<PersistenceManager>,
196 transport_factory: Arc<dyn crate::transport::TransportFactory>,
197 http_client: Arc<dyn crate::http::HttpClient>,
198 override_version: Option<(u32, u32, u32)>,
199 ) -> (Arc<Self>, mpsc::Receiver<MajorSyncTask>) {
200 let mut unique_id_bytes = [0u8; 2];
201 rand::rng().fill_bytes(&mut unique_id_bytes);
202
203 let device_snapshot = persistence_manager.get_device_snapshot().await;
204 let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
205
206 let (tx, rx) = mpsc::channel(32);
207
208 let this = Self {
209 core,
210 persistence_manager: persistence_manager.clone(),
211 media_conn: Arc::new(RwLock::new(None)),
212 is_logged_in: Arc::new(AtomicBool::new(false)),
213 is_connecting: Arc::new(AtomicBool::new(false)),
214 is_running: Arc::new(AtomicBool::new(false)),
215 shutdown_notifier: Arc::new(Notify::new()),
216
217 transport: Arc::new(Mutex::new(None)),
218 transport_events: Arc::new(Mutex::new(None)),
219 transport_factory,
220 noise_socket: Arc::new(Mutex::new(None)),
221
222 response_waiters: Arc::new(Mutex::new(HashMap::new())),
223 unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
224 id_counter: Arc::new(AtomicU64::new(0)),
225
226 session_locks: Cache::builder()
227 .time_to_live(Duration::from_secs(300)) .max_capacity(10_000) .build(),
230 message_queues: Cache::builder()
231 .time_to_live(Duration::from_secs(300)) .max_capacity(10_000) .build(),
234 lid_pn_cache: Arc::new(LidPnCache::new()),
235 message_enqueue_locks: Cache::builder()
236 .time_to_live(Duration::from_secs(300))
237 .max_capacity(10_000)
238 .build(),
239 group_cache: OnceCell::new(),
240 device_cache: OnceCell::new(),
241 retried_group_messages: Cache::builder()
242 .time_to_live(Duration::from_secs(300))
243 .max_capacity(2_000)
244 .build(),
245
246 expected_disconnect: Arc::new(AtomicBool::new(false)),
247 connection_generation: Arc::new(AtomicU64::new(0)),
248
249 recent_messages: Cache::builder()
253 .time_to_live(Duration::from_secs(300))
254 .max_capacity(1_000)
255 .build(),
256
257 pending_retries: Arc::new(Mutex::new(HashSet::new())),
258
259 message_retry_counts: Cache::builder()
262 .time_to_live(Duration::from_secs(300))
263 .max_capacity(5_000)
264 .build(),
265
266 enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
267 auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
268 last_successful_connect: Arc::new(Mutex::new(None)),
269
270 needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
271
272 app_state_processor: OnceCell::new(),
273 app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
274 initial_keys_synced_notifier: Arc::new(Notify::new()),
275 initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
276 offline_sync_notifier: Arc::new(Notify::new()),
277 offline_sync_completed: Arc::new(AtomicBool::new(false)),
278 socket_ready_notifier: Arc::new(Notify::new()),
279 connected_notifier: Arc::new(Notify::new()),
280 major_sync_task_sender: tx,
281 pairing_cancellation_tx: Arc::new(Mutex::new(None)),
282 pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
283 send_buffer_pool: Arc::new(Mutex::new(Vec::with_capacity(4))),
284 custom_enc_handlers: Arc::new(DashMap::new()),
285 pdo_pending_requests: crate::pdo::new_pdo_cache(),
286 device_registry_cache: Cache::builder()
287 .max_capacity(5_000) .time_to_live(Duration::from_secs(3600)) .build(),
290 stanza_router: Self::create_stanza_router(),
291 synchronous_ack: false,
292 http_client,
293 override_version,
294 };
295
296 let arc = Arc::new(this);
297
298 let warm_up_arc = arc.clone();
300 tokio::spawn(async move {
301 if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
302 warn!("Failed to warm up LID-PN cache: {e}");
303 }
304 });
305
306 let cleanup_arc = arc.clone();
308 tokio::spawn(async move {
309 cleanup_arc.device_registry_cleanup_loop().await;
310 });
311
312 (arc, rx)
313 }
314
315 pub(crate) async fn get_group_cache(&self) -> &Cache<Jid, GroupInfo> {
316 self.group_cache
317 .get_or_init(|| async {
318 info!("Initializing Group Cache for the first time.");
319 Cache::builder()
320 .time_to_live(Duration::from_secs(3600))
321 .max_capacity(1_000)
322 .build()
323 })
324 .await
325 }
326
327 pub(crate) async fn get_device_cache(&self) -> &Cache<Jid, Vec<Jid>> {
328 self.device_cache
329 .get_or_init(|| async {
330 info!("Initializing Device Cache for the first time.");
331 Cache::builder()
332 .time_to_live(Duration::from_secs(3600))
333 .max_capacity(5_000)
334 .build()
335 })
336 .await
337 }
338
339 pub(crate) async fn get_app_state_processor(&self) -> &AppStateProcessor {
340 self.app_state_processor
341 .get_or_init(|| async {
342 info!("Initializing AppStateProcessor for the first time.");
343 AppStateProcessor::new(self.persistence_manager.backend())
344 })
345 .await
346 }
347
348 fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
350 use crate::handlers::{
351 basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
352 ib::IbHandler,
353 iq::IqHandler,
354 message::MessageHandler,
355 notification::NotificationHandler,
356 receipt::ReceiptHandler,
357 router::StanzaRouter,
358 unimplemented::UnimplementedHandler,
359 };
360
361 let mut router = StanzaRouter::new();
362
363 router.register(Arc::new(MessageHandler));
365 router.register(Arc::new(ReceiptHandler));
366 router.register(Arc::new(IqHandler));
367 router.register(Arc::new(SuccessHandler));
368 router.register(Arc::new(FailureHandler));
369 router.register(Arc::new(StreamErrorHandler));
370 router.register(Arc::new(IbHandler));
371 router.register(Arc::new(NotificationHandler));
372 router.register(Arc::new(AckHandler));
373
374 router.register(Arc::new(UnimplementedHandler::for_call()));
376 router.register(Arc::new(UnimplementedHandler::for_presence()));
377 router.register(Arc::new(UnimplementedHandler::for_chatstate()));
378
379 router
380 }
381
382 pub async fn run(self: &Arc<Self>) {
383 if self.is_running.swap(true, Ordering::SeqCst) {
384 warn!("Client `run` method called while already running.");
385 return;
386 }
387 while self.is_running.load(Ordering::Relaxed) {
388 self.expected_disconnect.store(false, Ordering::Relaxed);
389
390 if self.connect().await.is_err() {
391 error!("Failed to connect, will retry...");
392 } else {
393 if self.read_messages_loop().await.is_err() {
394 warn!(
395 "Message loop exited with an error. Will attempt to reconnect if enabled."
396 );
397 } else if self.expected_disconnect.load(Ordering::Relaxed) {
398 debug!("Message loop exited gracefully (expected disconnect).");
399 } else {
400 info!("Message loop exited gracefully.");
401 }
402
403 self.cleanup_connection_state().await;
404 }
405
406 if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
407 info!("Auto-reconnect disabled, shutting down.");
408 self.is_running.store(false, Ordering::Relaxed);
409 break;
410 }
411
412 if self.expected_disconnect.load(Ordering::Relaxed) {
414 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
415 info!("Expected disconnect (e.g., 515), reconnecting immediately...");
416 continue;
417 }
418
419 let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
420 let delay_secs = u64::from(error_count * 2).min(30);
421 let delay = Duration::from_secs(delay_secs);
422 info!(
423 "Will attempt to reconnect in {:?} (attempt {})",
424 delay,
425 error_count + 1
426 );
427 sleep(delay).await;
428 }
429 info!("Client run loop has shut down.");
430 }
431
432 pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
433 if self.is_connecting.swap(true, Ordering::SeqCst) {
434 return Err(ClientError::AlreadyConnected.into());
435 }
436
437 let _guard = scopeguard::guard((), |_| {
438 self.is_connecting.store(false, Ordering::Relaxed);
439 });
440
441 if self.is_connected() {
442 return Err(ClientError::AlreadyConnected.into());
443 }
444
445 self.is_logged_in.store(false, Ordering::Relaxed);
449 self.offline_sync_completed.store(false, Ordering::Relaxed);
450
451 let version_future = crate::version::resolve_and_update_version(
452 &self.persistence_manager,
453 &self.http_client,
454 self.override_version,
455 );
456
457 let transport_future = self.transport_factory.create_transport();
458
459 info!("Connecting WebSocket and fetching latest client version in parallel...");
460 let (version_result, transport_result) = tokio::join!(version_future, transport_future);
461
462 version_result.map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
463 let (transport, mut transport_events) = transport_result?;
464 info!("Version fetch and transport connection established.");
465
466 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
467
468 let noise_socket =
469 handshake::do_handshake(&device_snapshot, transport.clone(), &mut transport_events)
470 .await?;
471
472 *self.transport.lock().await = Some(transport);
473 *self.transport_events.lock().await = Some(transport_events);
474 *self.noise_socket.lock().await = Some(noise_socket);
475
476 self.socket_ready_notifier.notify_waiters();
478
479 let client_clone = self.clone();
480 tokio::spawn(async move { client_clone.keepalive_loop().await });
481
482 Ok(())
483 }
484
485 pub async fn disconnect(&self) {
486 info!("Disconnecting client intentionally.");
487 self.expected_disconnect.store(true, Ordering::Relaxed);
488 self.is_running.store(false, Ordering::Relaxed);
489 self.shutdown_notifier.notify_waiters();
490
491 if let Some(transport) = self.transport.lock().await.as_ref() {
492 transport.disconnect().await;
493 }
494 self.cleanup_connection_state().await;
495 }
496
497 async fn cleanup_connection_state(&self) {
498 self.is_logged_in.store(false, Ordering::Relaxed);
499 *self.transport.lock().await = None;
500 *self.transport_events.lock().await = None;
501 *self.noise_socket.lock().await = None;
502 self.retried_group_messages.invalidate_all();
503 self.offline_sync_completed.store(false, Ordering::Relaxed);
505 }
506
507 async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
508 info!(target: "Client", "Starting message processing loop...");
509
510 let mut rx_guard = self.transport_events.lock().await;
511 let transport_events = rx_guard
512 .take()
513 .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
514 drop(rx_guard);
515
516 let mut frame_decoder = wacore::framing::FrameDecoder::new();
518
519 loop {
520 tokio::select! {
521 biased;
522 _ = self.shutdown_notifier.notified() => {
523 info!(target: "Client", "Shutdown signaled in message loop. Exiting message loop.");
524 return Ok(());
525 },
526 event_result = transport_events.recv() => {
527 match event_result {
528 Ok(crate::transport::TransportEvent::DataReceived(data)) => {
529 frame_decoder.feed(&data);
531
532 while let Some(encrypted_frame) = frame_decoder.decode_frame() {
536 if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
538 let is_critical = matches!(node.tag.as_str(), "success" | "failure" | "stream:error");
542
543 if is_critical {
544 self.process_decrypted_node(node).await;
546 } else {
547 let client = self.clone();
550 tokio::spawn(async move {
551 client.process_decrypted_node(node).await;
552 });
553 }
554 }
555
556 if self.expected_disconnect.load(Ordering::Relaxed) {
558 info!(target: "Client", "Expected disconnect signaled during frame processing. Exiting message loop.");
559 return Ok(());
560 }
561 }
562 },
563 Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
564 self.cleanup_connection_state().await;
565 if !self.expected_disconnect.load(Ordering::Relaxed) {
566 self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
567 info!("Transport disconnected unexpectedly.");
568 return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
569 } else {
570 info!("Transport disconnected as expected.");
571 return Ok(());
572 }
573 }
574 Ok(crate::transport::TransportEvent::Connected) => {
575 debug!("Transport connected event received");
577 }
578 }
579 }
580 }
581 }
582 }
583
584 pub(crate) async fn decrypt_frame(
587 self: &Arc<Self>,
588 encrypted_frame: &bytes::Bytes,
589 ) -> Option<wacore_binary::node::Node> {
590 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
591 let noise_socket = match noise_socket_arc {
592 Some(s) => s,
593 None => {
594 log::error!("Cannot process frame: not connected (no noise socket)");
595 return None;
596 }
597 };
598
599 let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
600 Ok(p) => p,
601 Err(e) => {
602 log::error!(target: "Client", "Failed to decrypt frame: {e}");
603 return None;
604 }
605 };
606
607 let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
608 Ok(data) => data,
609 Err(e) => {
610 log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
611 return None;
612 }
613 };
614
615 match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
616 Ok(node_ref) => Some(node_ref.to_owned()),
617 Err(e) => {
618 log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
619 None
620 }
621 }
622 }
623
624 pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
628 let node_arc = Arc::new(node);
630 self.process_node(node_arc).await;
631 }
632
633 pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
635 use wacore::xml::DisplayableNode;
636
637 if node.tag.as_str() == "iq"
638 && let Some(sync_node) = node.get_optional_child("sync")
639 && let Some(collection_node) = sync_node.get_optional_child("collection")
640 {
641 let name = collection_node.attrs().string("name");
642 info!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
643 } else {
644 info!(target: "Client/Recv","{}", DisplayableNode(&node));
645 }
646
647 let mut cancelled = false;
649
650 if node.tag.as_str() == "xmlstreamend" {
651 if self.expected_disconnect.load(Ordering::Relaxed) {
652 debug!(target: "Client", "Received <xmlstreamend/>, expected disconnect.");
653 } else {
654 warn!(target: "Client", "Received <xmlstreamend/>, treating as disconnect.");
655 }
656 self.shutdown_notifier.notify_waiters();
657 return;
658 }
659
660 if node.tag.as_str() == "iq" {
661 let id_opt = node.attrs.get("id");
662 if let Some(id) = id_opt {
663 let has_waiter = self.response_waiters.lock().await.contains_key(id.as_str());
664 if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
665 return;
666 }
667 }
668 }
669
670 if !self
673 .stanza_router
674 .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
675 .await
676 {
677 warn!(target: "Client", "Received unknown top-level node: {}", DisplayableNode(&node));
678 }
679
680 if self.should_ack(&node) && !cancelled {
682 self.maybe_deferred_ack(node).await;
683 }
684 }
685
686 fn should_ack(&self, node: &Node) -> bool {
688 matches!(
689 node.tag.as_str(),
690 "message" | "receipt" | "notification" | "call"
691 ) && node.attrs.contains_key("id")
692 && node.attrs.contains_key("from")
693 }
694
695 async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
699 if self.synchronous_ack {
700 if let Err(e) = self.send_ack_for(&node).await {
701 warn!(target: "Client", "Failed to send ack: {e:?}");
702 }
703 } else {
704 let this = self.clone();
705 tokio::spawn(async move {
707 if let Err(e) = this.send_ack_for(&node).await {
708 warn!(target: "Client", "Failed to send ack: {e:?}");
709 }
710 });
711 }
712 }
713
714 async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
716 let id = match node.attrs.get("id") {
717 Some(v) => v.clone(),
718 None => return Ok(()),
719 };
720 let from = match node.attrs.get("from") {
721 Some(v) => v.clone(),
722 None => return Ok(()),
723 };
724 let participant = node.attrs.get("participant").cloned();
725 let typ = if node.tag != "message" {
726 node.attrs.get("type").cloned()
727 } else {
728 None
729 };
730 let mut attrs = IndexMap::new();
731 attrs.insert("class".to_string(), node.tag.clone());
732 attrs.insert("id".to_string(), id);
733 attrs.insert("to".to_string(), from);
734 if let Some(p) = participant {
735 attrs.insert("participant".to_string(), p);
736 }
737 if let Some(t) = typ {
738 attrs.insert("type".to_string(), t);
739 }
740 let ack = Node {
741 tag: "ack".to_string(),
742 attrs,
743 content: None,
744 };
745 self.send_node(ack).await
746 }
747
748 pub(crate) async fn handle_unimplemented(&self, tag: &str) {
749 warn!(target: "Client", "TODO: Implement handler for <{tag}>");
750 }
751
752 pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
753 use crate::request::InfoQuery;
754
755 let tag = if passive { "passive" } else { "active" };
756
757 let query = InfoQuery::set(
758 "passive",
759 server_jid(),
760 Some(wacore_binary::node::NodeContent::Nodes(vec![
761 NodeBuilder::new(tag).build(),
762 ])),
763 );
764
765 self.send_iq(query).await.map(|_| ())
766 }
767
768 pub async fn clean_dirty_bits(
769 &self,
770 type_: &str,
771 timestamp: Option<&str>,
772 ) -> Result<(), ClientError> {
773 let id = self.generate_request_id();
774 let mut clean_builder = NodeBuilder::new("clean").attr("type", type_);
775 if let Some(ts) = timestamp {
776 clean_builder = clean_builder.attr("timestamp", ts);
777 }
778
779 let node = NodeBuilder::new("iq")
780 .attr("to", server_jid().to_string())
781 .attr("type", "set")
782 .attr("xmlns", "urn:xmpp:whatsapp:dirty")
783 .attr("id", id)
784 .children([clean_builder.build()])
785 .build();
786
787 self.send_node(node).await
788 }
789
790 pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
791 use crate::request::InfoQuery;
792
793 debug!(target: "Client", "Fetching properties (props)...");
794
795 let props_node = NodeBuilder::new("props")
796 .attr("protocol", "2")
797 .attr("hash", "") .build();
799
800 let iq = InfoQuery::get(
801 "w",
802 server_jid(),
803 Some(wacore_binary::node::NodeContent::Nodes(vec![props_node])),
804 );
805
806 self.send_iq(iq).await.map(|_| ())
807 }
808
809 pub async fn fetch_privacy_settings(&self) -> Result<(), crate::request::IqError> {
810 use crate::request::InfoQuery;
811
812 debug!(target: "Client", "Fetching privacy settings...");
813
814 let iq = InfoQuery::get(
815 "privacy",
816 server_jid(),
817 Some(wacore_binary::node::NodeContent::Nodes(vec![
818 NodeBuilder::new("privacy").build(),
819 ])),
820 );
821
822 self.send_iq(iq).await.map(|_| ())
823 }
824
825 pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
826 use crate::request::InfoQuery;
827
828 debug!(target: "Client", "Sending digest key bundle...");
829
830 let digest_node = NodeBuilder::new("digest").build();
831 let iq = InfoQuery::get(
832 "encrypt",
833 server_jid(),
834 Some(wacore_binary::node::NodeContent::Nodes(vec![digest_node])),
835 );
836
837 self.send_iq(iq).await.map(|_| ())
838 }
839
840 pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
841 if self.expected_disconnect.load(Ordering::Relaxed) {
845 debug!(target: "Client", "Ignoring <success> stanza: expected disconnect pending");
846 return;
847 }
848
849 if self.is_logged_in.swap(true, Ordering::SeqCst) {
852 debug!(target: "Client", "Ignoring duplicate <success> stanza (already logged in)");
853 return;
854 }
855
856 let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
859
860 info!(
861 "Successfully authenticated with WhatsApp servers! (gen={})",
862 current_generation
863 );
864 *self.last_successful_connect.lock().await = Some(chrono::Utc::now());
865 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
866
867 if let Some(lid_str) = node.attrs.get("lid") {
868 if let Ok(lid) = lid_str.parse::<Jid>() {
869 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
870 if device_snapshot.lid.as_ref() != Some(&lid) {
871 info!(target: "Client", "Updating LID from server to '{lid}'");
872 self.persistence_manager
873 .process_command(DeviceCommand::SetLid(Some(lid)))
874 .await;
875 }
876 } else {
877 warn!(target: "Client", "Failed to parse LID from success stanza: {lid_str}");
878 }
879 } else {
880 warn!(target: "Client", "LID not found in <success> stanza. Group messaging may fail.");
881 }
882
883 let client_clone = self.clone();
884 let task_generation = current_generation;
885 tokio::spawn(async move {
886 macro_rules! check_generation {
888 () => {
889 if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
890 {
891 debug!("Post-login task cancelled: connection generation changed");
892 return;
893 }
894 };
895 }
896
897 info!(target: "Client", "Starting post-login initialization sequence (gen={})...", task_generation);
898
899 let mut force_initial_sync = false;
900 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
901 if device_snapshot.push_name.is_empty() {
902 const DEFAULT_PUSH_NAME: &str = "WhatsApp Rust";
903 warn!(
904 target: "Client",
905 "Push name is empty! Setting default to '{DEFAULT_PUSH_NAME}' to allow presence."
906 );
907 client_clone
908 .persistence_manager
909 .process_command(DeviceCommand::SetPushName(DEFAULT_PUSH_NAME.to_string()))
910 .await;
911 force_initial_sync = true;
912 }
913
914 if !client_clone.is_connected() {
918 debug!(
919 "Skipping post-login init: connection closed (likely pairing phase reconnect)"
920 );
921 return;
922 }
923
924 check_generation!();
929 if let Err(e) = client_clone
930 .establish_primary_phone_session_immediate()
931 .await
932 {
933 warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
934 }
936
937 check_generation!();
942 if let Err(e) = client_clone.set_passive(false).await {
943 warn!("Failed to send post-connect active IQ: {e:?}");
944 }
945
946 const OFFLINE_SYNC_TIMEOUT_SECS: u64 = 5;
951
952 if !client_clone.offline_sync_completed.load(Ordering::Relaxed) {
953 info!(target: "Client", "Waiting for offline sync to complete (up to {}s)...", OFFLINE_SYNC_TIMEOUT_SECS);
954 let wait_result = tokio::time::timeout(
955 Duration::from_secs(OFFLINE_SYNC_TIMEOUT_SECS),
956 client_clone.offline_sync_notifier.notified(),
957 )
958 .await;
959
960 check_generation!();
962
963 if wait_result.is_err() {
964 info!(target: "Client", "Offline sync wait timed out, proceeding with passive tasks");
965 } else {
966 info!(target: "Client", "Offline sync completed, proceeding with passive tasks");
967 }
968 }
969
970 check_generation!();
974 if let Err(e) = client_clone.upload_pre_keys().await {
975 warn!("Failed to upload pre-keys during startup: {e:?}");
976 }
977
978 check_generation!();
980 if !client_clone.is_connected() {
981 debug!("Skipping presence: connection closed");
982 return;
983 }
984
985 if let Err(e) = client_clone.presence().set_available().await {
987 warn!("Failed to send initial presence: {e:?}");
988 } else {
989 info!("Initial presence sent successfully.");
990 }
991
992 check_generation!();
995
996 let bg_client = client_clone.clone();
998 let bg_generation = task_generation;
999 tokio::spawn(async move {
1000 if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1002 debug!("Skipping background init queries: connection generation changed");
1003 return;
1004 }
1005 if !bg_client.is_connected() {
1006 debug!("Skipping background init queries: connection closed");
1007 return;
1008 }
1009
1010 info!(
1011 target: "Client",
1012 "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1013 );
1014
1015 let props_fut = bg_client.fetch_props();
1016 let binding = bg_client.blocking();
1017 let blocklist_fut = binding.get_blocklist();
1018 let privacy_fut = bg_client.fetch_privacy_settings();
1019 let digest_fut = bg_client.send_digest_key_bundle();
1020
1021 let (r_props, r_block, r_priv, r_digest) =
1022 tokio::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1023
1024 if let Err(e) = r_props {
1025 warn!("Background init: Failed to fetch props: {e:?}");
1026 }
1027 if let Err(e) = r_block {
1028 warn!("Background init: Failed to fetch blocklist: {e:?}");
1029 }
1030 if let Err(e) = r_priv {
1031 warn!("Background init: Failed to fetch privacy settings: {e:?}");
1032 }
1033 if let Err(e) = r_digest {
1034 warn!("Background init: Failed to send digest: {e:?}");
1035 }
1036 });
1037
1038 client_clone
1039 .core
1040 .event_bus
1041 .dispatch(&Event::Connected(crate::types::events::Connected));
1042 client_clone.connected_notifier.notify_waiters();
1043
1044 check_generation!();
1045
1046 let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1047 if flag_set || force_initial_sync {
1048 info!(
1049 target: "Client/AppState",
1050 "Starting Initial App State Sync (flag_set={flag_set}, force={force_initial_sync})"
1051 );
1052
1053 if !client_clone
1054 .initial_app_state_keys_received
1055 .load(Ordering::Relaxed)
1056 {
1057 info!(
1058 target: "Client/AppState",
1059 "Waiting up to 5s for app state keys..."
1060 );
1061 let _ = tokio::time::timeout(
1062 Duration::from_secs(5),
1063 client_clone.initial_keys_synced_notifier.notified(),
1064 )
1065 .await;
1066
1067 check_generation!();
1069 }
1070
1071 let sync_client = client_clone.clone();
1072 let sync_generation = task_generation;
1073 tokio::spawn(async move {
1074 let names = [
1075 WAPatchName::CriticalBlock,
1076 WAPatchName::CriticalUnblockLow,
1077 WAPatchName::RegularLow,
1078 WAPatchName::RegularHigh,
1079 WAPatchName::Regular,
1080 ];
1081
1082 for name in names {
1083 if sync_client.connection_generation.load(Ordering::SeqCst)
1085 != sync_generation
1086 {
1087 debug!("App state sync cancelled: connection generation changed");
1088 return;
1089 }
1090
1091 if let Err(e) = sync_client.fetch_app_state_with_retry(name).await {
1092 warn!("Failed to full sync app state {:?}: {e}", name);
1093 }
1094 }
1095
1096 sync_client
1097 .needs_initial_full_sync
1098 .store(false, Ordering::Relaxed);
1099 info!(target: "Client/AppState", "Initial App State Sync Completed.");
1100 });
1101 }
1102 });
1103 }
1104
1105 pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1110 let id_opt = node.attrs.get("id").cloned();
1111 if let Some(id) = id_opt
1112 && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1113 {
1114 if waiter.send(node).is_err() {
1115 warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1116 }
1117 return true;
1118 }
1119 false
1120 }
1121
1122 async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1123 let mut attempt = 0u32;
1124 loop {
1125 attempt += 1;
1126 let res = self.process_app_state_sync_task(name, true).await;
1127 match res {
1128 Ok(()) => return Ok(()),
1129 Err(e) => {
1130 let es = e.to_string();
1131 if es.contains("app state key not found") && attempt == 1 {
1132 if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
1133 info!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
1134 if tokio::time::timeout(
1135 Duration::from_secs(10),
1136 self.initial_keys_synced_notifier.notified(),
1137 )
1138 .await
1139 .is_err()
1140 {
1141 warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
1142 }
1143 }
1144 continue;
1145 }
1146 if es.contains("database is locked") && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
1147 let backoff = Duration::from_millis(200 * attempt as u64 + 150);
1148 warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
1149 tokio::time::sleep(backoff).await;
1150 continue;
1151 }
1152 return Err(e);
1153 }
1154 }
1155 }
1156 }
1157
1158 pub(crate) async fn process_app_state_sync_task(
1159 &self,
1160 name: WAPatchName,
1161 full_sync: bool,
1162 ) -> anyhow::Result<()> {
1163 let backend = self.persistence_manager.backend();
1164 let mut full_sync = full_sync;
1165
1166 let mut state = backend.get_version(name.as_str()).await?;
1167 if state.version == 0 {
1168 full_sync = true;
1169 }
1170
1171 let mut has_more = true;
1172 let want_snapshot = full_sync;
1173
1174 if has_more {
1175 debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
1176
1177 let mut collection_builder = NodeBuilder::new("collection")
1178 .attr("name", name.as_str())
1179 .attr(
1180 "return_snapshot",
1181 if want_snapshot { "true" } else { "false" },
1182 );
1183 if !want_snapshot {
1184 collection_builder = collection_builder.attr("version", state.version.to_string());
1185 }
1186 let sync_node = NodeBuilder::new("sync")
1187 .children([collection_builder.build()])
1188 .build();
1189 let iq = crate::request::InfoQuery {
1190 namespace: "w:sync:app:state",
1191 query_type: crate::request::InfoQueryType::Set,
1192 to: server_jid(),
1193 target: None,
1194 id: None,
1195 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
1196 timeout: None,
1197 };
1198
1199 let resp = self.send_iq(iq).await?;
1200 debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
1201
1202 let _decode_start = std::time::Instant::now();
1203 let pre_downloaded_snapshot: Option<Vec<u8>> =
1204 match wacore::appstate::patch_decode::parse_patch_list(&resp) {
1205 Ok(pl) => {
1206 debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={}", name, pl.snapshot_ref.is_some(), pl.has_more_patches);
1207 if let Some(ext) = &pl.snapshot_ref {
1208 match self.download(ext).await {
1209 Ok(bytes) => Some(bytes),
1210 Err(e) => {
1211 warn!("Failed to download external snapshot: {e}");
1212 None
1213 }
1214 }
1215 } else {
1216 None
1217 }
1218 }
1219 Err(_) => None,
1220 };
1221
1222 let download = |_: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
1223 if let Some(bytes) = &pre_downloaded_snapshot {
1224 Ok(bytes.clone())
1225 } else {
1226 Err(anyhow::anyhow!("snapshot not pre-downloaded"))
1227 }
1228 };
1229
1230 let proc = self.get_app_state_processor().await;
1231 let (mutations, new_state, list) =
1232 proc.decode_patch_list(&resp, &download, true).await?;
1233 let decode_elapsed = _decode_start.elapsed();
1234 if decode_elapsed.as_millis() > 500 {
1235 debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
1236 }
1237
1238 let missing = match proc.get_missing_key_ids(&list).await {
1239 Ok(v) => v,
1240 Err(e) => {
1241 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
1242 Vec::new()
1243 }
1244 };
1245 if !missing.is_empty() {
1246 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
1247 let mut guard = self.app_state_key_requests.lock().await;
1248 let now = std::time::Instant::now();
1249 for key_id in missing {
1250 let hex_id = hex::encode(&key_id);
1251 let should = guard
1252 .get(&hex_id)
1253 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
1254 .unwrap_or(true);
1255 if should {
1256 guard.insert(hex_id, now);
1257 to_request.push(key_id);
1258 }
1259 }
1260 drop(guard);
1261 if !to_request.is_empty() {
1262 self.request_app_state_keys(&to_request).await;
1263 }
1264 }
1265
1266 for m in mutations {
1267 debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
1268 self.dispatch_app_state_mutation(&m, full_sync).await;
1269 }
1270
1271 state = new_state;
1272 has_more = list.has_more_patches;
1273 debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more}", name);
1274 }
1275
1276 backend.set_version(name.as_str(), state.clone()).await?;
1277
1278 debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
1279 Ok(())
1280 }
1281
1282 #[allow(dead_code)]
1283 async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
1284 if raw_key_ids.is_empty() {
1285 return;
1286 }
1287 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1288 let own_jid = match device_snapshot.pn.clone() {
1289 Some(j) => j,
1290 None => return,
1291 };
1292 let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
1293 .iter()
1294 .map(|k| wa::message::AppStateSyncKeyId {
1295 key_id: Some(k.clone()),
1296 })
1297 .collect();
1298 let msg = wa::Message {
1299 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1300 r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
1301 app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
1302 ..Default::default()
1303 })),
1304 ..Default::default()
1305 };
1306 if let Err(e) = self
1307 .send_message_impl(
1308 own_jid,
1309 &msg,
1310 Some(self.generate_message_id().await),
1311 true,
1312 false,
1313 None,
1314 )
1315 .await
1316 {
1317 warn!("Failed to send app state key request: {e}");
1318 }
1319 }
1320
1321 #[allow(dead_code)]
1322 async fn dispatch_app_state_mutation(
1323 &self,
1324 m: &crate::appstate_sync::Mutation,
1325 full_sync: bool,
1326 ) {
1327 use wacore::types::events::{
1328 ArchiveUpdate, ContactUpdate, Event, MarkChatAsReadUpdate, MuteUpdate, PinUpdate,
1329 };
1330 if m.operation != wa::syncd_mutation::SyncdOperation::Set {
1331 return;
1332 }
1333 if m.index.is_empty() {
1334 return;
1335 }
1336 let kind = &m.index[0];
1337 let ts = m
1338 .action_value
1339 .as_ref()
1340 .and_then(|v| v.timestamp)
1341 .unwrap_or(0);
1342 let time = chrono::DateTime::from_timestamp_millis(ts).unwrap_or_else(chrono::Utc::now);
1343 let jid = if m.index.len() > 1 {
1344 m.index[1].parse().unwrap_or_default()
1345 } else {
1346 Jid::default()
1347 };
1348 match kind.as_str() {
1349 "setting_pushName" => {
1350 if let Some(val) = &m.action_value
1351 && let Some(act) = &val.push_name_setting
1352 && let Some(new_name) = &act.name
1353 {
1354 let new_name = new_name.clone();
1355 let bus = self.core.event_bus.clone();
1356
1357 let snapshot = self.persistence_manager.get_device_snapshot().await;
1358 let old = snapshot.push_name.clone();
1359 if old != new_name {
1360 info!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
1361 self.persistence_manager
1362 .process_command(DeviceCommand::SetPushName(new_name.clone()))
1363 .await;
1364 bus.dispatch(&Event::SelfPushNameUpdated(
1365 crate::types::events::SelfPushNameUpdated {
1366 from_server: true,
1367 old_name: old,
1368 new_name: new_name.clone(),
1369 },
1370 ));
1371 } else {
1372 debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
1373 }
1374 }
1375 }
1376 "mute" => {
1377 if let Some(val) = &m.action_value
1378 && let Some(act) = &val.mute_action
1379 {
1380 self.core.event_bus.dispatch(&Event::MuteUpdate(MuteUpdate {
1381 jid,
1382 timestamp: time,
1383 action: Box::new(*act),
1384 from_full_sync: full_sync,
1385 }));
1386 }
1387 }
1388 "pin" | "pin_v1" => {
1389 if let Some(val) = &m.action_value
1390 && let Some(act) = &val.pin_action
1391 {
1392 self.core.event_bus.dispatch(&Event::PinUpdate(PinUpdate {
1393 jid,
1394 timestamp: time,
1395 action: Box::new(*act),
1396 from_full_sync: full_sync,
1397 }));
1398 }
1399 }
1400 "archive" => {
1401 if let Some(val) = &m.action_value
1402 && let Some(act) = &val.archive_chat_action
1403 {
1404 self.core
1405 .event_bus
1406 .dispatch(&Event::ArchiveUpdate(ArchiveUpdate {
1407 jid,
1408 timestamp: time,
1409 action: Box::new(act.clone()),
1410 from_full_sync: full_sync,
1411 }));
1412 }
1413 }
1414 "contact" => {
1415 if let Some(val) = &m.action_value
1416 && let Some(act) = &val.contact_action
1417 {
1418 self.core
1419 .event_bus
1420 .dispatch(&Event::ContactUpdate(ContactUpdate {
1421 jid,
1422 timestamp: time,
1423 action: Box::new(act.clone()),
1424 from_full_sync: full_sync,
1425 }));
1426 }
1427 }
1428 "mark_chat_as_read" | "markChatAsRead" => {
1429 if let Some(val) = &m.action_value
1430 && let Some(act) = &val.mark_chat_as_read_action
1431 {
1432 self.core.event_bus.dispatch(&Event::MarkChatAsReadUpdate(
1433 MarkChatAsReadUpdate {
1434 jid,
1435 timestamp: time,
1436 action: Box::new(act.clone()),
1437 from_full_sync: full_sync,
1438 },
1439 ));
1440 }
1441 }
1442 _ => {}
1443 }
1444 }
1445
1446 async fn expect_disconnect(&self) {
1447 self.expected_disconnect.store(true, Ordering::Relaxed);
1448 }
1449
1450 pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
1451 self.is_logged_in.store(false, Ordering::Relaxed);
1452
1453 let mut attrs = node.attrs();
1454 let code = attrs.optional_string("code").unwrap_or("");
1455 let conflict_type = node
1456 .get_optional_child("conflict")
1457 .map(|n| n.attrs().optional_string("type").unwrap_or("").to_string())
1458 .unwrap_or_default();
1459
1460 match (code, conflict_type.as_str()) {
1461 ("515", _) => {
1462 info!(target: "Client", "Got 515 stream error, server is closing stream. Will auto-reconnect.");
1464 self.expect_disconnect().await;
1465 let transport_opt = self.transport.lock().await.clone();
1468 if let Some(transport) = transport_opt {
1469 tokio::spawn(async move {
1471 info!(target: "Client", "Disconnecting transport after 515");
1472 transport.disconnect().await;
1473 });
1474 }
1475 }
1476 ("401", "device_removed") | (_, "replaced") => {
1477 info!(target: "Client", "Got stream error indicating client was removed or replaced. Logging out.");
1478 self.expect_disconnect().await;
1479 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1480
1481 let event = if conflict_type == "replaced" {
1482 Event::StreamReplaced(crate::types::events::StreamReplaced)
1483 } else {
1484 Event::LoggedOut(crate::types::events::LoggedOut {
1485 on_connect: false,
1486 reason: ConnectFailureReason::LoggedOut,
1487 })
1488 };
1489 self.core.event_bus.dispatch(&event);
1490 }
1491 ("503", _) => {
1492 info!(target: "Client", "Got 503 service unavailable, will auto-reconnect.");
1493 }
1494 _ => {
1495 error!(target: "Client", "Unknown stream error: {}", DisplayableNode(node));
1496 self.expect_disconnect().await;
1497 self.core.event_bus.dispatch(&Event::StreamError(
1498 crate::types::events::StreamError {
1499 code: code.to_string(),
1500 raw: Some(node.clone()),
1501 },
1502 ));
1503 }
1504 }
1505
1506 info!(target: "Client", "Notifying shutdown from stream error handler");
1507 self.shutdown_notifier.notify_waiters();
1508 }
1509
1510 pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
1511 self.expected_disconnect.store(true, Ordering::Relaxed);
1512 self.shutdown_notifier.notify_waiters();
1513
1514 let mut attrs = node.attrs();
1515 let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
1516 let reason = ConnectFailureReason::from(reason_code);
1517
1518 if reason.should_reconnect() {
1519 self.expected_disconnect.store(false, Ordering::Relaxed);
1520 } else {
1521 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
1522 }
1523
1524 if reason.is_logged_out() {
1525 info!(target: "Client", "Got {reason:?} connect failure, logging out.");
1526 self.core
1527 .event_bus
1528 .dispatch(&wacore::types::events::Event::LoggedOut(
1529 crate::types::events::LoggedOut {
1530 on_connect: true,
1531 reason,
1532 },
1533 ));
1534 } else if let ConnectFailureReason::TempBanned = reason {
1535 let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
1536 let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
1537 let expire_duration =
1538 chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
1539 warn!(target: "Client", "Temporary ban connect failure: {}", DisplayableNode(node));
1540 self.core.event_bus.dispatch(&Event::TemporaryBan(
1541 crate::types::events::TemporaryBan {
1542 code: crate::types::events::TempBanReason::from(ban_code),
1543 expire: expire_duration,
1544 },
1545 ));
1546 } else if let ConnectFailureReason::ClientOutdated = reason {
1547 error!(target: "Client", "Client is outdated and was rejected by server.");
1548 self.core
1549 .event_bus
1550 .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
1551 } else {
1552 warn!(target: "Client", "Unknown connect failure: {}", DisplayableNode(node));
1553 self.core.event_bus.dispatch(&Event::ConnectFailure(
1554 crate::types::events::ConnectFailure {
1555 reason,
1556 message: attrs.optional_string("message").unwrap_or("").to_string(),
1557 raw: Some(node.clone()),
1558 },
1559 ));
1560 }
1561 }
1562
1563 pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
1564 if let Some("get") = node.attrs.get("type").map(|s| s.as_str())
1565 && node.get_optional_child("ping").is_some()
1566 {
1567 info!(target: "Client", "Received ping, sending pong.");
1568 let mut parser = node.attrs();
1569 let from_jid = parser.jid("from");
1570 let id = parser.string("id");
1571 let pong = NodeBuilder::new("iq")
1572 .attrs([
1573 ("to", from_jid.to_string()),
1574 ("id", id),
1575 ("type", "result".to_string()),
1576 ])
1577 .build();
1578 if let Err(e) = self.send_node(pong).await {
1579 warn!("Failed to send pong: {e:?}");
1580 }
1581 return true;
1582 }
1583
1584 if pair::handle_iq(self, node).await {
1586 return true;
1587 }
1588
1589 false
1590 }
1591
1592 pub fn is_connected(&self) -> bool {
1593 self.noise_socket
1594 .try_lock()
1595 .is_ok_and(|guard| guard.is_some())
1596 }
1597
1598 pub fn is_logged_in(&self) -> bool {
1599 self.is_logged_in.load(Ordering::Relaxed)
1600 }
1601
1602 pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
1610 if self.is_connected() {
1612 return Ok(());
1613 }
1614
1615 let notified = self.socket_ready_notifier.notified();
1618 if self.is_connected() {
1619 return Ok(());
1620 }
1621
1622 tokio::time::timeout(timeout, notified)
1623 .await
1624 .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
1625 }
1626
1627 pub async fn wait_for_connected(
1635 &self,
1636 timeout: std::time::Duration,
1637 ) -> Result<(), anyhow::Error> {
1638 if self.is_connected() && self.is_logged_in() {
1640 return Ok(());
1641 }
1642
1643 let notified = self.connected_notifier.notified();
1646 if self.is_connected() && self.is_logged_in() {
1647 return Ok(());
1648 }
1649
1650 tokio::time::timeout(timeout, notified)
1651 .await
1652 .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
1653 }
1654
1655 pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
1658 self.persistence_manager.clone()
1659 }
1660
1661 pub async fn edit_message(
1662 &self,
1663 to: Jid,
1664 original_id: String,
1665 new_content: wa::Message,
1666 ) -> Result<String, anyhow::Error> {
1667 let own_jid = self
1668 .get_pn()
1669 .await
1670 .ok_or_else(|| anyhow!("Not logged in"))?;
1671
1672 let edit_container_message = wa::Message {
1673 edited_message: Some(Box::new(wa::message::FutureProofMessage {
1674 message: Some(Box::new(wa::Message {
1675 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1676 key: Some(wa::MessageKey {
1677 remote_jid: Some(to.to_string()),
1678 from_me: Some(true),
1679 id: Some(original_id.clone()),
1680 participant: if to.is_group() {
1681 Some(own_jid.to_non_ad().to_string())
1682 } else {
1683 None
1684 },
1685 }),
1686 r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
1687 edited_message: Some(Box::new(new_content)),
1688 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
1689 ..Default::default()
1690 })),
1691 ..Default::default()
1692 })),
1693 })),
1694 ..Default::default()
1695 };
1696
1697 self.send_message_impl(
1698 to,
1699 &edit_container_message,
1700 Some(original_id.clone()),
1701 false,
1702 false,
1703 Some(crate::types::message::EditAttribute::MessageEdit),
1704 )
1705 .await?;
1706
1707 Ok(original_id)
1708 }
1709
1710 pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
1711 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
1712 let noise_socket = match noise_socket_arc {
1713 Some(socket) => socket,
1714 None => return Err(ClientError::NotConnected),
1715 };
1716
1717 info!(target: "Client/Send", "{}", DisplayableNode(&node));
1718 let mut pool_guard = self.send_buffer_pool.lock().await;
1719 let mut plaintext_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1720 let mut encrypted_buf = pool_guard.pop().unwrap_or_else(|| Vec::with_capacity(1024));
1721 drop(pool_guard);
1722
1723 plaintext_buf.clear();
1724 encrypted_buf.clear();
1725
1726 if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
1727 error!("Failed to marshal node: {e:?}");
1728 let mut g = self.send_buffer_pool.lock().await;
1729 if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1730 g.push(plaintext_buf);
1731 }
1732 if encrypted_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1733 g.push(encrypted_buf);
1734 }
1735 return Err(SocketError::Crypto("Marshal error".to_string()).into());
1736 }
1737
1738 let (plaintext_buf, encrypted_buf) = match noise_socket
1739 .encrypt_and_send(plaintext_buf, encrypted_buf)
1740 .await
1741 {
1742 Ok(bufs) => bufs,
1743 Err(mut e) => {
1744 let p_buf = std::mem::take(&mut e.plaintext_buf);
1745 let o_buf = std::mem::take(&mut e.out_buf);
1746 let mut g = self.send_buffer_pool.lock().await;
1747 if p_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1748 g.push(p_buf);
1749 }
1750 if o_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1751 g.push(o_buf);
1752 }
1753 return Err(e.into());
1754 }
1755 };
1756
1757 let mut g = self.send_buffer_pool.lock().await;
1758 if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1759 g.push(plaintext_buf);
1760 }
1761 if encrypted_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
1762 g.push(encrypted_buf);
1763 }
1764 Ok(())
1765 }
1766
1767 pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
1768 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1769 let old_name = device_snapshot.push_name.clone();
1770
1771 if old_name == new_name {
1772 return;
1773 }
1774
1775 log::info!("Updating push name from '{}' -> '{}'", old_name, new_name);
1776 self.persistence_manager
1777 .process_command(DeviceCommand::SetPushName(new_name.clone()))
1778 .await;
1779
1780 self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
1781 crate::types::events::SelfPushNameUpdated {
1782 from_server: true,
1783 old_name,
1784 new_name: new_name.clone(),
1785 },
1786 ));
1787
1788 let client_clone = self.clone();
1789 tokio::spawn(async move {
1790 if let Err(e) = client_clone.presence().set_available().await {
1791 log::warn!("Failed to send presence after push name update: {:?}", e);
1792 } else {
1793 log::info!("Sent presence after push name update.");
1794 }
1795 });
1796 }
1797
1798 pub async fn get_push_name(&self) -> String {
1799 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1800 device_snapshot.push_name.clone()
1801 }
1802
1803 pub async fn get_pn(&self) -> Option<Jid> {
1804 let snapshot = self.persistence_manager.get_device_snapshot().await;
1805 snapshot.pn.clone()
1806 }
1807
1808 pub async fn get_lid(&self) -> Option<Jid> {
1809 let snapshot = self.persistence_manager.get_device_snapshot().await;
1810 snapshot.lid.clone()
1811 }
1812
1813 pub(crate) async fn send_protocol_receipt(
1816 &self,
1817 id: String,
1818 receipt_type: crate::types::presence::ReceiptType,
1819 ) {
1820 if id.is_empty() {
1821 return;
1822 }
1823 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1824 if let Some(own_jid) = &device_snapshot.pn {
1825 let type_str = match receipt_type {
1826 crate::types::presence::ReceiptType::HistorySync => "hist_sync",
1827 crate::types::presence::ReceiptType::Read => "read",
1828 crate::types::presence::ReceiptType::ReadSelf => "read-self",
1829 crate::types::presence::ReceiptType::Delivered => "delivery",
1830 crate::types::presence::ReceiptType::Played => "played",
1831 crate::types::presence::ReceiptType::PlayedSelf => "played-self",
1832 crate::types::presence::ReceiptType::Inactive => "inactive",
1833 crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
1834 crate::types::presence::ReceiptType::Sender => "sender",
1835 crate::types::presence::ReceiptType::ServerError => "server-error",
1836 crate::types::presence::ReceiptType::Retry => "retry",
1837 crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
1838 };
1839
1840 let node = NodeBuilder::new("receipt")
1841 .attrs([
1842 ("id", id),
1843 ("type", type_str.to_string()),
1844 ("to", own_jid.to_non_ad().to_string()),
1845 ])
1846 .build();
1847
1848 if let Err(e) = self.send_node(node).await {
1849 warn!(
1850 "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
1851 receipt_type, self.unique_id, e
1852 );
1853 }
1854 }
1855 }
1856}
1857
1858#[cfg(test)]
1859mod tests {
1860 use super::*;
1861 use crate::lid_pn_cache::LearningSource;
1862 use crate::test_utils::MockHttpClient;
1863 use tokio::sync::oneshot;
1864 use wacore_binary::jid::SERVER_JID;
1865
1866 #[tokio::test]
1867 async fn test_ack_behavior_for_incoming_stanzas() {
1868 let backend = Arc::new(
1869 crate::store::SqliteStore::new(":memory:")
1870 .await
1871 .expect("Failed to create in-memory backend for test"),
1872 );
1873 let pm = Arc::new(
1874 PersistenceManager::new(backend)
1875 .await
1876 .expect("persistence manager should initialize"),
1877 );
1878 let (client, _rx) = Client::new(
1879 pm,
1880 Arc::new(crate::transport::mock::MockTransportFactory::new()),
1881 Arc::new(MockHttpClient),
1882 None,
1883 )
1884 .await;
1885
1886 use indexmap::IndexMap;
1890 use wacore_binary::node::{Node, NodeContent};
1891
1892 let mut receipt_attrs = IndexMap::new();
1893 receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
1894 receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
1895 let receipt_node = Node::new(
1896 "receipt",
1897 receipt_attrs,
1898 Some(NodeContent::String("test".to_string())),
1899 );
1900
1901 let mut notification_attrs = IndexMap::new();
1902 notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
1903 notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
1904 let notification_node = Node::new(
1905 "notification",
1906 notification_attrs,
1907 Some(NodeContent::String("test".to_string())),
1908 );
1909
1910 assert!(
1911 client.should_ack(&receipt_node),
1912 "should_ack must still return TRUE for <receipt> stanzas."
1913 );
1914 assert!(
1915 client.should_ack(¬ification_node),
1916 "should_ack must still return TRUE for <notification> stanzas."
1917 );
1918
1919 info!(
1920 "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
1921 );
1922 }
1923
1924 #[tokio::test]
1925 async fn test_send_buffer_pool_reuses_both_buffers() {
1926 let backend = Arc::new(
1927 crate::store::SqliteStore::new(":memory:")
1928 .await
1929 .expect("Failed to create in-memory backend for test"),
1930 );
1931 let pm = Arc::new(
1932 PersistenceManager::new(backend)
1933 .await
1934 .expect("persistence manager should initialize"),
1935 );
1936 let (client, _rx) = Client::new(
1937 pm,
1938 Arc::new(crate::transport::mock::MockTransportFactory::new()),
1939 Arc::new(MockHttpClient),
1940 None,
1941 )
1942 .await;
1943
1944 let initial_pool_size = {
1946 let pool = client.send_buffer_pool.lock().await;
1947 pool.len()
1948 };
1949
1950 let test_node = NodeBuilder::new("test").attr("id", "test-123").build();
1952
1953 let _ = client.send_node(test_node).await;
1954
1955 let final_pool_size = {
1958 let pool = client.send_buffer_pool.lock().await;
1959 pool.len()
1960 };
1961
1962 assert!(
1966 final_pool_size >= initial_pool_size,
1967 "Buffer pool should not shrink after send operations"
1968 );
1969
1970 info!(
1971 "✅ test_send_buffer_pool_reuses_both_buffers passed: Buffer pool properly manages buffers"
1972 );
1973 }
1974
1975 #[tokio::test]
1976 async fn test_ack_waiter_resolves() {
1977 let backend = Arc::new(
1978 crate::store::SqliteStore::new(":memory:")
1979 .await
1980 .expect("Failed to create in-memory backend for test"),
1981 );
1982 let pm = Arc::new(
1983 PersistenceManager::new(backend)
1984 .await
1985 .expect("persistence manager should initialize"),
1986 );
1987 let (client, _rx) = Client::new(
1988 pm,
1989 Arc::new(crate::transport::mock::MockTransportFactory::new()),
1990 Arc::new(MockHttpClient),
1991 None,
1992 )
1993 .await;
1994
1995 let test_id = "ack-test-123".to_string();
1997 let (tx, rx) = oneshot::channel();
1998 client
1999 .response_waiters
2000 .lock()
2001 .await
2002 .insert(test_id.clone(), tx);
2003 assert!(
2004 client.response_waiters.lock().await.contains_key(&test_id),
2005 "Waiter should be inserted before handling ack"
2006 );
2007
2008 let ack_node = NodeBuilder::new("ack")
2010 .attr("id", test_id.clone())
2011 .attr("from", SERVER_JID)
2012 .build();
2013
2014 let handled = client.handle_ack_response(ack_node).await;
2016 assert!(
2017 handled,
2018 "handle_ack_response should return true when waiter exists"
2019 );
2020
2021 match tokio::time::timeout(Duration::from_secs(1), rx).await {
2023 Ok(Ok(response_node)) => {
2024 assert_eq!(
2025 response_node.attrs.get("id"),
2026 Some(&test_id),
2027 "Response node should have correct ID"
2028 );
2029 }
2030 Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
2031 Err(_) => panic!("Test timed out waiting for ack response"),
2032 }
2033
2034 assert!(
2036 !client.response_waiters.lock().await.contains_key(&test_id),
2037 "Waiter should be removed after handling"
2038 );
2039
2040 info!(
2041 "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
2042 );
2043 }
2044
2045 #[tokio::test]
2046 async fn test_ack_without_matching_waiter() {
2047 let backend = Arc::new(
2048 crate::store::SqliteStore::new(":memory:")
2049 .await
2050 .expect("Failed to create in-memory backend for test"),
2051 );
2052 let pm = Arc::new(
2053 PersistenceManager::new(backend)
2054 .await
2055 .expect("persistence manager should initialize"),
2056 );
2057 let (client, _rx) = Client::new(
2058 pm,
2059 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2060 Arc::new(MockHttpClient),
2061 None,
2062 )
2063 .await;
2064
2065 let ack_node = NodeBuilder::new("ack")
2067 .attr("id", "non-existent-id")
2068 .attr("from", SERVER_JID)
2069 .build();
2070
2071 let handled = client.handle_ack_response(ack_node).await;
2073 assert!(
2074 !handled,
2075 "handle_ack_response should return false when no waiter exists"
2076 );
2077
2078 info!(
2079 "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
2080 );
2081 }
2082
2083 #[tokio::test]
2089 async fn test_lid_pn_cache_basic_operations() {
2090 let backend = Arc::new(
2091 crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
2092 .await
2093 .expect("Failed to create in-memory backend for test"),
2094 );
2095 let pm = Arc::new(
2096 PersistenceManager::new(backend)
2097 .await
2098 .expect("persistence manager should initialize"),
2099 );
2100 let (client, _rx) = Client::new(
2101 pm,
2102 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2103 Arc::new(MockHttpClient),
2104 None,
2105 )
2106 .await;
2107
2108 let phone = "559980000001";
2110 let lid = "100000012345678";
2111
2112 assert!(
2113 client.lid_pn_cache.get_current_lid(phone).await.is_none(),
2114 "Cache should be empty initially"
2115 );
2116
2117 client
2119 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2120 .await
2121 .expect("Failed to persist LID-PN mapping in tests");
2122
2123 let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
2125 assert!(cached_lid.is_some(), "Cache should contain the mapping");
2126 assert_eq!(
2127 cached_lid.expect("cache should have LID"),
2128 lid,
2129 "Cached LID should match what we inserted"
2130 );
2131
2132 let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
2134 assert!(cached_phone.is_some(), "Reverse lookup should work");
2135 assert_eq!(
2136 cached_phone.expect("reverse lookup should return phone"),
2137 phone,
2138 "Cached phone should match what we inserted"
2139 );
2140
2141 assert!(
2143 client
2144 .lid_pn_cache
2145 .get_current_lid("559980000002")
2146 .await
2147 .is_none(),
2148 "Different phone number should not have a mapping"
2149 );
2150
2151 info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
2152 }
2153
2154 #[tokio::test]
2158 async fn test_lid_pn_cache_timestamp_resolution() {
2159 let backend = Arc::new(
2160 crate::store::SqliteStore::new(
2161 "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
2162 )
2163 .await
2164 .expect("Failed to create in-memory backend for test"),
2165 );
2166 let pm = Arc::new(
2167 PersistenceManager::new(backend)
2168 .await
2169 .expect("persistence manager should initialize"),
2170 );
2171 let (client, _rx) = Client::new(
2172 pm,
2173 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2174 Arc::new(MockHttpClient),
2175 None,
2176 )
2177 .await;
2178
2179 let phone = "559980000001";
2180 let lid_old = "100000012345678";
2181 let lid_new = "100000087654321";
2182
2183 client
2185 .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
2186 .await
2187 .expect("Failed to persist LID-PN mapping in tests");
2188
2189 assert_eq!(
2190 client
2191 .lid_pn_cache
2192 .get_current_lid(phone)
2193 .await
2194 .expect("cache should have LID"),
2195 lid_old,
2196 "Initial LID should be stored"
2197 );
2198
2199 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2201
2202 client
2204 .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
2205 .await
2206 .expect("Failed to persist LID-PN mapping in tests");
2207
2208 assert_eq!(
2209 client
2210 .lid_pn_cache
2211 .get_current_lid(phone)
2212 .await
2213 .expect("cache should have newer LID"),
2214 lid_new,
2215 "Newer LID should be returned for phone lookup"
2216 );
2217
2218 assert_eq!(
2220 client
2221 .lid_pn_cache
2222 .get_phone_number(lid_old)
2223 .await
2224 .expect("reverse lookup should return phone"),
2225 phone,
2226 "Old LID should still map to phone"
2227 );
2228 assert_eq!(
2229 client
2230 .lid_pn_cache
2231 .get_phone_number(lid_new)
2232 .await
2233 .expect("reverse lookup should return phone"),
2234 phone,
2235 "New LID should also map to phone"
2236 );
2237
2238 info!(
2239 "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
2240 );
2241 }
2242
2243 #[tokio::test]
2247 async fn test_get_lid_for_phone_via_send_context_resolver() {
2248 use wacore::client::context::SendContextResolver;
2249
2250 let backend = Arc::new(
2251 crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
2252 .await
2253 .expect("Failed to create in-memory backend for test"),
2254 );
2255 let pm = Arc::new(
2256 PersistenceManager::new(backend)
2257 .await
2258 .expect("persistence manager should initialize"),
2259 );
2260 let (client, _rx) = Client::new(
2261 pm,
2262 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2263 Arc::new(MockHttpClient),
2264 None,
2265 )
2266 .await;
2267
2268 let phone = "559980000001";
2269 let lid = "100000012345678";
2270
2271 assert!(
2273 client.get_lid_for_phone(phone).await.is_none(),
2274 "get_lid_for_phone should return None before caching"
2275 );
2276
2277 client
2279 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2280 .await
2281 .expect("Failed to persist LID-PN mapping in tests");
2282
2283 let result = client.get_lid_for_phone(phone).await;
2285 assert!(
2286 result.is_some(),
2287 "get_lid_for_phone should return Some after caching"
2288 );
2289 assert_eq!(
2290 result.expect("get_lid_for_phone should return Some"),
2291 lid,
2292 "get_lid_for_phone should return the cached LID"
2293 );
2294
2295 info!(
2296 "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
2297 );
2298 }
2299
2300 #[tokio::test]
2311 async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
2312 let backend = Arc::new(
2313 crate::store::SqliteStore::new(
2314 "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
2315 )
2316 .await
2317 .expect("Failed to create in-memory backend for test"),
2318 );
2319 let pm = Arc::new(
2320 PersistenceManager::new(backend)
2321 .await
2322 .expect("persistence manager should initialize"),
2323 );
2324 let (client, _rx) = Client::new(
2325 pm,
2326 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2327 Arc::new(MockHttpClient),
2328 None,
2329 )
2330 .await;
2331
2332 client
2334 .offline_sync_completed
2335 .store(true, std::sync::atomic::Ordering::Relaxed);
2336
2337 let start = std::time::Instant::now();
2339 client.wait_for_offline_delivery_end().await;
2340 let elapsed = start.elapsed();
2341
2342 assert!(
2344 elapsed.as_millis() < 100,
2345 "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
2346 elapsed
2347 );
2348
2349 info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
2350 }
2351
2352 #[tokio::test]
2355 async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
2356 let backend = Arc::new(
2357 crate::store::SqliteStore::new(
2358 "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
2359 )
2360 .await
2361 .expect("Failed to create in-memory backend for test"),
2362 );
2363 let pm = Arc::new(
2364 PersistenceManager::new(backend)
2365 .await
2366 .expect("persistence manager should initialize"),
2367 );
2368 let (client, _rx) = Client::new(
2369 pm,
2370 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2371 Arc::new(MockHttpClient),
2372 None,
2373 )
2374 .await;
2375
2376 let start = std::time::Instant::now();
2379
2380 let result = tokio::time::timeout(
2382 std::time::Duration::from_millis(100),
2383 client.wait_for_offline_delivery_end(),
2384 )
2385 .await;
2386
2387 let elapsed = start.elapsed();
2388
2389 assert!(
2392 result.is_err(),
2393 "wait_for_offline_delivery_end should not return immediately when flag is false"
2394 );
2395 assert!(
2396 elapsed.as_millis() >= 95, "Should have waited for the timeout duration, took {:?}",
2398 elapsed
2399 );
2400
2401 info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
2402 }
2403
2404 #[tokio::test]
2406 async fn test_wait_for_offline_delivery_end_returns_on_notify() {
2407 let backend = Arc::new(
2408 crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
2409 .await
2410 .expect("Failed to create in-memory backend for test"),
2411 );
2412 let pm = Arc::new(
2413 PersistenceManager::new(backend)
2414 .await
2415 .expect("persistence manager should initialize"),
2416 );
2417 let (client, _rx) = Client::new(
2418 pm,
2419 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2420 Arc::new(MockHttpClient),
2421 None,
2422 )
2423 .await;
2424
2425 let client_clone = client.clone();
2426
2427 tokio::spawn(async move {
2429 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2430 client_clone.offline_sync_notifier.notify_waiters();
2431 });
2432
2433 let start = std::time::Instant::now();
2434 client.wait_for_offline_delivery_end().await;
2435 let elapsed = start.elapsed();
2436
2437 assert!(
2439 elapsed.as_millis() < 200,
2440 "wait_for_offline_delivery_end should return when notified, took {:?}",
2441 elapsed
2442 );
2443 assert!(
2444 elapsed.as_millis() >= 45, "Should have waited for the notify, only took {:?}",
2446 elapsed
2447 );
2448
2449 info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
2450 }
2451
2452 #[tokio::test]
2454 async fn test_offline_sync_flag_initially_false() {
2455 let backend = Arc::new(
2456 crate::store::SqliteStore::new(
2457 "file:memdb_offline_flag_initial?mode=memory&cache=shared",
2458 )
2459 .await
2460 .expect("Failed to create in-memory backend for test"),
2461 );
2462 let pm = Arc::new(
2463 PersistenceManager::new(backend)
2464 .await
2465 .expect("persistence manager should initialize"),
2466 );
2467 let (client, _rx) = Client::new(
2468 pm,
2469 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2470 Arc::new(MockHttpClient),
2471 None,
2472 )
2473 .await;
2474
2475 assert!(
2477 !client
2478 .offline_sync_completed
2479 .load(std::sync::atomic::Ordering::Relaxed),
2480 "offline_sync_completed should be false when Client is first created"
2481 );
2482
2483 info!("✅ test_offline_sync_flag_initially_false passed");
2484 }
2485
2486 #[tokio::test]
2491 async fn test_offline_sync_lifecycle() {
2492 use std::sync::atomic::Ordering;
2493
2494 let backend = Arc::new(
2495 crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
2496 .await
2497 .expect("Failed to create in-memory backend for test"),
2498 );
2499 let pm = Arc::new(
2500 PersistenceManager::new(backend)
2501 .await
2502 .expect("persistence manager should initialize"),
2503 );
2504 let (client, _rx) = Client::new(
2505 pm,
2506 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2507 Arc::new(MockHttpClient),
2508 None,
2509 )
2510 .await;
2511
2512 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
2514
2515 let client_waiter = client.clone();
2517 let waiter_handle = tokio::spawn(async move {
2518 client_waiter.wait_for_offline_delivery_end().await;
2519 true });
2521
2522 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2524
2525 assert!(
2527 !waiter_handle.is_finished(),
2528 "Waiter should still be waiting"
2529 );
2530
2531 client.offline_sync_completed.store(true, Ordering::Relaxed);
2533 client.offline_sync_notifier.notify_waiters();
2534
2535 let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
2537 .await
2538 .expect("Waiter should complete after notify")
2539 .expect("Waiter task should not panic");
2540
2541 assert!(result, "Waiter should have completed successfully");
2542 assert!(client.offline_sync_completed.load(Ordering::Relaxed));
2543
2544 info!("✅ test_offline_sync_lifecycle passed");
2545 }
2546
2547 #[tokio::test]
2550 async fn test_establish_primary_phone_session_fails_without_pn() {
2551 let backend = Arc::new(
2552 crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
2553 .await
2554 .expect("Failed to create in-memory backend for test"),
2555 );
2556 let pm = Arc::new(
2557 PersistenceManager::new(backend)
2558 .await
2559 .expect("persistence manager should initialize"),
2560 );
2561 let (client, _rx) = Client::new(
2562 pm,
2563 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2564 Arc::new(MockHttpClient),
2565 None,
2566 )
2567 .await;
2568
2569 let result = client.establish_primary_phone_session_immediate().await;
2571
2572 assert!(
2573 result.is_err(),
2574 "establish_primary_phone_session_immediate should fail when no PN is set"
2575 );
2576
2577 let error_msg = result.unwrap_err().to_string();
2578 assert!(
2579 error_msg.contains("Not logged in"),
2580 "Error should mention 'Not logged in', got: {}",
2581 error_msg
2582 );
2583
2584 info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
2585 }
2586
2587 #[tokio::test]
2591 async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
2592 use std::sync::atomic::Ordering;
2593 use wacore_binary::jid::Jid;
2594
2595 let backend = Arc::new(
2596 crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
2597 .await
2598 .expect("Failed to create in-memory backend for test"),
2599 );
2600 let pm = Arc::new(
2601 PersistenceManager::new(backend)
2602 .await
2603 .expect("persistence manager should initialize"),
2604 );
2605 let (client, _rx) = Client::new(
2606 pm,
2607 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2608 Arc::new(MockHttpClient),
2609 None,
2610 )
2611 .await;
2612
2613 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
2615
2616 let client_clone = client.clone();
2619 let ensure_handle = tokio::spawn(async move {
2620 client_clone.ensure_e2e_sessions(vec![]).await
2623 });
2624
2625 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2627 assert!(
2628 ensure_handle.is_finished(),
2629 "ensure_e2e_sessions should return immediately for empty JID list"
2630 );
2631
2632 let client_clone = client.clone();
2634 let test_jid = Jid::pn("559999999999");
2635 let ensure_handle = tokio::spawn(async move {
2636 let start = std::time::Instant::now();
2638 let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
2639 start.elapsed()
2640 });
2641
2642 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2644
2645 assert!(
2647 !ensure_handle.is_finished(),
2648 "ensure_e2e_sessions should be waiting for offline sync"
2649 );
2650
2651 client.offline_sync_completed.store(true, Ordering::Relaxed);
2653 client.offline_sync_notifier.notify_waiters();
2654
2655 let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
2657
2658 assert!(
2659 result.is_ok(),
2660 "ensure_e2e_sessions should complete after offline sync"
2661 );
2662
2663 info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
2664 }
2665
2666 #[tokio::test]
2675 async fn test_immediate_session_does_not_wait_for_offline_sync() {
2676 use std::sync::atomic::Ordering;
2677 use wacore_binary::jid::Jid;
2678
2679 let backend = Arc::new(
2680 crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
2681 .await
2682 .expect("Failed to create in-memory backend for test"),
2683 );
2684 let pm = Arc::new(
2685 PersistenceManager::new(backend.clone())
2686 .await
2687 .expect("persistence manager should initialize"),
2688 );
2689
2690 pm.modify_device(|device| {
2692 device.pn = Some(Jid::pn("559999999999"));
2693 })
2694 .await;
2695
2696 let (client, _rx) = Client::new(
2697 pm,
2698 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2699 Arc::new(MockHttpClient),
2700 None,
2701 )
2702 .await;
2703
2704 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
2706
2707 let start = std::time::Instant::now();
2710
2711 let result = tokio::time::timeout(
2714 std::time::Duration::from_millis(500),
2715 client.establish_primary_phone_session_immediate(),
2716 )
2717 .await;
2718
2719 let elapsed = start.elapsed();
2720
2721 assert!(
2723 result.is_ok(),
2724 "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
2725 );
2726
2727 assert!(
2729 elapsed.as_millis() < 500,
2730 "establish_primary_phone_session_immediate should not wait, took {:?}",
2731 elapsed
2732 );
2733
2734 info!(
2737 "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
2738 elapsed,
2739 result.unwrap().is_ok()
2740 );
2741
2742 info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
2743 }
2744}