1use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6use wacore::libsignal::store::SessionStore;
7use wacore::types::jid::JidExt;
8use wacore_binary::jid::Jid;
9
10use super::Client;
11use crate::types::events::{Event, OfflineSyncCompleted};
12
13impl Client {
14 pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
16
17 pub(crate) fn complete_offline_sync(&self, count: i32) {
18 self.offline_sync_metrics
19 .active
20 .store(false, Ordering::Release);
21 match self.offline_sync_metrics.start_time.lock() {
22 Ok(mut guard) => *guard = None,
23 Err(poison) => *poison.into_inner() = None,
24 }
25
26 if self
32 .offline_sync_completed
33 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
34 .is_ok()
35 {
36 self.swap_message_semaphore(64);
41
42 self.offline_sync_notifier.notify(usize::MAX);
43
44 self.core
45 .event_bus
46 .dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
47 }
48 }
49
50 pub(crate) async fn wait_for_offline_delivery_end(&self) {
52 self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
53 .await;
54 }
55
56 pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
57 let wait_generation = self.connection_generation.load(Ordering::Acquire);
58 let offline_fut = self.offline_sync_notifier.listen();
59 if self.offline_sync_completed.load(Ordering::Relaxed) {
60 return;
61 }
62
63 if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
64 .await
65 .is_err()
66 {
67 if self.connection_generation.load(Ordering::Acquire) != wait_generation
71 || self.expected_disconnect.load(Ordering::Relaxed)
72 {
73 log::debug!(
74 target: "Client/OfflineSync",
75 "Offline sync timeout ignored: connection generation changed or disconnected",
76 );
77 return;
78 }
79
80 let processed = self
81 .offline_sync_metrics
82 .processed_messages
83 .load(Ordering::Acquire);
84 let expected = self
85 .offline_sync_metrics
86 .total_messages
87 .load(Ordering::Acquire);
88 log::warn!(
89 target: "Client/OfflineSync",
90 "Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
91 timeout,
92 processed,
93 expected,
94 );
95 self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
96 }
97 }
98
99 pub(crate) fn begin_history_sync_task(&self) {
100 self.history_sync_tasks_in_flight
101 .fetch_add(1, Ordering::Relaxed);
102 }
103
104 pub(crate) fn finish_history_sync_task(&self) {
105 let previous = self
106 .history_sync_tasks_in_flight
107 .fetch_sub(1, Ordering::Relaxed);
108 if previous <= 1 {
109 self.history_sync_tasks_in_flight
110 .store(0, Ordering::Relaxed);
111 self.history_sync_idle_notifier.notify(usize::MAX);
112 }
113 }
114
115 pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
116 use anyhow::anyhow;
117 use wacore::time::Instant;
118
119 let deadline = Instant::now() + timeout;
120
121 let offline_fut = self.offline_sync_notifier.listen();
124 if !self.offline_sync_completed.load(Ordering::Relaxed) {
125 let remaining = deadline.saturating_duration_since(Instant::now());
126 wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
127 .await
128 .map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
129 }
130
131 loop {
132 let history_fut = self.history_sync_idle_notifier.listen();
133 if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
134 return Ok(());
135 }
136
137 let remaining = deadline.saturating_duration_since(Instant::now());
138 wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
139 .await
140 .map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
141 }
142 }
143
144 pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
147 use wacore::types::jid::JidExt;
148
149 if device_jids.is_empty() {
150 return Ok(());
151 }
152
153 self.wait_for_offline_delivery_end().await;
154 let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
155
156 let device_store = self.persistence_manager.get_device_arc().await;
157 let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
158
159 {
160 let device_guard = device_store.read().await;
161 for jid in resolved_jids {
162 let signal_addr = jid.to_protocol_address();
163 match self
165 .signal_cache
166 .has_session(&signal_addr, &*device_guard.backend)
167 .await
168 {
169 Ok(true) => {}
170 Ok(false) => jids_needing_sessions.push(jid),
171 Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
172 }
173 }
174 }
175
176 if jids_needing_sessions.is_empty() {
177 return Ok(());
178 }
179
180 for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
181 self.fetch_and_establish_sessions(batch).await?;
182 }
183
184 Ok(())
185 }
186
187 async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
190 use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
191 use wacore::types::jid::JidExt;
192
193 if jids.is_empty() {
194 return Ok(0);
195 }
196
197 let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
198
199 let device_store = self.persistence_manager.get_device_arc().await;
200 let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
201 device_store,
202 self.signal_cache.clone(),
203 );
204
205 let mut success_count = 0;
206 let mut missing_count = 0;
207 let mut failed_count = 0;
208
209 for jid in jids {
210 if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
211 let signal_addr = jid.to_protocol_address();
212
213 let session_mutex = self
215 .session_locks
216 .get_with_by_ref(signal_addr.as_str(), async {
217 std::sync::Arc::new(async_lock::Mutex::new(()))
218 })
219 .await;
220 let _session_guard = session_mutex.lock().await;
221
222 match process_prekey_bundle(
223 &signal_addr,
224 &mut adapter.session_store,
225 &mut adapter.identity_store,
226 bundle,
227 &mut rand::make_rng::<rand::rngs::StdRng>(),
228 UsePQRatchet::No,
229 )
230 .await
231 {
232 Ok(_) => {
233 success_count += 1;
234 log::debug!("Successfully established session with {}", jid);
235 }
236 Err(e) => {
237 failed_count += 1;
238 log::warn!("Failed to establish session with {}: {}", jid, e);
239 }
240 }
241 } else {
242 missing_count += 1;
243 if jid.device == 0 {
244 log::warn!("Server did not return prekeys for primary phone {}", jid);
245 } else {
246 log::debug!("Server did not return prekeys for {}", jid);
247 }
248 }
249 }
250
251 if missing_count > 0 || failed_count > 0 {
252 log::debug!(
253 "Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
254 success_count,
255 missing_count,
256 failed_count,
257 jids.len()
258 );
259 }
260
261 if success_count > 0 {
263 self.flush_signal_cache().await?;
264 }
265
266 Ok(success_count)
267 }
268
269 pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
279 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
280
281 let own_pn = device_snapshot
282 .pn
283 .clone()
284 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
285
286 let primary_phone_pn = own_pn.with_device(0);
287 let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
288
289 let pn_session_exists =
290 self.check_session_exists(&primary_phone_pn)
291 .await
292 .map_err(|e| {
293 anyhow::anyhow!(
294 "Cannot verify PN session existence for primary phone {}: {}. \
295 Refusing to establish session to prevent potential MAC failures.",
296 primary_phone_pn,
297 e
298 )
299 })?;
300
301 if pn_session_exists {
306 log::debug!(
307 "PN session with primary phone {} already exists",
308 primary_phone_pn
309 );
310 } else {
311 log::debug!(
312 "No PN session with primary phone {} - will be established via LID pkmsg",
313 primary_phone_pn
314 );
315 }
316
317 if let Some(ref lid_jid) = primary_phone_lid {
319 match self.check_session_exists(lid_jid).await {
320 Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
321 Ok(false) => log::debug!(
322 "No LID session with {} - established on first message",
323 lid_jid
324 ),
325 Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
326 }
327 }
328
329 Ok(())
330 }
331
332 async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
334 let device_store = self.persistence_manager.get_device_arc().await;
335 let device_guard = device_store.read().await;
336 let signal_addr = jid.to_protocol_address();
337
338 device_guard
339 .contains_session(&signal_addr)
340 .await
341 .map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
349
350 #[test]
351 fn test_primary_phone_jid_creation_from_pn() {
352 let own_pn = Jid::pn("559999999999");
353 let primary_phone_jid = own_pn.with_device(0);
354
355 assert_eq!(primary_phone_jid.user, "559999999999");
356 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
357 assert_eq!(primary_phone_jid.device, 0);
358 assert_eq!(primary_phone_jid.agent, 0);
359 assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
360 }
361
362 #[test]
363 fn test_primary_phone_jid_overwrites_existing_device() {
364 let own_pn = Jid::pn_device("559999999999", 33);
366 let primary_phone_jid = own_pn.with_device(0);
367
368 assert_eq!(primary_phone_jid.user, "559999999999");
369 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
370 assert_eq!(primary_phone_jid.device, 0);
371 }
372
373 #[test]
374 fn test_primary_phone_jid_is_not_ad() {
375 let primary_phone_jid = Jid::pn("559999999999").with_device(0);
376 assert!(!primary_phone_jid.is_ad()); }
378
379 #[test]
380 fn test_linked_device_is_ad() {
381 let linked_device_jid = Jid::pn_device("559999999999", 33);
382 assert!(linked_device_jid.is_ad()); }
384
385 #[test]
386 fn test_primary_phone_jid_from_lid() {
387 let own_lid = Jid::lid("100000000000001");
388 let primary_phone_jid = own_lid.with_device(0);
389
390 assert_eq!(primary_phone_jid.user, "100000000000001");
391 assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
392 assert_eq!(primary_phone_jid.device, 0);
393 assert!(!primary_phone_jid.is_ad());
394 }
395
396 #[test]
397 fn test_primary_phone_jid_roundtrip() {
398 let own_pn = Jid::pn("559999999999");
399 let primary_phone_jid = own_pn.with_device(0);
400
401 let jid_string = primary_phone_jid.to_string();
402 assert_eq!(jid_string, "559999999999@s.whatsapp.net");
403
404 let parsed: Jid = jid_string.parse().expect("JID should be parseable");
405 assert_eq!(parsed.user, "559999999999");
406 assert_eq!(parsed.server, DEFAULT_USER_SERVER);
407 assert_eq!(parsed.device, 0);
408 }
409
410 #[test]
411 fn test_with_device_preserves_identity() {
412 let pn = Jid::pn("1234567890");
413 let pn_device_0 = pn.with_device(0);
414 let pn_device_5 = pn.with_device(5);
415
416 assert_eq!(pn_device_0.user, pn_device_5.user);
417 assert_eq!(pn_device_0.server, pn_device_5.server);
418 assert_eq!(pn_device_0.device, 0);
419 assert_eq!(pn_device_5.device, 5);
420
421 let lid = Jid::lid("100000012345678");
422 let lid_device_0 = lid.with_device(0);
423 let lid_device_33 = lid.with_device(33);
424
425 assert_eq!(lid_device_0.user, lid_device_33.user);
426 assert_eq!(lid_device_0.server, lid_device_33.server);
427 assert_eq!(lid_device_0.device, 0);
428 assert_eq!(lid_device_33.device, 33);
429 }
430
431 #[test]
432 fn test_primary_phone_vs_companion_devices() {
433 let user = "559999999999";
434 let primary = Jid::pn(user).with_device(0);
435 let companion_web = Jid::pn_device(user, 33);
436 let companion_desktop = Jid::pn_device(user, 34);
437
438 assert_eq!(primary.user, companion_web.user);
440 assert_eq!(primary.user, companion_desktop.user);
441
442 assert_eq!(primary.device, 0);
444 assert_eq!(companion_web.device, 33);
445 assert_eq!(companion_desktop.device, 34);
446
447 assert!(!primary.is_ad());
449 assert!(companion_web.is_ad());
450 assert!(companion_desktop.is_ad());
451 }
452
453 #[test]
455 fn test_session_check_behavior_documentation() {
456 enum SessionCheckResult {
458 Exists,
459 NotExists,
460 CheckFailed,
461 }
462
463 fn should_establish_session(
464 check_result: SessionCheckResult,
465 ) -> Result<bool, &'static str> {
466 match check_result {
467 SessionCheckResult::Exists => Ok(false), SessionCheckResult::NotExists => Ok(true), SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
470 }
471 }
472
473 assert_eq!(
475 should_establish_session(SessionCheckResult::Exists),
476 Ok(false)
477 );
478 assert_eq!(
479 should_establish_session(SessionCheckResult::NotExists),
480 Ok(true)
481 );
482 assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
483 }
484
485 #[test]
487 fn test_protocol_address_format_for_session_lookup() {
488 use wacore::types::jid::JidExt;
489
490 let pn = Jid::pn("559999999999").with_device(0);
491 let addr = pn.to_protocol_address();
492 assert_eq!(addr.name(), "559999999999@c.us");
493 assert_eq!(u32::from(addr.device_id()), 0);
494 assert_eq!(addr.to_string(), "559999999999@c.us.0");
495
496 let companion = Jid::pn_device("559999999999", 33);
497 let companion_addr = companion.to_protocol_address();
498 assert_eq!(companion_addr.name(), "559999999999:33@c.us");
499 assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
500
501 let lid = Jid::lid("100000000000001").with_device(0);
502 let lid_addr = lid.to_protocol_address();
503 assert_eq!(lid_addr.name(), "100000000000001@lid");
504 assert_eq!(u32::from(lid_addr.device_id()), 0);
505 assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
506
507 let lid_device = Jid::lid_device("100000000000001", 33);
508 let lid_device_addr = lid_device.to_protocol_address();
509 assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
510 assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
511 }
512
513 #[test]
514 fn test_filter_logic_for_session_establishment() {
515 let jids = vec![
516 Jid::pn_device("111", 0),
517 Jid::pn_device("222", 0),
518 Jid::pn_device("333", 0),
519 ];
520
521 let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
523 match jid.user.as_str() {
524 "111" => Ok(true), "222" => Ok(false), "333" => Err("DB error"), _ => Ok(false),
528 }
529 };
530
531 let mut jids_needing_sessions = Vec::with_capacity(jids.len());
533 for jid in &jids {
534 match session_exists(jid) {
535 Ok(true) => {} Ok(false) => jids_needing_sessions.push(jid.clone()), Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), }
539 }
540
541 assert_eq!(jids_needing_sessions.len(), 1);
543 assert_eq!(jids_needing_sessions[0].user, "222");
544 }
545
546 #[test]
549 fn test_dual_addressing_pn_and_lid_are_independent() {
550 let pn_address = Jid::pn("551199887766").with_device(0);
551 let lid_address = Jid::lid("236395184570386").with_device(0);
552
553 assert_ne!(pn_address.user, lid_address.user);
554 assert_ne!(pn_address.server, lid_address.server);
555
556 use wacore::types::jid::JidExt;
557 let pn_signal_addr = pn_address.to_protocol_address();
558 let lid_signal_addr = lid_address.to_protocol_address();
559
560 assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
561 assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
562 assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
563 assert_eq!(pn_address.device, 0);
564 assert_eq!(lid_address.device, 0);
565 }
566
567 #[test]
568 fn test_lid_extraction_from_own_device() {
569 let own_lid_with_device = Jid::lid_device("236395184570386", 61);
570 let primary_lid = own_lid_with_device.with_device(0);
571
572 assert_eq!(primary_lid.user, "236395184570386");
573 assert_eq!(primary_lid.device, 0);
574 assert!(!primary_lid.is_ad());
575 }
576
577 #[test]
579 fn test_stale_session_scenario_documentation() {
580 fn should_establish_pn_session(pn_exists: bool) -> bool {
581 !pn_exists
582 }
583
584 fn should_establish_lid_session(_lid_exists: bool) -> bool {
585 false }
587
588 assert!(!should_establish_pn_session(true));
590 assert!(should_establish_pn_session(false));
592 assert!(!should_establish_lid_session(true));
594 assert!(!should_establish_lid_session(false));
595 }
596
597 #[test]
599 fn test_retry_mechanism_for_stale_sessions() {
600 const RETRY_ERROR_NO_SESSION: u8 = 1;
601 const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
602
603 fn action_for_error(error_code: u8) -> &'static str {
604 match error_code {
605 RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
606 RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
607 _ => "Unknown error",
608 }
609 }
610
611 assert_eq!(
612 action_for_error(RETRY_ERROR_NO_SESSION),
613 "Establish new session via prekey"
614 );
615 assert_eq!(
616 action_for_error(RETRY_ERROR_INVALID_MESSAGE),
617 "Delete stale session, resend message"
618 );
619 }
620
621 #[test]
622 fn test_session_establishment_lookup_normalization() {
623 use std::collections::HashMap;
624 use wacore_binary::jid::Jid;
625
626 let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new(); let normalized_jid = Jid::lid("123456789"); prekey_bundles.insert(normalized_jid.clone(), ());
632
633 let mut requested_jid = Jid::lid("123456789");
636 requested_jid.agent = 1;
637
638 assert!(
640 !prekey_bundles.contains_key(&requested_jid),
641 "Direct lookup of non-normalized JID should fail"
642 );
643
644 let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
647 assert!(
648 prekey_bundles.contains_key(&normalized_lookup),
649 "Normalized lookup should succeed"
650 );
651
652 assert_eq!(normalized_lookup, normalized_jid);
654 }
655}