Skip to main content

whatsapp_rust/client/
sessions.rs

1//! E2E Session management for Client.
2
3use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6use wacore::libsignal::store::SessionStore;
7use wacore::types::jid::JidExt;
8use wacore_binary::jid::Jid;
9
10use super::Client;
11use crate::types::events::{Event, OfflineSyncCompleted};
12
13impl Client {
14    /// WA Web: `WAWebOfflineResumeConst.OFFLINE_STANZA_TIMEOUT_MS = 60000`
15    pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
16
17    pub(crate) fn complete_offline_sync(&self, count: i32) {
18        self.offline_sync_metrics
19            .active
20            .store(false, Ordering::Release);
21        match self.offline_sync_metrics.start_time.lock() {
22            Ok(mut guard) => *guard = None,
23            Err(poison) => *poison.into_inner() = None,
24        }
25
26        // Signal that offline sync is complete - post-login tasks are waiting for this.
27        // This mimics WhatsApp Web's offlineDeliveryEnd event.
28        // Use compare_exchange to ensure we only run this once (add_permits is NOT idempotent).
29        // Install the wider semaphore BEFORE flipping the flag so that any thread
30        // observing offline_sync_completed=true already sees the 64-permit semaphore.
31        if self
32            .offline_sync_completed
33            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
34            .is_ok()
35        {
36            // Allow parallel message processing now that offline sync is done.
37            // During offline sync, permits=1 serialized all message processing.
38            // Replace with a new semaphore with 64 permits for concurrent processing.
39            // Old workers holding the previous semaphore Arc will finish normally.
40            self.swap_message_semaphore(64);
41
42            self.offline_sync_notifier.notify(usize::MAX);
43
44            self.core
45                .event_bus
46                .dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
47        }
48    }
49
50    /// Wait for offline message delivery to complete (with timeout).
51    pub(crate) async fn wait_for_offline_delivery_end(&self) {
52        self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
53            .await;
54    }
55
56    pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
57        let wait_generation = self.connection_generation.load(Ordering::Acquire);
58        let offline_fut = self.offline_sync_notifier.listen();
59        if self.offline_sync_completed.load(Ordering::Relaxed) {
60            return;
61        }
62
63        if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
64            .await
65            .is_err()
66        {
67            // Guard: don't complete sync for a stale connection generation.
68            // A reconnect may have happened while we were waiting, making this
69            // timeout belong to the old connection.
70            if self.connection_generation.load(Ordering::Acquire) != wait_generation
71                || self.expected_disconnect.load(Ordering::Relaxed)
72            {
73                log::debug!(
74                    target: "Client/OfflineSync",
75                    "Offline sync timeout ignored: connection generation changed or disconnected",
76                );
77                return;
78            }
79
80            let processed = self
81                .offline_sync_metrics
82                .processed_messages
83                .load(Ordering::Acquire);
84            let expected = self
85                .offline_sync_metrics
86                .total_messages
87                .load(Ordering::Acquire);
88            log::warn!(
89                target: "Client/OfflineSync",
90                "Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
91                timeout,
92                processed,
93                expected,
94            );
95            self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
96        }
97    }
98
99    pub(crate) fn begin_history_sync_task(&self) {
100        self.history_sync_tasks_in_flight
101            .fetch_add(1, Ordering::Relaxed);
102    }
103
104    pub(crate) fn finish_history_sync_task(&self) {
105        let previous = self
106            .history_sync_tasks_in_flight
107            .fetch_sub(1, Ordering::Relaxed);
108        if previous <= 1 {
109            self.history_sync_tasks_in_flight
110                .store(0, Ordering::Relaxed);
111            self.history_sync_idle_notifier.notify(usize::MAX);
112        }
113    }
114
115    pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
116        use anyhow::anyhow;
117        use wacore::time::Instant;
118
119        let deadline = Instant::now() + timeout;
120
121        // Register the notified future *before* checking state to avoid missing
122        // a notify_waiters() that fires between the check and the await.
123        let offline_fut = self.offline_sync_notifier.listen();
124        if !self.offline_sync_completed.load(Ordering::Relaxed) {
125            let remaining = deadline.saturating_duration_since(Instant::now());
126            wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
127                .await
128                .map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
129        }
130
131        loop {
132            let history_fut = self.history_sync_idle_notifier.listen();
133            if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
134                return Ok(());
135            }
136
137            let remaining = deadline.saturating_duration_since(Instant::now());
138            wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
139                .await
140                .map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
141        }
142    }
143
144    /// Ensure E2E sessions exist for the given device JIDs.
145    /// Waits for offline delivery, resolves LID mappings, then batches prekey fetches.
146    pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
147        use wacore::types::jid::JidExt;
148
149        if device_jids.is_empty() {
150            return Ok(());
151        }
152
153        self.wait_for_offline_delivery_end().await;
154        let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
155
156        let device_store = self.persistence_manager.get_device_arc().await;
157        let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
158
159        {
160            let device_guard = device_store.read().await;
161            for jid in resolved_jids {
162                let signal_addr = jid.to_protocol_address();
163                let addr_str = signal_addr.to_string();
164                // Check cache first (includes unflushed sessions), fall back to backend
165                match self
166                    .signal_cache
167                    .has_session(&addr_str, &*device_guard.backend)
168                    .await
169                {
170                    Ok(true) => {}
171                    Ok(false) => jids_needing_sessions.push(jid),
172                    Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
173                }
174            }
175        }
176
177        if jids_needing_sessions.is_empty() {
178            return Ok(());
179        }
180
181        for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
182            self.fetch_and_establish_sessions(batch).await?;
183        }
184
185        Ok(())
186    }
187
188    /// Fetch prekeys and establish sessions for a batch of JIDs.
189    /// Returns the number of sessions successfully established.
190    async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
191        use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
192        use wacore::types::jid::JidExt;
193
194        if jids.is_empty() {
195            return Ok(0);
196        }
197
198        let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
199
200        let device_store = self.persistence_manager.get_device_arc().await;
201        let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
202            device_store,
203            self.signal_cache.clone(),
204        );
205
206        let mut success_count = 0;
207        let mut missing_count = 0;
208        let mut failed_count = 0;
209
210        for jid in jids {
211            if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
212                let signal_addr = jid.to_protocol_address();
213
214                // Acquire per-sender session lock to prevent race with concurrent message decryption.
215                let signal_addr_str = signal_addr.to_string();
216                let session_mutex = self
217                    .session_locks
218                    .get_with(signal_addr_str.clone(), async {
219                        std::sync::Arc::new(async_lock::Mutex::new(()))
220                    })
221                    .await;
222                let _session_guard = session_mutex.lock().await;
223
224                match process_prekey_bundle(
225                    &signal_addr,
226                    &mut adapter.session_store,
227                    &mut adapter.identity_store,
228                    bundle,
229                    &mut rand::make_rng::<rand::rngs::StdRng>(),
230                    UsePQRatchet::No,
231                )
232                .await
233                {
234                    Ok(_) => {
235                        success_count += 1;
236                        log::debug!("Successfully established session with {}", jid);
237                    }
238                    Err(e) => {
239                        failed_count += 1;
240                        log::warn!("Failed to establish session with {}: {}", jid, e);
241                    }
242                }
243            } else {
244                missing_count += 1;
245                if jid.device == 0 {
246                    log::warn!("Server did not return prekeys for primary phone {}", jid);
247                } else {
248                    log::debug!("Server did not return prekeys for {}", jid);
249                }
250            }
251        }
252
253        if missing_count > 0 || failed_count > 0 {
254            log::debug!(
255                "Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
256                success_count,
257                missing_count,
258                failed_count,
259                jids.len()
260            );
261        }
262
263        // Flush after all sessions established
264        if success_count > 0 {
265            self.flush_signal_cache().await?;
266        }
267
268        Ok(success_count)
269    }
270
271    /// Establish session with primary phone (device 0) immediately for PDO.
272    ///
273    /// Called during login BEFORE offline messages arrive. Checks both PN and LID
274    /// sessions but does NOT establish PN sessions proactively. The primary phone's
275    /// PN session will be established via LID pkmsg when needed, which prevents
276    /// dual-session conflicts where both PN and LID sessions exist for the same user.
277    /// This matches WhatsApp Web's `prekey_fetch_iq_pnh_lid_enabled: false` behavior.
278    ///
279    /// Returns error if session check fails (fail-safe to prevent replacing existing sessions).
280    pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
281        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
282
283        let own_pn = device_snapshot
284            .pn
285            .clone()
286            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
287
288        let primary_phone_pn = own_pn.with_device(0);
289        let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
290
291        let pn_session_exists =
292            self.check_session_exists(&primary_phone_pn)
293                .await
294                .map_err(|e| {
295                    anyhow::anyhow!(
296                        "Cannot verify PN session existence for primary phone {}: {}. \
297                     Refusing to establish session to prevent potential MAC failures.",
298                        primary_phone_pn,
299                        e
300                    )
301                })?;
302
303        // Don't proactively establish PN session - matches WhatsApp Web's
304        // prekey_fetch_iq_pnh_lid_enabled: false behavior. The primary phone will
305        // establish the session via pkmsg from LID address, which prevents dual-session
306        // conflicts where both PN and LID sessions exist for the same user.
307        if pn_session_exists {
308            log::debug!(
309                "PN session with primary phone {} already exists",
310                primary_phone_pn
311            );
312        } else {
313            log::debug!(
314                "No PN session with primary phone {} - will be established via LID pkmsg",
315                primary_phone_pn
316            );
317        }
318
319        // Check LID session existence (don't establish - primary phone does that via pkmsg)
320        if let Some(ref lid_jid) = primary_phone_lid {
321            match self.check_session_exists(lid_jid).await {
322                Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
323                Ok(false) => log::debug!(
324                    "No LID session with {} - established on first message",
325                    lid_jid
326                ),
327                Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
328            }
329        }
330
331        Ok(())
332    }
333
334    /// Check if a session exists for the given JID.
335    async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
336        let device_store = self.persistence_manager.get_device_arc().await;
337        let device_guard = device_store.read().await;
338        let signal_addr = jid.to_protocol_address();
339
340        device_guard
341            .contains_session(&signal_addr)
342            .await
343            .map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
351
352    #[test]
353    fn test_primary_phone_jid_creation_from_pn() {
354        let own_pn = Jid::pn("559999999999");
355        let primary_phone_jid = own_pn.with_device(0);
356
357        assert_eq!(primary_phone_jid.user, "559999999999");
358        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
359        assert_eq!(primary_phone_jid.device, 0);
360        assert_eq!(primary_phone_jid.agent, 0);
361        assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
362    }
363
364    #[test]
365    fn test_primary_phone_jid_overwrites_existing_device() {
366        // Edge case: pn with device ID should still produce device 0
367        let own_pn = Jid::pn_device("559999999999", 33);
368        let primary_phone_jid = own_pn.with_device(0);
369
370        assert_eq!(primary_phone_jid.user, "559999999999");
371        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
372        assert_eq!(primary_phone_jid.device, 0);
373    }
374
375    #[test]
376    fn test_primary_phone_jid_is_not_ad() {
377        let primary_phone_jid = Jid::pn("559999999999").with_device(0);
378        assert!(!primary_phone_jid.is_ad()); // device 0 is NOT an additional device
379    }
380
381    #[test]
382    fn test_linked_device_is_ad() {
383        let linked_device_jid = Jid::pn_device("559999999999", 33);
384        assert!(linked_device_jid.is_ad()); // device > 0 IS an additional device
385    }
386
387    #[test]
388    fn test_primary_phone_jid_from_lid() {
389        let own_lid = Jid::lid("100000000000001");
390        let primary_phone_jid = own_lid.with_device(0);
391
392        assert_eq!(primary_phone_jid.user, "100000000000001");
393        assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
394        assert_eq!(primary_phone_jid.device, 0);
395        assert!(!primary_phone_jid.is_ad());
396    }
397
398    #[test]
399    fn test_primary_phone_jid_roundtrip() {
400        let own_pn = Jid::pn("559999999999");
401        let primary_phone_jid = own_pn.with_device(0);
402
403        let jid_string = primary_phone_jid.to_string();
404        assert_eq!(jid_string, "559999999999@s.whatsapp.net");
405
406        let parsed: Jid = jid_string.parse().expect("JID should be parseable");
407        assert_eq!(parsed.user, "559999999999");
408        assert_eq!(parsed.server, DEFAULT_USER_SERVER);
409        assert_eq!(parsed.device, 0);
410    }
411
412    #[test]
413    fn test_with_device_preserves_identity() {
414        let pn = Jid::pn("1234567890");
415        let pn_device_0 = pn.with_device(0);
416        let pn_device_5 = pn.with_device(5);
417
418        assert_eq!(pn_device_0.user, pn_device_5.user);
419        assert_eq!(pn_device_0.server, pn_device_5.server);
420        assert_eq!(pn_device_0.device, 0);
421        assert_eq!(pn_device_5.device, 5);
422
423        let lid = Jid::lid("100000012345678");
424        let lid_device_0 = lid.with_device(0);
425        let lid_device_33 = lid.with_device(33);
426
427        assert_eq!(lid_device_0.user, lid_device_33.user);
428        assert_eq!(lid_device_0.server, lid_device_33.server);
429        assert_eq!(lid_device_0.device, 0);
430        assert_eq!(lid_device_33.device, 33);
431    }
432
433    #[test]
434    fn test_primary_phone_vs_companion_devices() {
435        let user = "559999999999";
436        let primary = Jid::pn(user).with_device(0);
437        let companion_web = Jid::pn_device(user, 33);
438        let companion_desktop = Jid::pn_device(user, 34);
439
440        // All share the same user
441        assert_eq!(primary.user, companion_web.user);
442        assert_eq!(primary.user, companion_desktop.user);
443
444        // But have different device IDs
445        assert_eq!(primary.device, 0);
446        assert_eq!(companion_web.device, 33);
447        assert_eq!(companion_desktop.device, 34);
448
449        // Primary is NOT AD, companions ARE AD
450        assert!(!primary.is_ad());
451        assert!(companion_web.is_ad());
452        assert!(companion_desktop.is_ad());
453    }
454
455    /// Session check must succeed before establishment (fail-safe behavior).
456    #[test]
457    fn test_session_check_behavior_documentation() {
458        // Ok(true) -> skip, Ok(false) -> establish, Err -> fail-safe
459        enum SessionCheckResult {
460            Exists,
461            NotExists,
462            CheckFailed,
463        }
464
465        fn should_establish_session(
466            check_result: SessionCheckResult,
467        ) -> Result<bool, &'static str> {
468            match check_result {
469                SessionCheckResult::Exists => Ok(false),   // Don't establish
470                SessionCheckResult::NotExists => Ok(true), // Do establish
471                SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
472            }
473        }
474
475        // Test cases
476        assert_eq!(
477            should_establish_session(SessionCheckResult::Exists),
478            Ok(false)
479        );
480        assert_eq!(
481            should_establish_session(SessionCheckResult::NotExists),
482            Ok(true)
483        );
484        assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
485    }
486
487    /// Protocol address format: {user}[:device]@{server}.0
488    #[test]
489    fn test_protocol_address_format_for_session_lookup() {
490        use wacore::types::jid::JidExt;
491
492        let pn = Jid::pn("559999999999").with_device(0);
493        let addr = pn.to_protocol_address();
494        assert_eq!(addr.name(), "559999999999@c.us");
495        assert_eq!(u32::from(addr.device_id()), 0);
496        assert_eq!(addr.to_string(), "559999999999@c.us.0");
497
498        let companion = Jid::pn_device("559999999999", 33);
499        let companion_addr = companion.to_protocol_address();
500        assert_eq!(companion_addr.name(), "559999999999:33@c.us");
501        assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
502
503        let lid = Jid::lid("100000000000001").with_device(0);
504        let lid_addr = lid.to_protocol_address();
505        assert_eq!(lid_addr.name(), "100000000000001@lid");
506        assert_eq!(u32::from(lid_addr.device_id()), 0);
507        assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
508
509        let lid_device = Jid::lid_device("100000000000001", 33);
510        let lid_device_addr = lid_device.to_protocol_address();
511        assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
512        assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
513    }
514
515    #[test]
516    fn test_filter_logic_for_session_establishment() {
517        let jids = vec![
518            Jid::pn_device("111", 0),
519            Jid::pn_device("222", 0),
520            Jid::pn_device("333", 0),
521        ];
522
523        // Simulate contains_session results
524        let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
525            match jid.user.as_str() {
526                "111" => Ok(true),        // Session exists
527                "222" => Ok(false),       // No session
528                "333" => Err("DB error"), // Error
529                _ => Ok(false),
530            }
531        };
532
533        // Apply filter logic (matching ensure_e2e_sessions behavior)
534        let mut jids_needing_sessions = Vec::with_capacity(jids.len());
535        for jid in &jids {
536            match session_exists(jid) {
537                Ok(true) => {}                                        // Skip - session exists
538                Ok(false) => jids_needing_sessions.push(jid.clone()), // Needs session
539                Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), // Skip on error
540            }
541        }
542
543        // Only "222" should need a session
544        assert_eq!(jids_needing_sessions.len(), 1);
545        assert_eq!(jids_needing_sessions[0].user, "222");
546    }
547
548    // PN and LID have independent Signal sessions
549
550    #[test]
551    fn test_dual_addressing_pn_and_lid_are_independent() {
552        let pn_address = Jid::pn("551199887766").with_device(0);
553        let lid_address = Jid::lid("236395184570386").with_device(0);
554
555        assert_ne!(pn_address.user, lid_address.user);
556        assert_ne!(pn_address.server, lid_address.server);
557
558        use wacore::types::jid::JidExt;
559        let pn_signal_addr = pn_address.to_protocol_address();
560        let lid_signal_addr = lid_address.to_protocol_address();
561
562        assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
563        assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
564        assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
565        assert_eq!(pn_address.device, 0);
566        assert_eq!(lid_address.device, 0);
567    }
568
569    #[test]
570    fn test_lid_extraction_from_own_device() {
571        let own_lid_with_device = Jid::lid_device("236395184570386", 61);
572        let primary_lid = own_lid_with_device.with_device(0);
573
574        assert_eq!(primary_lid.user, "236395184570386");
575        assert_eq!(primary_lid.device, 0);
576        assert!(!primary_lid.is_ad());
577    }
578
579    /// PN sessions established proactively, LID sessions established by primary phone.
580    #[test]
581    fn test_stale_session_scenario_documentation() {
582        fn should_establish_pn_session(pn_exists: bool) -> bool {
583            !pn_exists
584        }
585
586        fn should_establish_lid_session(_lid_exists: bool) -> bool {
587            false // Primary phone establishes LID sessions via pkmsg
588        }
589
590        // PN exists -> don't establish
591        assert!(!should_establish_pn_session(true));
592        // PN doesn't exist -> establish
593        assert!(should_establish_pn_session(false));
594        // LID never established proactively
595        assert!(!should_establish_lid_session(true));
596        assert!(!should_establish_lid_session(false));
597    }
598
599    /// Retry mechanism: error=1 (NoSession), error=4 (InvalidMessage/MAC failure)
600    #[test]
601    fn test_retry_mechanism_for_stale_sessions() {
602        const RETRY_ERROR_NO_SESSION: u8 = 1;
603        const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
604
605        fn action_for_error(error_code: u8) -> &'static str {
606            match error_code {
607                RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
608                RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
609                _ => "Unknown error",
610            }
611        }
612
613        assert_eq!(
614            action_for_error(RETRY_ERROR_NO_SESSION),
615            "Establish new session via prekey"
616        );
617        assert_eq!(
618            action_for_error(RETRY_ERROR_INVALID_MESSAGE),
619            "Delete stale session, resend message"
620        );
621    }
622
623    #[test]
624    fn test_session_establishment_lookup_normalization() {
625        use std::collections::HashMap;
626        use wacore_binary::jid::Jid;
627
628        // Represents the bundle map returned by fetch_pre_keys
629        // (keys are normalized by parsing logic as verified in wacore/src/prekeys.rs)
630        let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new(); // Using () as mock bundle placeholder
631
632        let normalized_jid = Jid::lid("123456789"); // agent=0
633        prekey_bundles.insert(normalized_jid.clone(), ());
634
635        // Represents the JID from the device list (e.g. from ensure_e2e_sessions)
636        // which might have agent=1 due to some upstream source or parsing quirk
637        let mut requested_jid = Jid::lid("123456789");
638        requested_jid.agent = 1;
639
640        // 1. Verify direct lookup fails (This is the bug)
641        assert!(
642            !prekey_bundles.contains_key(&requested_jid),
643            "Direct lookup of non-normalized JID should fail"
644        );
645
646        // 2. Verify normalized lookup succeeds (This is the fix)
647        // This mirrors the logic change in fetch_and_establish_sessions
648        let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
649        assert!(
650            prekey_bundles.contains_key(&normalized_lookup),
651            "Normalized lookup should succeed"
652        );
653
654        // Ensure the normalization actually produced the key we stored
655        assert_eq!(normalized_lookup, normalized_jid);
656    }
657}