1mod context_impl;
2mod device_registry;
3mod lid_pn;
4mod sender_keys;
5mod sessions;
6
7use crate::handshake;
8use crate::lid_pn_cache::LidPnCache;
9use crate::pair;
10use anyhow::{Result, anyhow};
11use dashmap::DashMap;
12use moka::future::Cache;
13use tokio::sync::watch;
14use wacore::xml::DisplayableNode;
15use wacore_binary::builder::NodeBuilder;
16use wacore_binary::jid::JidExt;
17use wacore_binary::node::{Attrs, Node};
18
19use crate::appstate_sync::AppStateProcessor;
20use crate::handlers::chatstate::ChatStateEvent;
21use crate::jid_utils::server_jid;
22use crate::store::{commands::DeviceCommand, persistence_manager::PersistenceManager};
23use crate::types::enc_handler::EncHandler;
24use crate::types::events::{ConnectFailureReason, Event};
25
26use log::{debug, error, info, trace, warn};
27
28use rand::RngCore;
29use scopeguard;
30use std::collections::{HashMap, HashSet};
31use wacore_binary::jid::Jid;
32
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
35
36use thiserror::Error;
37use tokio::sync::{Mutex, Notify, OnceCell, RwLock, mpsc};
38use tokio::time::{Duration, sleep};
39use wacore::appstate::patch_decode::WAPatchName;
40use wacore::client::context::GroupInfo;
41use waproto::whatsapp as wa;
42
43use crate::socket::{NoiseSocket, SocketError, error::EncryptSendError};
44use crate::sync_task::MajorSyncTask;
45
46type ChatStateHandler = Arc<dyn Fn(ChatStateEvent) + Send + Sync>;
48
49const APP_STATE_RETRY_MAX_ATTEMPTS: u32 = 6;
50
51const MAX_POOLED_BUFFER_CAP: usize = 512 * 1024;
52
53#[derive(Debug, Error)]
54pub enum ClientError {
55 #[error("client is not connected")]
56 NotConnected,
57 #[error("socket error: {0}")]
58 Socket(#[from] SocketError),
59 #[error("encrypt/send error: {0}")]
60 EncryptSend(#[from] EncryptSendError),
61 #[error("client is already connected")]
62 AlreadyConnected,
63 #[error("client is not logged in")]
64 NotLoggedIn,
65}
66
67use wacore::types::message::StanzaKey;
68
69#[derive(Debug)]
71pub(crate) struct OfflineSyncMetrics {
72 pub active: AtomicBool,
73 pub total_messages: AtomicUsize,
74 pub processed_messages: AtomicUsize,
75 pub start_time: std::sync::Mutex<Option<std::time::Instant>>,
77}
78
79pub(crate) struct RetryMetrics {
82 pub local_requeue_attempts: AtomicUsize,
84 pub local_requeue_success: AtomicUsize,
86 pub local_requeue_fallback: AtomicUsize,
88}
89
90pub struct Client {
91 pub(crate) core: wacore::client::CoreClient,
92
93 pub(crate) persistence_manager: Arc<PersistenceManager>,
94 pub(crate) media_conn: Arc<RwLock<Option<crate::mediaconn::MediaConn>>>,
95
96 pub(crate) is_logged_in: Arc<AtomicBool>,
97 pub(crate) is_connecting: Arc<AtomicBool>,
98 pub(crate) is_running: Arc<AtomicBool>,
99 pub(crate) shutdown_notifier: Arc<Notify>,
100
101 pub(crate) transport: Arc<Mutex<Option<Arc<dyn crate::transport::Transport>>>>,
102 pub(crate) transport_events:
103 Arc<Mutex<Option<async_channel::Receiver<crate::transport::TransportEvent>>>>,
104 pub(crate) transport_factory: Arc<dyn crate::transport::TransportFactory>,
105 pub(crate) noise_socket: Arc<Mutex<Option<Arc<NoiseSocket>>>>,
106
107 pub(crate) response_waiters:
108 Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<wacore_binary::Node>>>>,
109 pub(crate) unique_id: String,
110 pub(crate) id_counter: Arc<AtomicU64>,
111
112 pub(crate) unified_session: crate::unified_session::UnifiedSessionManager,
113
114 pub(crate) session_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
120
121 pub(crate) message_queues: Cache<String, mpsc::Sender<Arc<Node>>>,
125
126 pub(crate) lid_pn_cache: Arc<LidPnCache>,
131
132 pub(crate) message_enqueue_locks: Cache<String, Arc<tokio::sync::Mutex<()>>>,
136
137 pub group_cache: OnceCell<Cache<Jid, GroupInfo>>,
138 pub device_cache: OnceCell<Cache<Jid, Vec<Jid>>>,
139
140 pub(crate) retried_group_messages: Cache<String, ()>,
141 pub(crate) expected_disconnect: Arc<AtomicBool>,
142
143 pub(crate) connection_generation: Arc<AtomicU64>,
146
147 pub(crate) recent_messages: Cache<StanzaKey, Vec<u8>>,
150
151 pub(crate) pending_retries: Arc<Mutex<HashSet<String>>>,
152
153 pub(crate) message_retry_counts: Cache<String, u8>,
157
158 pub(crate) local_retry_cache: Cache<String, ()>,
163
164 pub enable_auto_reconnect: Arc<AtomicBool>,
165 pub auto_reconnect_errors: Arc<AtomicU32>,
166 pub last_successful_connect: Arc<Mutex<Option<chrono::DateTime<chrono::Utc>>>>,
167
168 pub(crate) needs_initial_full_sync: Arc<AtomicBool>,
169
170 pub(crate) app_state_processor: OnceCell<AppStateProcessor>,
171 pub(crate) app_state_key_requests: Arc<Mutex<HashMap<String, std::time::Instant>>>,
172 pub(crate) app_state_syncing: Arc<Mutex<HashSet<WAPatchName>>>,
175 pub(crate) initial_keys_synced_notifier: Arc<Notify>,
176 pub(crate) initial_app_state_keys_received: Arc<AtomicBool>,
177
178 pub(crate) offline_sync_notifier: Arc<Notify>,
181 pub(crate) offline_sync_completed: Arc<AtomicBool>,
183 pub(crate) offline_sync_metrics: Arc<OfflineSyncMetrics>,
185 pub(crate) retry_metrics: Arc<RetryMetrics>,
187 pub(crate) socket_ready_notifier: Arc<Notify>,
190 pub(crate) connected_notifier: Arc<Notify>,
193 pub(crate) major_sync_task_sender: mpsc::Sender<MajorSyncTask>,
194 pub(crate) pairing_cancellation_tx: Arc<Mutex<Option<watch::Sender<()>>>>,
195
196 pub(crate) pair_code_state: Arc<Mutex<wacore::pair_code::PairCodeState>>,
199
200 pub(crate) plaintext_buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
203
204 pub custom_enc_handlers: Arc<DashMap<String, Arc<dyn EncHandler>>>,
206
207 pub(crate) chatstate_handlers: Arc<RwLock<Vec<ChatStateHandler>>>,
210
211 pub(crate) pdo_pending_requests: Cache<String, crate::pdo::PendingPdoRequest>,
214
215 pub(crate) device_registry_cache: Cache<String, wacore::store::traits::DeviceListRecord>,
219
220 pub(crate) stanza_router: crate::handlers::router::StanzaRouter,
222
223 pub(crate) synchronous_ack: bool,
225
226 pub http_client: Arc<dyn crate::http::HttpClient>,
228
229 pub(crate) override_version: Option<(u32, u32, u32)>,
231
232 pub(crate) skip_history_sync: AtomicBool,
235}
236
237impl Client {
238 pub fn set_skip_history_sync(&self, enabled: bool) {
243 self.skip_history_sync.store(enabled, Ordering::Relaxed);
244 }
245
246 pub fn skip_history_sync_enabled(&self) -> bool {
248 self.skip_history_sync.load(Ordering::Relaxed)
249 }
250
251 pub async fn new(
252 persistence_manager: Arc<PersistenceManager>,
253 transport_factory: Arc<dyn crate::transport::TransportFactory>,
254 http_client: Arc<dyn crate::http::HttpClient>,
255 override_version: Option<(u32, u32, u32)>,
256 ) -> (Arc<Self>, mpsc::Receiver<MajorSyncTask>) {
257 let mut unique_id_bytes = [0u8; 2];
258 rand::rng().fill_bytes(&mut unique_id_bytes);
259
260 let device_snapshot = persistence_manager.get_device_snapshot().await;
261 let core = wacore::client::CoreClient::new(device_snapshot.core.clone());
262
263 let (tx, rx) = mpsc::channel(32);
264
265 let this = Self {
266 core,
267 persistence_manager: persistence_manager.clone(),
268 media_conn: Arc::new(RwLock::new(None)),
269 is_logged_in: Arc::new(AtomicBool::new(false)),
270 is_connecting: Arc::new(AtomicBool::new(false)),
271 is_running: Arc::new(AtomicBool::new(false)),
272 shutdown_notifier: Arc::new(Notify::new()),
273
274 transport: Arc::new(Mutex::new(None)),
275 transport_events: Arc::new(Mutex::new(None)),
276 transport_factory,
277 noise_socket: Arc::new(Mutex::new(None)),
278
279 response_waiters: Arc::new(Mutex::new(HashMap::new())),
280 unique_id: format!("{}.{}", unique_id_bytes[0], unique_id_bytes[1]),
281 id_counter: Arc::new(AtomicU64::new(0)),
282 unified_session: crate::unified_session::UnifiedSessionManager::new(),
283
284 session_locks: Cache::builder()
285 .time_to_live(Duration::from_secs(300)) .max_capacity(10_000) .build(),
288 message_queues: Cache::builder()
289 .time_to_live(Duration::from_secs(300)) .max_capacity(10_000) .build(),
292 lid_pn_cache: Arc::new(LidPnCache::new()),
293 message_enqueue_locks: Cache::builder()
294 .time_to_live(Duration::from_secs(300))
295 .max_capacity(10_000)
296 .build(),
297 group_cache: OnceCell::new(),
298 device_cache: OnceCell::new(),
299 retried_group_messages: Cache::builder()
300 .time_to_live(Duration::from_secs(300))
301 .max_capacity(2_000)
302 .build(),
303
304 expected_disconnect: Arc::new(AtomicBool::new(false)),
305 connection_generation: Arc::new(AtomicU64::new(0)),
306
307 recent_messages: Cache::builder()
311 .time_to_live(Duration::from_secs(300))
312 .max_capacity(1_000)
313 .build(),
314
315 pending_retries: Arc::new(Mutex::new(HashSet::new())),
316
317 message_retry_counts: Cache::builder()
320 .time_to_live(Duration::from_secs(300))
321 .max_capacity(5_000)
322 .build(),
323
324 local_retry_cache: Cache::builder()
327 .time_to_live(Duration::from_secs(10))
328 .max_capacity(5_000)
329 .build(),
330
331 offline_sync_metrics: Arc::new(OfflineSyncMetrics {
332 active: AtomicBool::new(false),
333 total_messages: AtomicUsize::new(0),
334 processed_messages: AtomicUsize::new(0),
335 start_time: std::sync::Mutex::new(None),
336 }),
337
338 retry_metrics: Arc::new(RetryMetrics {
339 local_requeue_attempts: AtomicUsize::new(0),
340 local_requeue_success: AtomicUsize::new(0),
341 local_requeue_fallback: AtomicUsize::new(0),
342 }),
343
344 enable_auto_reconnect: Arc::new(AtomicBool::new(true)),
345 auto_reconnect_errors: Arc::new(AtomicU32::new(0)),
346 last_successful_connect: Arc::new(Mutex::new(None)),
347
348 needs_initial_full_sync: Arc::new(AtomicBool::new(false)),
349
350 app_state_processor: OnceCell::new(),
351 app_state_key_requests: Arc::new(Mutex::new(HashMap::new())),
352 app_state_syncing: Arc::new(Mutex::new(HashSet::new())),
353 initial_keys_synced_notifier: Arc::new(Notify::new()),
354 initial_app_state_keys_received: Arc::new(AtomicBool::new(false)),
355 offline_sync_notifier: Arc::new(Notify::new()),
356 offline_sync_completed: Arc::new(AtomicBool::new(false)),
357 socket_ready_notifier: Arc::new(Notify::new()),
358 connected_notifier: Arc::new(Notify::new()),
359 major_sync_task_sender: tx,
360 pairing_cancellation_tx: Arc::new(Mutex::new(None)),
361 pair_code_state: Arc::new(Mutex::new(wacore::pair_code::PairCodeState::default())),
362 plaintext_buffer_pool: Arc::new(Mutex::new(Vec::with_capacity(4))),
363 custom_enc_handlers: Arc::new(DashMap::new()),
364 chatstate_handlers: Arc::new(RwLock::new(Vec::new())),
365 pdo_pending_requests: crate::pdo::new_pdo_cache(),
366 device_registry_cache: Cache::builder()
367 .max_capacity(5_000) .time_to_live(Duration::from_secs(3600)) .build(),
370 stanza_router: Self::create_stanza_router(),
371 synchronous_ack: false,
372 http_client,
373 override_version,
374 skip_history_sync: AtomicBool::new(false),
375 };
376
377 let arc = Arc::new(this);
378
379 let warm_up_arc = arc.clone();
381 tokio::spawn(async move {
382 if let Err(e) = warm_up_arc.warm_up_lid_pn_cache().await {
383 warn!("Failed to warm up LID-PN cache: {e}");
384 }
385 });
386
387 let cleanup_arc = arc.clone();
389 tokio::spawn(async move {
390 cleanup_arc.device_registry_cleanup_loop().await;
391 });
392
393 (arc, rx)
394 }
395
396 pub(crate) async fn get_group_cache(&self) -> &Cache<Jid, GroupInfo> {
397 self.group_cache
398 .get_or_init(|| async {
399 debug!("Initializing Group Cache for the first time.");
400 Cache::builder()
401 .time_to_live(Duration::from_secs(3600))
402 .max_capacity(1_000)
403 .build()
404 })
405 .await
406 }
407
408 pub(crate) async fn get_device_cache(&self) -> &Cache<Jid, Vec<Jid>> {
409 self.device_cache
410 .get_or_init(|| async {
411 debug!("Initializing Device Cache for the first time.");
412 Cache::builder()
413 .time_to_live(Duration::from_secs(3600))
414 .max_capacity(5_000)
415 .build()
416 })
417 .await
418 }
419
420 pub(crate) async fn get_app_state_processor(&self) -> &AppStateProcessor {
421 self.app_state_processor
422 .get_or_init(|| async {
423 debug!("Initializing AppStateProcessor for the first time.");
424 AppStateProcessor::new(self.persistence_manager.backend())
425 })
426 .await
427 }
428
429 fn create_stanza_router() -> crate::handlers::router::StanzaRouter {
431 use crate::handlers::{
432 basic::{AckHandler, FailureHandler, StreamErrorHandler, SuccessHandler},
433 chatstate::ChatstateHandler,
434 ib::IbHandler,
435 iq::IqHandler,
436 message::MessageHandler,
437 notification::NotificationHandler,
438 receipt::ReceiptHandler,
439 router::StanzaRouter,
440 unimplemented::UnimplementedHandler,
441 };
442
443 let mut router = StanzaRouter::new();
444
445 router.register(Arc::new(MessageHandler));
447 router.register(Arc::new(ReceiptHandler));
448 router.register(Arc::new(IqHandler));
449 router.register(Arc::new(SuccessHandler));
450 router.register(Arc::new(FailureHandler));
451 router.register(Arc::new(StreamErrorHandler));
452 router.register(Arc::new(IbHandler));
453 router.register(Arc::new(NotificationHandler));
454 router.register(Arc::new(AckHandler));
455 router.register(Arc::new(ChatstateHandler));
456
457 router.register(Arc::new(UnimplementedHandler::for_call()));
459 router.register(Arc::new(UnimplementedHandler::for_presence()));
460
461 router
462 }
463
464 pub fn register_handler(&self, handler: Arc<dyn wacore::types::events::EventHandler>) {
466 self.core.event_bus.add_handler(handler);
467 }
468
469 pub async fn register_chatstate_handler(
473 &self,
474 handler: Arc<dyn Fn(ChatStateEvent) + Send + Sync>,
475 ) {
476 self.chatstate_handlers.write().await.push(handler);
477 }
478
479 pub(crate) async fn dispatch_chatstate_event(
483 &self,
484 stanza: wacore::iq::chatstate::ChatstateStanza,
485 ) {
486 let event = ChatStateEvent::from_stanza(stanza);
487
488 let handlers = self.chatstate_handlers.read().await.clone();
490 for handler in handlers {
491 let event_clone = event.clone();
492 let handler_clone = handler.clone();
493 tokio::spawn(async move {
494 (handler_clone)(event_clone);
495 });
496 }
497 }
498
499 pub async fn run(self: &Arc<Self>) {
500 if self.is_running.swap(true, Ordering::SeqCst) {
501 warn!("Client `run` method called while already running.");
502 return;
503 }
504 while self.is_running.load(Ordering::Relaxed) {
505 self.expected_disconnect.store(false, Ordering::Relaxed);
506
507 if self.connect().await.is_err() {
508 error!("Failed to connect, will retry...");
509 } else {
510 if self.read_messages_loop().await.is_err() {
511 warn!(
512 "Message loop exited with an error. Will attempt to reconnect if enabled."
513 );
514 } else if self.expected_disconnect.load(Ordering::Relaxed) {
515 debug!("Message loop exited gracefully (expected disconnect).");
516 } else {
517 info!("Message loop exited gracefully.");
518 }
519
520 self.cleanup_connection_state().await;
521 }
522
523 if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
524 info!("Auto-reconnect disabled, shutting down.");
525 self.is_running.store(false, Ordering::Relaxed);
526 break;
527 }
528
529 if self.expected_disconnect.load(Ordering::Relaxed) {
531 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
532 info!("Expected disconnect (e.g., 515), reconnecting immediately...");
533 continue;
534 }
535
536 let error_count = self.auto_reconnect_errors.fetch_add(1, Ordering::SeqCst);
537 let delay_secs = u64::from(error_count * 2).min(30);
538 let delay = Duration::from_secs(delay_secs);
539 info!(
540 "Will attempt to reconnect in {:?} (attempt {})",
541 delay,
542 error_count + 1
543 );
544 sleep(delay).await;
545 }
546 info!("Client run loop has shut down.");
547 }
548
549 pub async fn connect(self: &Arc<Self>) -> Result<(), anyhow::Error> {
550 if self.is_connecting.swap(true, Ordering::SeqCst) {
551 return Err(ClientError::AlreadyConnected.into());
552 }
553
554 let _guard = scopeguard::guard((), |_| {
555 self.is_connecting.store(false, Ordering::Relaxed);
556 });
557
558 if self.is_connected() {
559 return Err(ClientError::AlreadyConnected.into());
560 }
561
562 self.is_logged_in.store(false, Ordering::Relaxed);
566 self.offline_sync_completed.store(false, Ordering::Relaxed);
567
568 let version_future = crate::version::resolve_and_update_version(
569 &self.persistence_manager,
570 &self.http_client,
571 self.override_version,
572 );
573
574 let transport_future = self.transport_factory.create_transport();
575
576 debug!("Connecting WebSocket and fetching latest client version in parallel...");
577 let (version_result, transport_result) = tokio::join!(version_future, transport_future);
578
579 version_result.map_err(|e| anyhow!("Failed to resolve app version: {}", e))?;
580 let (transport, mut transport_events) = transport_result?;
581 debug!("Version fetch and transport connection established.");
582
583 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
584
585 let noise_socket =
586 handshake::do_handshake(&device_snapshot, transport.clone(), &mut transport_events)
587 .await?;
588
589 *self.transport.lock().await = Some(transport);
590 *self.transport_events.lock().await = Some(transport_events);
591 *self.noise_socket.lock().await = Some(noise_socket);
592
593 self.socket_ready_notifier.notify_waiters();
595
596 let client_clone = self.clone();
597 tokio::spawn(async move { client_clone.keepalive_loop().await });
598
599 Ok(())
600 }
601
602 pub async fn disconnect(self: &Arc<Self>) {
603 info!("Disconnecting client intentionally.");
604 self.expected_disconnect.store(true, Ordering::Relaxed);
605 self.is_running.store(false, Ordering::Relaxed);
606 self.shutdown_notifier.notify_waiters();
607
608 if let Some(transport) = self.transport.lock().await.as_ref() {
609 transport.disconnect().await;
610 }
611 self.cleanup_connection_state().await;
612 }
613
614 async fn cleanup_connection_state(&self) {
615 self.is_logged_in.store(false, Ordering::Relaxed);
616 *self.transport.lock().await = None;
617 *self.transport_events.lock().await = None;
618 *self.noise_socket.lock().await = None;
619 self.retried_group_messages.invalidate_all();
620 self.offline_sync_completed.store(false, Ordering::Relaxed);
622 }
623
624 async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
625 debug!("Starting message processing loop...");
626
627 let mut rx_guard = self.transport_events.lock().await;
628 let transport_events = rx_guard
629 .take()
630 .ok_or_else(|| anyhow::anyhow!("Cannot start message loop: not connected"))?;
631 drop(rx_guard);
632
633 let mut frame_decoder = wacore::framing::FrameDecoder::new();
635
636 loop {
637 tokio::select! {
638 biased;
639 _ = self.shutdown_notifier.notified() => {
640 debug!("Shutdown signaled in message loop. Exiting message loop.");
641 return Ok(());
642 },
643 event_result = transport_events.recv() => {
644 match event_result {
645 Ok(crate::transport::TransportEvent::DataReceived(data)) => {
646 frame_decoder.feed(&data);
648
649 while let Some(encrypted_frame) = frame_decoder.decode_frame() {
653 if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
655 let is_critical = matches!(node.tag.as_str(), "success" | "failure" | "stream:error");
659
660 if is_critical {
661 self.process_decrypted_node(node).await;
663 } else {
664 let client = self.clone();
667 tokio::spawn(async move {
668 client.process_decrypted_node(node).await;
669 });
670 }
671 }
672
673 if self.expected_disconnect.load(Ordering::Relaxed) {
675 debug!("Expected disconnect signaled during frame processing. Exiting message loop.");
676 return Ok(());
677 }
678 }
679 },
680 Ok(crate::transport::TransportEvent::Disconnected) | Err(_) => {
681 self.cleanup_connection_state().await;
682 if !self.expected_disconnect.load(Ordering::Relaxed) {
683 self.core.event_bus.dispatch(&Event::Disconnected(crate::types::events::Disconnected));
684 debug!("Transport disconnected unexpectedly.");
685 return Err(anyhow::anyhow!("Transport disconnected unexpectedly"));
686 } else {
687 debug!("Transport disconnected as expected.");
688 return Ok(());
689 }
690 }
691 Ok(crate::transport::TransportEvent::Connected) => {
692 debug!("Transport connected event received");
694 }
695 }
696 }
697 }
698 }
699 }
700
701 pub(crate) async fn decrypt_frame(
704 self: &Arc<Self>,
705 encrypted_frame: &bytes::Bytes,
706 ) -> Option<wacore_binary::node::Node> {
707 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
708 let noise_socket = match noise_socket_arc {
709 Some(s) => s,
710 None => {
711 log::error!("Cannot process frame: not connected (no noise socket)");
712 return None;
713 }
714 };
715
716 let decrypted_payload = match noise_socket.decrypt_frame(encrypted_frame) {
717 Ok(p) => p,
718 Err(e) => {
719 log::error!("Failed to decrypt frame: {e}");
720 return None;
721 }
722 };
723
724 let unpacked_data_cow = match wacore_binary::util::unpack(&decrypted_payload) {
725 Ok(data) => data,
726 Err(e) => {
727 log::warn!(target: "Client/Recv", "Failed to decompress frame: {e}");
728 return None;
729 }
730 };
731
732 match wacore_binary::marshal::unmarshal_ref(unpacked_data_cow.as_ref()) {
733 Ok(node_ref) => Some(node_ref.to_owned()),
734 Err(e) => {
735 log::warn!(target: "Client/Recv", "Failed to unmarshal node: {e}");
736 None
737 }
738 }
739 }
740
741 pub(crate) async fn process_decrypted_node(self: &Arc<Self>, node: wacore_binary::node::Node) {
745 let node_arc = Arc::new(node);
747 self.process_node(node_arc).await;
748 }
749
750 pub(crate) async fn process_node(self: &Arc<Self>, node: Arc<Node>) {
752 use wacore::xml::DisplayableNode;
753
754 if node.tag.as_str() == "ib" {
756 if let Some(preview) = node.get_optional_child("offline_preview") {
758 let count: usize = preview
759 .attrs
760 .get("count")
761 .and_then(|v| v.as_str())
762 .and_then(|s| s.parse().ok())
763 .unwrap_or(0);
764
765 if count == 0 {
766 self.offline_sync_metrics
767 .active
768 .store(false, Ordering::Release);
769 debug!(target: "Client/OfflineSync", "Sync COMPLETED: 0 items.");
770 } else {
771 self.offline_sync_metrics
773 .total_messages
774 .store(count, Ordering::Release);
775 self.offline_sync_metrics
776 .processed_messages
777 .store(0, Ordering::Release);
778 self.offline_sync_metrics
779 .active
780 .store(true, Ordering::Release);
781 match self.offline_sync_metrics.start_time.lock() {
782 Ok(mut guard) => *guard = Some(std::time::Instant::now()),
783 Err(poison) => *poison.into_inner() = Some(std::time::Instant::now()),
784 }
785 debug!(target: "Client/OfflineSync", "Sync STARTED: Expecting {} items.", count);
786 }
787 } else if self.offline_sync_metrics.active.load(Ordering::Acquire)
788 && node.get_optional_child("offline").is_some()
789 {
790 let processed = self
794 .offline_sync_metrics
795 .processed_messages
796 .load(Ordering::Acquire);
797 let elapsed = match self.offline_sync_metrics.start_time.lock() {
798 Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
799 Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
800 };
801 debug!(target: "Client/OfflineSync", "Sync COMPLETED: End marker received. Processed {} items in {:.2?}.", processed, elapsed);
802 self.offline_sync_metrics
803 .active
804 .store(false, Ordering::Release);
805 }
806 }
807
808 if self.offline_sync_metrics.active.load(Ordering::Acquire) {
810 if node.attrs.contains_key("offline") {
812 let processed = self
813 .offline_sync_metrics
814 .processed_messages
815 .fetch_add(1, Ordering::Release)
816 + 1;
817 let total = self
818 .offline_sync_metrics
819 .total_messages
820 .load(Ordering::Acquire);
821
822 if processed.is_multiple_of(50) || processed == total {
823 trace!(target: "Client/OfflineSync", "Sync Progress: {}/{}", processed, total);
824 }
825
826 if processed >= total {
827 let elapsed = match self.offline_sync_metrics.start_time.lock() {
828 Ok(guard) => guard.map(|t| t.elapsed()).unwrap_or_default(),
829 Err(poison) => poison.into_inner().map(|t| t.elapsed()).unwrap_or_default(),
830 };
831 debug!(target: "Client/OfflineSync", "Sync COMPLETED: Processed {} items in {:.2?}.", processed, elapsed);
832 self.offline_sync_metrics
833 .active
834 .store(false, Ordering::Release);
835 }
836 }
837 }
838 if node.tag.as_str() == "iq"
841 && let Some(sync_node) = node.get_optional_child("sync")
842 && let Some(collection_node) = sync_node.get_optional_child("collection")
843 {
844 let name = collection_node
845 .attrs()
846 .optional_string("name")
847 .unwrap_or("<unknown>");
848 debug!(target: "Client/Recv", "Received app state sync response for '{name}' (hiding content).");
849 } else {
850 debug!(target: "Client/Recv","{}", DisplayableNode(&node));
851 }
852
853 let mut cancelled = false;
855
856 if node.tag.as_str() == "xmlstreamend" {
857 if self.expected_disconnect.load(Ordering::Relaxed) {
858 debug!("Received <xmlstreamend/>, expected disconnect.");
859 } else {
860 warn!("Received <xmlstreamend/>, treating as disconnect.");
861 }
862 self.shutdown_notifier.notify_waiters();
863 return;
864 }
865
866 if node.tag.as_str() == "iq"
867 && let Some(id) = node.attrs.get("id").and_then(|v| v.as_str())
868 {
869 let has_waiter = self.response_waiters.lock().await.contains_key(id);
870 if has_waiter && self.handle_iq_response(Arc::clone(&node)).await {
871 return;
872 }
873 }
874
875 if !self
878 .stanza_router
879 .dispatch(self.clone(), Arc::clone(&node), &mut cancelled)
880 .await
881 {
882 warn!(
883 "Received unknown top-level node: {}",
884 DisplayableNode(&node)
885 );
886 }
887
888 if self.should_ack(&node) && !cancelled {
890 self.maybe_deferred_ack(node).await;
891 }
892 }
893
894 fn should_ack(&self, node: &Node) -> bool {
896 matches!(
897 node.tag.as_str(),
898 "message" | "receipt" | "notification" | "call"
899 ) && node.attrs.contains_key("id")
900 && node.attrs.contains_key("from")
901 }
902
903 async fn maybe_deferred_ack(self: &Arc<Self>, node: Arc<Node>) {
907 if self.synchronous_ack {
908 if let Err(e) = self.send_ack_for(&node).await {
909 warn!("Failed to send ack: {e:?}");
910 }
911 } else {
912 let this = self.clone();
913 tokio::spawn(async move {
915 if let Err(e) = this.send_ack_for(&node).await {
916 warn!("Failed to send ack: {e:?}");
917 }
918 });
919 }
920 }
921
922 async fn send_ack_for(&self, node: &Node) -> Result<(), ClientError> {
924 if !self.is_connected() || self.expected_disconnect.load(Ordering::Relaxed) {
925 return Ok(());
926 }
927 let id = match node.attrs.get("id") {
928 Some(v) => v.clone(),
929 None => return Ok(()),
930 };
931 let from = match node.attrs.get("from") {
932 Some(v) => v.clone(),
933 None => return Ok(()),
934 };
935 let participant = node.attrs.get("participant").cloned();
936 let typ = if node.tag != "message" {
937 node.attrs.get("type").cloned()
938 } else {
939 None
940 };
941 let mut attrs = Attrs::new();
942 attrs.insert("class".to_string(), node.tag.clone());
943 attrs.insert("id".to_string(), id);
944 attrs.insert("to".to_string(), from);
945 if let Some(p) = participant {
946 attrs.insert("participant".to_string(), p);
947 }
948 if let Some(t) = typ {
949 attrs.insert("type".to_string(), t);
950 }
951 let ack = Node {
952 tag: "ack".to_string(),
953 attrs,
954 content: None,
955 };
956 self.send_node(ack).await
957 }
958
959 pub(crate) async fn handle_unimplemented(&self, tag: &str) {
960 warn!("TODO: Implement handler for <{tag}>");
961 }
962
963 pub async fn set_passive(&self, passive: bool) -> Result<(), crate::request::IqError> {
964 use wacore::iq::passive::PassiveModeSpec;
965 self.execute(PassiveModeSpec::new(passive)).await
966 }
967
968 pub async fn clean_dirty_bits(
969 &self,
970 type_: &str,
971 timestamp: Option<&str>,
972 ) -> Result<(), crate::request::IqError> {
973 use wacore::iq::dirty::CleanDirtyBitsSpec;
974
975 let spec = CleanDirtyBitsSpec::single(type_, timestamp)?;
976 self.execute(spec).await
977 }
978
979 pub async fn fetch_props(&self) -> Result<(), crate::request::IqError> {
980 use wacore::iq::props::PropsSpec;
981 use wacore::store::commands::DeviceCommand;
982
983 let stored_hash = self
984 .persistence_manager
985 .get_device_snapshot()
986 .await
987 .props_hash
988 .clone();
989
990 let spec = match &stored_hash {
991 Some(hash) => {
992 debug!("Fetching props with hash for delta update...");
993 PropsSpec::with_hash(hash)
994 }
995 None => {
996 debug!("Fetching props (full, no stored hash)...");
997 PropsSpec::new()
998 }
999 };
1000
1001 let response = self.execute(spec).await?;
1002
1003 if response.delta_update {
1004 debug!(
1005 "Props delta update received ({} changed props)",
1006 response.props.len()
1007 );
1008 } else {
1009 debug!(
1010 "Props full update received ({} props, hash={:?})",
1011 response.props.len(),
1012 response.hash
1013 );
1014 }
1015
1016 if let Some(new_hash) = response.hash {
1017 self.persistence_manager
1018 .process_command(DeviceCommand::SetPropsHash(Some(new_hash)))
1019 .await;
1020 }
1021
1022 Ok(())
1023 }
1024
1025 pub async fn fetch_privacy_settings(
1026 &self,
1027 ) -> Result<wacore::iq::privacy::PrivacySettingsResponse, crate::request::IqError> {
1028 use wacore::iq::privacy::PrivacySettingsSpec;
1029
1030 debug!("Fetching privacy settings...");
1031
1032 self.execute(PrivacySettingsSpec::new()).await
1033 }
1034
1035 pub async fn send_digest_key_bundle(&self) -> Result<(), crate::request::IqError> {
1036 use wacore::iq::prekeys::DigestKeyBundleSpec;
1037
1038 debug!("Sending digest key bundle...");
1039
1040 self.execute(DigestKeyBundleSpec::new()).await.map(|_| ())
1041 }
1042
1043 pub(crate) async fn handle_success(self: &Arc<Self>, node: &wacore_binary::node::Node) {
1044 if self.expected_disconnect.load(Ordering::Relaxed) {
1048 debug!("Ignoring <success> stanza: expected disconnect pending");
1049 return;
1050 }
1051
1052 if self.is_logged_in.swap(true, Ordering::SeqCst) {
1055 debug!("Ignoring duplicate <success> stanza (already logged in)");
1056 return;
1057 }
1058
1059 let current_generation = self.connection_generation.fetch_add(1, Ordering::SeqCst) + 1;
1062
1063 info!(
1064 "Successfully authenticated with WhatsApp servers! (gen={})",
1065 current_generation
1066 );
1067 *self.last_successful_connect.lock().await = Some(chrono::Utc::now());
1068 self.auto_reconnect_errors.store(0, Ordering::Relaxed);
1069
1070 self.update_server_time_offset(node);
1071
1072 if let Some(lid_value) = node.attrs.get("lid") {
1073 if let Some(lid) = lid_value.to_jid() {
1074 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1075 if device_snapshot.lid.as_ref() != Some(&lid) {
1076 debug!("Updating LID from server to '{lid}'");
1077 self.persistence_manager
1078 .process_command(DeviceCommand::SetLid(Some(lid)))
1079 .await;
1080 }
1081 } else {
1082 warn!("Failed to parse LID from success stanza: {lid_value}");
1083 }
1084 } else {
1085 warn!("LID not found in <success> stanza. Group messaging may fail.");
1086 }
1087
1088 let client_clone = self.clone();
1089 let task_generation = current_generation;
1090 tokio::spawn(async move {
1091 macro_rules! check_generation {
1093 () => {
1094 if client_clone.connection_generation.load(Ordering::SeqCst) != task_generation
1095 {
1096 debug!("Post-login task cancelled: connection generation changed");
1097 return;
1098 }
1099 };
1100 }
1101
1102 debug!(
1103 "Starting post-login initialization sequence (gen={})...",
1104 task_generation
1105 );
1106
1107 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1110 let needs_pushname_from_sync = device_snapshot.push_name.is_empty();
1111 if needs_pushname_from_sync {
1112 debug!("Push name is empty - will be set from app state sync (setting_pushName)");
1113 }
1114
1115 if !client_clone.is_connected() {
1119 debug!(
1120 "Skipping post-login init: connection closed (likely pairing phase reconnect)"
1121 );
1122 return;
1123 }
1124
1125 check_generation!();
1126 client_clone.send_unified_session().await;
1127
1128 check_generation!();
1133 if let Err(e) = client_clone
1134 .establish_primary_phone_session_immediate()
1135 .await
1136 {
1137 warn!(target: "Client/PDO", "Failed to establish session with primary phone on login: {:?}", e);
1138 }
1140
1141 check_generation!();
1144 if let Err(e) = client_clone.upload_pre_keys().await {
1145 warn!("Failed to upload pre-keys during startup: {e:?}");
1146 }
1147
1148 check_generation!();
1152 if let Err(e) = client_clone.set_passive(false).await {
1153 warn!("Failed to send post-connect active IQ: {e:?}");
1154 }
1155
1156 const OFFLINE_SYNC_TIMEOUT_SECS: u64 = 5;
1161
1162 if !client_clone.offline_sync_completed.load(Ordering::Relaxed) {
1163 debug!(
1164 "Waiting for offline sync to complete (up to {}s)...",
1165 OFFLINE_SYNC_TIMEOUT_SECS
1166 );
1167 let wait_result = tokio::time::timeout(
1168 Duration::from_secs(OFFLINE_SYNC_TIMEOUT_SECS),
1169 client_clone.offline_sync_notifier.notified(),
1170 )
1171 .await;
1172
1173 check_generation!();
1175
1176 if wait_result.is_err() {
1177 debug!("Offline sync wait timed out, proceeding with passive tasks");
1178 } else {
1179 debug!("Offline sync completed, proceeding with passive tasks");
1180 }
1181 }
1182
1183 check_generation!();
1185 if !client_clone.is_connected() {
1186 debug!("Skipping presence: connection closed");
1187 return;
1188 }
1189
1190 let bg_client = client_clone.clone();
1192 let bg_generation = task_generation;
1193 tokio::spawn(async move {
1194 if bg_client.connection_generation.load(Ordering::SeqCst) != bg_generation {
1196 debug!("Skipping background init queries: connection generation changed");
1197 return;
1198 }
1199 if !bg_client.is_connected() {
1200 debug!("Skipping background init queries: connection closed");
1201 return;
1202 }
1203
1204 debug!(
1205 "Sending background initialization queries (Props, Blocklist, Privacy, Digest)..."
1206 );
1207
1208 let props_fut = bg_client.fetch_props();
1209 let binding = bg_client.blocking();
1210 let blocklist_fut = binding.get_blocklist();
1211 let privacy_fut = bg_client.fetch_privacy_settings();
1212 let digest_fut = bg_client.send_digest_key_bundle();
1213
1214 let (r_props, r_block, r_priv, r_digest) =
1215 tokio::join!(props_fut, blocklist_fut, privacy_fut, digest_fut);
1216
1217 if let Err(e) = r_props {
1218 warn!("Background init: Failed to fetch props: {e:?}");
1219 }
1220 if let Err(e) = r_block {
1221 warn!("Background init: Failed to fetch blocklist: {e:?}");
1222 }
1223 if let Err(e) = r_priv {
1224 warn!("Background init: Failed to fetch privacy settings: {e:?}");
1225 }
1226 if let Err(e) = r_digest {
1227 warn!("Background init: Failed to send digest: {e:?}");
1228 }
1229
1230 if let Err(e) = bg_client.tc_token().prune_expired().await {
1232 warn!("Background init: Failed to prune expired tc_tokens: {e:?}");
1233 }
1234 });
1235
1236 check_generation!();
1237
1238 let flag_set = client_clone.needs_initial_full_sync.load(Ordering::Relaxed);
1239 let needs_initial_sync = flag_set || needs_pushname_from_sync;
1240
1241 if needs_initial_sync {
1242 debug!(
1246 target: "Client/AppState",
1247 "Starting Initial App State Sync (flag_set={flag_set}, needs_pushname={needs_pushname_from_sync})"
1248 );
1249
1250 if !client_clone
1251 .initial_app_state_keys_received
1252 .load(Ordering::Relaxed)
1253 {
1254 debug!(
1255 target: "Client/AppState",
1256 "Waiting up to 5s for app state keys..."
1257 );
1258 let _ = tokio::time::timeout(
1259 Duration::from_secs(5),
1260 client_clone.initial_keys_synced_notifier.notified(),
1261 )
1262 .await;
1263
1264 check_generation!();
1266 }
1267
1268 check_generation!();
1270 if let Err(e) = client_clone
1271 .sync_collections_batched(vec![
1272 WAPatchName::CriticalBlock,
1273 WAPatchName::CriticalUnblockLow,
1274 ])
1275 .await
1276 {
1277 warn!("Failed to sync critical app state: {e}");
1278 }
1279
1280 check_generation!();
1281
1282 client_clone
1287 .core
1288 .event_bus
1289 .dispatch(&Event::Connected(crate::types::events::Connected));
1290 client_clone.connected_notifier.notify_waiters();
1291
1292 let sync_client = client_clone.clone();
1294 let sync_generation = task_generation;
1295 tokio::spawn(async move {
1296 if sync_client.connection_generation.load(Ordering::SeqCst) != sync_generation {
1297 debug!("App state sync cancelled: connection generation changed");
1298 return;
1299 }
1300
1301 if let Err(e) = sync_client
1302 .sync_collections_batched(vec![
1303 WAPatchName::RegularLow,
1304 WAPatchName::RegularHigh,
1305 WAPatchName::Regular,
1306 ])
1307 .await
1308 {
1309 warn!("Failed to batch sync non-critical app state: {e}");
1310 }
1311
1312 sync_client
1313 .needs_initial_full_sync
1314 .store(false, Ordering::Relaxed);
1315 debug!(target: "Client/AppState", "Initial App State Sync Completed.");
1316 });
1317 } else {
1318 let device_snapshot = client_clone.persistence_manager.get_device_snapshot().await;
1321 if !device_snapshot.push_name.is_empty() {
1322 if let Err(e) = client_clone.presence().set_available().await {
1323 warn!("Failed to send initial presence: {e:?}");
1324 } else {
1325 debug!("Initial presence sent successfully.");
1326 }
1327 }
1328
1329 check_generation!();
1332
1333 client_clone
1334 .core
1335 .event_bus
1336 .dispatch(&Event::Connected(crate::types::events::Connected));
1337 client_clone.connected_notifier.notify_waiters();
1338 }
1339 });
1340 }
1341
1342 pub(crate) async fn handle_ack_response(&self, node: Node) -> bool {
1347 let id_opt = node.attrs.get("id").map(|v| v.to_string_value());
1348 if let Some(id) = id_opt
1349 && let Some(waiter) = self.response_waiters.lock().await.remove(&id)
1350 {
1351 if waiter.send(node).is_err() {
1352 warn!(target: "Client/Ack", "Failed to send ACK response to waiter for ID {id}. Receiver was likely dropped.");
1353 }
1354 return true;
1355 }
1356 false
1357 }
1358
1359 #[allow(dead_code)] pub(crate) async fn fetch_app_state_with_retry(&self, name: WAPatchName) -> anyhow::Result<()> {
1361 {
1365 let mut syncing = self.app_state_syncing.lock().await;
1366 if !syncing.insert(name) {
1367 debug!(target: "Client/AppState", "Skipping sync for {:?}: already in flight", name);
1368 return Ok(());
1369 }
1370 }
1371
1372 let result = self.fetch_app_state_with_retry_inner(name).await;
1373
1374 self.app_state_syncing.lock().await.remove(&name);
1376
1377 result
1378 }
1379
1380 #[allow(dead_code)]
1381 async fn fetch_app_state_with_retry_inner(&self, name: WAPatchName) -> anyhow::Result<()> {
1382 let mut attempt = 0u32;
1383 loop {
1384 attempt += 1;
1385 let res = self.process_app_state_sync_task(name, false).await;
1389 match res {
1390 Ok(()) => return Ok(()),
1391 Err(e) => {
1392 let es = e.to_string();
1393 if es.contains("app state key not found") && attempt == 1 {
1394 if !self.initial_app_state_keys_received.load(Ordering::Relaxed) {
1395 debug!(target: "Client/AppState", "App state key missing for {:?}; waiting up to 10s for key share then retrying", name);
1396 if tokio::time::timeout(
1397 Duration::from_secs(10),
1398 self.initial_keys_synced_notifier.notified(),
1399 )
1400 .await
1401 .is_err()
1402 {
1403 warn!(target: "Client/AppState", "Timeout waiting for key share for {:?}; retrying anyway", name);
1404 }
1405 }
1406 continue;
1407 }
1408 if es.contains("database is locked") && attempt < APP_STATE_RETRY_MAX_ATTEMPTS {
1409 let backoff = Duration::from_millis(200 * attempt as u64 + 150);
1410 warn!(target: "Client/AppState", "Attempt {} for {:?} failed due to locked DB; backing off {:?} and retrying", attempt, name, backoff);
1411 tokio::time::sleep(backoff).await;
1412 continue;
1413 }
1414 return Err(e);
1415 }
1416 }
1417 }
1418 }
1419
1420 pub(crate) async fn sync_collections_batched(
1424 &self,
1425 collections: Vec<WAPatchName>,
1426 ) -> anyhow::Result<()> {
1427 if collections.is_empty() {
1428 return Ok(());
1429 }
1430
1431 let pending = {
1433 let mut syncing = self.app_state_syncing.lock().await;
1434 let mut filtered = Vec::with_capacity(collections.len());
1435 for name in collections {
1436 if syncing.insert(name) {
1437 filtered.push(name);
1438 } else {
1439 debug!(target: "Client/AppState", "Skipping {:?} in batch: already in flight", name);
1440 }
1441 }
1442 filtered
1443 };
1444
1445 if pending.is_empty() {
1446 return Ok(());
1447 }
1448
1449 let all_collections: Vec<WAPatchName> = pending.clone();
1451
1452 let result = self.sync_collections_batched_inner(pending).await;
1453
1454 {
1456 let mut syncing = self.app_state_syncing.lock().await;
1457 for name in &all_collections {
1458 syncing.remove(name);
1459 }
1460 }
1461
1462 result
1463 }
1464
1465 async fn sync_collections_batched_inner(
1466 &self,
1467 mut pending: Vec<WAPatchName>,
1468 ) -> anyhow::Result<()> {
1469 use wacore::appstate::patch_decode::CollectionSyncError;
1470 const MAX_ITERATIONS: usize = 5;
1471 let mut iteration = 0;
1472
1473 while !pending.is_empty() && iteration < MAX_ITERATIONS {
1474 iteration += 1;
1475 debug!(
1476 target: "Client/AppState",
1477 "Batched sync iteration {}/{}: {:?}",
1478 iteration, MAX_ITERATIONS, pending
1479 );
1480
1481 let backend = self.persistence_manager.backend();
1482
1483 let mut collection_nodes = Vec::with_capacity(pending.len());
1485 let mut was_snapshot = std::collections::HashSet::new();
1486 for &name in &pending {
1487 let state = backend.get_version(name.as_str()).await?;
1488 let want_snapshot = state.version == 0;
1489 if want_snapshot {
1490 was_snapshot.insert(name);
1491 }
1492 let mut builder = NodeBuilder::new("collection")
1493 .attr("name", name.as_str())
1494 .attr(
1495 "return_snapshot",
1496 if want_snapshot { "true" } else { "false" },
1497 );
1498 if !want_snapshot {
1499 builder = builder.attr("version", state.version.to_string());
1500 }
1501 collection_nodes.push(builder.build());
1502 }
1503
1504 let sync_node = NodeBuilder::new("sync").children(collection_nodes).build();
1505 let iq = crate::request::InfoQuery {
1506 namespace: "w:sync:app:state",
1507 query_type: crate::request::InfoQueryType::Set,
1508 to: server_jid(),
1509 target: None,
1510 id: None,
1511 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
1512 timeout: None,
1513 };
1514
1515 let resp = self.send_iq(iq).await?;
1516
1517 let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
1519 std::collections::HashMap::new();
1520
1521 if let Ok(patch_lists) = wacore::appstate::patch_decode::parse_patch_lists(&resp) {
1522 for pl in &patch_lists {
1523 if let Some(ext) = &pl.snapshot_ref
1525 && let Some(path) = &ext.direct_path
1526 {
1527 match self.download(ext).await {
1528 Ok(bytes) => {
1529 pre_downloaded.insert(path.clone(), bytes);
1530 }
1531 Err(e) => {
1532 warn!(
1533 "Failed to download external snapshot for {:?}: {e}",
1534 pl.name
1535 );
1536 }
1537 }
1538 }
1539
1540 for patch in &pl.patches {
1542 if let Some(ext) = &patch.external_mutations
1543 && let Some(path) = &ext.direct_path
1544 {
1545 match self.download(ext).await {
1546 Ok(bytes) => {
1547 pre_downloaded.insert(path.clone(), bytes);
1548 }
1549 Err(e) => {
1550 let v =
1551 patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
1552 warn!(
1553 "Failed to download external mutations for patch v{}: {e}",
1554 v
1555 );
1556 }
1557 }
1558 }
1559 }
1560 }
1561 }
1562
1563 let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
1564 if let Some(path) = &ext.direct_path {
1565 if let Some(bytes) = pre_downloaded.get(path) {
1566 Ok(bytes.clone())
1567 } else {
1568 Err(anyhow::anyhow!(
1569 "external blob not pre-downloaded: {}",
1570 path
1571 ))
1572 }
1573 } else {
1574 Err(anyhow::anyhow!("external blob has no directPath"))
1575 }
1576 };
1577
1578 let proc = self.get_app_state_processor().await;
1580 let results = proc.decode_multi_patch_list(&resp, &download, true).await?;
1581
1582 let mut needs_refetch = Vec::new();
1583
1584 for (mutations, new_state, list) in results {
1585 let name = list.name;
1586
1587 if let Some(ref err) = list.error {
1589 match err {
1590 CollectionSyncError::Conflict { has_more } => {
1591 warn!(target: "Client/AppState", "Collection {:?} conflict (has_more={}), will refetch", name, has_more);
1592 needs_refetch.push(name);
1593 continue;
1594 }
1595 CollectionSyncError::Fatal { code, text } => {
1596 warn!(target: "Client/AppState", "Collection {:?} fatal error {}: {}", name, code, text);
1597 continue;
1598 }
1599 CollectionSyncError::Retry { code, text } => {
1600 warn!(target: "Client/AppState", "Collection {:?} retryable error {}: {}, will refetch", name, code, text);
1601 needs_refetch.push(name);
1602 continue;
1603 }
1604 }
1605 }
1606
1607 let missing = match proc.get_missing_key_ids(&list).await {
1609 Ok(v) => v,
1610 Err(e) => {
1611 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
1612 Vec::new()
1613 }
1614 };
1615 if !missing.is_empty() {
1616 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
1617 let mut guard = self.app_state_key_requests.lock().await;
1618 let now = std::time::Instant::now();
1619 for key_id in missing {
1620 let hex_id = hex::encode(&key_id);
1621 let should = guard
1622 .get(&hex_id)
1623 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
1624 .unwrap_or(true);
1625 if should {
1626 guard.insert(hex_id, now);
1627 to_request.push(key_id);
1628 }
1629 }
1630 drop(guard);
1631 if !to_request.is_empty() {
1632 self.request_app_state_keys(&to_request).await;
1633 }
1634 }
1635
1636 let full_sync = was_snapshot.contains(&name);
1640 for m in mutations {
1641 self.dispatch_app_state_mutation(&m, full_sync).await;
1642 }
1643
1644 backend
1646 .set_version(name.as_str(), new_state.clone())
1647 .await?;
1648
1649 if list.has_more_patches {
1651 needs_refetch.push(name);
1652 }
1653
1654 debug!(
1655 target: "Client/AppState",
1656 "Batched sync: {:?} done (version={}, has_more={})",
1657 name, new_state.version, list.has_more_patches
1658 );
1659 }
1660
1661 pending = needs_refetch;
1662 }
1663
1664 if !pending.is_empty() {
1665 warn!(
1666 target: "Client/AppState",
1667 "Batched sync: max iterations ({}) reached for {:?}",
1668 MAX_ITERATIONS, pending
1669 );
1670 }
1671
1672 Ok(())
1673 }
1674
1675 pub(crate) async fn process_app_state_sync_task(
1676 &self,
1677 name: WAPatchName,
1678 full_sync: bool,
1679 ) -> anyhow::Result<()> {
1680 let backend = self.persistence_manager.backend();
1681 let mut full_sync = full_sync;
1682
1683 let mut state = backend.get_version(name.as_str()).await?;
1684 if state.version == 0 {
1685 full_sync = true;
1686 }
1687
1688 let mut has_more = true;
1689 let mut want_snapshot = full_sync;
1690 const MAX_PAGINATION_ITERATIONS: u32 = 500;
1693 let mut iteration = 0u32;
1694
1695 while has_more {
1696 iteration += 1;
1697 if iteration > MAX_PAGINATION_ITERATIONS {
1698 warn!(target: "Client/AppState", "App state sync for {:?} exceeded {} iterations, aborting", name, MAX_PAGINATION_ITERATIONS);
1699 break;
1700 }
1701 debug!(target: "Client/AppState", "Fetching app state patch batch: name={:?} want_snapshot={want_snapshot} version={} full_sync={} has_more_previous={}", name, state.version, full_sync, has_more);
1702
1703 let mut collection_builder = NodeBuilder::new("collection")
1704 .attr("name", name.as_str())
1705 .attr(
1706 "return_snapshot",
1707 if want_snapshot { "true" } else { "false" },
1708 );
1709 if !want_snapshot {
1710 collection_builder = collection_builder.attr("version", state.version.to_string());
1711 }
1712 let sync_node = NodeBuilder::new("sync")
1713 .children([collection_builder.build()])
1714 .build();
1715 let iq = crate::request::InfoQuery {
1716 namespace: "w:sync:app:state",
1717 query_type: crate::request::InfoQueryType::Set,
1718 to: server_jid(),
1719 target: None,
1720 id: None,
1721 content: Some(wacore_binary::node::NodeContent::Nodes(vec![sync_node])),
1722 timeout: None,
1723 };
1724
1725 let resp = self.send_iq(iq).await?;
1726 debug!(target: "Client/AppState", "Received IQ response for {:?}; decoding patches", name);
1727
1728 let _decode_start = std::time::Instant::now();
1729
1730 let mut pre_downloaded: std::collections::HashMap<String, Vec<u8>> =
1733 std::collections::HashMap::new();
1734
1735 if let Ok(pl) = wacore::appstate::patch_decode::parse_patch_list(&resp) {
1736 debug!(target: "Client/AppState", "Parsed patch list for {:?}: has_snapshot_ref={} has_more_patches={} patches_count={}",
1737 name, pl.snapshot_ref.is_some(), pl.has_more_patches, pl.patches.len());
1738
1739 if let Some(ext) = &pl.snapshot_ref
1741 && let Some(path) = &ext.direct_path
1742 {
1743 match self.download(ext).await {
1744 Ok(bytes) => {
1745 debug!(target: "Client/AppState", "Downloaded external snapshot ({} bytes)", bytes.len());
1746 pre_downloaded.insert(path.clone(), bytes);
1747 }
1748 Err(e) => {
1749 warn!("Failed to download external snapshot: {e}");
1750 }
1751 }
1752 }
1753
1754 for patch in &pl.patches {
1756 if let Some(ext) = &patch.external_mutations
1757 && let Some(path) = &ext.direct_path
1758 {
1759 let patch_version =
1760 patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
1761 match self.download(ext).await {
1762 Ok(bytes) => {
1763 debug!(target: "Client/AppState", "Downloaded external mutations for patch v{} ({} bytes)", patch_version, bytes.len());
1764 pre_downloaded.insert(path.clone(), bytes);
1765 }
1766 Err(e) => {
1767 warn!(
1768 "Failed to download external mutations for patch v{}: {e}",
1769 patch_version
1770 );
1771 }
1772 }
1773 }
1774 }
1775 }
1776
1777 let download = |ext: &wa::ExternalBlobReference| -> anyhow::Result<Vec<u8>> {
1778 if let Some(path) = &ext.direct_path {
1779 if let Some(bytes) = pre_downloaded.get(path) {
1780 Ok(bytes.clone())
1781 } else {
1782 Err(anyhow::anyhow!(
1783 "external blob not pre-downloaded: {}",
1784 path
1785 ))
1786 }
1787 } else {
1788 Err(anyhow::anyhow!("external blob has no directPath"))
1789 }
1790 };
1791
1792 let proc = self.get_app_state_processor().await;
1793 let (mutations, new_state, list) =
1794 proc.decode_patch_list(&resp, &download, true).await?;
1795 let decode_elapsed = _decode_start.elapsed();
1796 if decode_elapsed.as_millis() > 500 {
1797 debug!(target: "Client/AppState", "Patch decode for {:?} took {:?}", name, decode_elapsed);
1798 }
1799
1800 let missing = match proc.get_missing_key_ids(&list).await {
1801 Ok(v) => v,
1802 Err(e) => {
1803 warn!("Failed to get missing key IDs for {:?}: {}", name, e);
1804 Vec::new()
1805 }
1806 };
1807 if !missing.is_empty() {
1808 let mut to_request: Vec<Vec<u8>> = Vec::with_capacity(missing.len());
1809 let mut guard = self.app_state_key_requests.lock().await;
1810 let now = std::time::Instant::now();
1811 for key_id in missing {
1812 let hex_id = hex::encode(&key_id);
1813 let should = guard
1814 .get(&hex_id)
1815 .map(|t| t.elapsed() > std::time::Duration::from_secs(24 * 3600))
1816 .unwrap_or(true);
1817 if should {
1818 guard.insert(hex_id, now);
1819 to_request.push(key_id);
1820 }
1821 }
1822 drop(guard);
1823 if !to_request.is_empty() {
1824 self.request_app_state_keys(&to_request).await;
1825 }
1826 }
1827
1828 for m in mutations {
1829 debug!(target: "Client/AppState", "Dispatching mutation kind={} index_len={} full_sync={}", m.index.first().map(|s| s.as_str()).unwrap_or(""), m.index.len(), full_sync);
1830 self.dispatch_app_state_mutation(&m, full_sync).await;
1831 }
1832
1833 state = new_state;
1834 has_more = list.has_more_patches;
1835 want_snapshot = false;
1837 debug!(target: "Client/AppState", "After processing batch name={:?} has_more={has_more} new_version={}", name, state.version);
1838 }
1839
1840 backend.set_version(name.as_str(), state.clone()).await?;
1841
1842 debug!(target: "Client/AppState", "Completed and saved app state sync for {:?} (final version={})", name, state.version);
1843 Ok(())
1844 }
1845
1846 async fn request_app_state_keys(&self, raw_key_ids: &[Vec<u8>]) {
1847 if raw_key_ids.is_empty() {
1848 return;
1849 }
1850 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
1851 let own_jid = match device_snapshot.pn.clone() {
1852 Some(j) => j,
1853 None => return,
1854 };
1855 let key_ids: Vec<wa::message::AppStateSyncKeyId> = raw_key_ids
1856 .iter()
1857 .map(|k| wa::message::AppStateSyncKeyId {
1858 key_id: Some(k.clone()),
1859 })
1860 .collect();
1861 let msg = wa::Message {
1862 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
1863 r#type: Some(wa::message::protocol_message::Type::AppStateSyncKeyRequest as i32),
1864 app_state_sync_key_request: Some(wa::message::AppStateSyncKeyRequest { key_ids }),
1865 ..Default::default()
1866 })),
1867 ..Default::default()
1868 };
1869 if let Err(e) = self
1870 .send_message_impl(
1871 own_jid,
1872 &msg,
1873 Some(self.generate_message_id().await),
1874 true,
1875 false,
1876 None,
1877 vec![],
1878 )
1879 .await
1880 {
1881 warn!("Failed to send app state key request: {e}");
1882 }
1883 }
1884
1885 async fn dispatch_app_state_mutation(
1886 &self,
1887 m: &crate::appstate_sync::Mutation,
1888 full_sync: bool,
1889 ) {
1890 use wacore::types::events::{
1891 ArchiveUpdate, ContactUpdate, Event, MarkChatAsReadUpdate, MuteUpdate, PinUpdate,
1892 };
1893 if m.operation != wa::syncd_mutation::SyncdOperation::Set {
1894 return;
1895 }
1896 if m.index.is_empty() {
1897 return;
1898 }
1899 let kind = &m.index[0];
1900 let ts = m
1901 .action_value
1902 .as_ref()
1903 .and_then(|v| v.timestamp)
1904 .unwrap_or(0);
1905 let time = chrono::DateTime::from_timestamp_millis(ts).unwrap_or_else(chrono::Utc::now);
1906 let jid = if m.index.len() > 1 {
1907 m.index[1].parse().unwrap_or_default()
1908 } else {
1909 Jid::default()
1910 };
1911 match kind.as_str() {
1912 "setting_pushName" => {
1913 if let Some(val) = &m.action_value
1914 && let Some(act) = &val.push_name_setting
1915 && let Some(new_name) = &act.name
1916 {
1917 let new_name = new_name.clone();
1918 let bus = self.core.event_bus.clone();
1919
1920 let snapshot = self.persistence_manager.get_device_snapshot().await;
1921 let old = snapshot.push_name.clone();
1922 if old != new_name {
1923 debug!(target: "Client/AppState", "Persisting push name from app state mutation: '{}' (old='{}')", new_name, old);
1924 self.persistence_manager
1925 .process_command(DeviceCommand::SetPushName(new_name.clone()))
1926 .await;
1927 bus.dispatch(&Event::SelfPushNameUpdated(
1928 crate::types::events::SelfPushNameUpdated {
1929 from_server: true,
1930 old_name: old.clone(),
1931 new_name: new_name.clone(),
1932 },
1933 ));
1934
1935 if old.is_empty() && !new_name.is_empty() {
1937 debug!(target: "Client/AppState", "Sending presence after receiving initial pushname from app state sync");
1938 if let Err(e) = self.presence().set_available().await {
1939 warn!(target: "Client/AppState", "Failed to send presence after pushname sync: {e:?}");
1940 }
1941 }
1942 } else {
1943 debug!(target: "Client/AppState", "Push name mutation received but name unchanged: '{}'", new_name);
1944 }
1945 }
1946 }
1947 "mute" => {
1948 if let Some(val) = &m.action_value
1949 && let Some(act) = &val.mute_action
1950 {
1951 self.core.event_bus.dispatch(&Event::MuteUpdate(MuteUpdate {
1952 jid,
1953 timestamp: time,
1954 action: Box::new(*act),
1955 from_full_sync: full_sync,
1956 }));
1957 }
1958 }
1959 "pin" | "pin_v1" => {
1960 if let Some(val) = &m.action_value
1961 && let Some(act) = &val.pin_action
1962 {
1963 self.core.event_bus.dispatch(&Event::PinUpdate(PinUpdate {
1964 jid,
1965 timestamp: time,
1966 action: Box::new(*act),
1967 from_full_sync: full_sync,
1968 }));
1969 }
1970 }
1971 "archive" => {
1972 if let Some(val) = &m.action_value
1973 && let Some(act) = &val.archive_chat_action
1974 {
1975 self.core
1976 .event_bus
1977 .dispatch(&Event::ArchiveUpdate(ArchiveUpdate {
1978 jid,
1979 timestamp: time,
1980 action: Box::new(act.clone()),
1981 from_full_sync: full_sync,
1982 }));
1983 }
1984 }
1985 "contact" => {
1986 if let Some(val) = &m.action_value
1987 && let Some(act) = &val.contact_action
1988 {
1989 self.core
1990 .event_bus
1991 .dispatch(&Event::ContactUpdate(ContactUpdate {
1992 jid,
1993 timestamp: time,
1994 action: Box::new(act.clone()),
1995 from_full_sync: full_sync,
1996 }));
1997 }
1998 }
1999 "mark_chat_as_read" | "markChatAsRead" => {
2000 if let Some(val) = &m.action_value
2001 && let Some(act) = &val.mark_chat_as_read_action
2002 {
2003 self.core.event_bus.dispatch(&Event::MarkChatAsReadUpdate(
2004 MarkChatAsReadUpdate {
2005 jid,
2006 timestamp: time,
2007 action: Box::new(act.clone()),
2008 from_full_sync: full_sync,
2009 },
2010 ));
2011 }
2012 }
2013 _ => {}
2014 }
2015 }
2016
2017 async fn expect_disconnect(&self) {
2018 self.expected_disconnect.store(true, Ordering::Relaxed);
2019 }
2020
2021 pub(crate) async fn handle_stream_error(&self, node: &wacore_binary::node::Node) {
2022 self.is_logged_in.store(false, Ordering::Relaxed);
2023
2024 let mut attrs = node.attrs();
2025 let code = attrs.optional_string("code").unwrap_or("");
2026 let conflict_type = node
2027 .get_optional_child("conflict")
2028 .map(|n| n.attrs().optional_string("type").unwrap_or("").to_string())
2029 .unwrap_or_default();
2030
2031 if !conflict_type.is_empty() {
2032 info!(
2033 "Got stream error indicating client was removed or replaced (conflict={}). Logging out.",
2034 conflict_type
2035 );
2036 self.expect_disconnect().await;
2037 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2038
2039 let event = if conflict_type == "replaced" {
2040 Event::StreamReplaced(crate::types::events::StreamReplaced)
2041 } else {
2042 Event::LoggedOut(crate::types::events::LoggedOut {
2043 on_connect: false,
2044 reason: ConnectFailureReason::LoggedOut,
2045 })
2046 };
2047 self.core.event_bus.dispatch(&event);
2048
2049 let transport_opt = self.transport.lock().await.clone();
2050 if let Some(transport) = transport_opt {
2051 tokio::spawn(async move {
2052 info!("Disconnecting transport after conflict");
2053 transport.disconnect().await;
2054 });
2055 }
2056 } else {
2057 match code {
2058 "515" => {
2059 info!(
2061 "Got 515 stream error, server is closing stream (expected after pairing). Will auto-reconnect."
2062 );
2063 self.expect_disconnect().await;
2064 let transport_opt = self.transport.lock().await.clone();
2067 if let Some(transport) = transport_opt {
2068 tokio::spawn(async move {
2070 info!("Disconnecting transport after 515");
2071 transport.disconnect().await;
2072 });
2073 }
2074 }
2075 "516" => {
2076 info!("Got 516 stream error (device removed). Logging out.");
2077 self.expect_disconnect().await;
2078 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2079 self.core.event_bus.dispatch(&Event::LoggedOut(
2080 crate::types::events::LoggedOut {
2081 on_connect: false,
2082 reason: ConnectFailureReason::LoggedOut,
2083 },
2084 ));
2085
2086 let transport_opt = self.transport.lock().await.clone();
2087 if let Some(transport) = transport_opt {
2088 tokio::spawn(async move {
2089 info!("Disconnecting transport after 516");
2090 transport.disconnect().await;
2091 });
2092 }
2093 }
2094 "503" => {
2095 info!("Got 503 service unavailable, will auto-reconnect.");
2096 }
2097 _ => {
2098 error!("Unknown stream error: {}", DisplayableNode(node));
2099 self.expect_disconnect().await;
2100 self.core.event_bus.dispatch(&Event::StreamError(
2101 crate::types::events::StreamError {
2102 code: code.to_string(),
2103 raw: Some(node.clone()),
2104 },
2105 ));
2106 }
2107 }
2108 }
2109
2110 info!("Notifying shutdown from stream error handler");
2111 self.shutdown_notifier.notify_waiters();
2112 }
2113
2114 pub(crate) async fn handle_connect_failure(&self, node: &wacore_binary::node::Node) {
2115 self.expected_disconnect.store(true, Ordering::Relaxed);
2116 self.shutdown_notifier.notify_waiters();
2117
2118 let mut attrs = node.attrs();
2119 let reason_code = attrs.optional_u64("reason").unwrap_or(0) as i32;
2120 let reason = ConnectFailureReason::from(reason_code);
2121
2122 if reason.should_reconnect() {
2123 self.expected_disconnect.store(false, Ordering::Relaxed);
2124 } else {
2125 self.enable_auto_reconnect.store(false, Ordering::Relaxed);
2126 }
2127
2128 if reason.is_logged_out() {
2129 info!("Got {reason:?} connect failure, logging out.");
2130 self.core
2131 .event_bus
2132 .dispatch(&wacore::types::events::Event::LoggedOut(
2133 crate::types::events::LoggedOut {
2134 on_connect: true,
2135 reason,
2136 },
2137 ));
2138 } else if let ConnectFailureReason::TempBanned = reason {
2139 let ban_code = attrs.optional_u64("code").unwrap_or(0) as i32;
2140 let expire_secs = attrs.optional_u64("expire").unwrap_or(0);
2141 let expire_duration =
2142 chrono::Duration::try_seconds(expire_secs as i64).unwrap_or_default();
2143 warn!("Temporary ban connect failure: {}", DisplayableNode(node));
2144 self.core.event_bus.dispatch(&Event::TemporaryBan(
2145 crate::types::events::TemporaryBan {
2146 code: crate::types::events::TempBanReason::from(ban_code),
2147 expire: expire_duration,
2148 },
2149 ));
2150 } else if let ConnectFailureReason::ClientOutdated = reason {
2151 error!("Client is outdated and was rejected by server.");
2152 self.core
2153 .event_bus
2154 .dispatch(&Event::ClientOutdated(crate::types::events::ClientOutdated));
2155 } else {
2156 warn!("Unknown connect failure: {}", DisplayableNode(node));
2157 self.core.event_bus.dispatch(&Event::ConnectFailure(
2158 crate::types::events::ConnectFailure {
2159 reason,
2160 message: attrs.optional_string("message").unwrap_or("").to_string(),
2161 raw: Some(node.clone()),
2162 },
2163 ));
2164 }
2165 }
2166
2167 pub(crate) async fn handle_iq(self: &Arc<Self>, node: &wacore_binary::node::Node) -> bool {
2168 if let Some("get") = node.attrs.get("type").and_then(|s| s.as_str())
2169 && node.get_optional_child("ping").is_some()
2170 {
2171 info!("Received ping, sending pong.");
2172 let mut parser = node.attrs();
2173 let from_jid = parser.jid("from");
2174 let id = parser.optional_string("id").unwrap_or("").to_string();
2175 let pong = NodeBuilder::new("iq")
2176 .attrs([
2177 ("to", from_jid.to_string()),
2178 ("id", id),
2179 ("type", "result".to_string()),
2180 ])
2181 .build();
2182 if let Err(e) = self.send_node(pong).await {
2183 warn!("Failed to send pong: {e:?}");
2184 }
2185 return true;
2186 }
2187
2188 if pair::handle_iq(self, node).await {
2190 return true;
2191 }
2192
2193 false
2194 }
2195
2196 pub fn is_connected(&self) -> bool {
2197 self.noise_socket
2198 .try_lock()
2199 .is_ok_and(|guard| guard.is_some())
2200 }
2201
2202 pub fn is_logged_in(&self) -> bool {
2203 self.is_logged_in.load(Ordering::Relaxed)
2204 }
2205
2206 pub(crate) fn update_server_time_offset(&self, node: &wacore_binary::node::Node) {
2207 self.unified_session.update_server_time_offset(node);
2208 }
2209
2210 pub(crate) async fn send_unified_session(&self) {
2211 if !self.is_connected() {
2212 debug!(target: "Client/UnifiedSession", "Skipping: not connected");
2213 return;
2214 }
2215
2216 let Some((node, _sequence)) = self.unified_session.prepare_send().await else {
2217 return;
2218 };
2219
2220 if let Err(e) = self.send_node(node).await {
2221 debug!(target: "Client/UnifiedSession", "Send failed: {e}");
2222 self.unified_session.clear_last_sent().await;
2223 }
2224 }
2225
2226 pub async fn wait_for_socket(&self, timeout: std::time::Duration) -> Result<(), anyhow::Error> {
2234 if self.is_connected() {
2236 return Ok(());
2237 }
2238
2239 let notified = self.socket_ready_notifier.notified();
2242 if self.is_connected() {
2243 return Ok(());
2244 }
2245
2246 tokio::time::timeout(timeout, notified)
2247 .await
2248 .map_err(|_| anyhow::anyhow!("Timeout waiting for socket"))
2249 }
2250
2251 pub async fn wait_for_connected(
2259 &self,
2260 timeout: std::time::Duration,
2261 ) -> Result<(), anyhow::Error> {
2262 if self.is_connected() && self.is_logged_in() {
2264 return Ok(());
2265 }
2266
2267 let notified = self.connected_notifier.notified();
2270 if self.is_connected() && self.is_logged_in() {
2271 return Ok(());
2272 }
2273
2274 tokio::time::timeout(timeout, notified)
2275 .await
2276 .map_err(|_| anyhow::anyhow!("Timeout waiting for connection"))
2277 }
2278
2279 pub fn persistence_manager(&self) -> Arc<PersistenceManager> {
2282 self.persistence_manager.clone()
2283 }
2284
2285 pub async fn edit_message(
2286 &self,
2287 to: Jid,
2288 original_id: impl Into<String>,
2289 new_content: wa::Message,
2290 ) -> Result<String, anyhow::Error> {
2291 let original_id = original_id.into();
2292
2293 let participant = if to.is_group() {
2296 Some(
2297 self.get_own_jid_for_group(&to)
2298 .await?
2299 .to_non_ad()
2300 .to_string(),
2301 )
2302 } else {
2303 if self.get_pn().await.is_none() {
2304 return Err(anyhow!("Not logged in"));
2305 }
2306 None
2307 };
2308
2309 let edit_container_message = wa::Message {
2310 edited_message: Some(Box::new(wa::message::FutureProofMessage {
2311 message: Some(Box::new(wa::Message {
2312 protocol_message: Some(Box::new(wa::message::ProtocolMessage {
2313 key: Some(wa::MessageKey {
2314 remote_jid: Some(to.to_string()),
2315 from_me: Some(true),
2316 id: Some(original_id.clone()),
2317 participant,
2318 }),
2319 r#type: Some(wa::message::protocol_message::Type::MessageEdit as i32),
2320 edited_message: Some(Box::new(new_content)),
2321 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
2322 ..Default::default()
2323 })),
2324 ..Default::default()
2325 })),
2326 })),
2327 ..Default::default()
2328 };
2329
2330 self.send_message_impl(
2336 to,
2337 &edit_container_message,
2338 None,
2339 false,
2340 false,
2341 Some(crate::types::message::EditAttribute::MessageEdit),
2342 vec![],
2343 )
2344 .await?;
2345
2346 Ok(original_id)
2347 }
2348
2349 pub async fn send_node(&self, node: Node) -> Result<(), ClientError> {
2350 let noise_socket_arc = { self.noise_socket.lock().await.clone() };
2351 let noise_socket = match noise_socket_arc {
2352 Some(socket) => socket,
2353 None => return Err(ClientError::NotConnected),
2354 };
2355
2356 debug!(target: "Client/Send", "{}", DisplayableNode(&node));
2357
2358 let mut plaintext_buf = {
2359 let mut pool = self.plaintext_buffer_pool.lock().await;
2360 pool.pop().unwrap_or_else(|| Vec::with_capacity(1024))
2361 };
2362 plaintext_buf.clear();
2363
2364 if let Err(e) = wacore_binary::marshal::marshal_to(&node, &mut plaintext_buf) {
2365 error!("Failed to marshal node: {e:?}");
2366 let mut pool = self.plaintext_buffer_pool.lock().await;
2367 if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
2368 pool.push(plaintext_buf);
2369 }
2370 return Err(SocketError::Crypto("Marshal error".to_string()).into());
2371 }
2372
2373 let encrypted_buf = Vec::with_capacity(plaintext_buf.len() + 32);
2375
2376 let (plaintext_buf, _) = match noise_socket
2377 .encrypt_and_send(plaintext_buf, encrypted_buf)
2378 .await
2379 {
2380 Ok(bufs) => bufs,
2381 Err(mut e) => {
2382 let p_buf = std::mem::take(&mut e.plaintext_buf);
2383 let mut pool = self.plaintext_buffer_pool.lock().await;
2384 if p_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
2385 pool.push(p_buf);
2386 }
2387 return Err(e.into());
2388 }
2389 };
2390
2391 let mut pool = self.plaintext_buffer_pool.lock().await;
2392 if plaintext_buf.capacity() <= MAX_POOLED_BUFFER_CAP {
2393 pool.push(plaintext_buf);
2394 }
2395 Ok(())
2396 }
2397
2398 pub(crate) async fn update_push_name_and_notify(self: &Arc<Self>, new_name: String) {
2399 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2400 let old_name = device_snapshot.push_name.clone();
2401
2402 if old_name == new_name {
2403 return;
2404 }
2405
2406 log::debug!("Updating push name from '{}' -> '{}'", old_name, new_name);
2407 self.persistence_manager
2408 .process_command(DeviceCommand::SetPushName(new_name.clone()))
2409 .await;
2410
2411 self.core.event_bus.dispatch(&Event::SelfPushNameUpdated(
2412 crate::types::events::SelfPushNameUpdated {
2413 from_server: true,
2414 old_name,
2415 new_name: new_name.clone(),
2416 },
2417 ));
2418
2419 let client_clone = self.clone();
2420 tokio::spawn(async move {
2421 if let Err(e) = client_clone.presence().set_available().await {
2422 log::warn!("Failed to send presence after push name update: {:?}", e);
2423 } else {
2424 log::debug!("Sent presence after push name update.");
2425 }
2426 });
2427 }
2428
2429 pub async fn get_push_name(&self) -> String {
2430 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2431 device_snapshot.push_name.clone()
2432 }
2433
2434 pub async fn get_pn(&self) -> Option<Jid> {
2435 let snapshot = self.persistence_manager.get_device_snapshot().await;
2436 snapshot.pn.clone()
2437 }
2438
2439 pub async fn get_lid(&self) -> Option<Jid> {
2440 let snapshot = self.persistence_manager.get_device_snapshot().await;
2441 snapshot.lid.clone()
2442 }
2443
2444 pub(crate) async fn get_own_jid_for_group(
2449 &self,
2450 group_jid: &Jid,
2451 ) -> Result<Jid, anyhow::Error> {
2452 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2453 let own_pn = device_snapshot
2454 .pn
2455 .clone()
2456 .ok_or_else(|| anyhow!("Not logged in"))?;
2457
2458 let addressing_mode = self
2459 .groups()
2460 .query_info(group_jid)
2461 .await
2462 .map(|info| info.addressing_mode)
2463 .unwrap_or(crate::types::message::AddressingMode::Pn);
2464
2465 Ok(match addressing_mode {
2466 crate::types::message::AddressingMode::Lid => {
2467 device_snapshot.lid.clone().unwrap_or(own_pn)
2468 }
2469 crate::types::message::AddressingMode::Pn => own_pn,
2470 })
2471 }
2472
2473 pub(crate) async fn make_stanza_key(&self, chat: Jid, id: String) -> StanzaKey {
2475 let chat = self.resolve_encryption_jid(&chat).await;
2477
2478 StanzaKey { chat, id }
2479 }
2480
2481 pub(crate) async fn send_protocol_receipt(
2484 &self,
2485 id: String,
2486 receipt_type: crate::types::presence::ReceiptType,
2487 ) {
2488 if id.is_empty() {
2489 return;
2490 }
2491 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
2492 if let Some(own_jid) = &device_snapshot.pn {
2493 let type_str = match receipt_type {
2494 crate::types::presence::ReceiptType::HistorySync => "hist_sync",
2495 crate::types::presence::ReceiptType::Read => "read",
2496 crate::types::presence::ReceiptType::ReadSelf => "read-self",
2497 crate::types::presence::ReceiptType::Delivered => "delivery",
2498 crate::types::presence::ReceiptType::Played => "played",
2499 crate::types::presence::ReceiptType::PlayedSelf => "played-self",
2500 crate::types::presence::ReceiptType::Inactive => "inactive",
2501 crate::types::presence::ReceiptType::PeerMsg => "peer_msg",
2502 crate::types::presence::ReceiptType::Sender => "sender",
2503 crate::types::presence::ReceiptType::ServerError => "server-error",
2504 crate::types::presence::ReceiptType::Retry => "retry",
2505 crate::types::presence::ReceiptType::Other(ref s) => s.as_str(),
2506 };
2507
2508 let node = NodeBuilder::new("receipt")
2509 .attrs([
2510 ("id", id),
2511 ("type", type_str.to_string()),
2512 ("to", own_jid.to_non_ad().to_string()),
2513 ])
2514 .build();
2515
2516 if let Err(e) = self.send_node(node).await {
2517 warn!(
2518 "Failed to send protocol receipt of type {:?} for message ID {}: {:?}",
2519 receipt_type, self.unique_id, e
2520 );
2521 }
2522 }
2523 }
2524}
2525
2526#[cfg(test)]
2527mod tests {
2528 use super::*;
2529 use crate::lid_pn_cache::LearningSource;
2530 use crate::test_utils::MockHttpClient;
2531 use tokio::sync::oneshot;
2532 use wacore_binary::jid::SERVER_JID;
2533
2534 #[tokio::test]
2535 async fn test_ack_behavior_for_incoming_stanzas() {
2536 let backend = crate::test_utils::create_test_backend().await;
2537 let pm = Arc::new(
2538 PersistenceManager::new(backend)
2539 .await
2540 .expect("persistence manager should initialize"),
2541 );
2542 let (client, _rx) = Client::new(
2543 pm,
2544 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2545 Arc::new(MockHttpClient),
2546 None,
2547 )
2548 .await;
2549
2550 use wacore_binary::node::{Attrs, Node, NodeContent};
2554
2555 let mut receipt_attrs = Attrs::new();
2556 receipt_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
2557 receipt_attrs.insert("id".to_string(), "RCPT-1".to_string());
2558 let receipt_node = Node::new(
2559 "receipt",
2560 receipt_attrs,
2561 Some(NodeContent::String("test".to_string())),
2562 );
2563
2564 let mut notification_attrs = Attrs::new();
2565 notification_attrs.insert("from".to_string(), "@s.whatsapp.net".to_string());
2566 notification_attrs.insert("id".to_string(), "NOTIF-1".to_string());
2567 let notification_node = Node::new(
2568 "notification",
2569 notification_attrs,
2570 Some(NodeContent::String("test".to_string())),
2571 );
2572
2573 assert!(
2574 client.should_ack(&receipt_node),
2575 "should_ack must still return TRUE for <receipt> stanzas."
2576 );
2577 assert!(
2578 client.should_ack(¬ification_node),
2579 "should_ack must still return TRUE for <notification> stanzas."
2580 );
2581
2582 info!(
2583 "✅ test_ack_behavior_for_incoming_stanzas passed: Client correctly differentiates which stanzas to acknowledge."
2584 );
2585 }
2586
2587 #[tokio::test]
2588 async fn test_plaintext_buffer_pool_reuses_buffers() {
2589 let backend = crate::test_utils::create_test_backend().await;
2590 let pm = Arc::new(
2591 PersistenceManager::new(backend)
2592 .await
2593 .expect("persistence manager should initialize"),
2594 );
2595 let (client, _rx) = Client::new(
2596 pm,
2597 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2598 Arc::new(MockHttpClient),
2599 None,
2600 )
2601 .await;
2602
2603 let initial_pool_size = {
2605 let pool = client.plaintext_buffer_pool.lock().await;
2606 pool.len()
2607 };
2608
2609 let test_node = NodeBuilder::new("test").attr("id", "test-123").build();
2611
2612 let _ = client.send_node(test_node).await;
2613
2614 let final_pool_size = {
2617 let pool = client.plaintext_buffer_pool.lock().await;
2618 pool.len()
2619 };
2620
2621 assert!(
2622 final_pool_size >= initial_pool_size,
2623 "Plaintext buffer pool should not shrink after send operations"
2624 );
2625
2626 info!(
2627 "✅ test_plaintext_buffer_pool_reuses_buffers passed: Buffer pool properly manages plaintext buffers"
2628 );
2629 }
2630
2631 #[tokio::test]
2632 async fn test_ack_waiter_resolves() {
2633 let backend = crate::test_utils::create_test_backend().await;
2634 let pm = Arc::new(
2635 PersistenceManager::new(backend)
2636 .await
2637 .expect("persistence manager should initialize"),
2638 );
2639 let (client, _rx) = Client::new(
2640 pm,
2641 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2642 Arc::new(MockHttpClient),
2643 None,
2644 )
2645 .await;
2646
2647 let test_id = "ack-test-123".to_string();
2649 let (tx, rx) = oneshot::channel();
2650 client
2651 .response_waiters
2652 .lock()
2653 .await
2654 .insert(test_id.clone(), tx);
2655 assert!(
2656 client.response_waiters.lock().await.contains_key(&test_id),
2657 "Waiter should be inserted before handling ack"
2658 );
2659
2660 let ack_node = NodeBuilder::new("ack")
2662 .attr("id", test_id.clone())
2663 .attr("from", SERVER_JID)
2664 .build();
2665
2666 let handled = client.handle_ack_response(ack_node).await;
2668 assert!(
2669 handled,
2670 "handle_ack_response should return true when waiter exists"
2671 );
2672
2673 match tokio::time::timeout(Duration::from_secs(1), rx).await {
2675 Ok(Ok(response_node)) => {
2676 assert_eq!(
2677 response_node.attrs.get("id").and_then(|v| v.as_str()),
2678 Some(test_id.as_str()),
2679 "Response node should have correct ID"
2680 );
2681 }
2682 Ok(Err(_)) => panic!("Receiver was dropped without being sent a value"),
2683 Err(_) => panic!("Test timed out waiting for ack response"),
2684 }
2685
2686 assert!(
2688 !client.response_waiters.lock().await.contains_key(&test_id),
2689 "Waiter should be removed after handling"
2690 );
2691
2692 info!(
2693 "✅ test_ack_waiter_resolves passed: ACK response correctly resolves pending waiters"
2694 );
2695 }
2696
2697 #[tokio::test]
2698 async fn test_ack_without_matching_waiter() {
2699 let backend = crate::test_utils::create_test_backend().await;
2700 let pm = Arc::new(
2701 PersistenceManager::new(backend)
2702 .await
2703 .expect("persistence manager should initialize"),
2704 );
2705 let (client, _rx) = Client::new(
2706 pm,
2707 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2708 Arc::new(MockHttpClient),
2709 None,
2710 )
2711 .await;
2712
2713 let ack_node = NodeBuilder::new("ack")
2715 .attr("id", "non-existent-id")
2716 .attr("from", SERVER_JID)
2717 .build();
2718
2719 let handled = client.handle_ack_response(ack_node).await;
2721 assert!(
2722 !handled,
2723 "handle_ack_response should return false when no waiter exists"
2724 );
2725
2726 info!(
2727 "✅ test_ack_without_matching_waiter passed: ACK without matching waiter handled gracefully"
2728 );
2729 }
2730
2731 #[tokio::test]
2737 async fn test_lid_pn_cache_basic_operations() {
2738 let backend = Arc::new(
2739 crate::store::SqliteStore::new("file:memdb_lid_cache_basic?mode=memory&cache=shared")
2740 .await
2741 .expect("Failed to create in-memory backend for test"),
2742 );
2743 let pm = Arc::new(
2744 PersistenceManager::new(backend)
2745 .await
2746 .expect("persistence manager should initialize"),
2747 );
2748 let (client, _rx) = Client::new(
2749 pm,
2750 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2751 Arc::new(MockHttpClient),
2752 None,
2753 )
2754 .await;
2755
2756 let phone = "559980000001";
2758 let lid = "100000012345678";
2759
2760 assert!(
2761 client.lid_pn_cache.get_current_lid(phone).await.is_none(),
2762 "Cache should be empty initially"
2763 );
2764
2765 client
2767 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2768 .await
2769 .expect("Failed to persist LID-PN mapping in tests");
2770
2771 let cached_lid = client.lid_pn_cache.get_current_lid(phone).await;
2773 assert!(cached_lid.is_some(), "Cache should contain the mapping");
2774 assert_eq!(
2775 cached_lid.expect("cache should have LID"),
2776 lid,
2777 "Cached LID should match what we inserted"
2778 );
2779
2780 let cached_phone = client.lid_pn_cache.get_phone_number(lid).await;
2782 assert!(cached_phone.is_some(), "Reverse lookup should work");
2783 assert_eq!(
2784 cached_phone.expect("reverse lookup should return phone"),
2785 phone,
2786 "Cached phone should match what we inserted"
2787 );
2788
2789 assert!(
2791 client
2792 .lid_pn_cache
2793 .get_current_lid("559980000002")
2794 .await
2795 .is_none(),
2796 "Different phone number should not have a mapping"
2797 );
2798
2799 info!("✅ test_lid_pn_cache_basic_operations passed: LID-PN cache works correctly");
2800 }
2801
2802 #[tokio::test]
2806 async fn test_lid_pn_cache_timestamp_resolution() {
2807 let backend = Arc::new(
2808 crate::store::SqliteStore::new(
2809 "file:memdb_lid_cache_timestamp?mode=memory&cache=shared",
2810 )
2811 .await
2812 .expect("Failed to create in-memory backend for test"),
2813 );
2814 let pm = Arc::new(
2815 PersistenceManager::new(backend)
2816 .await
2817 .expect("persistence manager should initialize"),
2818 );
2819 let (client, _rx) = Client::new(
2820 pm,
2821 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2822 Arc::new(MockHttpClient),
2823 None,
2824 )
2825 .await;
2826
2827 let phone = "559980000001";
2828 let lid_old = "100000012345678";
2829 let lid_new = "100000087654321";
2830
2831 client
2833 .add_lid_pn_mapping(lid_old, phone, LearningSource::Usync)
2834 .await
2835 .expect("Failed to persist LID-PN mapping in tests");
2836
2837 assert_eq!(
2838 client
2839 .lid_pn_cache
2840 .get_current_lid(phone)
2841 .await
2842 .expect("cache should have LID"),
2843 lid_old,
2844 "Initial LID should be stored"
2845 );
2846
2847 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2849
2850 client
2852 .add_lid_pn_mapping(lid_new, phone, LearningSource::PeerPnMessage)
2853 .await
2854 .expect("Failed to persist LID-PN mapping in tests");
2855
2856 assert_eq!(
2857 client
2858 .lid_pn_cache
2859 .get_current_lid(phone)
2860 .await
2861 .expect("cache should have newer LID"),
2862 lid_new,
2863 "Newer LID should be returned for phone lookup"
2864 );
2865
2866 assert_eq!(
2868 client
2869 .lid_pn_cache
2870 .get_phone_number(lid_old)
2871 .await
2872 .expect("reverse lookup should return phone"),
2873 phone,
2874 "Old LID should still map to phone"
2875 );
2876 assert_eq!(
2877 client
2878 .lid_pn_cache
2879 .get_phone_number(lid_new)
2880 .await
2881 .expect("reverse lookup should return phone"),
2882 phone,
2883 "New LID should also map to phone"
2884 );
2885
2886 info!(
2887 "✅ test_lid_pn_cache_timestamp_resolution passed: Timestamp-based resolution works correctly"
2888 );
2889 }
2890
2891 #[tokio::test]
2895 async fn test_get_lid_for_phone_via_send_context_resolver() {
2896 use wacore::client::context::SendContextResolver;
2897
2898 let backend = Arc::new(
2899 crate::store::SqliteStore::new("file:memdb_get_lid_for_phone?mode=memory&cache=shared")
2900 .await
2901 .expect("Failed to create in-memory backend for test"),
2902 );
2903 let pm = Arc::new(
2904 PersistenceManager::new(backend)
2905 .await
2906 .expect("persistence manager should initialize"),
2907 );
2908 let (client, _rx) = Client::new(
2909 pm,
2910 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2911 Arc::new(MockHttpClient),
2912 None,
2913 )
2914 .await;
2915
2916 let phone = "559980000001";
2917 let lid = "100000012345678";
2918
2919 assert!(
2921 client.get_lid_for_phone(phone).await.is_none(),
2922 "get_lid_for_phone should return None before caching"
2923 );
2924
2925 client
2927 .add_lid_pn_mapping(lid, phone, LearningSource::Usync)
2928 .await
2929 .expect("Failed to persist LID-PN mapping in tests");
2930
2931 let result = client.get_lid_for_phone(phone).await;
2933 assert!(
2934 result.is_some(),
2935 "get_lid_for_phone should return Some after caching"
2936 );
2937 assert_eq!(
2938 result.expect("get_lid_for_phone should return Some"),
2939 lid,
2940 "get_lid_for_phone should return the cached LID"
2941 );
2942
2943 info!(
2944 "✅ test_get_lid_for_phone_via_send_context_resolver passed: SendContextResolver correctly returns cached LID"
2945 );
2946 }
2947
2948 #[tokio::test]
2950 async fn test_wait_for_offline_delivery_end_returns_immediately_when_flag_set() {
2951 let backend = Arc::new(
2952 crate::store::SqliteStore::new(
2953 "file:memdb_offline_sync_flag_set?mode=memory&cache=shared",
2954 )
2955 .await
2956 .expect("Failed to create in-memory backend for test"),
2957 );
2958 let pm = Arc::new(
2959 PersistenceManager::new(backend)
2960 .await
2961 .expect("persistence manager should initialize"),
2962 );
2963 let (client, _rx) = Client::new(
2964 pm,
2965 Arc::new(crate::transport::mock::MockTransportFactory::new()),
2966 Arc::new(MockHttpClient),
2967 None,
2968 )
2969 .await;
2970
2971 client
2973 .offline_sync_completed
2974 .store(true, std::sync::atomic::Ordering::Relaxed);
2975
2976 let start = std::time::Instant::now();
2978 client.wait_for_offline_delivery_end().await;
2979 let elapsed = start.elapsed();
2980
2981 assert!(
2983 elapsed.as_millis() < 100,
2984 "wait_for_offline_delivery_end should return immediately when flag is set, took {:?}",
2985 elapsed
2986 );
2987
2988 info!("✅ test_wait_for_offline_delivery_end_returns_immediately_when_flag_set passed");
2989 }
2990
2991 #[tokio::test]
2994 async fn test_wait_for_offline_delivery_end_times_out_when_flag_not_set() {
2995 let backend = Arc::new(
2996 crate::store::SqliteStore::new(
2997 "file:memdb_offline_sync_timeout?mode=memory&cache=shared",
2998 )
2999 .await
3000 .expect("Failed to create in-memory backend for test"),
3001 );
3002 let pm = Arc::new(
3003 PersistenceManager::new(backend)
3004 .await
3005 .expect("persistence manager should initialize"),
3006 );
3007 let (client, _rx) = Client::new(
3008 pm,
3009 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3010 Arc::new(MockHttpClient),
3011 None,
3012 )
3013 .await;
3014
3015 let start = std::time::Instant::now();
3018
3019 let result = tokio::time::timeout(
3021 std::time::Duration::from_millis(100),
3022 client.wait_for_offline_delivery_end(),
3023 )
3024 .await;
3025
3026 let elapsed = start.elapsed();
3027
3028 assert!(
3031 result.is_err(),
3032 "wait_for_offline_delivery_end should not return immediately when flag is false"
3033 );
3034 assert!(
3035 elapsed.as_millis() >= 95, "Should have waited for the timeout duration, took {:?}",
3037 elapsed
3038 );
3039
3040 info!("✅ test_wait_for_offline_delivery_end_times_out_when_flag_not_set passed");
3041 }
3042
3043 #[tokio::test]
3045 async fn test_wait_for_offline_delivery_end_returns_on_notify() {
3046 let backend = Arc::new(
3047 crate::store::SqliteStore::new("file:memdb_offline_notify?mode=memory&cache=shared")
3048 .await
3049 .expect("Failed to create in-memory backend for test"),
3050 );
3051 let pm = Arc::new(
3052 PersistenceManager::new(backend)
3053 .await
3054 .expect("persistence manager should initialize"),
3055 );
3056 let (client, _rx) = Client::new(
3057 pm,
3058 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3059 Arc::new(MockHttpClient),
3060 None,
3061 )
3062 .await;
3063
3064 let client_clone = client.clone();
3065
3066 tokio::spawn(async move {
3068 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3069 client_clone.offline_sync_notifier.notify_waiters();
3070 });
3071
3072 let start = std::time::Instant::now();
3073 client.wait_for_offline_delivery_end().await;
3074 let elapsed = start.elapsed();
3075
3076 assert!(
3078 elapsed.as_millis() < 200,
3079 "wait_for_offline_delivery_end should return when notified, took {:?}",
3080 elapsed
3081 );
3082 assert!(
3083 elapsed.as_millis() >= 45, "Should have waited for the notify, only took {:?}",
3085 elapsed
3086 );
3087
3088 info!("✅ test_wait_for_offline_delivery_end_returns_on_notify passed");
3089 }
3090
3091 #[tokio::test]
3093 async fn test_offline_sync_flag_initially_false() {
3094 let backend = Arc::new(
3095 crate::store::SqliteStore::new(
3096 "file:memdb_offline_flag_initial?mode=memory&cache=shared",
3097 )
3098 .await
3099 .expect("Failed to create in-memory backend for test"),
3100 );
3101 let pm = Arc::new(
3102 PersistenceManager::new(backend)
3103 .await
3104 .expect("persistence manager should initialize"),
3105 );
3106 let (client, _rx) = Client::new(
3107 pm,
3108 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3109 Arc::new(MockHttpClient),
3110 None,
3111 )
3112 .await;
3113
3114 assert!(
3116 !client
3117 .offline_sync_completed
3118 .load(std::sync::atomic::Ordering::Relaxed),
3119 "offline_sync_completed should be false when Client is first created"
3120 );
3121
3122 info!("✅ test_offline_sync_flag_initially_false passed");
3123 }
3124
3125 #[tokio::test]
3130 async fn test_offline_sync_lifecycle() {
3131 use std::sync::atomic::Ordering;
3132
3133 let backend = Arc::new(
3134 crate::store::SqliteStore::new("file:memdb_offline_lifecycle?mode=memory&cache=shared")
3135 .await
3136 .expect("Failed to create in-memory backend for test"),
3137 );
3138 let pm = Arc::new(
3139 PersistenceManager::new(backend)
3140 .await
3141 .expect("persistence manager should initialize"),
3142 );
3143 let (client, _rx) = Client::new(
3144 pm,
3145 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3146 Arc::new(MockHttpClient),
3147 None,
3148 )
3149 .await;
3150
3151 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3153
3154 let client_waiter = client.clone();
3156 let waiter_handle = tokio::spawn(async move {
3157 client_waiter.wait_for_offline_delivery_end().await;
3158 true });
3160
3161 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3163
3164 assert!(
3166 !waiter_handle.is_finished(),
3167 "Waiter should still be waiting"
3168 );
3169
3170 client.offline_sync_completed.store(true, Ordering::Relaxed);
3172 client.offline_sync_notifier.notify_waiters();
3173
3174 let result = tokio::time::timeout(std::time::Duration::from_millis(100), waiter_handle)
3176 .await
3177 .expect("Waiter should complete after notify")
3178 .expect("Waiter task should not panic");
3179
3180 assert!(result, "Waiter should have completed successfully");
3181 assert!(client.offline_sync_completed.load(Ordering::Relaxed));
3182
3183 info!("✅ test_offline_sync_lifecycle passed");
3184 }
3185
3186 #[tokio::test]
3189 async fn test_establish_primary_phone_session_fails_without_pn() {
3190 let backend = Arc::new(
3191 crate::store::SqliteStore::new("file:memdb_no_pn?mode=memory&cache=shared")
3192 .await
3193 .expect("Failed to create in-memory backend for test"),
3194 );
3195 let pm = Arc::new(
3196 PersistenceManager::new(backend)
3197 .await
3198 .expect("persistence manager should initialize"),
3199 );
3200 let (client, _rx) = Client::new(
3201 pm,
3202 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3203 Arc::new(MockHttpClient),
3204 None,
3205 )
3206 .await;
3207
3208 let result = client.establish_primary_phone_session_immediate().await;
3210
3211 assert!(
3212 result.is_err(),
3213 "establish_primary_phone_session_immediate should fail when no PN is set"
3214 );
3215
3216 let error_msg = result.unwrap_err().to_string();
3217 assert!(
3218 error_msg.contains("Not logged in"),
3219 "Error should mention 'Not logged in', got: {}",
3220 error_msg
3221 );
3222
3223 info!("✅ test_establish_primary_phone_session_fails_without_pn passed");
3224 }
3225
3226 #[tokio::test]
3230 async fn test_ensure_e2e_sessions_waits_for_offline_sync() {
3231 use std::sync::atomic::Ordering;
3232 use wacore_binary::jid::Jid;
3233
3234 let backend = Arc::new(
3235 crate::store::SqliteStore::new("file:memdb_ensure_e2e_waits?mode=memory&cache=shared")
3236 .await
3237 .expect("Failed to create in-memory backend for test"),
3238 );
3239 let pm = Arc::new(
3240 PersistenceManager::new(backend)
3241 .await
3242 .expect("persistence manager should initialize"),
3243 );
3244 let (client, _rx) = Client::new(
3245 pm,
3246 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3247 Arc::new(MockHttpClient),
3248 None,
3249 )
3250 .await;
3251
3252 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3254
3255 let client_clone = client.clone();
3258 let ensure_handle = tokio::spawn(async move {
3259 client_clone.ensure_e2e_sessions(vec![]).await
3262 });
3263
3264 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3266 assert!(
3267 ensure_handle.is_finished(),
3268 "ensure_e2e_sessions should return immediately for empty JID list"
3269 );
3270
3271 let client_clone = client.clone();
3273 let test_jid = Jid::pn("559999999999");
3274 let ensure_handle = tokio::spawn(async move {
3275 let start = std::time::Instant::now();
3277 let _ = client_clone.ensure_e2e_sessions(vec![test_jid]).await;
3278 start.elapsed()
3279 });
3280
3281 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3283
3284 assert!(
3286 !ensure_handle.is_finished(),
3287 "ensure_e2e_sessions should be waiting for offline sync"
3288 );
3289
3290 client.offline_sync_completed.store(true, Ordering::Relaxed);
3292 client.offline_sync_notifier.notify_waiters();
3293
3294 let result = tokio::time::timeout(std::time::Duration::from_secs(2), ensure_handle).await;
3296
3297 assert!(
3298 result.is_ok(),
3299 "ensure_e2e_sessions should complete after offline sync"
3300 );
3301
3302 info!("✅ test_ensure_e2e_sessions_waits_for_offline_sync passed");
3303 }
3304
3305 #[tokio::test]
3314 async fn test_immediate_session_does_not_wait_for_offline_sync() {
3315 use std::sync::atomic::Ordering;
3316 use wacore_binary::jid::Jid;
3317
3318 let backend = Arc::new(
3319 crate::store::SqliteStore::new("file:memdb_immediate_no_wait?mode=memory&cache=shared")
3320 .await
3321 .expect("Failed to create in-memory backend for test"),
3322 );
3323 let pm = Arc::new(
3324 PersistenceManager::new(backend.clone())
3325 .await
3326 .expect("persistence manager should initialize"),
3327 );
3328
3329 pm.modify_device(|device| {
3331 device.pn = Some(Jid::pn("559999999999"));
3332 })
3333 .await;
3334
3335 let (client, _rx) = Client::new(
3336 pm,
3337 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3338 Arc::new(MockHttpClient),
3339 None,
3340 )
3341 .await;
3342
3343 assert!(!client.offline_sync_completed.load(Ordering::Relaxed));
3345
3346 let start = std::time::Instant::now();
3349
3350 let result = tokio::time::timeout(
3353 std::time::Duration::from_millis(500),
3354 client.establish_primary_phone_session_immediate(),
3355 )
3356 .await;
3357
3358 let elapsed = start.elapsed();
3359
3360 assert!(
3362 result.is_ok(),
3363 "establish_primary_phone_session_immediate should not wait for offline sync, timed out"
3364 );
3365
3366 assert!(
3368 elapsed.as_millis() < 500,
3369 "establish_primary_phone_session_immediate should not wait, took {:?}",
3370 elapsed
3371 );
3372
3373 info!(
3376 "establish_primary_phone_session_immediate completed in {:?} (result: {:?})",
3377 elapsed,
3378 result.unwrap().is_ok()
3379 );
3380
3381 info!("✅ test_immediate_session_does_not_wait_for_offline_sync passed");
3382 }
3383
3384 #[tokio::test]
3392 async fn test_establish_session_skips_when_exists() {
3393 use wacore::libsignal::protocol::SessionRecord;
3394 use wacore::libsignal::store::SessionStore;
3395 use wacore::types::jid::JidExt;
3396 use wacore_binary::jid::Jid;
3397
3398 let backend = Arc::new(
3399 crate::store::SqliteStore::new("file:memdb_skip_existing?mode=memory&cache=shared")
3400 .await
3401 .expect("Failed to create in-memory backend for test"),
3402 );
3403 let pm = Arc::new(
3404 PersistenceManager::new(backend.clone())
3405 .await
3406 .expect("persistence manager should initialize"),
3407 );
3408
3409 let own_pn = Jid::pn("559999999999");
3411 pm.modify_device(|device| {
3412 device.pn = Some(own_pn.clone());
3413 })
3414 .await;
3415
3416 let primary_phone_jid = own_pn.with_device(0);
3418 let signal_addr = primary_phone_jid.to_protocol_address();
3419
3420 let dummy_session = SessionRecord::new_fresh();
3422 {
3423 let device_arc = pm.get_device_arc().await;
3424 let device = device_arc.read().await;
3425 device
3426 .store_session(&signal_addr, &dummy_session)
3427 .await
3428 .expect("Failed to store test session");
3429
3430 let exists = device
3432 .contains_session(&signal_addr)
3433 .await
3434 .expect("Failed to check session");
3435 assert!(exists, "Session should exist after store");
3436 }
3437
3438 let (client, _rx) = Client::new(
3439 pm.clone(),
3440 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3441 Arc::new(MockHttpClient),
3442 None,
3443 )
3444 .await;
3445
3446 let result = client.establish_primary_phone_session_immediate().await;
3449
3450 assert!(
3451 result.is_ok(),
3452 "establish_primary_phone_session_immediate should succeed when session exists"
3453 );
3454
3455 {
3458 let device_arc = pm.get_device_arc().await;
3459 let device = device_arc.read().await;
3460 let exists = device
3461 .contains_session(&signal_addr)
3462 .await
3463 .expect("Failed to check session");
3464 assert!(exists, "Session should still exist after the call");
3465 }
3466
3467 info!("✅ test_establish_session_skips_when_exists passed");
3468 }
3469
3470 #[test]
3473 fn test_mac_failure_prevention_flow_documentation() {
3474 fn should_establish_session(
3476 check_result: Result<bool, &'static str>,
3477 ) -> Result<bool, String> {
3478 match check_result {
3479 Ok(true) => Ok(false), Ok(false) => Ok(true), Err(e) => Err(format!("Cannot verify session: {}", e)), }
3483 }
3484
3485 let result = should_establish_session(Ok(true));
3487 assert_eq!(result, Ok(false), "Should skip when session exists");
3488
3489 let result = should_establish_session(Ok(false));
3491 assert_eq!(result, Ok(true), "Should establish when no session");
3492
3493 let result = should_establish_session(Err("database error"));
3495 assert!(result.is_err(), "Should fail when check fails");
3496
3497 info!("✅ test_mac_failure_prevention_flow_documentation passed");
3498 }
3499
3500 #[test]
3501 fn test_unified_session_id_calculation() {
3502 const DAY_MS: i64 = 24 * 60 * 60 * 1000;
3506 const WEEK_MS: i64 = 7 * DAY_MS;
3507 const OFFSET_MS: i64 = 3 * DAY_MS;
3508
3509 fn calculate_session_id(now_ms: i64, server_offset_ms: i64) -> i64 {
3511 let adjusted_now = now_ms + server_offset_ms;
3512 (adjusted_now + OFFSET_MS) % WEEK_MS
3513 }
3514
3515 let now_ms = 1706000000000_i64; let id = calculate_session_id(now_ms, 0);
3518 assert!(
3519 (0..WEEK_MS).contains(&id),
3520 "Session ID should be in [0, WEEK_MS)"
3521 );
3522
3523 let id_with_positive_offset = calculate_session_id(now_ms, 5000);
3525 assert!(
3526 (0..WEEK_MS).contains(&id_with_positive_offset),
3527 "Session ID should be in [0, WEEK_MS)"
3528 );
3529 let id_with_negative_offset = calculate_session_id(now_ms, -5000);
3534 assert!(
3535 (0..WEEK_MS).contains(&id_with_negative_offset),
3536 "Session ID should be in [0, WEEK_MS)"
3537 );
3538
3539 let wrap_test_now = WEEK_MS - OFFSET_MS + 1000; let wrapped_id = calculate_session_id(wrap_test_now, 0);
3543 assert_eq!(wrapped_id, 1000, "Should wrap around correctly");
3544
3545 let boundary_now = WEEK_MS - OFFSET_MS;
3547 let boundary_id = calculate_session_id(boundary_now, 0);
3548 assert_eq!(boundary_id, 0, "At exact boundary should be 0");
3549 }
3550
3551 #[tokio::test]
3552 async fn test_server_time_offset_extraction() {
3553 use wacore_binary::builder::NodeBuilder;
3554
3555 let backend = crate::test_utils::create_test_backend().await;
3556 let pm = Arc::new(
3557 PersistenceManager::new(backend)
3558 .await
3559 .expect("persistence manager should initialize"),
3560 );
3561 let (client, _rx) = Client::new(
3562 pm,
3563 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3564 Arc::new(MockHttpClient),
3565 None,
3566 )
3567 .await;
3568
3569 assert_eq!(
3571 client.unified_session.server_time_offset_ms(),
3572 0,
3573 "Initial offset should be 0"
3574 );
3575
3576 let server_time = chrono::Utc::now().timestamp() + 10; let node = NodeBuilder::new("success")
3579 .attr("t", server_time.to_string())
3580 .build();
3581
3582 client.update_server_time_offset(&node);
3584
3585 let offset = client.unified_session.server_time_offset_ms();
3588 assert!(
3589 (offset - 10000).abs() < 1000, "Offset should be approximately 10000ms, got {}",
3591 offset
3592 );
3593
3594 let node_no_t = NodeBuilder::new("success").build();
3596 client.update_server_time_offset(&node_no_t);
3597 let offset_after = client.unified_session.server_time_offset_ms();
3598 assert!(
3599 (offset_after - offset).abs() < 100, "Offset should not change when 't' is missing"
3601 );
3602
3603 let node_invalid = NodeBuilder::new("success")
3605 .attr("t", "not_a_number")
3606 .build();
3607 client.update_server_time_offset(&node_invalid);
3608 let offset_after_invalid = client.unified_session.server_time_offset_ms();
3609 assert!(
3610 (offset_after_invalid - offset).abs() < 100,
3611 "Offset should not change when 't' is invalid"
3612 );
3613
3614 let node_zero = NodeBuilder::new("success").attr("t", "0").build();
3616 client.update_server_time_offset(&node_zero);
3617 let offset_after_zero = client.unified_session.server_time_offset_ms();
3618 assert!(
3619 (offset_after_zero - offset).abs() < 100,
3620 "Offset should not change when 't' is 0"
3621 );
3622
3623 info!("✅ test_server_time_offset_extraction passed");
3624 }
3625
3626 #[tokio::test]
3627 async fn test_unified_session_manager_integration() {
3628 let backend = crate::test_utils::create_test_backend().await;
3631 let pm = Arc::new(
3632 PersistenceManager::new(backend)
3633 .await
3634 .expect("persistence manager should initialize"),
3635 );
3636 let (client, _rx) = Client::new(
3637 pm,
3638 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3639 Arc::new(MockHttpClient),
3640 None,
3641 )
3642 .await;
3643
3644 assert_eq!(
3646 client.unified_session.sequence(),
3647 0,
3648 "Initial sequence should be 0"
3649 );
3650
3651 loop {
3655 client.unified_session.reset().await;
3656
3657 let result = client.unified_session.prepare_send().await;
3658 assert!(result.is_some(), "First send should succeed");
3659 let (node, seq) = result.unwrap();
3660 assert_eq!(node.tag, "ib", "Should be an IB stanza");
3661 assert_eq!(seq, 1, "First sequence should be 1 (pre-increment)");
3662 assert_eq!(client.unified_session.sequence(), 1);
3663
3664 let result2 = client.unified_session.prepare_send().await;
3665 if result2.is_none() {
3666 assert_eq!(client.unified_session.sequence(), 1);
3668 break;
3669 }
3670 tokio::task::yield_now().await;
3672 }
3673
3674 client.unified_session.clear_last_sent().await;
3676 let result3 = client.unified_session.prepare_send().await;
3677 assert!(result3.is_some(), "Should succeed after clearing");
3678 let (_, seq3) = result3.unwrap();
3679 assert_eq!(seq3, 1, "Sequence resets when session ID changes");
3680 assert_eq!(client.unified_session.sequence(), 1);
3681
3682 info!("✅ test_unified_session_manager_integration passed");
3683 }
3684
3685 #[test]
3686 fn test_unified_session_protocol_node() {
3687 use wacore::ib::{IbStanza, UnifiedSession};
3689 use wacore::protocol::ProtocolNode;
3690
3691 let session = UnifiedSession::new("123456789");
3693 assert_eq!(session.id, "123456789");
3694 assert_eq!(session.tag(), "unified_session");
3695
3696 let node = session.into_node();
3698 assert_eq!(node.tag, "unified_session");
3699 assert_eq!(
3700 node.attrs.get("id").and_then(|v| v.as_str()),
3701 Some("123456789")
3702 );
3703
3704 let stanza = IbStanza::unified_session(UnifiedSession::new("987654321"));
3706 assert_eq!(stanza.tag(), "ib");
3707
3708 let ib_node = stanza.into_node();
3710 assert_eq!(ib_node.tag, "ib");
3711 let children = ib_node.children().expect("IB stanza should have children");
3712 assert_eq!(children.len(), 1);
3713 assert_eq!(children[0].tag, "unified_session");
3714 assert_eq!(
3715 children[0].attrs.get("id").and_then(|v| v.as_str()),
3716 Some("987654321")
3717 );
3718
3719 info!("✅ test_unified_session_protocol_node passed");
3720 }
3721
3722 async fn create_offline_sync_test_client() -> Arc<Client> {
3724 let backend = crate::test_utils::create_test_backend().await;
3725 let pm = Arc::new(
3726 PersistenceManager::new(backend)
3727 .await
3728 .expect("persistence manager should initialize"),
3729 );
3730 let (client, _rx) = Client::new(
3731 pm,
3732 Arc::new(crate::transport::mock::MockTransportFactory::new()),
3733 Arc::new(MockHttpClient),
3734 None,
3735 )
3736 .await;
3737 client
3738 }
3739
3740 #[tokio::test]
3741 async fn test_ib_thread_metadata_does_not_end_sync() {
3742 let client = create_offline_sync_test_client().await;
3743 client
3744 .offline_sync_metrics
3745 .active
3746 .store(true, Ordering::Release);
3747
3748 let node = NodeBuilder::new("ib")
3749 .children([NodeBuilder::new("thread_metadata")
3750 .children([NodeBuilder::new("item").build()])
3751 .build()])
3752 .build();
3753
3754 client.process_node(Arc::new(node)).await;
3755 assert!(
3756 client.offline_sync_metrics.active.load(Ordering::Acquire),
3757 "<ib><thread_metadata> should NOT end offline sync"
3758 );
3759 }
3760
3761 #[tokio::test]
3762 async fn test_ib_edge_routing_does_not_end_sync() {
3763 let client = create_offline_sync_test_client().await;
3764 client
3765 .offline_sync_metrics
3766 .active
3767 .store(true, Ordering::Release);
3768
3769 let node = NodeBuilder::new("ib")
3770 .children([NodeBuilder::new("edge_routing")
3771 .children([NodeBuilder::new("routing_info")
3772 .bytes(vec![1, 2, 3])
3773 .build()])
3774 .build()])
3775 .build();
3776
3777 client.process_node(Arc::new(node)).await;
3778 assert!(
3779 client.offline_sync_metrics.active.load(Ordering::Acquire),
3780 "<ib><edge_routing> should NOT end offline sync"
3781 );
3782 }
3783
3784 #[tokio::test]
3785 async fn test_ib_dirty_does_not_end_sync() {
3786 let client = create_offline_sync_test_client().await;
3787 client
3788 .offline_sync_metrics
3789 .active
3790 .store(true, Ordering::Release);
3791
3792 let node = NodeBuilder::new("ib")
3793 .children([NodeBuilder::new("dirty")
3794 .attr("type", "groups")
3795 .attr("timestamp", "1234")
3796 .build()])
3797 .build();
3798
3799 client.process_node(Arc::new(node)).await;
3800 assert!(
3801 client.offline_sync_metrics.active.load(Ordering::Acquire),
3802 "<ib><dirty> should NOT end offline sync"
3803 );
3804 }
3805
3806 #[tokio::test]
3807 async fn test_ib_offline_child_ends_sync() {
3808 let client = create_offline_sync_test_client().await;
3809 client
3810 .offline_sync_metrics
3811 .active
3812 .store(true, Ordering::Release);
3813 client
3814 .offline_sync_metrics
3815 .total_messages
3816 .store(301, Ordering::Release);
3817
3818 let node = NodeBuilder::new("ib")
3819 .children([NodeBuilder::new("offline").attr("count", "301").build()])
3820 .build();
3821
3822 client.process_node(Arc::new(node)).await;
3823 assert!(
3824 !client.offline_sync_metrics.active.load(Ordering::Acquire),
3825 "<ib><offline count='301'/> should end offline sync"
3826 );
3827 }
3828
3829 #[tokio::test]
3830 async fn test_ib_offline_preview_starts_sync() {
3831 let client = create_offline_sync_test_client().await;
3832
3833 let node = NodeBuilder::new("ib")
3834 .children([NodeBuilder::new("offline_preview")
3835 .attr("count", "301")
3836 .attr("message", "168")
3837 .attr("notification", "62")
3838 .attr("receipt", "68")
3839 .attr("appdata", "0")
3840 .build()])
3841 .build();
3842
3843 client.process_node(Arc::new(node)).await;
3844 assert!(
3845 client.offline_sync_metrics.active.load(Ordering::Acquire),
3846 "offline_preview with count>0 should activate sync"
3847 );
3848 assert_eq!(
3849 client
3850 .offline_sync_metrics
3851 .total_messages
3852 .load(Ordering::Acquire),
3853 301
3854 );
3855 }
3856
3857 #[tokio::test]
3858 async fn test_offline_message_increments_processed() {
3859 let client = create_offline_sync_test_client().await;
3860 client
3861 .offline_sync_metrics
3862 .active
3863 .store(true, Ordering::Release);
3864 client
3865 .offline_sync_metrics
3866 .total_messages
3867 .store(100, Ordering::Release);
3868
3869 let node = NodeBuilder::new("message")
3870 .attr("offline", "1")
3871 .attr("from", "5551234567@s.whatsapp.net")
3872 .attr("id", "TEST123")
3873 .attr("t", "1772884671")
3874 .attr("type", "text")
3875 .build();
3876
3877 client.process_node(Arc::new(node)).await;
3878 assert_eq!(
3879 client
3880 .offline_sync_metrics
3881 .processed_messages
3882 .load(Ordering::Acquire),
3883 1,
3884 "offline message should increment processed count"
3885 );
3886 }
3887}