1use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6use wacore::libsignal::store::SessionStore;
7use wacore::types::jid::JidExt;
8use wacore_binary::jid::Jid;
9
10use super::Client;
11use crate::types::events::{Event, OfflineSyncCompleted};
12
13impl Client {
14 pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
16
17 pub(crate) fn complete_offline_sync(&self, count: i32) {
18 self.offline_sync_metrics
19 .active
20 .store(false, Ordering::Release);
21 match self.offline_sync_metrics.start_time.lock() {
22 Ok(mut guard) => *guard = None,
23 Err(poison) => *poison.into_inner() = None,
24 }
25
26 if self
32 .offline_sync_completed
33 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
34 .is_ok()
35 {
36 self.swap_message_semaphore(64);
41
42 self.offline_sync_notifier.notify(usize::MAX);
43
44 self.core
45 .event_bus
46 .dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
47 }
48 }
49
50 pub(crate) async fn wait_for_offline_delivery_end(&self) {
52 self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
53 .await;
54 }
55
56 pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
57 let wait_generation = self.connection_generation.load(Ordering::Acquire);
58 let offline_fut = self.offline_sync_notifier.listen();
59 if self.offline_sync_completed.load(Ordering::Relaxed) {
60 return;
61 }
62
63 if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
64 .await
65 .is_err()
66 {
67 if self.connection_generation.load(Ordering::Acquire) != wait_generation
71 || self.expected_disconnect.load(Ordering::Relaxed)
72 {
73 log::debug!(
74 target: "Client/OfflineSync",
75 "Offline sync timeout ignored: connection generation changed or disconnected",
76 );
77 return;
78 }
79
80 let processed = self
81 .offline_sync_metrics
82 .processed_messages
83 .load(Ordering::Acquire);
84 let expected = self
85 .offline_sync_metrics
86 .total_messages
87 .load(Ordering::Acquire);
88 log::warn!(
89 target: "Client/OfflineSync",
90 "Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
91 timeout,
92 processed,
93 expected,
94 );
95 self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
96 }
97 }
98
99 pub(crate) fn begin_history_sync_task(&self) {
100 self.history_sync_tasks_in_flight
101 .fetch_add(1, Ordering::Relaxed);
102 }
103
104 pub(crate) fn finish_history_sync_task(&self) {
105 let previous = self
106 .history_sync_tasks_in_flight
107 .fetch_sub(1, Ordering::Relaxed);
108 if previous <= 1 {
109 self.history_sync_tasks_in_flight
110 .store(0, Ordering::Relaxed);
111 self.history_sync_idle_notifier.notify(usize::MAX);
112 }
113 }
114
115 pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
116 use anyhow::anyhow;
117 use wacore::time::Instant;
118
119 let deadline = Instant::now() + timeout;
120
121 let offline_fut = self.offline_sync_notifier.listen();
124 if !self.offline_sync_completed.load(Ordering::Relaxed) {
125 let remaining = deadline.saturating_duration_since(Instant::now());
126 wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
127 .await
128 .map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
129 }
130
131 loop {
132 let history_fut = self.history_sync_idle_notifier.listen();
133 if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
134 return Ok(());
135 }
136
137 let remaining = deadline.saturating_duration_since(Instant::now());
138 wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
139 .await
140 .map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
141 }
142 }
143
144 pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
147 use wacore::types::jid::JidExt;
148
149 if device_jids.is_empty() {
150 return Ok(());
151 }
152
153 self.wait_for_offline_delivery_end().await;
154 let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
155
156 let device_store = self.persistence_manager.get_device_arc().await;
157 let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
158
159 {
160 let device_guard = device_store.read().await;
161 for jid in resolved_jids {
162 let signal_addr = jid.to_protocol_address();
163 let addr_str = signal_addr.to_string();
164 match self
166 .signal_cache
167 .has_session(&addr_str, &*device_guard.backend)
168 .await
169 {
170 Ok(true) => {}
171 Ok(false) => jids_needing_sessions.push(jid),
172 Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
173 }
174 }
175 }
176
177 if jids_needing_sessions.is_empty() {
178 return Ok(());
179 }
180
181 for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
182 self.fetch_and_establish_sessions(batch).await?;
183 }
184
185 Ok(())
186 }
187
188 async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
191 use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
192 use wacore::types::jid::JidExt;
193
194 if jids.is_empty() {
195 return Ok(0);
196 }
197
198 let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
199
200 let device_store = self.persistence_manager.get_device_arc().await;
201 let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
202 device_store,
203 self.signal_cache.clone(),
204 );
205
206 let mut success_count = 0;
207 let mut missing_count = 0;
208 let mut failed_count = 0;
209
210 for jid in jids {
211 if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
212 let signal_addr = jid.to_protocol_address();
213
214 let signal_addr_str = signal_addr.to_string();
216 let session_mutex = self
217 .session_locks
218 .get_with(signal_addr_str.clone(), async {
219 std::sync::Arc::new(async_lock::Mutex::new(()))
220 })
221 .await;
222 let _session_guard = session_mutex.lock().await;
223
224 match process_prekey_bundle(
225 &signal_addr,
226 &mut adapter.session_store,
227 &mut adapter.identity_store,
228 bundle,
229 &mut rand::make_rng::<rand::rngs::StdRng>(),
230 UsePQRatchet::No,
231 )
232 .await
233 {
234 Ok(_) => {
235 success_count += 1;
236 log::debug!("Successfully established session with {}", jid);
237 }
238 Err(e) => {
239 failed_count += 1;
240 log::warn!("Failed to establish session with {}: {}", jid, e);
241 }
242 }
243 } else {
244 missing_count += 1;
245 if jid.device == 0 {
246 log::warn!("Server did not return prekeys for primary phone {}", jid);
247 } else {
248 log::debug!("Server did not return prekeys for {}", jid);
249 }
250 }
251 }
252
253 if missing_count > 0 || failed_count > 0 {
254 log::debug!(
255 "Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
256 success_count,
257 missing_count,
258 failed_count,
259 jids.len()
260 );
261 }
262
263 if success_count > 0 {
265 self.flush_signal_cache().await?;
266 }
267
268 Ok(success_count)
269 }
270
271 pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
281 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
282
283 let own_pn = device_snapshot
284 .pn
285 .clone()
286 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
287
288 let primary_phone_pn = own_pn.with_device(0);
289 let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
290
291 let pn_session_exists =
292 self.check_session_exists(&primary_phone_pn)
293 .await
294 .map_err(|e| {
295 anyhow::anyhow!(
296 "Cannot verify PN session existence for primary phone {}: {}. \
297 Refusing to establish session to prevent potential MAC failures.",
298 primary_phone_pn,
299 e
300 )
301 })?;
302
303 if pn_session_exists {
308 log::debug!(
309 "PN session with primary phone {} already exists",
310 primary_phone_pn
311 );
312 } else {
313 log::debug!(
314 "No PN session with primary phone {} - will be established via LID pkmsg",
315 primary_phone_pn
316 );
317 }
318
319 if let Some(ref lid_jid) = primary_phone_lid {
321 match self.check_session_exists(lid_jid).await {
322 Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
323 Ok(false) => log::debug!(
324 "No LID session with {} - established on first message",
325 lid_jid
326 ),
327 Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
328 }
329 }
330
331 Ok(())
332 }
333
334 async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
336 let device_store = self.persistence_manager.get_device_arc().await;
337 let device_guard = device_store.read().await;
338 let signal_addr = jid.to_protocol_address();
339
340 device_guard
341 .contains_session(&signal_addr)
342 .await
343 .map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
351
352 #[test]
353 fn test_primary_phone_jid_creation_from_pn() {
354 let own_pn = Jid::pn("559999999999");
355 let primary_phone_jid = own_pn.with_device(0);
356
357 assert_eq!(primary_phone_jid.user, "559999999999");
358 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
359 assert_eq!(primary_phone_jid.device, 0);
360 assert_eq!(primary_phone_jid.agent, 0);
361 assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
362 }
363
364 #[test]
365 fn test_primary_phone_jid_overwrites_existing_device() {
366 let own_pn = Jid::pn_device("559999999999", 33);
368 let primary_phone_jid = own_pn.with_device(0);
369
370 assert_eq!(primary_phone_jid.user, "559999999999");
371 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
372 assert_eq!(primary_phone_jid.device, 0);
373 }
374
375 #[test]
376 fn test_primary_phone_jid_is_not_ad() {
377 let primary_phone_jid = Jid::pn("559999999999").with_device(0);
378 assert!(!primary_phone_jid.is_ad()); }
380
381 #[test]
382 fn test_linked_device_is_ad() {
383 let linked_device_jid = Jid::pn_device("559999999999", 33);
384 assert!(linked_device_jid.is_ad()); }
386
387 #[test]
388 fn test_primary_phone_jid_from_lid() {
389 let own_lid = Jid::lid("100000000000001");
390 let primary_phone_jid = own_lid.with_device(0);
391
392 assert_eq!(primary_phone_jid.user, "100000000000001");
393 assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
394 assert_eq!(primary_phone_jid.device, 0);
395 assert!(!primary_phone_jid.is_ad());
396 }
397
398 #[test]
399 fn test_primary_phone_jid_roundtrip() {
400 let own_pn = Jid::pn("559999999999");
401 let primary_phone_jid = own_pn.with_device(0);
402
403 let jid_string = primary_phone_jid.to_string();
404 assert_eq!(jid_string, "559999999999@s.whatsapp.net");
405
406 let parsed: Jid = jid_string.parse().expect("JID should be parseable");
407 assert_eq!(parsed.user, "559999999999");
408 assert_eq!(parsed.server, DEFAULT_USER_SERVER);
409 assert_eq!(parsed.device, 0);
410 }
411
412 #[test]
413 fn test_with_device_preserves_identity() {
414 let pn = Jid::pn("1234567890");
415 let pn_device_0 = pn.with_device(0);
416 let pn_device_5 = pn.with_device(5);
417
418 assert_eq!(pn_device_0.user, pn_device_5.user);
419 assert_eq!(pn_device_0.server, pn_device_5.server);
420 assert_eq!(pn_device_0.device, 0);
421 assert_eq!(pn_device_5.device, 5);
422
423 let lid = Jid::lid("100000012345678");
424 let lid_device_0 = lid.with_device(0);
425 let lid_device_33 = lid.with_device(33);
426
427 assert_eq!(lid_device_0.user, lid_device_33.user);
428 assert_eq!(lid_device_0.server, lid_device_33.server);
429 assert_eq!(lid_device_0.device, 0);
430 assert_eq!(lid_device_33.device, 33);
431 }
432
433 #[test]
434 fn test_primary_phone_vs_companion_devices() {
435 let user = "559999999999";
436 let primary = Jid::pn(user).with_device(0);
437 let companion_web = Jid::pn_device(user, 33);
438 let companion_desktop = Jid::pn_device(user, 34);
439
440 assert_eq!(primary.user, companion_web.user);
442 assert_eq!(primary.user, companion_desktop.user);
443
444 assert_eq!(primary.device, 0);
446 assert_eq!(companion_web.device, 33);
447 assert_eq!(companion_desktop.device, 34);
448
449 assert!(!primary.is_ad());
451 assert!(companion_web.is_ad());
452 assert!(companion_desktop.is_ad());
453 }
454
455 #[test]
457 fn test_session_check_behavior_documentation() {
458 enum SessionCheckResult {
460 Exists,
461 NotExists,
462 CheckFailed,
463 }
464
465 fn should_establish_session(
466 check_result: SessionCheckResult,
467 ) -> Result<bool, &'static str> {
468 match check_result {
469 SessionCheckResult::Exists => Ok(false), SessionCheckResult::NotExists => Ok(true), SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
472 }
473 }
474
475 assert_eq!(
477 should_establish_session(SessionCheckResult::Exists),
478 Ok(false)
479 );
480 assert_eq!(
481 should_establish_session(SessionCheckResult::NotExists),
482 Ok(true)
483 );
484 assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
485 }
486
487 #[test]
489 fn test_protocol_address_format_for_session_lookup() {
490 use wacore::types::jid::JidExt;
491
492 let pn = Jid::pn("559999999999").with_device(0);
493 let addr = pn.to_protocol_address();
494 assert_eq!(addr.name(), "559999999999@c.us");
495 assert_eq!(u32::from(addr.device_id()), 0);
496 assert_eq!(addr.to_string(), "559999999999@c.us.0");
497
498 let companion = Jid::pn_device("559999999999", 33);
499 let companion_addr = companion.to_protocol_address();
500 assert_eq!(companion_addr.name(), "559999999999:33@c.us");
501 assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
502
503 let lid = Jid::lid("100000000000001").with_device(0);
504 let lid_addr = lid.to_protocol_address();
505 assert_eq!(lid_addr.name(), "100000000000001@lid");
506 assert_eq!(u32::from(lid_addr.device_id()), 0);
507 assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
508
509 let lid_device = Jid::lid_device("100000000000001", 33);
510 let lid_device_addr = lid_device.to_protocol_address();
511 assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
512 assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
513 }
514
515 #[test]
516 fn test_filter_logic_for_session_establishment() {
517 let jids = vec![
518 Jid::pn_device("111", 0),
519 Jid::pn_device("222", 0),
520 Jid::pn_device("333", 0),
521 ];
522
523 let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
525 match jid.user.as_str() {
526 "111" => Ok(true), "222" => Ok(false), "333" => Err("DB error"), _ => Ok(false),
530 }
531 };
532
533 let mut jids_needing_sessions = Vec::with_capacity(jids.len());
535 for jid in &jids {
536 match session_exists(jid) {
537 Ok(true) => {} Ok(false) => jids_needing_sessions.push(jid.clone()), Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), }
541 }
542
543 assert_eq!(jids_needing_sessions.len(), 1);
545 assert_eq!(jids_needing_sessions[0].user, "222");
546 }
547
548 #[test]
551 fn test_dual_addressing_pn_and_lid_are_independent() {
552 let pn_address = Jid::pn("551199887766").with_device(0);
553 let lid_address = Jid::lid("236395184570386").with_device(0);
554
555 assert_ne!(pn_address.user, lid_address.user);
556 assert_ne!(pn_address.server, lid_address.server);
557
558 use wacore::types::jid::JidExt;
559 let pn_signal_addr = pn_address.to_protocol_address();
560 let lid_signal_addr = lid_address.to_protocol_address();
561
562 assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
563 assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
564 assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
565 assert_eq!(pn_address.device, 0);
566 assert_eq!(lid_address.device, 0);
567 }
568
569 #[test]
570 fn test_lid_extraction_from_own_device() {
571 let own_lid_with_device = Jid::lid_device("236395184570386", 61);
572 let primary_lid = own_lid_with_device.with_device(0);
573
574 assert_eq!(primary_lid.user, "236395184570386");
575 assert_eq!(primary_lid.device, 0);
576 assert!(!primary_lid.is_ad());
577 }
578
579 #[test]
581 fn test_stale_session_scenario_documentation() {
582 fn should_establish_pn_session(pn_exists: bool) -> bool {
583 !pn_exists
584 }
585
586 fn should_establish_lid_session(_lid_exists: bool) -> bool {
587 false }
589
590 assert!(!should_establish_pn_session(true));
592 assert!(should_establish_pn_session(false));
594 assert!(!should_establish_lid_session(true));
596 assert!(!should_establish_lid_session(false));
597 }
598
599 #[test]
601 fn test_retry_mechanism_for_stale_sessions() {
602 const RETRY_ERROR_NO_SESSION: u8 = 1;
603 const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
604
605 fn action_for_error(error_code: u8) -> &'static str {
606 match error_code {
607 RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
608 RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
609 _ => "Unknown error",
610 }
611 }
612
613 assert_eq!(
614 action_for_error(RETRY_ERROR_NO_SESSION),
615 "Establish new session via prekey"
616 );
617 assert_eq!(
618 action_for_error(RETRY_ERROR_INVALID_MESSAGE),
619 "Delete stale session, resend message"
620 );
621 }
622
623 #[test]
624 fn test_session_establishment_lookup_normalization() {
625 use std::collections::HashMap;
626 use wacore_binary::jid::Jid;
627
628 let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new(); let normalized_jid = Jid::lid("123456789"); prekey_bundles.insert(normalized_jid.clone(), ());
634
635 let mut requested_jid = Jid::lid("123456789");
638 requested_jid.agent = 1;
639
640 assert!(
642 !prekey_bundles.contains_key(&requested_jid),
643 "Direct lookup of non-normalized JID should fail"
644 );
645
646 let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
649 assert!(
650 prekey_bundles.contains_key(&normalized_lookup),
651 "Normalized lookup should succeed"
652 );
653
654 assert_eq!(normalized_lookup, normalized_jid);
656 }
657}