1use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6use wacore::libsignal::store::SessionStore;
7use wacore::types::jid::JidExt;
8use wacore_binary::jid::Jid;
9
10use super::Client;
11use crate::types::events::{Event, OfflineSyncCompleted};
12
13impl Client {
14 pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
16
17 pub(crate) fn complete_offline_sync(&self, count: i32) {
18 self.offline_sync_metrics
19 .active
20 .store(false, Ordering::Release);
21 match self.offline_sync_metrics.start_time.lock() {
22 Ok(mut guard) => *guard = None,
23 Err(poison) => *poison.into_inner() = None,
24 }
25
26 if self
32 .offline_sync_completed
33 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
34 .is_ok()
35 {
36 *self
41 .message_processing_semaphore
42 .lock()
43 .expect("message_processing_semaphore poisoned") =
44 std::sync::Arc::new(async_lock::Semaphore::new(64));
45
46 self.offline_sync_notifier.notify(usize::MAX);
47
48 self.core
49 .event_bus
50 .dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
51 }
52 }
53
54 pub(crate) async fn wait_for_offline_delivery_end(&self) {
56 self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
57 .await;
58 }
59
60 pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
61 let wait_generation = self.connection_generation.load(Ordering::Acquire);
62 let offline_fut = self.offline_sync_notifier.listen();
63 if self.offline_sync_completed.load(Ordering::Relaxed) {
64 return;
65 }
66
67 if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
68 .await
69 .is_err()
70 {
71 if self.connection_generation.load(Ordering::Acquire) != wait_generation
75 || self.expected_disconnect.load(Ordering::Relaxed)
76 {
77 log::debug!(
78 target: "Client/OfflineSync",
79 "Offline sync timeout ignored: connection generation changed or disconnected",
80 );
81 return;
82 }
83
84 let processed = self
85 .offline_sync_metrics
86 .processed_messages
87 .load(Ordering::Acquire);
88 let expected = self
89 .offline_sync_metrics
90 .total_messages
91 .load(Ordering::Acquire);
92 log::warn!(
93 target: "Client/OfflineSync",
94 "Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
95 timeout,
96 processed,
97 expected,
98 );
99 self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
100 }
101 }
102
103 pub(crate) fn begin_history_sync_task(&self) {
104 self.history_sync_tasks_in_flight
105 .fetch_add(1, Ordering::Relaxed);
106 }
107
108 pub(crate) fn finish_history_sync_task(&self) {
109 let previous = self
110 .history_sync_tasks_in_flight
111 .fetch_sub(1, Ordering::Relaxed);
112 if previous <= 1 {
113 self.history_sync_tasks_in_flight
114 .store(0, Ordering::Relaxed);
115 self.history_sync_idle_notifier.notify(usize::MAX);
116 }
117 }
118
119 pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
120 use anyhow::anyhow;
121 use wacore::time::Instant;
122
123 let deadline = Instant::now() + timeout;
124
125 let offline_fut = self.offline_sync_notifier.listen();
128 if !self.offline_sync_completed.load(Ordering::Relaxed) {
129 let remaining = deadline.saturating_duration_since(Instant::now());
130 wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
131 .await
132 .map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
133 }
134
135 loop {
136 let history_fut = self.history_sync_idle_notifier.listen();
137 if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
138 return Ok(());
139 }
140
141 let remaining = deadline.saturating_duration_since(Instant::now());
142 wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
143 .await
144 .map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
145 }
146 }
147
148 pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
151 use wacore::types::jid::JidExt;
152
153 if device_jids.is_empty() {
154 return Ok(());
155 }
156
157 self.wait_for_offline_delivery_end().await;
158 let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
159
160 let device_store = self.persistence_manager.get_device_arc().await;
161 let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
162
163 {
164 let device_guard = device_store.read().await;
165 for jid in resolved_jids {
166 let signal_addr = jid.to_protocol_address();
167 let addr_str = signal_addr.to_string();
168 match self
170 .signal_cache
171 .has_session(&addr_str, &*device_guard.backend)
172 .await
173 {
174 Ok(true) => {}
175 Ok(false) => jids_needing_sessions.push(jid),
176 Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
177 }
178 }
179 }
180
181 if jids_needing_sessions.is_empty() {
182 return Ok(());
183 }
184
185 for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
186 self.fetch_and_establish_sessions(batch).await?;
187 }
188
189 Ok(())
190 }
191
192 async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
195 use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
196 use wacore::types::jid::JidExt;
197
198 if jids.is_empty() {
199 return Ok(0);
200 }
201
202 let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
203
204 let device_store = self.persistence_manager.get_device_arc().await;
205 let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
206 device_store,
207 self.signal_cache.clone(),
208 );
209
210 let mut success_count = 0;
211 let mut missing_count = 0;
212 let mut failed_count = 0;
213
214 for jid in jids {
215 if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
216 let signal_addr = jid.to_protocol_address();
217
218 let signal_addr_str = signal_addr.to_string();
220 let session_mutex = self
221 .session_locks
222 .get_with(signal_addr_str.clone(), async {
223 std::sync::Arc::new(async_lock::Mutex::new(()))
224 })
225 .await;
226 let _session_guard = session_mutex.lock().await;
227
228 match process_prekey_bundle(
229 &signal_addr,
230 &mut adapter.session_store,
231 &mut adapter.identity_store,
232 bundle,
233 &mut rand::make_rng::<rand::rngs::StdRng>(),
234 UsePQRatchet::No,
235 )
236 .await
237 {
238 Ok(_) => {
239 success_count += 1;
240 log::debug!("Successfully established session with {}", jid);
241 }
242 Err(e) => {
243 failed_count += 1;
244 log::warn!("Failed to establish session with {}: {}", jid, e);
245 }
246 }
247 } else {
248 missing_count += 1;
249 if jid.device == 0 {
250 log::warn!("Server did not return prekeys for primary phone {}", jid);
251 } else {
252 log::debug!("Server did not return prekeys for {}", jid);
253 }
254 }
255 }
256
257 if missing_count > 0 || failed_count > 0 {
258 log::debug!(
259 "Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
260 success_count,
261 missing_count,
262 failed_count,
263 jids.len()
264 );
265 }
266
267 if success_count > 0 {
269 self.flush_signal_cache().await?;
270 }
271
272 Ok(success_count)
273 }
274
275 pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
285 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
286
287 let own_pn = device_snapshot
288 .pn
289 .clone()
290 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
291
292 let primary_phone_pn = own_pn.with_device(0);
293 let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
294
295 let pn_session_exists =
296 self.check_session_exists(&primary_phone_pn)
297 .await
298 .map_err(|e| {
299 anyhow::anyhow!(
300 "Cannot verify PN session existence for primary phone {}: {}. \
301 Refusing to establish session to prevent potential MAC failures.",
302 primary_phone_pn,
303 e
304 )
305 })?;
306
307 if pn_session_exists {
312 log::debug!(
313 "PN session with primary phone {} already exists",
314 primary_phone_pn
315 );
316 } else {
317 log::debug!(
318 "No PN session with primary phone {} - will be established via LID pkmsg",
319 primary_phone_pn
320 );
321 }
322
323 if let Some(ref lid_jid) = primary_phone_lid {
325 match self.check_session_exists(lid_jid).await {
326 Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
327 Ok(false) => log::debug!(
328 "No LID session with {} - established on first message",
329 lid_jid
330 ),
331 Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
332 }
333 }
334
335 Ok(())
336 }
337
338 async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
340 let device_store = self.persistence_manager.get_device_arc().await;
341 let device_guard = device_store.read().await;
342 let signal_addr = jid.to_protocol_address();
343
344 device_guard
345 .contains_session(&signal_addr)
346 .await
347 .map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
355
356 #[test]
357 fn test_primary_phone_jid_creation_from_pn() {
358 let own_pn = Jid::pn("559999999999");
359 let primary_phone_jid = own_pn.with_device(0);
360
361 assert_eq!(primary_phone_jid.user, "559999999999");
362 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
363 assert_eq!(primary_phone_jid.device, 0);
364 assert_eq!(primary_phone_jid.agent, 0);
365 assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
366 }
367
368 #[test]
369 fn test_primary_phone_jid_overwrites_existing_device() {
370 let own_pn = Jid::pn_device("559999999999", 33);
372 let primary_phone_jid = own_pn.with_device(0);
373
374 assert_eq!(primary_phone_jid.user, "559999999999");
375 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
376 assert_eq!(primary_phone_jid.device, 0);
377 }
378
379 #[test]
380 fn test_primary_phone_jid_is_not_ad() {
381 let primary_phone_jid = Jid::pn("559999999999").with_device(0);
382 assert!(!primary_phone_jid.is_ad()); }
384
385 #[test]
386 fn test_linked_device_is_ad() {
387 let linked_device_jid = Jid::pn_device("559999999999", 33);
388 assert!(linked_device_jid.is_ad()); }
390
391 #[test]
392 fn test_primary_phone_jid_from_lid() {
393 let own_lid = Jid::lid("100000000000001");
394 let primary_phone_jid = own_lid.with_device(0);
395
396 assert_eq!(primary_phone_jid.user, "100000000000001");
397 assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
398 assert_eq!(primary_phone_jid.device, 0);
399 assert!(!primary_phone_jid.is_ad());
400 }
401
402 #[test]
403 fn test_primary_phone_jid_roundtrip() {
404 let own_pn = Jid::pn("559999999999");
405 let primary_phone_jid = own_pn.with_device(0);
406
407 let jid_string = primary_phone_jid.to_string();
408 assert_eq!(jid_string, "559999999999@s.whatsapp.net");
409
410 let parsed: Jid = jid_string.parse().expect("JID should be parseable");
411 assert_eq!(parsed.user, "559999999999");
412 assert_eq!(parsed.server, DEFAULT_USER_SERVER);
413 assert_eq!(parsed.device, 0);
414 }
415
416 #[test]
417 fn test_with_device_preserves_identity() {
418 let pn = Jid::pn("1234567890");
419 let pn_device_0 = pn.with_device(0);
420 let pn_device_5 = pn.with_device(5);
421
422 assert_eq!(pn_device_0.user, pn_device_5.user);
423 assert_eq!(pn_device_0.server, pn_device_5.server);
424 assert_eq!(pn_device_0.device, 0);
425 assert_eq!(pn_device_5.device, 5);
426
427 let lid = Jid::lid("100000012345678");
428 let lid_device_0 = lid.with_device(0);
429 let lid_device_33 = lid.with_device(33);
430
431 assert_eq!(lid_device_0.user, lid_device_33.user);
432 assert_eq!(lid_device_0.server, lid_device_33.server);
433 assert_eq!(lid_device_0.device, 0);
434 assert_eq!(lid_device_33.device, 33);
435 }
436
437 #[test]
438 fn test_primary_phone_vs_companion_devices() {
439 let user = "559999999999";
440 let primary = Jid::pn(user).with_device(0);
441 let companion_web = Jid::pn_device(user, 33);
442 let companion_desktop = Jid::pn_device(user, 34);
443
444 assert_eq!(primary.user, companion_web.user);
446 assert_eq!(primary.user, companion_desktop.user);
447
448 assert_eq!(primary.device, 0);
450 assert_eq!(companion_web.device, 33);
451 assert_eq!(companion_desktop.device, 34);
452
453 assert!(!primary.is_ad());
455 assert!(companion_web.is_ad());
456 assert!(companion_desktop.is_ad());
457 }
458
459 #[test]
461 fn test_session_check_behavior_documentation() {
462 enum SessionCheckResult {
464 Exists,
465 NotExists,
466 CheckFailed,
467 }
468
469 fn should_establish_session(
470 check_result: SessionCheckResult,
471 ) -> Result<bool, &'static str> {
472 match check_result {
473 SessionCheckResult::Exists => Ok(false), SessionCheckResult::NotExists => Ok(true), SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
476 }
477 }
478
479 assert_eq!(
481 should_establish_session(SessionCheckResult::Exists),
482 Ok(false)
483 );
484 assert_eq!(
485 should_establish_session(SessionCheckResult::NotExists),
486 Ok(true)
487 );
488 assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
489 }
490
491 #[test]
493 fn test_protocol_address_format_for_session_lookup() {
494 use wacore::types::jid::JidExt;
495
496 let pn = Jid::pn("559999999999").with_device(0);
497 let addr = pn.to_protocol_address();
498 assert_eq!(addr.name(), "559999999999@c.us");
499 assert_eq!(u32::from(addr.device_id()), 0);
500 assert_eq!(addr.to_string(), "559999999999@c.us.0");
501
502 let companion = Jid::pn_device("559999999999", 33);
503 let companion_addr = companion.to_protocol_address();
504 assert_eq!(companion_addr.name(), "559999999999:33@c.us");
505 assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
506
507 let lid = Jid::lid("100000000000001").with_device(0);
508 let lid_addr = lid.to_protocol_address();
509 assert_eq!(lid_addr.name(), "100000000000001@lid");
510 assert_eq!(u32::from(lid_addr.device_id()), 0);
511 assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
512
513 let lid_device = Jid::lid_device("100000000000001", 33);
514 let lid_device_addr = lid_device.to_protocol_address();
515 assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
516 assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
517 }
518
519 #[test]
520 fn test_filter_logic_for_session_establishment() {
521 let jids = vec![
522 Jid::pn_device("111", 0),
523 Jid::pn_device("222", 0),
524 Jid::pn_device("333", 0),
525 ];
526
527 let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
529 match jid.user.as_str() {
530 "111" => Ok(true), "222" => Ok(false), "333" => Err("DB error"), _ => Ok(false),
534 }
535 };
536
537 let mut jids_needing_sessions = Vec::with_capacity(jids.len());
539 for jid in &jids {
540 match session_exists(jid) {
541 Ok(true) => {} Ok(false) => jids_needing_sessions.push(jid.clone()), Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), }
545 }
546
547 assert_eq!(jids_needing_sessions.len(), 1);
549 assert_eq!(jids_needing_sessions[0].user, "222");
550 }
551
552 #[test]
555 fn test_dual_addressing_pn_and_lid_are_independent() {
556 let pn_address = Jid::pn("551199887766").with_device(0);
557 let lid_address = Jid::lid("236395184570386").with_device(0);
558
559 assert_ne!(pn_address.user, lid_address.user);
560 assert_ne!(pn_address.server, lid_address.server);
561
562 use wacore::types::jid::JidExt;
563 let pn_signal_addr = pn_address.to_protocol_address();
564 let lid_signal_addr = lid_address.to_protocol_address();
565
566 assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
567 assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
568 assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
569 assert_eq!(pn_address.device, 0);
570 assert_eq!(lid_address.device, 0);
571 }
572
573 #[test]
574 fn test_lid_extraction_from_own_device() {
575 let own_lid_with_device = Jid::lid_device("236395184570386", 61);
576 let primary_lid = own_lid_with_device.with_device(0);
577
578 assert_eq!(primary_lid.user, "236395184570386");
579 assert_eq!(primary_lid.device, 0);
580 assert!(!primary_lid.is_ad());
581 }
582
583 #[test]
585 fn test_stale_session_scenario_documentation() {
586 fn should_establish_pn_session(pn_exists: bool) -> bool {
587 !pn_exists
588 }
589
590 fn should_establish_lid_session(_lid_exists: bool) -> bool {
591 false }
593
594 assert!(!should_establish_pn_session(true));
596 assert!(should_establish_pn_session(false));
598 assert!(!should_establish_lid_session(true));
600 assert!(!should_establish_lid_session(false));
601 }
602
603 #[test]
605 fn test_retry_mechanism_for_stale_sessions() {
606 const RETRY_ERROR_NO_SESSION: u8 = 1;
607 const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
608
609 fn action_for_error(error_code: u8) -> &'static str {
610 match error_code {
611 RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
612 RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
613 _ => "Unknown error",
614 }
615 }
616
617 assert_eq!(
618 action_for_error(RETRY_ERROR_NO_SESSION),
619 "Establish new session via prekey"
620 );
621 assert_eq!(
622 action_for_error(RETRY_ERROR_INVALID_MESSAGE),
623 "Delete stale session, resend message"
624 );
625 }
626
627 #[test]
628 fn test_session_establishment_lookup_normalization() {
629 use std::collections::HashMap;
630 use wacore_binary::jid::Jid;
631
632 let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new(); let normalized_jid = Jid::lid("123456789"); prekey_bundles.insert(normalized_jid.clone(), ());
638
639 let mut requested_jid = Jid::lid("123456789");
642 requested_jid.agent = 1;
643
644 assert!(
646 !prekey_bundles.contains_key(&requested_jid),
647 "Direct lookup of non-normalized JID should fail"
648 );
649
650 let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
653 assert!(
654 prekey_bundles.contains_key(&normalized_lookup),
655 "Normalized lookup should succeed"
656 );
657
658 assert_eq!(normalized_lookup, normalized_jid);
660 }
661}