matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt, iter,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29    store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, EncryptionSettings,
30    OlmError, OlmMachine, TrustRequirement,
31};
32#[cfg(feature = "e2e-encryption")]
33use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
34#[cfg(doc)]
35use ruma::DeviceId;
36use ruma::{
37    api::client::{self as api, sync::sync_events::v5},
38    events::{
39        push_rules::{PushRulesEvent, PushRulesEventContent},
40        room::member::SyncRoomMemberEvent,
41        AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, StateEvent, StateEventType,
42    },
43    push::{Action, Ruleset},
44    serde::Raw,
45    time::Instant,
46    OwnedRoomId, OwnedUserId, RoomId,
47};
48use tokio::sync::{broadcast, Mutex};
49#[cfg(feature = "e2e-encryption")]
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tracing::{debug, info, instrument};
52
53#[cfg(feature = "e2e-encryption")]
54use crate::RoomMemberships;
55use crate::{
56    deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent},
57    error::{Error, Result},
58    event_cache::store::EventCacheStoreLock,
59    response_processors::{self as processors, Context},
60    rooms::{
61        normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
62        Room, RoomInfo, RoomState,
63    },
64    store::{
65        ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
66        Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
67        StateStoreDataValue, StateStoreExt, StoreConfig,
68    },
69    sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse},
70    RoomStateFilter, SessionMeta,
71};
72
73/// A no (network) IO client implementation.
74///
75/// This client is a state machine that receives responses and events and
76/// accordingly updates its state. It is not designed to be used directly, but
77/// rather through `matrix_sdk::Client`.
78///
79/// ```rust
80/// use matrix_sdk_base::{store::StoreConfig, BaseClient};
81///
82/// let client = BaseClient::new(StoreConfig::new(
83///     "cross-process-holder-name".to_owned(),
84/// ));
85/// ```
86#[derive(Clone)]
87pub struct BaseClient {
88    /// The state store.
89    pub(crate) state_store: BaseStateStore,
90
91    /// The store used by the event cache.
92    event_cache_store: EventCacheStoreLock,
93
94    /// The store used for encryption.
95    ///
96    /// This field is only meant to be used for `OlmMachine` initialization.
97    /// All operations on it happen inside the `OlmMachine`.
98    #[cfg(feature = "e2e-encryption")]
99    crypto_store: Arc<DynCryptoStore>,
100
101    /// The olm-machine that is created once the
102    /// [`SessionMeta`][crate::session::SessionMeta] is set via
103    /// [`BaseClient::activate`]
104    #[cfg(feature = "e2e-encryption")]
105    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
106
107    /// Observable of when a user is ignored/unignored.
108    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
109
110    /// A sender that is used to communicate changes to room information. Each
111    /// tick contains the room ID and the reasons that have generated this tick.
112    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
113
114    /// The strategy to use for picking recipient devices, when sending an
115    /// encrypted message.
116    #[cfg(feature = "e2e-encryption")]
117    pub room_key_recipient_strategy: CollectStrategy,
118
119    /// The trust requirement to use for decrypting events.
120    #[cfg(feature = "e2e-encryption")]
121    pub decryption_trust_requirement: TrustRequirement,
122
123    /// If the client should handle verification events received when syncing.
124    #[cfg(feature = "e2e-encryption")]
125    pub handle_verification_events: bool,
126}
127
128#[cfg(not(tarpaulin_include))]
129impl fmt::Debug for BaseClient {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.debug_struct("BaseClient")
132            .field("session_meta", &self.state_store.session_meta())
133            .field("sync_token", &self.state_store.sync_token)
134            .finish_non_exhaustive()
135    }
136}
137
138impl BaseClient {
139    /// Create a new client.
140    ///
141    /// # Arguments
142    ///
143    /// * `config` - the configuration for the stores (state store, event cache
144    ///   store and crypto store).
145    pub fn new(config: StoreConfig) -> Self {
146        let store = BaseStateStore::new(config.state_store);
147
148        // Create the channel to receive `RoomInfoNotableUpdate`.
149        //
150        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
151        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
152        // trigger such amount of updates, it's a safe value.
153        //
154        // Also, note that it must not be
155        // zero, because (i) it will panic, (ii) a new user has no room, but can create
156        // rooms; remember that the channel's capacity is immutable.
157        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
158            broadcast::channel(500);
159
160        BaseClient {
161            state_store: store,
162            event_cache_store: config.event_cache_store,
163            #[cfg(feature = "e2e-encryption")]
164            crypto_store: config.crypto_store,
165            #[cfg(feature = "e2e-encryption")]
166            olm_machine: Default::default(),
167            ignore_user_list_changes: Default::default(),
168            room_info_notable_update_sender,
169            #[cfg(feature = "e2e-encryption")]
170            room_key_recipient_strategy: Default::default(),
171            #[cfg(feature = "e2e-encryption")]
172            decryption_trust_requirement: TrustRequirement::Untrusted,
173            #[cfg(feature = "e2e-encryption")]
174            handle_verification_events: true,
175        }
176    }
177
178    /// Clones the current base client to use the same crypto store but a
179    /// different, in-memory store config, and resets transient state.
180    #[cfg(feature = "e2e-encryption")]
181    pub async fn clone_with_in_memory_state_store(
182        &self,
183        cross_process_store_locks_holder_name: &str,
184        handle_verification_events: bool,
185    ) -> Result<Self> {
186        let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
187            .state_store(MemoryStore::new());
188        let config = config.crypto_store(self.crypto_store.clone());
189
190        let copy = Self {
191            state_store: BaseStateStore::new(config.state_store),
192            event_cache_store: config.event_cache_store,
193            // We copy the crypto store as well as the `OlmMachine` for two reasons:
194            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
195            // 2. We need to ensure that the parent and child use the same data and caches inside
196            //    the `OlmMachine` so the various ratchets and places where new randomness gets
197            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
198            //    or Olm sessions when they encrypt or decrypt messages.
199            crypto_store: self.crypto_store.clone(),
200            olm_machine: self.olm_machine.clone(),
201            ignore_user_list_changes: Default::default(),
202            room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
203            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
204            decryption_trust_requirement: self.decryption_trust_requirement,
205            handle_verification_events,
206        };
207
208        copy.state_store
209            .derive_from_other(&self.state_store, &copy.room_info_notable_update_sender)
210            .await?;
211
212        Ok(copy)
213    }
214
215    /// Clones the current base client to use the same crypto store but a
216    /// different, in-memory store config, and resets transient state.
217    #[cfg(not(feature = "e2e-encryption"))]
218    #[allow(clippy::unused_async)]
219    pub async fn clone_with_in_memory_state_store(
220        &self,
221        cross_process_store_locks_holder: &str,
222        _handle_verification_events: bool,
223    ) -> Result<Self> {
224        let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
225            .state_store(MemoryStore::new());
226        Ok(Self::new(config))
227    }
228
229    /// Get the session meta information.
230    ///
231    /// If the client is currently logged in, this will return a
232    /// [`SessionMeta`] object which contains the user ID and device ID.
233    /// Otherwise it returns `None`.
234    pub fn session_meta(&self) -> Option<&SessionMeta> {
235        self.state_store.session_meta()
236    }
237
238    /// Get all the rooms this client knows about.
239    pub fn rooms(&self) -> Vec<Room> {
240        self.state_store.rooms()
241    }
242
243    /// Get all the rooms this client knows about, filtered by room state.
244    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
245        self.state_store.rooms_filtered(filter)
246    }
247
248    /// Get a stream of all the rooms changes, in addition to the existing
249    /// rooms.
250    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
251        self.state_store.rooms_stream()
252    }
253
254    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
255    /// yet in the store
256    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
257        self.state_store.get_or_create_room(
258            room_id,
259            room_state,
260            self.room_info_notable_update_sender.clone(),
261        )
262    }
263
264    /// Get a reference to the state store.
265    pub fn state_store(&self) -> &DynStateStore {
266        self.state_store.deref()
267    }
268
269    /// Get a reference to the event cache store.
270    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
271        &self.event_cache_store
272    }
273
274    /// Check whether the client has been activated.
275    ///
276    /// See [`BaseClient::activate`] to know what it means.
277    pub fn is_active(&self) -> bool {
278        self.state_store.session_meta().is_some()
279    }
280
281    /// Activate the client.
282    ///
283    /// A client is considered active when:
284    ///
285    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
286    /// 2. Has loaded cached data from storage,
287    /// 3. If encryption is enabled, it also initialized or restored its
288    ///    `OlmMachine`.
289    ///
290    /// # Arguments
291    ///
292    /// * `session_meta` - The meta of a session that the user already has from
293    ///   a previous login call.
294    ///
295    /// * `custom_account` - A custom
296    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
297    ///   identity and one-time keys of this [`BaseClient`]. If no account is
298    ///   provided, a new default one or one from the store will be used. If an
299    ///   account is provided and one already exists in the store for this
300    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
301    ///   useful if one wishes to create identity keys before knowing the
302    ///   user/device IDs, e.g., to use the identity key as the device ID.
303    ///
304    /// * `room_load_settings` — Specify how many rooms must be restored; use
305    ///   `::default()` if you don't know which value to pick.
306    ///
307    /// # Panics
308    ///
309    /// This method panics if it is called twice.
310    ///
311    /// [`UserId`]: ruma::UserId
312    pub async fn activate(
313        &self,
314        session_meta: SessionMeta,
315        room_load_settings: RoomLoadSettings,
316        #[cfg(feature = "e2e-encryption")] custom_account: Option<
317            crate::crypto::vodozemac::olm::Account,
318        >,
319    ) -> Result<()> {
320        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
321
322        self.state_store
323            .load_rooms(
324                &session_meta.user_id,
325                room_load_settings,
326                &self.room_info_notable_update_sender,
327            )
328            .await?;
329        self.state_store.load_sync_token().await?;
330        self.state_store.set_session_meta(session_meta);
331
332        #[cfg(feature = "e2e-encryption")]
333        self.regenerate_olm(custom_account).await?;
334
335        Ok(())
336    }
337
338    /// Recreate an `OlmMachine` from scratch.
339    ///
340    /// In particular, this will clear all its caches.
341    #[cfg(feature = "e2e-encryption")]
342    pub async fn regenerate_olm(
343        &self,
344        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
345    ) -> Result<()> {
346        tracing::debug!("regenerating OlmMachine");
347        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
348
349        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
350        // because we suspect it has stale data.
351        let olm_machine = OlmMachine::with_store(
352            &session_meta.user_id,
353            &session_meta.device_id,
354            self.crypto_store.clone(),
355            custom_account,
356        )
357        .await
358        .map_err(OlmError::from)?;
359
360        *self.olm_machine.write().await = Some(olm_machine);
361        Ok(())
362    }
363
364    /// Get the current, if any, sync token of the client.
365    /// This will be None if the client didn't sync at least once.
366    pub async fn sync_token(&self) -> Option<String> {
367        self.state_store.sync_token.read().await.clone()
368    }
369
370    /// Handles the stripped state events in `invite_state`, modifying the
371    /// room's info and posting notifications as needed.
372    ///
373    /// * `room` - The [`Room`] to modify.
374    /// * `events` - The contents of `invite_state` in the form of list of pairs
375    ///   of raw stripped state events with their deserialized counterpart.
376    /// * `push_rules` - The push rules for this room.
377    /// * `room_info` - The current room's info.
378    /// * `changes` - The accumulated list of changes to apply once the
379    ///   processing is finished.
380    /// * `notifications` - Notifications to post for the current room.
381    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
382    pub(crate) async fn handle_invited_state(
383        &self,
384        context: &mut Context,
385        room: &Room,
386        events: (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>),
387        push_rules: &Ruleset,
388        room_info: &mut RoomInfo,
389        notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
390    ) -> Result<()> {
391        let mut state_events = BTreeMap::new();
392
393        for (raw_event, event) in iter::zip(events.0, events.1) {
394            room_info.handle_stripped_state_event(&event);
395            state_events
396                .entry(event.event_type())
397                .or_insert_with(BTreeMap::new)
398                .insert(event.state_key().to_owned(), raw_event);
399        }
400
401        context
402            .state_changes
403            .stripped_state
404            .insert(room_info.room_id().to_owned(), state_events.clone());
405
406        // We need to check for notifications after we have handled all state
407        // events, to make sure we have the full push context.
408        if let Some(push_context) =
409            processors::timeline::get_push_room_context(context, room, room_info, &self.state_store)
410                .await?
411        {
412            // Check every event again for notification.
413            for event in state_events.values().flat_map(|map| map.values()) {
414                let actions = push_rules.get_actions(event, &push_context);
415                if actions.iter().any(Action::should_notify) {
416                    notifications.entry(room.room_id().to_owned()).or_default().push(
417                        Notification {
418                            actions: actions.to_owned(),
419                            event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
420                        },
421                    );
422                }
423            }
424        }
425
426        Ok(())
427    }
428
429    /// User has knocked on a room.
430    ///
431    /// Update the internal and cached state accordingly. Return the final Room.
432    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
433        let room = self.state_store.get_or_create_room(
434            room_id,
435            RoomState::Knocked,
436            self.room_info_notable_update_sender.clone(),
437        );
438
439        if room.state() != RoomState::Knocked {
440            let _sync_lock = self.sync_lock().lock().await;
441
442            let mut room_info = room.clone_info();
443            room_info.mark_as_knocked();
444            room_info.mark_state_partially_synced();
445            room_info.mark_members_missing(); // the own member event changed
446            let mut changes = StateChanges::default();
447            changes.add_room(room_info.clone());
448            self.state_store.save_changes(&changes).await?; // Update the store
449            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
450        }
451
452        Ok(room)
453    }
454
455    /// User has joined a room.
456    ///
457    /// Update the internal and cached state accordingly. Return the final Room.
458    pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
459        let room = self.state_store.get_or_create_room(
460            room_id,
461            RoomState::Joined,
462            self.room_info_notable_update_sender.clone(),
463        );
464
465        if room.state() != RoomState::Joined {
466            let _sync_lock = self.sync_lock().lock().await;
467
468            let mut room_info = room.clone_info();
469            room_info.mark_as_joined();
470            room_info.mark_state_partially_synced();
471            room_info.mark_members_missing(); // the own member event changed
472            let mut changes = StateChanges::default();
473            changes.add_room(room_info.clone());
474            self.state_store.save_changes(&changes).await?; // Update the store
475            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
476        }
477
478        Ok(room)
479    }
480
481    /// User has left a room.
482    ///
483    /// Update the internal and cached state accordingly.
484    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
485        let room = self.state_store.get_or_create_room(
486            room_id,
487            RoomState::Left,
488            self.room_info_notable_update_sender.clone(),
489        );
490
491        if room.state() != RoomState::Left {
492            let _sync_lock = self.sync_lock().lock().await;
493
494            let mut room_info = room.clone_info();
495            room_info.mark_as_left();
496            room_info.mark_state_partially_synced();
497            room_info.mark_members_missing(); // the own member event changed
498            let mut changes = StateChanges::default();
499            changes.add_room(room_info.clone());
500            self.state_store.save_changes(&changes).await?; // Update the store
501            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
502        }
503
504        Ok(())
505    }
506
507    /// Get access to the store's sync lock.
508    pub fn sync_lock(&self) -> &Mutex<()> {
509        self.state_store.sync_lock()
510    }
511
512    /// Receive a response from a sync call.
513    ///
514    /// # Arguments
515    ///
516    /// * `response` - The response that we received after a successful sync.
517    #[instrument(skip_all)]
518    pub async fn receive_sync_response(
519        &self,
520        response: api::sync::sync_events::v3::Response,
521    ) -> Result<SyncResponse> {
522        self.receive_sync_response_with_requested_required_states(
523            response,
524            &RequestedRequiredStates::default(),
525        )
526        .await
527    }
528
529    /// Receive a response from a sync call, with the requested required state
530    /// events.
531    ///
532    /// # Arguments
533    ///
534    /// * `response` - The response that we received after a successful sync.
535    /// * `requested_required_states` - The requested required state events.
536    pub async fn receive_sync_response_with_requested_required_states(
537        &self,
538        response: api::sync::sync_events::v3::Response,
539        requested_required_states: &RequestedRequiredStates,
540    ) -> Result<SyncResponse> {
541        // The server might respond multiple times with the same sync token, in
542        // that case we already received this response and there's nothing to
543        // do.
544        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
545            info!("Got the same sync response twice");
546            return Ok(SyncResponse::default());
547        }
548
549        let now = Instant::now();
550
551        #[cfg(feature = "e2e-encryption")]
552        let olm_machine = self.olm_machine().await;
553
554        let mut context =
555            Context::new(StateChanges::new(response.next_batch.clone()), Default::default());
556
557        #[cfg(feature = "e2e-encryption")]
558        let to_device = {
559            let processors::e2ee::to_device::Output {
560                decrypted_to_device_events: to_device,
561                room_key_updates,
562            } = processors::e2ee::to_device::from_sync_v2(
563                &mut context,
564                &response,
565                olm_machine.as_ref(),
566            )
567            .await?;
568
569            processors::latest_event::decrypt_from_rooms(
570                &mut context,
571                room_key_updates
572                    .into_iter()
573                    .flatten()
574                    .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
575                    .collect(),
576                olm_machine.as_ref(),
577                self.decryption_trust_requirement,
578                self.handle_verification_events,
579            )
580            .await?;
581
582            to_device
583        };
584
585        #[cfg(not(feature = "e2e-encryption"))]
586        let to_device = response.to_device.events;
587
588        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
589
590        let global_account_data_processor =
591            processors::account_data::global(&response.account_data.events);
592
593        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
594
595        let mut new_rooms = RoomUpdates::default();
596        let mut notifications = Default::default();
597
598        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
599            BTreeMap::new();
600
601        for (room_id, new_info) in response.rooms.join {
602            let room = self.state_store.get_or_create_room(
603                &room_id,
604                RoomState::Joined,
605                self.room_info_notable_update_sender.clone(),
606            );
607
608            let mut room_info = room.clone_info();
609
610            room_info.mark_as_joined();
611            room_info.update_from_ruma_summary(&new_info.summary);
612            room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
613            room_info.mark_state_fully_synced();
614            room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
615
616            let (raw_state_events, state_events) =
617                processors::state_events::sync::collect(&mut context, &new_info.state.events);
618
619            let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
620                &mut context,
621                (&raw_state_events, &state_events),
622                &mut room_info,
623                &mut ambiguity_cache,
624            )
625            .await?;
626
627            for raw in &new_info.ephemeral.events {
628                match raw.deserialize() {
629                    Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
630                        context.state_changes.add_receipts(&room_id, event.content);
631                    }
632                    Ok(_) => {}
633                    Err(e) => {
634                        let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
635                        #[rustfmt::skip]
636                        info!(
637                            ?room_id, event_id,
638                            "Failed to deserialize ephemeral room event: {e}"
639                        );
640                    }
641                }
642            }
643
644            if new_info.timeline.limited {
645                room_info.mark_members_missing();
646            }
647
648            let (raw_state_events_from_timeline, state_events_from_timeline) =
649                processors::state_events::sync::collect_from_timeline(
650                    &mut context,
651                    &new_info.timeline.events,
652                );
653
654            let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users(
655                &mut context,
656                (&raw_state_events_from_timeline, &state_events_from_timeline),
657                &mut room_info,
658                &mut ambiguity_cache,
659            )
660            .await?;
661            new_user_ids.append(&mut other_new_user_ids);
662            updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
663
664            let timeline = processors::timeline::build(
665                &mut context,
666                &room,
667                &mut room_info,
668                processors::timeline::builder::Timeline::from(new_info.timeline),
669                processors::timeline::builder::Notification::new(
670                    &push_rules,
671                    &mut notifications,
672                    &self.state_store,
673                ),
674                #[cfg(feature = "e2e-encryption")]
675                processors::timeline::builder::E2EE::new(
676                    olm_machine.as_ref(),
677                    self.decryption_trust_requirement,
678                    self.handle_verification_events,
679                ),
680            )
681            .await?;
682
683            // Save the new `RoomInfo`.
684            context.state_changes.add_room(room_info);
685
686            processors::account_data::for_room(
687                &mut context,
688                &room_id,
689                &new_info.account_data.events,
690                &self.state_store,
691            )
692            .await;
693
694            // `Self::handle_room_account_data` might have updated the `RoomInfo`. Let's
695            // fetch it again.
696            //
697            // SAFETY: `unwrap` is safe because the `RoomInfo` has been inserted 2 lines
698            // above.
699            let mut room_info = context.state_changes.room_infos.get(&room_id).unwrap().clone();
700
701            #[cfg(feature = "e2e-encryption")]
702            processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
703                &mut context,
704                olm_machine.as_ref(),
705                &new_user_ids,
706                room_info.encryption_state(),
707                room.encryption_state(),
708                &room_id,
709                &self.state_store,
710            )
711            .await?;
712
713            let notification_count = new_info.unread_notifications.into();
714            room_info.update_notification_count(notification_count);
715
716            let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
717
718            new_rooms.join.insert(
719                room_id,
720                JoinedRoomUpdate::new(
721                    timeline,
722                    new_info.state.events,
723                    new_info.account_data.events,
724                    new_info.ephemeral.events,
725                    notification_count,
726                    ambiguity_changes,
727                ),
728            );
729
730            context.state_changes.add_room(room_info);
731        }
732
733        for (room_id, new_info) in response.rooms.leave {
734            let room = self.state_store.get_or_create_room(
735                &room_id,
736                RoomState::Left,
737                self.room_info_notable_update_sender.clone(),
738            );
739
740            let mut room_info = room.clone_info();
741            room_info.mark_as_left();
742            room_info.mark_state_partially_synced();
743            room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
744
745            let (raw_state_events, state_events) =
746                processors::state_events::sync::collect(&mut context, &new_info.state.events);
747
748            let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
749                &mut context,
750                (&raw_state_events, &state_events),
751                &mut room_info,
752                &mut ambiguity_cache,
753            )
754            .await?;
755
756            let (raw_state_events_from_timeline, state_events_from_timeline) =
757                processors::state_events::sync::collect_from_timeline(
758                    &mut context,
759                    &new_info.timeline.events,
760                );
761
762            let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users(
763                &mut context,
764                (&raw_state_events_from_timeline, &state_events_from_timeline),
765                &mut room_info,
766                &mut ambiguity_cache,
767            )
768            .await?;
769            new_user_ids.append(&mut other_new_user_ids);
770
771            let timeline = processors::timeline::build(
772                &mut context,
773                &room,
774                &mut room_info,
775                processors::timeline::builder::Timeline::from(new_info.timeline),
776                processors::timeline::builder::Notification::new(
777                    &push_rules,
778                    &mut notifications,
779                    &self.state_store,
780                ),
781                #[cfg(feature = "e2e-encryption")]
782                processors::timeline::builder::E2EE::new(
783                    olm_machine.as_ref(),
784                    self.decryption_trust_requirement,
785                    self.handle_verification_events,
786                ),
787            )
788            .await?;
789
790            // Save the new `RoomInfo`.
791            context.state_changes.add_room(room_info);
792
793            processors::account_data::for_room(
794                &mut context,
795                &room_id,
796                &new_info.account_data.events,
797                &self.state_store,
798            )
799            .await;
800
801            let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
802
803            new_rooms.leave.insert(
804                room_id,
805                LeftRoomUpdate::new(
806                    timeline,
807                    new_info.state.events,
808                    new_info.account_data.events,
809                    ambiguity_changes,
810                ),
811            );
812        }
813
814        for (room_id, new_info) in response.rooms.invite {
815            let room = self.state_store.get_or_create_room(
816                &room_id,
817                RoomState::Invited,
818                self.room_info_notable_update_sender.clone(),
819            );
820
821            let invite_state = processors::state_events::stripped::collect(
822                &mut context,
823                &new_info.invite_state.events,
824            );
825
826            let mut room_info = room.clone_info();
827            room_info.mark_as_invited();
828            room_info.mark_state_fully_synced();
829
830            self.handle_invited_state(
831                &mut context,
832                &room,
833                invite_state,
834                &push_rules,
835                &mut room_info,
836                &mut notifications,
837            )
838            .await?;
839
840            context.state_changes.add_room(room_info);
841
842            new_rooms.invite.insert(room_id, new_info);
843        }
844
845        for (room_id, new_info) in response.rooms.knock {
846            let room = self.state_store.get_or_create_room(
847                &room_id,
848                RoomState::Knocked,
849                self.room_info_notable_update_sender.clone(),
850            );
851
852            let knock_state = processors::state_events::stripped::collect(
853                &mut context,
854                &new_info.knock_state.events,
855            );
856
857            let mut room_info = room.clone_info();
858            room_info.mark_as_knocked();
859            room_info.mark_state_fully_synced();
860
861            self.handle_invited_state(
862                &mut context,
863                &room,
864                knock_state,
865                &push_rules,
866                &mut room_info,
867                &mut notifications,
868            )
869            .await?;
870
871            context.state_changes.add_room(room_info);
872
873            new_rooms.knocked.insert(room_id, new_info);
874        }
875
876        global_account_data_processor.apply(&mut context, &self.state_store).await;
877
878        context.state_changes.presence = response
879            .presence
880            .events
881            .iter()
882            .filter_map(|e| {
883                let event = e.deserialize().ok()?;
884                Some((event.sender, e.clone()))
885            })
886            .collect();
887
888        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
889
890        {
891            let _sync_lock = self.sync_lock().lock().await;
892
893            processors::changes::save_and_apply(
894                context,
895                &self.state_store,
896                &self.ignore_user_list_changes,
897                Some(response.next_batch.clone()),
898            )
899            .await?;
900        }
901
902        // Now that all the rooms information have been saved, update the display name
903        // cache (which relies on information stored in the database). This will
904        // live in memory, until the next sync which will saves the room info to
905        // disk; we do this to avoid saving that would be redundant with the
906        // above. Oh well.
907        new_rooms.update_in_memory_caches(&self.state_store).await;
908
909        for (room_id, member_ids) in updated_members_in_room {
910            if let Some(room) = self.get_room(&room_id) {
911                let _ =
912                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
913            }
914        }
915
916        info!("Processed a sync response in {:?}", now.elapsed());
917
918        let response = SyncResponse {
919            rooms: new_rooms,
920            presence: response.presence.events,
921            account_data: response.account_data.events,
922            to_device,
923            notifications,
924        };
925
926        Ok(response)
927    }
928
929    /// Receive a get member events response and convert it to a deserialized
930    /// `MembersResponse`
931    ///
932    /// This client-server request must be made without filters to make sure all
933    /// members are received. Otherwise, an error is returned.
934    ///
935    /// # Arguments
936    ///
937    /// * `room_id` - The room id this response belongs to.
938    ///
939    /// * `response` - The raw response that was received from the server.
940    #[instrument(skip_all, fields(?room_id))]
941    pub async fn receive_all_members(
942        &self,
943        room_id: &RoomId,
944        request: &api::membership::get_member_events::v3::Request,
945        response: &api::membership::get_member_events::v3::Response,
946    ) -> Result<()> {
947        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
948        {
949            // This function assumes all members are loaded at once to optimise how display
950            // name disambiguation works. Using it with partial member list results
951            // would produce incorrect disambiguated display name entries
952            return Err(Error::InvalidReceiveMembersParameters);
953        }
954
955        let Some(room) = self.state_store.room(room_id) else {
956            // The room is unknown to us: leave early.
957            return Ok(());
958        };
959
960        let mut chunk = Vec::with_capacity(response.chunk.len());
961        let mut context = Context::new(StateChanges::default(), Default::default());
962
963        #[cfg(feature = "e2e-encryption")]
964        let mut user_ids = BTreeSet::new();
965
966        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
967
968        for raw_event in &response.chunk {
969            let member = match raw_event.deserialize() {
970                Ok(ev) => ev,
971                Err(e) => {
972                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
973                    debug!(event_id, "Failed to deserialize member event: {e}");
974                    continue;
975                }
976            };
977
978            // TODO: All the actions in this loop used to be done only when the membership
979            // event was not in the store before. This was changed with the new room API,
980            // because e.g. leaving a room makes members events outdated and they need to be
981            // fetched by `members`. Therefore, they need to be overwritten here, even
982            // if they exist.
983            // However, this makes a new problem occur where setting the member events here
984            // potentially races with the sync.
985            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
986
987            #[cfg(feature = "e2e-encryption")]
988            match member.membership() {
989                MembershipState::Join | MembershipState::Invite => {
990                    user_ids.insert(member.state_key().to_owned());
991                }
992                _ => (),
993            }
994
995            if let StateEvent::Original(e) = &member {
996                if let Some(d) = &e.content.displayname {
997                    let display_name = DisplayName::new(d);
998                    ambiguity_map
999                        .entry(display_name)
1000                        .or_default()
1001                        .insert(member.state_key().clone());
1002                }
1003            }
1004
1005            let sync_member: SyncRoomMemberEvent = member.clone().into();
1006            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
1007
1008            context
1009                .state_changes
1010                .state
1011                .entry(room_id.to_owned())
1012                .or_default()
1013                .entry(member.event_type())
1014                .or_default()
1015                .insert(member.state_key().to_string(), raw_event.clone().cast());
1016            chunk.push(member);
1017        }
1018
1019        #[cfg(feature = "e2e-encryption")]
1020        processors::e2ee::tracked_users::update(
1021            &mut context,
1022            self.olm_machine().await.as_ref(),
1023            room.encryption_state(),
1024            &user_ids,
1025        )
1026        .await?;
1027
1028        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
1029
1030        let _sync_lock = self.sync_lock().lock().await;
1031        let mut room_info = room.clone_info();
1032        room_info.mark_members_synced();
1033        context.state_changes.add_room(room_info);
1034
1035        processors::changes::save_and_apply(
1036            context,
1037            &self.state_store,
1038            &self.ignore_user_list_changes,
1039            None,
1040        )
1041        .await?;
1042
1043        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
1044
1045        Ok(())
1046    }
1047
1048    /// Receive a successful filter upload response, the filter id will be
1049    /// stored under the given name in the store.
1050    ///
1051    /// The filter id can later be retrieved with the [`get_filter`] method.
1052    ///
1053    ///
1054    /// # Arguments
1055    ///
1056    /// * `filter_name` - The name that should be used to persist the filter id
1057    ///   in the store.
1058    ///
1059    /// * `response` - The successful filter upload response containing the
1060    ///   filter id.
1061    ///
1062    /// [`get_filter`]: #method.get_filter
1063    pub async fn receive_filter_upload(
1064        &self,
1065        filter_name: &str,
1066        response: &api::filter::create_filter::v3::Response,
1067    ) -> Result<()> {
1068        Ok(self
1069            .state_store
1070            .set_kv_data(
1071                StateStoreDataKey::Filter(filter_name),
1072                StateStoreDataValue::Filter(response.filter_id.clone()),
1073            )
1074            .await?)
1075    }
1076
1077    /// Get the filter id of a previously uploaded filter.
1078    ///
1079    /// *Note*: A filter will first need to be uploaded and persisted using
1080    /// [`receive_filter_upload`].
1081    ///
1082    /// # Arguments
1083    ///
1084    /// * `filter_name` - The name of the filter that was previously used to
1085    ///   persist the filter.
1086    ///
1087    /// [`receive_filter_upload`]: #method.receive_filter_upload
1088    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
1089        let filter = self
1090            .state_store
1091            .get_kv_data(StateStoreDataKey::Filter(filter_name))
1092            .await?
1093            .map(|d| d.into_filter().expect("State store data not a filter"));
1094
1095        Ok(filter)
1096    }
1097
1098    /// Get a to-device request that will share a room key with users in a room.
1099    #[cfg(feature = "e2e-encryption")]
1100    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1101        match self.olm_machine().await.as_ref() {
1102            Some(o) => {
1103                let Some(room) = self.get_room(room_id) else {
1104                    return Err(Error::InsufficientData);
1105                };
1106
1107                let history_visibility = room.history_visibility_or_default();
1108                let Some(room_encryption_event) = room.encryption_settings() else {
1109                    return Err(Error::EncryptionNotEnabled);
1110                };
1111
1112                // Don't share the group session with members that are invited
1113                // if the history visibility is set to `Joined`
1114                let filter = if history_visibility == HistoryVisibility::Joined {
1115                    RoomMemberships::JOIN
1116                } else {
1117                    RoomMemberships::ACTIVE
1118                };
1119
1120                let members = self.state_store.get_user_ids(room_id, filter).await?;
1121
1122                let settings = EncryptionSettings::new(
1123                    room_encryption_event,
1124                    history_visibility,
1125                    self.room_key_recipient_strategy.clone(),
1126                );
1127
1128                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1129            }
1130            None => panic!("Olm machine wasn't started"),
1131        }
1132    }
1133
1134    /// Get the room with the given room id.
1135    ///
1136    /// # Arguments
1137    ///
1138    /// * `room_id` - The id of the room that should be fetched.
1139    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1140        self.state_store.room(room_id)
1141    }
1142
1143    /// Forget the room with the given room ID.
1144    ///
1145    /// The room will be dropped from the room list and the store.
1146    ///
1147    /// # Arguments
1148    ///
1149    /// * `room_id` - The id of the room that should be forgotten.
1150    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1151        // Forget the room in the state store.
1152        self.state_store.forget_room(room_id).await?;
1153
1154        // Remove the room in the event cache store too.
1155        self.event_cache_store().lock().await?.remove_room(room_id).await?;
1156
1157        Ok(())
1158    }
1159
1160    /// Get the olm machine.
1161    #[cfg(feature = "e2e-encryption")]
1162    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1163        self.olm_machine.read().await
1164    }
1165
1166    /// Get the push rules.
1167    ///
1168    /// Gets the push rules previously processed, otherwise get them from the
1169    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1170    /// is logged in.
1171    pub(crate) async fn get_push_rules(
1172        &self,
1173        global_account_data_processor: &processors::account_data::Global,
1174    ) -> Result<Ruleset> {
1175        if let Some(event) = global_account_data_processor
1176            .push_rules()
1177            .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1178        {
1179            Ok(event.content.global)
1180        } else if let Some(event) = self
1181            .state_store
1182            .get_account_data_event_static::<PushRulesEventContent>()
1183            .await?
1184            .and_then(|ev| ev.deserialize().ok())
1185        {
1186            Ok(event.content.global)
1187        } else if let Some(session_meta) = self.state_store.session_meta() {
1188            Ok(Ruleset::server_default(&session_meta.user_id))
1189        } else {
1190            Ok(Ruleset::new())
1191        }
1192    }
1193
1194    /// Returns a subscriber that publishes an event every time the ignore user
1195    /// list changes
1196    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1197        self.ignore_user_list_changes.subscribe()
1198    }
1199
1200    /// Returns a new receiver that gets future room info notable updates.
1201    ///
1202    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1203    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1204        self.room_info_notable_update_sender.subscribe()
1205    }
1206}
1207
1208/// Represent the `required_state` values sent by a sync request.
1209///
1210/// This is useful to track what state events have been requested when handling
1211/// a response.
1212///
1213/// For example, if a sync requests the `m.room.encryption` state event, and the
1214/// server replies with nothing, if means the room **is not** encrypted. Without
1215/// knowing which state event was required by the sync, it is impossible to
1216/// interpret the absence of state event from the server as _the room's
1217/// encryption state is **not encrypted**_ or _the room's encryption state is
1218/// **unknown**_.
1219#[derive(Debug, Default)]
1220pub struct RequestedRequiredStates {
1221    default: Vec<(StateEventType, String)>,
1222    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1223}
1224
1225impl RequestedRequiredStates {
1226    /// Create a new `RequestedRequiredStates`.
1227    ///
1228    /// `default` represents the `required_state` value for all rooms.
1229    /// `for_rooms` is the `required_state` per room.
1230    pub fn new(
1231        default: Vec<(StateEventType, String)>,
1232        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1233    ) -> Self {
1234        Self { default, for_rooms }
1235    }
1236
1237    /// Get the `required_state` value for a specific room.
1238    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1239        self.for_rooms.get(room_id).unwrap_or(&self.default)
1240    }
1241}
1242
1243impl From<&v5::Request> for RequestedRequiredStates {
1244    fn from(request: &v5::Request) -> Self {
1245        // The following information is missing in the MSC4186 at the time of writing
1246        // (2025-03-12) but: the `required_state`s from all lists and from all room
1247        // subscriptions are combined by doing an union.
1248        //
1249        // Thus, we can do the same here, put the union in `default` and keep
1250        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1251        let mut default = BTreeSet::new();
1252
1253        for list in request.lists.values() {
1254            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1255        }
1256
1257        for room_subscription in request.room_subscriptions.values() {
1258            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1259        }
1260
1261        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1262    }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267    use std::collections::HashMap;
1268
1269    use assert_matches2::assert_let;
1270    use futures_util::FutureExt as _;
1271    use matrix_sdk_test::{
1272        async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1273        LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1274    };
1275    use ruma::{
1276        api::client::{self as api, sync::sync_events::v5},
1277        event_id,
1278        events::{room::member::MembershipState, StateEventType},
1279        room_id,
1280        serde::Raw,
1281        user_id,
1282    };
1283    use serde_json::{json, value::to_raw_value};
1284
1285    use super::{BaseClient, RequestedRequiredStates};
1286    use crate::{
1287        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1288        test_utils::logged_in_base_client,
1289        RoomDisplayName, RoomState, SessionMeta,
1290    };
1291
1292    #[test]
1293    fn test_requested_required_states() {
1294        let room_id_0 = room_id!("!r0");
1295        let room_id_1 = room_id!("!r1");
1296
1297        let requested_required_states = RequestedRequiredStates::new(
1298            vec![(StateEventType::RoomAvatar, "".to_owned())],
1299            HashMap::from([(
1300                room_id_0.to_owned(),
1301                vec![
1302                    (StateEventType::RoomMember, "foo".to_owned()),
1303                    (StateEventType::RoomEncryption, "".to_owned()),
1304                ],
1305            )]),
1306        );
1307
1308        // A special set of state events exists for `room_id_0`.
1309        assert_eq!(
1310            requested_required_states.for_room(room_id_0),
1311            &[
1312                (StateEventType::RoomMember, "foo".to_owned()),
1313                (StateEventType::RoomEncryption, "".to_owned()),
1314            ]
1315        );
1316
1317        // No special list for `room_id_1`, it should return the defaults.
1318        assert_eq!(
1319            requested_required_states.for_room(room_id_1),
1320            &[(StateEventType::RoomAvatar, "".to_owned()),]
1321        );
1322    }
1323
1324    #[test]
1325    fn test_requested_required_states_from_sync_v5_request() {
1326        let room_id_0 = room_id!("!r0");
1327        let room_id_1 = room_id!("!r1");
1328
1329        // Empty request.
1330        let mut request = v5::Request::new();
1331
1332        {
1333            let requested_required_states = RequestedRequiredStates::from(&request);
1334
1335            assert!(requested_required_states.default.is_empty());
1336            assert!(requested_required_states.for_rooms.is_empty());
1337        }
1338
1339        // One list.
1340        request.lists.insert("foo".to_owned(), {
1341            let mut list = v5::request::List::default();
1342            list.room_details.required_state = vec![
1343                (StateEventType::RoomAvatar, "".to_owned()),
1344                (StateEventType::RoomEncryption, "".to_owned()),
1345            ];
1346
1347            list
1348        });
1349
1350        {
1351            let requested_required_states = RequestedRequiredStates::from(&request);
1352
1353            assert_eq!(
1354                requested_required_states.default,
1355                &[
1356                    (StateEventType::RoomAvatar, "".to_owned()),
1357                    (StateEventType::RoomEncryption, "".to_owned())
1358                ]
1359            );
1360            assert!(requested_required_states.for_rooms.is_empty());
1361        }
1362
1363        // Two lists.
1364        request.lists.insert("bar".to_owned(), {
1365            let mut list = v5::request::List::default();
1366            list.room_details.required_state = vec![
1367                (StateEventType::RoomEncryption, "".to_owned()),
1368                (StateEventType::RoomName, "".to_owned()),
1369            ];
1370
1371            list
1372        });
1373
1374        {
1375            let requested_required_states = RequestedRequiredStates::from(&request);
1376
1377            // Union of the state events.
1378            assert_eq!(
1379                requested_required_states.default,
1380                &[
1381                    (StateEventType::RoomAvatar, "".to_owned()),
1382                    (StateEventType::RoomEncryption, "".to_owned()),
1383                    (StateEventType::RoomName, "".to_owned()),
1384                ]
1385            );
1386            assert!(requested_required_states.for_rooms.is_empty());
1387        }
1388
1389        // One room subscription.
1390        request.room_subscriptions.insert(room_id_0.to_owned(), {
1391            let mut room_subscription = v5::request::RoomSubscription::default();
1392
1393            room_subscription.required_state = vec![
1394                (StateEventType::RoomJoinRules, "".to_owned()),
1395                (StateEventType::RoomEncryption, "".to_owned()),
1396            ];
1397
1398            room_subscription
1399        });
1400
1401        {
1402            let requested_required_states = RequestedRequiredStates::from(&request);
1403
1404            // Union of state events, all in `default`, still nothing in `for_rooms`.
1405            assert_eq!(
1406                requested_required_states.default,
1407                &[
1408                    (StateEventType::RoomAvatar, "".to_owned()),
1409                    (StateEventType::RoomEncryption, "".to_owned()),
1410                    (StateEventType::RoomJoinRules, "".to_owned()),
1411                    (StateEventType::RoomName, "".to_owned()),
1412                ]
1413            );
1414            assert!(requested_required_states.for_rooms.is_empty());
1415        }
1416
1417        // Two room subscriptions.
1418        request.room_subscriptions.insert(room_id_1.to_owned(), {
1419            let mut room_subscription = v5::request::RoomSubscription::default();
1420
1421            room_subscription.required_state = vec![
1422                (StateEventType::RoomName, "".to_owned()),
1423                (StateEventType::RoomTopic, "".to_owned()),
1424            ];
1425
1426            room_subscription
1427        });
1428
1429        {
1430            let requested_required_states = RequestedRequiredStates::from(&request);
1431
1432            // Union of state events, all in `default`, still nothing in `for_rooms`.
1433            assert_eq!(
1434                requested_required_states.default,
1435                &[
1436                    (StateEventType::RoomAvatar, "".to_owned()),
1437                    (StateEventType::RoomEncryption, "".to_owned()),
1438                    (StateEventType::RoomJoinRules, "".to_owned()),
1439                    (StateEventType::RoomName, "".to_owned()),
1440                    (StateEventType::RoomTopic, "".to_owned()),
1441                ]
1442            );
1443        }
1444    }
1445
1446    #[async_test]
1447    async fn test_invite_after_leaving() {
1448        let user_id = user_id!("@alice:example.org");
1449        let room_id = room_id!("!test:example.org");
1450
1451        let client = logged_in_base_client(Some(user_id)).await;
1452
1453        let mut sync_builder = SyncResponseBuilder::new();
1454
1455        let response = sync_builder
1456            .add_left_room(
1457                LeftRoomBuilder::new(room_id).add_timeline_event(
1458                    EventFactory::new()
1459                        .member(user_id)
1460                        .membership(MembershipState::Leave)
1461                        .display_name("Alice")
1462                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1463                ),
1464            )
1465            .build_sync_response();
1466        client.receive_sync_response(response).await.unwrap();
1467        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1468
1469        let response = sync_builder
1470            .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1471                StrippedStateTestEvent::Custom(json!({
1472                    "content": {
1473                        "displayname": "Alice",
1474                        "membership": "invite",
1475                    },
1476                    "event_id": "$143273582443PhrSn:example.org",
1477                    "origin_server_ts": 1432735824653u64,
1478                    "sender": "@example:example.org",
1479                    "state_key": user_id,
1480                    "type": "m.room.member",
1481                })),
1482            ))
1483            .build_sync_response();
1484        client.receive_sync_response(response).await.unwrap();
1485        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1486    }
1487
1488    #[async_test]
1489    async fn test_invite_displayname() {
1490        let user_id = user_id!("@alice:example.org");
1491        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1492
1493        let client = logged_in_base_client(Some(user_id)).await;
1494
1495        let response = ruma_response_from_json(&json!({
1496            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1497            "device_one_time_keys_count": {
1498                "signed_curve25519": 50u64
1499            },
1500            "device_unused_fallback_key_types": [
1501                "signed_curve25519"
1502            ],
1503            "rooms": {
1504                "invite": {
1505                    "!ithpyNKDtmhneaTQja:example.org": {
1506                        "invite_state": {
1507                            "events": [
1508                                {
1509                                    "content": {
1510                                        "creator": "@test:example.org",
1511                                        "room_version": "9"
1512                                    },
1513                                    "sender": "@test:example.org",
1514                                    "state_key": "",
1515                                    "type": "m.room.create"
1516                                },
1517                                {
1518                                    "content": {
1519                                        "join_rule": "invite"
1520                                    },
1521                                    "sender": "@test:example.org",
1522                                    "state_key": "",
1523                                    "type": "m.room.join_rules"
1524                                },
1525                                {
1526                                    "content": {
1527                                        "algorithm": "m.megolm.v1.aes-sha2"
1528                                    },
1529                                    "sender": "@test:example.org",
1530                                    "state_key": "",
1531                                    "type": "m.room.encryption"
1532                                },
1533                                {
1534                                    "content": {
1535                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1536                                        "displayname": "Kyra",
1537                                        "membership": "join"
1538                                    },
1539                                    "sender": "@test:example.org",
1540                                    "state_key": "@test:example.org",
1541                                    "type": "m.room.member"
1542                                },
1543                                {
1544                                    "content": {
1545                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1546                                        "displayname": "alice",
1547                                        "is_direct": true,
1548                                        "membership": "invite"
1549                                    },
1550                                    "origin_server_ts": 1650878657984u64,
1551                                    "sender": "@test:example.org",
1552                                    "state_key": "@alice:example.org",
1553                                    "type": "m.room.member",
1554                                    "unsigned": {
1555                                        "age": 14u64
1556                                    },
1557                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1558                                }
1559                            ]
1560                        }
1561                    }
1562                }
1563            }
1564        }));
1565
1566        client.receive_sync_response(response).await.unwrap();
1567
1568        let room = client.get_room(room_id).expect("Room not found");
1569        assert_eq!(room.state(), RoomState::Invited);
1570        assert_eq!(
1571            room.compute_display_name().await.expect("fetching display name failed"),
1572            RoomDisplayName::Calculated("Kyra".to_owned())
1573        );
1574    }
1575
1576    #[async_test]
1577    async fn test_deserialization_failure() {
1578        let user_id = user_id!("@alice:example.org");
1579        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1580
1581        let client =
1582            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1583        client
1584            .activate(
1585                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1586                RoomLoadSettings::default(),
1587                #[cfg(feature = "e2e-encryption")]
1588                None,
1589            )
1590            .await
1591            .unwrap();
1592
1593        let response = ruma_response_from_json(&json!({
1594            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1595            "rooms": {
1596                "join": {
1597                    "!ithpyNKDtmhneaTQja:example.org": {
1598                        "state": {
1599                            "events": [
1600                                {
1601                                    "invalid": "invalid",
1602                                },
1603                                {
1604                                    "content": {
1605                                        "name": "The room name"
1606                                    },
1607                                    "event_id": "$143273582443PhrSn:example.org",
1608                                    "origin_server_ts": 1432735824653u64,
1609                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1610                                    "sender": "@example:example.org",
1611                                    "state_key": "",
1612                                    "type": "m.room.name",
1613                                    "unsigned": {
1614                                        "age": 1234
1615                                    }
1616                                },
1617                            ]
1618                        }
1619                    }
1620                }
1621            }
1622        }));
1623
1624        client.receive_sync_response(response).await.unwrap();
1625        client
1626            .state_store()
1627            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1628            .await
1629            .expect("Failed to fetch state event")
1630            .expect("State event not found")
1631            .deserialize()
1632            .expect("Failed to deserialize state event");
1633    }
1634
1635    #[async_test]
1636    async fn test_invited_members_arent_ignored() {
1637        let user_id = user_id!("@alice:example.org");
1638        let inviter_user_id = user_id!("@bob:example.org");
1639        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1640
1641        let client =
1642            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1643        client
1644            .activate(
1645                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1646                RoomLoadSettings::default(),
1647                #[cfg(feature = "e2e-encryption")]
1648                None,
1649            )
1650            .await
1651            .unwrap();
1652
1653        // Preamble: let the SDK know about the room.
1654        let mut sync_builder = SyncResponseBuilder::new();
1655        let response = sync_builder
1656            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1657            .build_sync_response();
1658        client.receive_sync_response(response).await.unwrap();
1659
1660        // When I process the result of a /members request that only contains an invited
1661        // member,
1662        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1663
1664        let raw_member_event = json!({
1665            "content": {
1666                "avatar_url": "mxc://localhost/fewjilfewjil42",
1667                "displayname": "Invited Alice",
1668                "membership": "invite"
1669            },
1670            "event_id": "$151800140517rfvjc:localhost",
1671            "origin_server_ts": 151800140,
1672            "room_id": room_id,
1673            "sender": inviter_user_id,
1674            "state_key": user_id,
1675            "type": "m.room.member",
1676            "unsigned": {
1677                "age": 13374242,
1678            }
1679        });
1680        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1681            to_raw_value(&raw_member_event).unwrap(),
1682        )]);
1683
1684        // It's correctly processed,
1685        client.receive_all_members(room_id, &request, &response).await.unwrap();
1686
1687        let room = client.get_room(room_id).unwrap();
1688
1689        // And I can get the invited member display name and avatar.
1690        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1691
1692        assert_eq!(member.user_id(), user_id);
1693        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1694        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1695    }
1696
1697    #[async_test]
1698    async fn test_reinvited_members_get_a_display_name() {
1699        let user_id = user_id!("@alice:example.org");
1700        let inviter_user_id = user_id!("@bob:example.org");
1701        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1702
1703        let client =
1704            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1705        client
1706            .activate(
1707                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1708                RoomLoadSettings::default(),
1709                #[cfg(feature = "e2e-encryption")]
1710                None,
1711            )
1712            .await
1713            .unwrap();
1714
1715        // Preamble: let the SDK know about the room, and that the invited user left it.
1716        let mut sync_builder = SyncResponseBuilder::new();
1717        let response = sync_builder
1718            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1719                StateTestEvent::Custom(json!({
1720                    "content": {
1721                        "avatar_url": null,
1722                        "displayname": null,
1723                        "membership": "leave"
1724                    },
1725                    "event_id": "$151803140217rkvjc:localhost",
1726                    "origin_server_ts": 151800139,
1727                    "room_id": room_id,
1728                    "sender": user_id,
1729                    "state_key": user_id,
1730                    "type": "m.room.member",
1731                })),
1732            ))
1733            .build_sync_response();
1734        client.receive_sync_response(response).await.unwrap();
1735
1736        // Now, say that the user has been re-invited.
1737        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1738
1739        let raw_member_event = json!({
1740            "content": {
1741                "avatar_url": "mxc://localhost/fewjilfewjil42",
1742                "displayname": "Invited Alice",
1743                "membership": "invite"
1744            },
1745            "event_id": "$151800140517rfvjc:localhost",
1746            "origin_server_ts": 151800140,
1747            "room_id": room_id,
1748            "sender": inviter_user_id,
1749            "state_key": user_id,
1750            "type": "m.room.member",
1751            "unsigned": {
1752                "age": 13374242,
1753            }
1754        });
1755        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1756            to_raw_value(&raw_member_event).unwrap(),
1757        )]);
1758
1759        // It's correctly processed,
1760        client.receive_all_members(room_id, &request, &response).await.unwrap();
1761
1762        let room = client.get_room(room_id).unwrap();
1763
1764        // And I can get the invited member display name and avatar.
1765        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1766
1767        assert_eq!(member.user_id(), user_id);
1768        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1769        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1770    }
1771
1772    #[async_test]
1773    async fn test_ignored_user_list_changes() {
1774        let user_id = user_id!("@alice:example.org");
1775        let client =
1776            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1777
1778        client
1779            .activate(
1780                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1781                RoomLoadSettings::default(),
1782                #[cfg(feature = "e2e-encryption")]
1783                None,
1784            )
1785            .await
1786            .unwrap();
1787
1788        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1789        assert!(subscriber.next().now_or_never().is_none());
1790
1791        let mut sync_builder = SyncResponseBuilder::new();
1792        let response = sync_builder
1793            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1794                json!({
1795                    "content": {
1796                        "ignored_users": {
1797                            *BOB: {}
1798                        }
1799                    },
1800                    "type": "m.ignored_user_list",
1801                }),
1802            ))
1803            .build_sync_response();
1804        client.receive_sync_response(response).await.unwrap();
1805
1806        assert_let!(Some(ignored) = subscriber.next().await);
1807        assert_eq!(ignored, [BOB.to_string()]);
1808
1809        // Receive the same response.
1810        let response = sync_builder
1811            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1812                json!({
1813                    "content": {
1814                        "ignored_users": {
1815                            *BOB: {}
1816                        }
1817                    },
1818                    "type": "m.ignored_user_list",
1819                }),
1820            ))
1821            .build_sync_response();
1822        client.receive_sync_response(response).await.unwrap();
1823
1824        // No changes in the ignored list.
1825        assert!(subscriber.next().now_or_never().is_none());
1826
1827        // Now remove Bob from the ignored list.
1828        let response = sync_builder
1829            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1830                json!({
1831                    "content": {
1832                        "ignored_users": {}
1833                    },
1834                    "type": "m.ignored_user_list",
1835                }),
1836            ))
1837            .build_sync_response();
1838        client.receive_sync_response(response).await.unwrap();
1839
1840        assert_let!(Some(ignored) = subscriber.next().await);
1841        assert!(ignored.is_empty());
1842    }
1843}