Skip to main content

whatsapp_rust/client/
sessions.rs

1//! E2E Session management for Client.
2
3use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6use wacore::libsignal::store::SessionStore;
7use wacore::types::jid::JidExt;
8use wacore_binary::jid::Jid;
9
10use super::Client;
11use crate::types::events::{Event, OfflineSyncCompleted};
12
13impl Client {
14    /// WA Web: `WAWebOfflineResumeConst.OFFLINE_STANZA_TIMEOUT_MS = 60000`
15    pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
16
17    pub(crate) fn complete_offline_sync(&self, count: i32) {
18        self.offline_sync_metrics
19            .active
20            .store(false, Ordering::Release);
21        match self.offline_sync_metrics.start_time.lock() {
22            Ok(mut guard) => *guard = None,
23            Err(poison) => *poison.into_inner() = None,
24        }
25
26        // Signal that offline sync is complete - post-login tasks are waiting for this.
27        // This mimics WhatsApp Web's offlineDeliveryEnd event.
28        // Use compare_exchange to ensure we only run this once (add_permits is NOT idempotent).
29        // Install the wider semaphore BEFORE flipping the flag so that any thread
30        // observing offline_sync_completed=true already sees the 64-permit semaphore.
31        if self
32            .offline_sync_completed
33            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
34            .is_ok()
35        {
36            // Allow parallel message processing now that offline sync is done.
37            // During offline sync, permits=1 serialized all message processing.
38            // Replace with a new semaphore with 64 permits for concurrent processing.
39            // Old workers holding the previous semaphore Arc will finish normally.
40            self.swap_message_semaphore(64);
41
42            self.offline_sync_notifier.notify(usize::MAX);
43
44            self.core
45                .event_bus
46                .dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
47        }
48    }
49
50    /// Wait for offline message delivery to complete (with timeout).
51    pub(crate) async fn wait_for_offline_delivery_end(&self) {
52        self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
53            .await;
54    }
55
56    pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
57        let wait_generation = self.connection_generation.load(Ordering::Acquire);
58        let offline_fut = self.offline_sync_notifier.listen();
59        if self.offline_sync_completed.load(Ordering::Relaxed) {
60            return;
61        }
62
63        if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
64            .await
65            .is_err()
66        {
67            // Guard: don't complete sync for a stale connection generation.
68            // A reconnect may have happened while we were waiting, making this
69            // timeout belong to the old connection.
70            if self.connection_generation.load(Ordering::Acquire) != wait_generation
71                || self.expected_disconnect.load(Ordering::Relaxed)
72            {
73                log::debug!(
74                    target: "Client/OfflineSync",
75                    "Offline sync timeout ignored: connection generation changed or disconnected",
76                );
77                return;
78            }
79
80            let processed = self
81                .offline_sync_metrics
82                .processed_messages
83                .load(Ordering::Acquire);
84            let expected = self
85                .offline_sync_metrics
86                .total_messages
87                .load(Ordering::Acquire);
88            log::warn!(
89                target: "Client/OfflineSync",
90                "Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
91                timeout,
92                processed,
93                expected,
94            );
95            self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
96        }
97    }
98
99    pub(crate) fn begin_history_sync_task(&self) {
100        self.history_sync_tasks_in_flight
101            .fetch_add(1, Ordering::Relaxed);
102    }
103
104    pub(crate) fn finish_history_sync_task(&self) {
105        let previous = self
106            .history_sync_tasks_in_flight
107            .fetch_sub(1, Ordering::Relaxed);
108        if previous <= 1 {
109            self.history_sync_tasks_in_flight
110                .store(0, Ordering::Relaxed);
111            self.history_sync_idle_notifier.notify(usize::MAX);
112        }
113    }
114
115    pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
116        use anyhow::anyhow;
117        use wacore::time::Instant;
118
119        let deadline = Instant::now() + timeout;
120
121        // Register the notified future *before* checking state to avoid missing
122        // a notify_waiters() that fires between the check and the await.
123        let offline_fut = self.offline_sync_notifier.listen();
124        if !self.offline_sync_completed.load(Ordering::Relaxed) {
125            let remaining = deadline.saturating_duration_since(Instant::now());
126            wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
127                .await
128                .map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
129        }
130
131        loop {
132            let history_fut = self.history_sync_idle_notifier.listen();
133            if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
134                return Ok(());
135            }
136
137            let remaining = deadline.saturating_duration_since(Instant::now());
138            wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
139                .await
140                .map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
141        }
142    }
143
144    /// Ensure E2E sessions exist for the given device JIDs.
145    /// Waits for offline delivery, resolves LID mappings, then batches prekey fetches.
146    pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
147        use wacore::types::jid::JidExt;
148
149        if device_jids.is_empty() {
150            return Ok(());
151        }
152
153        self.wait_for_offline_delivery_end().await;
154        let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
155
156        let device_store = self.persistence_manager.get_device_arc().await;
157        let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
158
159        {
160            let device_guard = device_store.read().await;
161            for jid in resolved_jids {
162                let signal_addr = jid.to_protocol_address();
163                // Check cache first (includes unflushed sessions), fall back to backend
164                match self
165                    .signal_cache
166                    .has_session(&signal_addr, &*device_guard.backend)
167                    .await
168                {
169                    Ok(true) => {}
170                    Ok(false) => jids_needing_sessions.push(jid),
171                    Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
172                }
173            }
174        }
175
176        if jids_needing_sessions.is_empty() {
177            return Ok(());
178        }
179
180        for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
181            self.fetch_and_establish_sessions(batch).await?;
182        }
183
184        Ok(())
185    }
186
187    /// Fetch prekeys and establish sessions for a batch of JIDs.
188    /// Returns the number of sessions successfully established.
189    async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
190        use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
191        use wacore::types::jid::JidExt;
192
193        if jids.is_empty() {
194            return Ok(0);
195        }
196
197        let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
198
199        let device_store = self.persistence_manager.get_device_arc().await;
200        let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
201            device_store,
202            self.signal_cache.clone(),
203        );
204
205        let mut success_count = 0;
206        let mut missing_count = 0;
207        let mut failed_count = 0;
208
209        for jid in jids {
210            if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
211                let signal_addr = jid.to_protocol_address();
212
213                // Acquire per-sender session lock to prevent race with concurrent message decryption.
214                let session_mutex = self
215                    .session_locks
216                    .get_with_by_ref(signal_addr.as_str(), async {
217                        std::sync::Arc::new(async_lock::Mutex::new(()))
218                    })
219                    .await;
220                let _session_guard = session_mutex.lock().await;
221
222                match process_prekey_bundle(
223                    &signal_addr,
224                    &mut adapter.session_store,
225                    &mut adapter.identity_store,
226                    bundle,
227                    &mut rand::make_rng::<rand::rngs::StdRng>(),
228                    UsePQRatchet::No,
229                )
230                .await
231                {
232                    Ok(_) => {
233                        success_count += 1;
234                        log::debug!("Successfully established session with {}", jid);
235                    }
236                    Err(e) => {
237                        failed_count += 1;
238                        log::warn!("Failed to establish session with {}: {}", jid, e);
239                    }
240                }
241            } else {
242                missing_count += 1;
243                if jid.device == 0 {
244                    log::warn!("Server did not return prekeys for primary phone {}", jid);
245                } else {
246                    log::debug!("Server did not return prekeys for {}", jid);
247                }
248            }
249        }
250
251        if missing_count > 0 || failed_count > 0 {
252            log::debug!(
253                "Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
254                success_count,
255                missing_count,
256                failed_count,
257                jids.len()
258            );
259        }
260
261        // Flush after all sessions established
262        if success_count > 0 {
263            self.flush_signal_cache().await?;
264        }
265
266        Ok(success_count)
267    }
268
269    /// Establish session with primary phone (device 0) immediately for PDO.
270    ///
271    /// Called during login BEFORE offline messages arrive. Checks both PN and LID
272    /// sessions but does NOT establish PN sessions proactively. The primary phone's
273    /// PN session will be established via LID pkmsg when needed, which prevents
274    /// dual-session conflicts where both PN and LID sessions exist for the same user.
275    /// This matches WhatsApp Web's `prekey_fetch_iq_pnh_lid_enabled: false` behavior.
276    ///
277    /// Returns error if session check fails (fail-safe to prevent replacing existing sessions).
278    pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
279        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
280
281        let own_pn = device_snapshot
282            .pn
283            .clone()
284            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
285
286        let primary_phone_pn = own_pn.with_device(0);
287        let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
288
289        let pn_session_exists =
290            self.check_session_exists(&primary_phone_pn)
291                .await
292                .map_err(|e| {
293                    anyhow::anyhow!(
294                        "Cannot verify PN session existence for primary phone {}: {}. \
295                     Refusing to establish session to prevent potential MAC failures.",
296                        primary_phone_pn,
297                        e
298                    )
299                })?;
300
301        // Don't proactively establish PN session - matches WhatsApp Web's
302        // prekey_fetch_iq_pnh_lid_enabled: false behavior. The primary phone will
303        // establish the session via pkmsg from LID address, which prevents dual-session
304        // conflicts where both PN and LID sessions exist for the same user.
305        if pn_session_exists {
306            log::debug!(
307                "PN session with primary phone {} already exists",
308                primary_phone_pn
309            );
310        } else {
311            log::debug!(
312                "No PN session with primary phone {} - will be established via LID pkmsg",
313                primary_phone_pn
314            );
315        }
316
317        // Check LID session existence (don't establish - primary phone does that via pkmsg)
318        if let Some(ref lid_jid) = primary_phone_lid {
319            match self.check_session_exists(lid_jid).await {
320                Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
321                Ok(false) => log::debug!(
322                    "No LID session with {} - established on first message",
323                    lid_jid
324                ),
325                Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
326            }
327        }
328
329        Ok(())
330    }
331
332    /// Check if a session exists for the given JID.
333    async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
334        let device_store = self.persistence_manager.get_device_arc().await;
335        let device_guard = device_store.read().await;
336        let signal_addr = jid.to_protocol_address();
337
338        device_guard
339            .contains_session(&signal_addr)
340            .await
341            .map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
349
350    #[test]
351    fn test_primary_phone_jid_creation_from_pn() {
352        let own_pn = Jid::pn("559999999999");
353        let primary_phone_jid = own_pn.with_device(0);
354
355        assert_eq!(primary_phone_jid.user, "559999999999");
356        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
357        assert_eq!(primary_phone_jid.device, 0);
358        assert_eq!(primary_phone_jid.agent, 0);
359        assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
360    }
361
362    #[test]
363    fn test_primary_phone_jid_overwrites_existing_device() {
364        // Edge case: pn with device ID should still produce device 0
365        let own_pn = Jid::pn_device("559999999999", 33);
366        let primary_phone_jid = own_pn.with_device(0);
367
368        assert_eq!(primary_phone_jid.user, "559999999999");
369        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
370        assert_eq!(primary_phone_jid.device, 0);
371    }
372
373    #[test]
374    fn test_primary_phone_jid_is_not_ad() {
375        let primary_phone_jid = Jid::pn("559999999999").with_device(0);
376        assert!(!primary_phone_jid.is_ad()); // device 0 is NOT an additional device
377    }
378
379    #[test]
380    fn test_linked_device_is_ad() {
381        let linked_device_jid = Jid::pn_device("559999999999", 33);
382        assert!(linked_device_jid.is_ad()); // device > 0 IS an additional device
383    }
384
385    #[test]
386    fn test_primary_phone_jid_from_lid() {
387        let own_lid = Jid::lid("100000000000001");
388        let primary_phone_jid = own_lid.with_device(0);
389
390        assert_eq!(primary_phone_jid.user, "100000000000001");
391        assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
392        assert_eq!(primary_phone_jid.device, 0);
393        assert!(!primary_phone_jid.is_ad());
394    }
395
396    #[test]
397    fn test_primary_phone_jid_roundtrip() {
398        let own_pn = Jid::pn("559999999999");
399        let primary_phone_jid = own_pn.with_device(0);
400
401        let jid_string = primary_phone_jid.to_string();
402        assert_eq!(jid_string, "559999999999@s.whatsapp.net");
403
404        let parsed: Jid = jid_string.parse().expect("JID should be parseable");
405        assert_eq!(parsed.user, "559999999999");
406        assert_eq!(parsed.server, DEFAULT_USER_SERVER);
407        assert_eq!(parsed.device, 0);
408    }
409
410    #[test]
411    fn test_with_device_preserves_identity() {
412        let pn = Jid::pn("1234567890");
413        let pn_device_0 = pn.with_device(0);
414        let pn_device_5 = pn.with_device(5);
415
416        assert_eq!(pn_device_0.user, pn_device_5.user);
417        assert_eq!(pn_device_0.server, pn_device_5.server);
418        assert_eq!(pn_device_0.device, 0);
419        assert_eq!(pn_device_5.device, 5);
420
421        let lid = Jid::lid("100000012345678");
422        let lid_device_0 = lid.with_device(0);
423        let lid_device_33 = lid.with_device(33);
424
425        assert_eq!(lid_device_0.user, lid_device_33.user);
426        assert_eq!(lid_device_0.server, lid_device_33.server);
427        assert_eq!(lid_device_0.device, 0);
428        assert_eq!(lid_device_33.device, 33);
429    }
430
431    #[test]
432    fn test_primary_phone_vs_companion_devices() {
433        let user = "559999999999";
434        let primary = Jid::pn(user).with_device(0);
435        let companion_web = Jid::pn_device(user, 33);
436        let companion_desktop = Jid::pn_device(user, 34);
437
438        // All share the same user
439        assert_eq!(primary.user, companion_web.user);
440        assert_eq!(primary.user, companion_desktop.user);
441
442        // But have different device IDs
443        assert_eq!(primary.device, 0);
444        assert_eq!(companion_web.device, 33);
445        assert_eq!(companion_desktop.device, 34);
446
447        // Primary is NOT AD, companions ARE AD
448        assert!(!primary.is_ad());
449        assert!(companion_web.is_ad());
450        assert!(companion_desktop.is_ad());
451    }
452
453    /// Session check must succeed before establishment (fail-safe behavior).
454    #[test]
455    fn test_session_check_behavior_documentation() {
456        // Ok(true) -> skip, Ok(false) -> establish, Err -> fail-safe
457        enum SessionCheckResult {
458            Exists,
459            NotExists,
460            CheckFailed,
461        }
462
463        fn should_establish_session(
464            check_result: SessionCheckResult,
465        ) -> Result<bool, &'static str> {
466            match check_result {
467                SessionCheckResult::Exists => Ok(false),   // Don't establish
468                SessionCheckResult::NotExists => Ok(true), // Do establish
469                SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
470            }
471        }
472
473        // Test cases
474        assert_eq!(
475            should_establish_session(SessionCheckResult::Exists),
476            Ok(false)
477        );
478        assert_eq!(
479            should_establish_session(SessionCheckResult::NotExists),
480            Ok(true)
481        );
482        assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
483    }
484
485    /// Protocol address format: {user}[:device]@{server}.0
486    #[test]
487    fn test_protocol_address_format_for_session_lookup() {
488        use wacore::types::jid::JidExt;
489
490        let pn = Jid::pn("559999999999").with_device(0);
491        let addr = pn.to_protocol_address();
492        assert_eq!(addr.name(), "559999999999@c.us");
493        assert_eq!(u32::from(addr.device_id()), 0);
494        assert_eq!(addr.to_string(), "559999999999@c.us.0");
495
496        let companion = Jid::pn_device("559999999999", 33);
497        let companion_addr = companion.to_protocol_address();
498        assert_eq!(companion_addr.name(), "559999999999:33@c.us");
499        assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
500
501        let lid = Jid::lid("100000000000001").with_device(0);
502        let lid_addr = lid.to_protocol_address();
503        assert_eq!(lid_addr.name(), "100000000000001@lid");
504        assert_eq!(u32::from(lid_addr.device_id()), 0);
505        assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
506
507        let lid_device = Jid::lid_device("100000000000001", 33);
508        let lid_device_addr = lid_device.to_protocol_address();
509        assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
510        assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
511    }
512
513    #[test]
514    fn test_filter_logic_for_session_establishment() {
515        let jids = vec![
516            Jid::pn_device("111", 0),
517            Jid::pn_device("222", 0),
518            Jid::pn_device("333", 0),
519        ];
520
521        // Simulate contains_session results
522        let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
523            match jid.user.as_str() {
524                "111" => Ok(true),        // Session exists
525                "222" => Ok(false),       // No session
526                "333" => Err("DB error"), // Error
527                _ => Ok(false),
528            }
529        };
530
531        // Apply filter logic (matching ensure_e2e_sessions behavior)
532        let mut jids_needing_sessions = Vec::with_capacity(jids.len());
533        for jid in &jids {
534            match session_exists(jid) {
535                Ok(true) => {}                                        // Skip - session exists
536                Ok(false) => jids_needing_sessions.push(jid.clone()), // Needs session
537                Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), // Skip on error
538            }
539        }
540
541        // Only "222" should need a session
542        assert_eq!(jids_needing_sessions.len(), 1);
543        assert_eq!(jids_needing_sessions[0].user, "222");
544    }
545
546    // PN and LID have independent Signal sessions
547
548    #[test]
549    fn test_dual_addressing_pn_and_lid_are_independent() {
550        let pn_address = Jid::pn("551199887766").with_device(0);
551        let lid_address = Jid::lid("236395184570386").with_device(0);
552
553        assert_ne!(pn_address.user, lid_address.user);
554        assert_ne!(pn_address.server, lid_address.server);
555
556        use wacore::types::jid::JidExt;
557        let pn_signal_addr = pn_address.to_protocol_address();
558        let lid_signal_addr = lid_address.to_protocol_address();
559
560        assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
561        assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
562        assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
563        assert_eq!(pn_address.device, 0);
564        assert_eq!(lid_address.device, 0);
565    }
566
567    #[test]
568    fn test_lid_extraction_from_own_device() {
569        let own_lid_with_device = Jid::lid_device("236395184570386", 61);
570        let primary_lid = own_lid_with_device.with_device(0);
571
572        assert_eq!(primary_lid.user, "236395184570386");
573        assert_eq!(primary_lid.device, 0);
574        assert!(!primary_lid.is_ad());
575    }
576
577    /// PN sessions established proactively, LID sessions established by primary phone.
578    #[test]
579    fn test_stale_session_scenario_documentation() {
580        fn should_establish_pn_session(pn_exists: bool) -> bool {
581            !pn_exists
582        }
583
584        fn should_establish_lid_session(_lid_exists: bool) -> bool {
585            false // Primary phone establishes LID sessions via pkmsg
586        }
587
588        // PN exists -> don't establish
589        assert!(!should_establish_pn_session(true));
590        // PN doesn't exist -> establish
591        assert!(should_establish_pn_session(false));
592        // LID never established proactively
593        assert!(!should_establish_lid_session(true));
594        assert!(!should_establish_lid_session(false));
595    }
596
597    /// Retry mechanism: error=1 (NoSession), error=4 (InvalidMessage/MAC failure)
598    #[test]
599    fn test_retry_mechanism_for_stale_sessions() {
600        const RETRY_ERROR_NO_SESSION: u8 = 1;
601        const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
602
603        fn action_for_error(error_code: u8) -> &'static str {
604            match error_code {
605                RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
606                RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
607                _ => "Unknown error",
608            }
609        }
610
611        assert_eq!(
612            action_for_error(RETRY_ERROR_NO_SESSION),
613            "Establish new session via prekey"
614        );
615        assert_eq!(
616            action_for_error(RETRY_ERROR_INVALID_MESSAGE),
617            "Delete stale session, resend message"
618        );
619    }
620
621    #[test]
622    fn test_session_establishment_lookup_normalization() {
623        use std::collections::HashMap;
624        use wacore_binary::jid::Jid;
625
626        // Represents the bundle map returned by fetch_pre_keys
627        // (keys are normalized by parsing logic as verified in wacore/src/prekeys.rs)
628        let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new(); // Using () as mock bundle placeholder
629
630        let normalized_jid = Jid::lid("123456789"); // agent=0
631        prekey_bundles.insert(normalized_jid.clone(), ());
632
633        // Represents the JID from the device list (e.g. from ensure_e2e_sessions)
634        // which might have agent=1 due to some upstream source or parsing quirk
635        let mut requested_jid = Jid::lid("123456789");
636        requested_jid.agent = 1;
637
638        // 1. Verify direct lookup fails (This is the bug)
639        assert!(
640            !prekey_bundles.contains_key(&requested_jid),
641            "Direct lookup of non-normalized JID should fail"
642        );
643
644        // 2. Verify normalized lookup succeeds (This is the fix)
645        // This mirrors the logic change in fetch_and_establish_sessions
646        let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
647        assert!(
648            prekey_bundles.contains_key(&normalized_lookup),
649            "Normalized lookup should succeed"
650        );
651
652        // Ensure the normalization actually produced the key we stored
653        assert_eq!(normalized_lookup, normalized_jid);
654    }
655}