Skip to main content

whatsapp_rust/client/
sessions.rs

1//! E2E Session management for Client.
2
3use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6use wacore::libsignal::store::SessionStore;
7use wacore::types::jid::JidExt;
8use wacore_binary::jid::Jid;
9
10use super::Client;
11use crate::types::events::{Event, OfflineSyncCompleted};
12
13impl Client {
14    /// WA Web: `WAWebOfflineResumeConst.OFFLINE_STANZA_TIMEOUT_MS = 60000`
15    pub(crate) const DEFAULT_OFFLINE_SYNC_TIMEOUT: Duration = Duration::from_secs(60);
16
17    pub(crate) fn complete_offline_sync(&self, count: i32) {
18        self.offline_sync_metrics
19            .active
20            .store(false, Ordering::Release);
21        match self.offline_sync_metrics.start_time.lock() {
22            Ok(mut guard) => *guard = None,
23            Err(poison) => *poison.into_inner() = None,
24        }
25
26        // Signal that offline sync is complete - post-login tasks are waiting for this.
27        // This mimics WhatsApp Web's offlineDeliveryEnd event.
28        // Use compare_exchange to ensure we only run this once (add_permits is NOT idempotent).
29        // Install the wider semaphore BEFORE flipping the flag so that any thread
30        // observing offline_sync_completed=true already sees the 64-permit semaphore.
31        if self
32            .offline_sync_completed
33            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
34            .is_ok()
35        {
36            // Allow parallel message processing now that offline sync is done.
37            // During offline sync, permits=1 serialized all message processing.
38            // Replace with a new semaphore with 64 permits for concurrent processing.
39            // Old workers holding the previous semaphore Arc will finish normally.
40            *self
41                .message_processing_semaphore
42                .lock()
43                .expect("message_processing_semaphore poisoned") =
44                std::sync::Arc::new(async_lock::Semaphore::new(64));
45
46            self.offline_sync_notifier.notify(usize::MAX);
47
48            self.core
49                .event_bus
50                .dispatch(&Event::OfflineSyncCompleted(OfflineSyncCompleted { count }));
51        }
52    }
53
54    /// Wait for offline message delivery to complete (with timeout).
55    pub(crate) async fn wait_for_offline_delivery_end(&self) {
56        self.wait_for_offline_delivery_end_with_timeout(Self::DEFAULT_OFFLINE_SYNC_TIMEOUT)
57            .await;
58    }
59
60    pub(crate) async fn wait_for_offline_delivery_end_with_timeout(&self, timeout: Duration) {
61        let wait_generation = self.connection_generation.load(Ordering::Acquire);
62        let offline_fut = self.offline_sync_notifier.listen();
63        if self.offline_sync_completed.load(Ordering::Relaxed) {
64            return;
65        }
66
67        if wacore::runtime::timeout(&*self.runtime, timeout, offline_fut)
68            .await
69            .is_err()
70        {
71            // Guard: don't complete sync for a stale connection generation.
72            // A reconnect may have happened while we were waiting, making this
73            // timeout belong to the old connection.
74            if self.connection_generation.load(Ordering::Acquire) != wait_generation
75                || self.expected_disconnect.load(Ordering::Relaxed)
76            {
77                log::debug!(
78                    target: "Client/OfflineSync",
79                    "Offline sync timeout ignored: connection generation changed or disconnected",
80                );
81                return;
82            }
83
84            let processed = self
85                .offline_sync_metrics
86                .processed_messages
87                .load(Ordering::Acquire);
88            let expected = self
89                .offline_sync_metrics
90                .total_messages
91                .load(Ordering::Acquire);
92            log::warn!(
93                target: "Client/OfflineSync",
94                "Offline sync timed out after {:?} (processed {} of {} items); marking sync complete",
95                timeout,
96                processed,
97                expected,
98            );
99            self.complete_offline_sync(i32::try_from(processed).unwrap_or(i32::MAX));
100        }
101    }
102
103    pub(crate) fn begin_history_sync_task(&self) {
104        self.history_sync_tasks_in_flight
105            .fetch_add(1, Ordering::Relaxed);
106    }
107
108    pub(crate) fn finish_history_sync_task(&self) {
109        let previous = self
110            .history_sync_tasks_in_flight
111            .fetch_sub(1, Ordering::Relaxed);
112        if previous <= 1 {
113            self.history_sync_tasks_in_flight
114                .store(0, Ordering::Relaxed);
115            self.history_sync_idle_notifier.notify(usize::MAX);
116        }
117    }
118
119    pub async fn wait_for_startup_sync(&self, timeout: std::time::Duration) -> Result<()> {
120        use anyhow::anyhow;
121        use wacore::time::Instant;
122
123        let deadline = Instant::now() + timeout;
124
125        // Register the notified future *before* checking state to avoid missing
126        // a notify_waiters() that fires between the check and the await.
127        let offline_fut = self.offline_sync_notifier.listen();
128        if !self.offline_sync_completed.load(Ordering::Relaxed) {
129            let remaining = deadline.saturating_duration_since(Instant::now());
130            wacore::runtime::timeout(&*self.runtime, remaining, offline_fut)
131                .await
132                .map_err(|_| anyhow!("Timeout waiting for offline sync completion"))?;
133        }
134
135        loop {
136            let history_fut = self.history_sync_idle_notifier.listen();
137            if self.history_sync_tasks_in_flight.load(Ordering::Relaxed) == 0 {
138                return Ok(());
139            }
140
141            let remaining = deadline.saturating_duration_since(Instant::now());
142            wacore::runtime::timeout(&*self.runtime, remaining, history_fut)
143                .await
144                .map_err(|_| anyhow!("Timeout waiting for history sync tasks to become idle"))?;
145        }
146    }
147
148    /// Ensure E2E sessions exist for the given device JIDs.
149    /// Waits for offline delivery, resolves LID mappings, then batches prekey fetches.
150    pub(crate) async fn ensure_e2e_sessions(&self, device_jids: Vec<Jid>) -> Result<()> {
151        use wacore::types::jid::JidExt;
152
153        if device_jids.is_empty() {
154            return Ok(());
155        }
156
157        self.wait_for_offline_delivery_end().await;
158        let resolved_jids = self.resolve_lid_mappings(&device_jids).await;
159
160        let device_store = self.persistence_manager.get_device_arc().await;
161        let mut jids_needing_sessions = Vec::with_capacity(resolved_jids.len());
162
163        {
164            let device_guard = device_store.read().await;
165            for jid in resolved_jids {
166                let signal_addr = jid.to_protocol_address();
167                let addr_str = signal_addr.to_string();
168                // Check cache first (includes unflushed sessions), fall back to backend
169                match self
170                    .signal_cache
171                    .has_session(&addr_str, &*device_guard.backend)
172                    .await
173                {
174                    Ok(true) => {}
175                    Ok(false) => jids_needing_sessions.push(jid),
176                    Err(e) => log::warn!("Failed to check session for {}: {}", jid, e),
177                }
178            }
179        }
180
181        if jids_needing_sessions.is_empty() {
182            return Ok(());
183        }
184
185        for batch in jids_needing_sessions.chunks(crate::session::SESSION_CHECK_BATCH_SIZE) {
186            self.fetch_and_establish_sessions(batch).await?;
187        }
188
189        Ok(())
190    }
191
192    /// Fetch prekeys and establish sessions for a batch of JIDs.
193    /// Returns the number of sessions successfully established.
194    async fn fetch_and_establish_sessions(&self, jids: &[Jid]) -> Result<usize, anyhow::Error> {
195        use wacore::libsignal::protocol::{UsePQRatchet, process_prekey_bundle};
196        use wacore::types::jid::JidExt;
197
198        if jids.is_empty() {
199            return Ok(0);
200        }
201
202        let prekey_bundles = self.fetch_pre_keys(jids, Some("identity")).await?;
203
204        let device_store = self.persistence_manager.get_device_arc().await;
205        let mut adapter = crate::store::signal_adapter::SignalProtocolStoreAdapter::new(
206            device_store,
207            self.signal_cache.clone(),
208        );
209
210        let mut success_count = 0;
211        let mut missing_count = 0;
212        let mut failed_count = 0;
213
214        for jid in jids {
215            if let Some(bundle) = prekey_bundles.get(&jid.normalize_for_prekey_bundle()) {
216                let signal_addr = jid.to_protocol_address();
217
218                // Acquire per-sender session lock to prevent race with concurrent message decryption.
219                let signal_addr_str = signal_addr.to_string();
220                let session_mutex = self
221                    .session_locks
222                    .get_with(signal_addr_str.clone(), async {
223                        std::sync::Arc::new(async_lock::Mutex::new(()))
224                    })
225                    .await;
226                let _session_guard = session_mutex.lock().await;
227
228                match process_prekey_bundle(
229                    &signal_addr,
230                    &mut adapter.session_store,
231                    &mut adapter.identity_store,
232                    bundle,
233                    &mut rand::make_rng::<rand::rngs::StdRng>(),
234                    UsePQRatchet::No,
235                )
236                .await
237                {
238                    Ok(_) => {
239                        success_count += 1;
240                        log::debug!("Successfully established session with {}", jid);
241                    }
242                    Err(e) => {
243                        failed_count += 1;
244                        log::warn!("Failed to establish session with {}: {}", jid, e);
245                    }
246                }
247            } else {
248                missing_count += 1;
249                if jid.device == 0 {
250                    log::warn!("Server did not return prekeys for primary phone {}", jid);
251                } else {
252                    log::debug!("Server did not return prekeys for {}", jid);
253                }
254            }
255        }
256
257        if missing_count > 0 || failed_count > 0 {
258            log::debug!(
259                "Session establishment: {} succeeded, {} missing prekeys, {} failed (of {} requested)",
260                success_count,
261                missing_count,
262                failed_count,
263                jids.len()
264            );
265        }
266
267        // Flush after all sessions established
268        if success_count > 0 {
269            self.flush_signal_cache().await?;
270        }
271
272        Ok(success_count)
273    }
274
275    /// Establish session with primary phone (device 0) immediately for PDO.
276    ///
277    /// Called during login BEFORE offline messages arrive. Checks both PN and LID
278    /// sessions but does NOT establish PN sessions proactively. The primary phone's
279    /// PN session will be established via LID pkmsg when needed, which prevents
280    /// dual-session conflicts where both PN and LID sessions exist for the same user.
281    /// This matches WhatsApp Web's `prekey_fetch_iq_pnh_lid_enabled: false` behavior.
282    ///
283    /// Returns error if session check fails (fail-safe to prevent replacing existing sessions).
284    pub(crate) async fn establish_primary_phone_session_immediate(&self) -> Result<()> {
285        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
286
287        let own_pn = device_snapshot
288            .pn
289            .clone()
290            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
291
292        let primary_phone_pn = own_pn.with_device(0);
293        let primary_phone_lid = device_snapshot.lid.as_ref().map(|lid| lid.with_device(0));
294
295        let pn_session_exists =
296            self.check_session_exists(&primary_phone_pn)
297                .await
298                .map_err(|e| {
299                    anyhow::anyhow!(
300                        "Cannot verify PN session existence for primary phone {}: {}. \
301                     Refusing to establish session to prevent potential MAC failures.",
302                        primary_phone_pn,
303                        e
304                    )
305                })?;
306
307        // Don't proactively establish PN session - matches WhatsApp Web's
308        // prekey_fetch_iq_pnh_lid_enabled: false behavior. The primary phone will
309        // establish the session via pkmsg from LID address, which prevents dual-session
310        // conflicts where both PN and LID sessions exist for the same user.
311        if pn_session_exists {
312            log::debug!(
313                "PN session with primary phone {} already exists",
314                primary_phone_pn
315            );
316        } else {
317            log::debug!(
318                "No PN session with primary phone {} - will be established via LID pkmsg",
319                primary_phone_pn
320            );
321        }
322
323        // Check LID session existence (don't establish - primary phone does that via pkmsg)
324        if let Some(ref lid_jid) = primary_phone_lid {
325            match self.check_session_exists(lid_jid).await {
326                Ok(true) => log::debug!("LID session with {} already exists", lid_jid),
327                Ok(false) => log::debug!(
328                    "No LID session with {} - established on first message",
329                    lid_jid
330                ),
331                Err(e) => log::debug!("Could not check LID session for {}: {}", lid_jid, e),
332            }
333        }
334
335        Ok(())
336    }
337
338    /// Check if a session exists for the given JID.
339    async fn check_session_exists(&self, jid: &Jid) -> Result<bool, anyhow::Error> {
340        let device_store = self.persistence_manager.get_device_arc().await;
341        let device_guard = device_store.read().await;
342        let signal_addr = jid.to_protocol_address();
343
344        device_guard
345            .contains_session(&signal_addr)
346            .await
347            .map_err(|e| anyhow::anyhow!("Failed to check session for {}: {}", jid, e))
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use wacore_binary::jid::{DEFAULT_USER_SERVER, HIDDEN_USER_SERVER, JidExt};
355
356    #[test]
357    fn test_primary_phone_jid_creation_from_pn() {
358        let own_pn = Jid::pn("559999999999");
359        let primary_phone_jid = own_pn.with_device(0);
360
361        assert_eq!(primary_phone_jid.user, "559999999999");
362        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
363        assert_eq!(primary_phone_jid.device, 0);
364        assert_eq!(primary_phone_jid.agent, 0);
365        assert_eq!(primary_phone_jid.to_string(), "559999999999@s.whatsapp.net");
366    }
367
368    #[test]
369    fn test_primary_phone_jid_overwrites_existing_device() {
370        // Edge case: pn with device ID should still produce device 0
371        let own_pn = Jid::pn_device("559999999999", 33);
372        let primary_phone_jid = own_pn.with_device(0);
373
374        assert_eq!(primary_phone_jid.user, "559999999999");
375        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
376        assert_eq!(primary_phone_jid.device, 0);
377    }
378
379    #[test]
380    fn test_primary_phone_jid_is_not_ad() {
381        let primary_phone_jid = Jid::pn("559999999999").with_device(0);
382        assert!(!primary_phone_jid.is_ad()); // device 0 is NOT an additional device
383    }
384
385    #[test]
386    fn test_linked_device_is_ad() {
387        let linked_device_jid = Jid::pn_device("559999999999", 33);
388        assert!(linked_device_jid.is_ad()); // device > 0 IS an additional device
389    }
390
391    #[test]
392    fn test_primary_phone_jid_from_lid() {
393        let own_lid = Jid::lid("100000000000001");
394        let primary_phone_jid = own_lid.with_device(0);
395
396        assert_eq!(primary_phone_jid.user, "100000000000001");
397        assert_eq!(primary_phone_jid.server, HIDDEN_USER_SERVER);
398        assert_eq!(primary_phone_jid.device, 0);
399        assert!(!primary_phone_jid.is_ad());
400    }
401
402    #[test]
403    fn test_primary_phone_jid_roundtrip() {
404        let own_pn = Jid::pn("559999999999");
405        let primary_phone_jid = own_pn.with_device(0);
406
407        let jid_string = primary_phone_jid.to_string();
408        assert_eq!(jid_string, "559999999999@s.whatsapp.net");
409
410        let parsed: Jid = jid_string.parse().expect("JID should be parseable");
411        assert_eq!(parsed.user, "559999999999");
412        assert_eq!(parsed.server, DEFAULT_USER_SERVER);
413        assert_eq!(parsed.device, 0);
414    }
415
416    #[test]
417    fn test_with_device_preserves_identity() {
418        let pn = Jid::pn("1234567890");
419        let pn_device_0 = pn.with_device(0);
420        let pn_device_5 = pn.with_device(5);
421
422        assert_eq!(pn_device_0.user, pn_device_5.user);
423        assert_eq!(pn_device_0.server, pn_device_5.server);
424        assert_eq!(pn_device_0.device, 0);
425        assert_eq!(pn_device_5.device, 5);
426
427        let lid = Jid::lid("100000012345678");
428        let lid_device_0 = lid.with_device(0);
429        let lid_device_33 = lid.with_device(33);
430
431        assert_eq!(lid_device_0.user, lid_device_33.user);
432        assert_eq!(lid_device_0.server, lid_device_33.server);
433        assert_eq!(lid_device_0.device, 0);
434        assert_eq!(lid_device_33.device, 33);
435    }
436
437    #[test]
438    fn test_primary_phone_vs_companion_devices() {
439        let user = "559999999999";
440        let primary = Jid::pn(user).with_device(0);
441        let companion_web = Jid::pn_device(user, 33);
442        let companion_desktop = Jid::pn_device(user, 34);
443
444        // All share the same user
445        assert_eq!(primary.user, companion_web.user);
446        assert_eq!(primary.user, companion_desktop.user);
447
448        // But have different device IDs
449        assert_eq!(primary.device, 0);
450        assert_eq!(companion_web.device, 33);
451        assert_eq!(companion_desktop.device, 34);
452
453        // Primary is NOT AD, companions ARE AD
454        assert!(!primary.is_ad());
455        assert!(companion_web.is_ad());
456        assert!(companion_desktop.is_ad());
457    }
458
459    /// Session check must succeed before establishment (fail-safe behavior).
460    #[test]
461    fn test_session_check_behavior_documentation() {
462        // Ok(true) -> skip, Ok(false) -> establish, Err -> fail-safe
463        enum SessionCheckResult {
464            Exists,
465            NotExists,
466            CheckFailed,
467        }
468
469        fn should_establish_session(
470            check_result: SessionCheckResult,
471        ) -> Result<bool, &'static str> {
472            match check_result {
473                SessionCheckResult::Exists => Ok(false),   // Don't establish
474                SessionCheckResult::NotExists => Ok(true), // Do establish
475                SessionCheckResult::CheckFailed => Err("Cannot verify - fail safe"),
476            }
477        }
478
479        // Test cases
480        assert_eq!(
481            should_establish_session(SessionCheckResult::Exists),
482            Ok(false)
483        );
484        assert_eq!(
485            should_establish_session(SessionCheckResult::NotExists),
486            Ok(true)
487        );
488        assert!(should_establish_session(SessionCheckResult::CheckFailed).is_err());
489    }
490
491    /// Protocol address format: {user}[:device]@{server}.0
492    #[test]
493    fn test_protocol_address_format_for_session_lookup() {
494        use wacore::types::jid::JidExt;
495
496        let pn = Jid::pn("559999999999").with_device(0);
497        let addr = pn.to_protocol_address();
498        assert_eq!(addr.name(), "559999999999@c.us");
499        assert_eq!(u32::from(addr.device_id()), 0);
500        assert_eq!(addr.to_string(), "559999999999@c.us.0");
501
502        let companion = Jid::pn_device("559999999999", 33);
503        let companion_addr = companion.to_protocol_address();
504        assert_eq!(companion_addr.name(), "559999999999:33@c.us");
505        assert_eq!(companion_addr.to_string(), "559999999999:33@c.us.0");
506
507        let lid = Jid::lid("100000000000001").with_device(0);
508        let lid_addr = lid.to_protocol_address();
509        assert_eq!(lid_addr.name(), "100000000000001@lid");
510        assert_eq!(u32::from(lid_addr.device_id()), 0);
511        assert_eq!(lid_addr.to_string(), "100000000000001@lid.0");
512
513        let lid_device = Jid::lid_device("100000000000001", 33);
514        let lid_device_addr = lid_device.to_protocol_address();
515        assert_eq!(lid_device_addr.name(), "100000000000001:33@lid");
516        assert_eq!(lid_device_addr.to_string(), "100000000000001:33@lid.0");
517    }
518
519    #[test]
520    fn test_filter_logic_for_session_establishment() {
521        let jids = vec![
522            Jid::pn_device("111", 0),
523            Jid::pn_device("222", 0),
524            Jid::pn_device("333", 0),
525        ];
526
527        // Simulate contains_session results
528        let session_exists = |jid: &Jid| -> Result<bool, &'static str> {
529            match jid.user.as_str() {
530                "111" => Ok(true),        // Session exists
531                "222" => Ok(false),       // No session
532                "333" => Err("DB error"), // Error
533                _ => Ok(false),
534            }
535        };
536
537        // Apply filter logic (matching ensure_e2e_sessions behavior)
538        let mut jids_needing_sessions = Vec::with_capacity(jids.len());
539        for jid in &jids {
540            match session_exists(jid) {
541                Ok(true) => {}                                        // Skip - session exists
542                Ok(false) => jids_needing_sessions.push(jid.clone()), // Needs session
543                Err(e) => eprintln!("Warning: failed to check {}: {}", jid, e), // Skip on error
544            }
545        }
546
547        // Only "222" should need a session
548        assert_eq!(jids_needing_sessions.len(), 1);
549        assert_eq!(jids_needing_sessions[0].user, "222");
550    }
551
552    // PN and LID have independent Signal sessions
553
554    #[test]
555    fn test_dual_addressing_pn_and_lid_are_independent() {
556        let pn_address = Jid::pn("551199887766").with_device(0);
557        let lid_address = Jid::lid("236395184570386").with_device(0);
558
559        assert_ne!(pn_address.user, lid_address.user);
560        assert_ne!(pn_address.server, lid_address.server);
561
562        use wacore::types::jid::JidExt;
563        let pn_signal_addr = pn_address.to_protocol_address();
564        let lid_signal_addr = lid_address.to_protocol_address();
565
566        assert_ne!(pn_signal_addr.name(), lid_signal_addr.name());
567        assert_eq!(pn_signal_addr.name(), "551199887766@c.us");
568        assert_eq!(lid_signal_addr.name(), "236395184570386@lid");
569        assert_eq!(pn_address.device, 0);
570        assert_eq!(lid_address.device, 0);
571    }
572
573    #[test]
574    fn test_lid_extraction_from_own_device() {
575        let own_lid_with_device = Jid::lid_device("236395184570386", 61);
576        let primary_lid = own_lid_with_device.with_device(0);
577
578        assert_eq!(primary_lid.user, "236395184570386");
579        assert_eq!(primary_lid.device, 0);
580        assert!(!primary_lid.is_ad());
581    }
582
583    /// PN sessions established proactively, LID sessions established by primary phone.
584    #[test]
585    fn test_stale_session_scenario_documentation() {
586        fn should_establish_pn_session(pn_exists: bool) -> bool {
587            !pn_exists
588        }
589
590        fn should_establish_lid_session(_lid_exists: bool) -> bool {
591            false // Primary phone establishes LID sessions via pkmsg
592        }
593
594        // PN exists -> don't establish
595        assert!(!should_establish_pn_session(true));
596        // PN doesn't exist -> establish
597        assert!(should_establish_pn_session(false));
598        // LID never established proactively
599        assert!(!should_establish_lid_session(true));
600        assert!(!should_establish_lid_session(false));
601    }
602
603    /// Retry mechanism: error=1 (NoSession), error=4 (InvalidMessage/MAC failure)
604    #[test]
605    fn test_retry_mechanism_for_stale_sessions() {
606        const RETRY_ERROR_NO_SESSION: u8 = 1;
607        const RETRY_ERROR_INVALID_MESSAGE: u8 = 4;
608
609        fn action_for_error(error_code: u8) -> &'static str {
610            match error_code {
611                RETRY_ERROR_NO_SESSION => "Establish new session via prekey",
612                RETRY_ERROR_INVALID_MESSAGE => "Delete stale session, resend message",
613                _ => "Unknown error",
614            }
615        }
616
617        assert_eq!(
618            action_for_error(RETRY_ERROR_NO_SESSION),
619            "Establish new session via prekey"
620        );
621        assert_eq!(
622            action_for_error(RETRY_ERROR_INVALID_MESSAGE),
623            "Delete stale session, resend message"
624        );
625    }
626
627    #[test]
628    fn test_session_establishment_lookup_normalization() {
629        use std::collections::HashMap;
630        use wacore_binary::jid::Jid;
631
632        // Represents the bundle map returned by fetch_pre_keys
633        // (keys are normalized by parsing logic as verified in wacore/src/prekeys.rs)
634        let mut prekey_bundles: HashMap<Jid, ()> = HashMap::new(); // Using () as mock bundle placeholder
635
636        let normalized_jid = Jid::lid("123456789"); // agent=0
637        prekey_bundles.insert(normalized_jid.clone(), ());
638
639        // Represents the JID from the device list (e.g. from ensure_e2e_sessions)
640        // which might have agent=1 due to some upstream source or parsing quirk
641        let mut requested_jid = Jid::lid("123456789");
642        requested_jid.agent = 1;
643
644        // 1. Verify direct lookup fails (This is the bug)
645        assert!(
646            !prekey_bundles.contains_key(&requested_jid),
647            "Direct lookup of non-normalized JID should fail"
648        );
649
650        // 2. Verify normalized lookup succeeds (This is the fix)
651        // This mirrors the logic change in fetch_and_establish_sessions
652        let normalized_lookup = requested_jid.normalize_for_prekey_bundle();
653        assert!(
654            prekey_bundles.contains_key(&normalized_lookup),
655            "Normalized lookup should succeed"
656        );
657
658        // Ensure the normalization actually produced the key we stored
659        assert_eq!(normalized_lookup, normalized_jid);
660    }
661}