matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29    store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, DecryptionSettings,
30    EncryptionSettings, OlmError, OlmMachine, TrustRequirement,
31};
32#[cfg(feature = "e2e-encryption")]
33use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
34#[cfg(doc)]
35use ruma::DeviceId;
36use ruma::{
37    api::client::{self as api, sync::sync_events::v5},
38    events::{
39        ignored_user_list::IgnoredUserListEventContent,
40        push_rules::{PushRulesEvent, PushRulesEventContent},
41        room::member::SyncRoomMemberEvent,
42        StateEvent, StateEventType,
43    },
44    push::Ruleset,
45    time::Instant,
46    OwnedRoomId, OwnedUserId, RoomId, UserId,
47};
48use tokio::sync::{broadcast, Mutex};
49#[cfg(feature = "e2e-encryption")]
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tracing::{debug, enabled, info, instrument, warn, Level};
52
53#[cfg(feature = "e2e-encryption")]
54use crate::RoomMemberships;
55use crate::{
56    deserialized_responses::DisplayName,
57    error::{Error, Result},
58    event_cache::store::EventCacheStoreLock,
59    response_processors::{self as processors, Context},
60    room::{
61        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
62    },
63    store::{
64        ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
65        Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
66        StateStoreDataValue, StateStoreExt, StoreConfig,
67    },
68    sync::{RoomUpdates, SyncResponse},
69    RoomStateFilter, SessionMeta,
70};
71
72/// A no (network) IO client implementation.
73///
74/// This client is a state machine that receives responses and events and
75/// accordingly updates its state. It is not designed to be used directly, but
76/// rather through `matrix_sdk::Client`.
77///
78/// ```rust
79/// use matrix_sdk_base::{store::StoreConfig, BaseClient, ThreadingSupport};
80///
81/// let client = BaseClient::new(
82///     StoreConfig::new("cross-process-holder-name".to_owned()),
83///     ThreadingSupport::Disabled,
84/// );
85/// ```
86#[derive(Clone)]
87pub struct BaseClient {
88    /// The state store.
89    pub(crate) state_store: BaseStateStore,
90
91    /// The store used by the event cache.
92    event_cache_store: EventCacheStoreLock,
93
94    /// The store used for encryption.
95    ///
96    /// This field is only meant to be used for `OlmMachine` initialization.
97    /// All operations on it happen inside the `OlmMachine`.
98    #[cfg(feature = "e2e-encryption")]
99    crypto_store: Arc<DynCryptoStore>,
100
101    /// The olm-machine that is created once the
102    /// [`SessionMeta`][crate::session::SessionMeta] is set via
103    /// [`BaseClient::activate`]
104    #[cfg(feature = "e2e-encryption")]
105    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
106
107    /// Observable of when a user is ignored/unignored.
108    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
109
110    /// A sender that is used to communicate changes to room information. Each
111    /// tick contains the room ID and the reasons that have generated this tick.
112    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
113
114    /// The strategy to use for picking recipient devices, when sending an
115    /// encrypted message.
116    #[cfg(feature = "e2e-encryption")]
117    pub room_key_recipient_strategy: CollectStrategy,
118
119    /// The settings to use for decrypting events.
120    #[cfg(feature = "e2e-encryption")]
121    pub decryption_settings: DecryptionSettings,
122
123    /// If the client should handle verification events received when syncing.
124    #[cfg(feature = "e2e-encryption")]
125    pub handle_verification_events: bool,
126
127    /// Whether the client supports threads or not.
128    pub threading_support: ThreadingSupport,
129}
130
131#[cfg(not(tarpaulin_include))]
132impl fmt::Debug for BaseClient {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("BaseClient")
135            .field("session_meta", &self.state_store.session_meta())
136            .field("sync_token", &self.state_store.sync_token)
137            .finish_non_exhaustive()
138    }
139}
140
141/// Whether this client instance supports threading or not. Currently used to
142/// determine how the client handles read receipts and unread count computations
143/// on the base SDK level.
144///
145/// Timelines on the other hand have a separate `TimelineFocus`
146/// `hide_threaded_events` associated value that can be used to hide threaded
147/// events but also to enable threaded read receipt sending. This is because
148/// certain timeline instances should ignore threading no matter what's defined
149/// at the client level. One such example are media filtered timelines which
150/// should contain all the room's media no matter what thread its in (unless
151/// explicitly opted into).
152#[derive(Clone, Copy, Debug)]
153pub enum ThreadingSupport {
154    /// Threading enabled
155    Enabled,
156    /// Threading disabled
157    Disabled,
158}
159
160impl BaseClient {
161    /// Create a new client.
162    ///
163    /// # Arguments
164    ///
165    /// * `config` - the configuration for the stores (state store, event cache
166    ///   store and crypto store).
167    pub fn new(config: StoreConfig, threading_support: ThreadingSupport) -> Self {
168        let store = BaseStateStore::new(config.state_store);
169
170        // Create the channel to receive `RoomInfoNotableUpdate`.
171        //
172        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
173        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
174        // trigger such amount of updates, it's a safe value.
175        //
176        // Also, note that it must not be
177        // zero, because (i) it will panic, (ii) a new user has no room, but can create
178        // rooms; remember that the channel's capacity is immutable.
179        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
180            broadcast::channel(500);
181
182        BaseClient {
183            state_store: store,
184            event_cache_store: config.event_cache_store,
185            #[cfg(feature = "e2e-encryption")]
186            crypto_store: config.crypto_store,
187            #[cfg(feature = "e2e-encryption")]
188            olm_machine: Default::default(),
189            ignore_user_list_changes: Default::default(),
190            room_info_notable_update_sender,
191            #[cfg(feature = "e2e-encryption")]
192            room_key_recipient_strategy: Default::default(),
193            #[cfg(feature = "e2e-encryption")]
194            decryption_settings: DecryptionSettings {
195                sender_device_trust_requirement: TrustRequirement::Untrusted,
196            },
197            #[cfg(feature = "e2e-encryption")]
198            handle_verification_events: true,
199            threading_support,
200        }
201    }
202
203    /// Clones the current base client to use the same crypto store but a
204    /// different, in-memory store config, and resets transient state.
205    #[cfg(feature = "e2e-encryption")]
206    pub async fn clone_with_in_memory_state_store(
207        &self,
208        cross_process_store_locks_holder_name: &str,
209        handle_verification_events: bool,
210    ) -> Result<Self> {
211        let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
212            .state_store(MemoryStore::new());
213        let config = config.crypto_store(self.crypto_store.clone());
214
215        let copy = Self {
216            state_store: BaseStateStore::new(config.state_store),
217            event_cache_store: config.event_cache_store,
218            // We copy the crypto store as well as the `OlmMachine` for two reasons:
219            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
220            // 2. We need to ensure that the parent and child use the same data and caches inside
221            //    the `OlmMachine` so the various ratchets and places where new randomness gets
222            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
223            //    or Olm sessions when they encrypt or decrypt messages.
224            crypto_store: self.crypto_store.clone(),
225            olm_machine: self.olm_machine.clone(),
226            ignore_user_list_changes: Default::default(),
227            room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
228            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
229            decryption_settings: self.decryption_settings.clone(),
230            handle_verification_events,
231            threading_support: self.threading_support,
232        };
233
234        copy.state_store
235            .derive_from_other(&self.state_store, &copy.room_info_notable_update_sender)
236            .await?;
237
238        Ok(copy)
239    }
240
241    /// Clones the current base client to use the same crypto store but a
242    /// different, in-memory store config, and resets transient state.
243    #[cfg(not(feature = "e2e-encryption"))]
244    #[allow(clippy::unused_async)]
245    pub async fn clone_with_in_memory_state_store(
246        &self,
247        cross_process_store_locks_holder: &str,
248        _handle_verification_events: bool,
249    ) -> Result<Self> {
250        let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
251            .state_store(MemoryStore::new());
252        Ok(Self::new(config, ThreadingSupport::Disabled))
253    }
254
255    /// Get the session meta information.
256    ///
257    /// If the client is currently logged in, this will return a
258    /// [`SessionMeta`] object which contains the user ID and device ID.
259    /// Otherwise it returns `None`.
260    pub fn session_meta(&self) -> Option<&SessionMeta> {
261        self.state_store.session_meta()
262    }
263
264    /// Get all the rooms this client knows about.
265    pub fn rooms(&self) -> Vec<Room> {
266        self.state_store.rooms()
267    }
268
269    /// Get all the rooms this client knows about, filtered by room state.
270    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
271        self.state_store.rooms_filtered(filter)
272    }
273
274    /// Get a stream of all the rooms changes, in addition to the existing
275    /// rooms.
276    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
277        self.state_store.rooms_stream()
278    }
279
280    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
281    /// yet in the store
282    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
283        self.state_store.get_or_create_room(
284            room_id,
285            room_state,
286            self.room_info_notable_update_sender.clone(),
287        )
288    }
289
290    /// Get a reference to the state store.
291    pub fn state_store(&self) -> &DynStateStore {
292        self.state_store.deref()
293    }
294
295    /// Get a reference to the event cache store.
296    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
297        &self.event_cache_store
298    }
299
300    /// Check whether the client has been activated.
301    ///
302    /// See [`BaseClient::activate`] to know what it means.
303    pub fn is_active(&self) -> bool {
304        self.state_store.session_meta().is_some()
305    }
306
307    /// Activate the client.
308    ///
309    /// A client is considered active when:
310    ///
311    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
312    /// 2. Has loaded cached data from storage,
313    /// 3. If encryption is enabled, it also initialized or restored its
314    ///    `OlmMachine`.
315    ///
316    /// # Arguments
317    ///
318    /// * `session_meta` - The meta of a session that the user already has from
319    ///   a previous login call.
320    ///
321    /// * `custom_account` - A custom
322    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
323    ///   identity and one-time keys of this [`BaseClient`]. If no account is
324    ///   provided, a new default one or one from the store will be used. If an
325    ///   account is provided and one already exists in the store for this
326    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
327    ///   useful if one wishes to create identity keys before knowing the
328    ///   user/device IDs, e.g., to use the identity key as the device ID.
329    ///
330    /// * `room_load_settings` — Specify how many rooms must be restored; use
331    ///   `::default()` if you don't know which value to pick.
332    ///
333    /// # Panics
334    ///
335    /// This method panics if it is called twice.
336    ///
337    /// [`UserId`]: ruma::UserId
338    pub async fn activate(
339        &self,
340        session_meta: SessionMeta,
341        room_load_settings: RoomLoadSettings,
342        #[cfg(feature = "e2e-encryption")] custom_account: Option<
343            crate::crypto::vodozemac::olm::Account,
344        >,
345    ) -> Result<()> {
346        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
347
348        self.state_store
349            .load_rooms(
350                &session_meta.user_id,
351                room_load_settings,
352                &self.room_info_notable_update_sender,
353            )
354            .await?;
355        self.state_store.load_sync_token().await?;
356        self.state_store.set_session_meta(session_meta);
357
358        #[cfg(feature = "e2e-encryption")]
359        self.regenerate_olm(custom_account).await?;
360
361        Ok(())
362    }
363
364    /// Recreate an `OlmMachine` from scratch.
365    ///
366    /// In particular, this will clear all its caches.
367    #[cfg(feature = "e2e-encryption")]
368    pub async fn regenerate_olm(
369        &self,
370        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
371    ) -> Result<()> {
372        tracing::debug!("regenerating OlmMachine");
373        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
374
375        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
376        // because we suspect it has stale data.
377        let olm_machine = OlmMachine::with_store(
378            &session_meta.user_id,
379            &session_meta.device_id,
380            self.crypto_store.clone(),
381            custom_account,
382        )
383        .await
384        .map_err(OlmError::from)?;
385
386        *self.olm_machine.write().await = Some(olm_machine);
387        Ok(())
388    }
389
390    /// Get the current, if any, sync token of the client.
391    /// This will be None if the client didn't sync at least once.
392    pub async fn sync_token(&self) -> Option<String> {
393        self.state_store.sync_token.read().await.clone()
394    }
395
396    /// User has knocked on a room.
397    ///
398    /// Update the internal and cached state accordingly. Return the final Room.
399    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
400        let room = self.state_store.get_or_create_room(
401            room_id,
402            RoomState::Knocked,
403            self.room_info_notable_update_sender.clone(),
404        );
405
406        if room.state() != RoomState::Knocked {
407            let _sync_lock = self.sync_lock().lock().await;
408
409            let mut room_info = room.clone_info();
410            room_info.mark_as_knocked();
411            room_info.mark_state_partially_synced();
412            room_info.mark_members_missing(); // the own member event changed
413            let mut changes = StateChanges::default();
414            changes.add_room(room_info.clone());
415            self.state_store.save_changes(&changes).await?; // Update the store
416            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
417        }
418
419        Ok(room)
420    }
421
422    /// The user has joined a room using this specific client.
423    ///
424    /// This method should be called if the user accepts an invite or if they
425    /// join a public room.
426    ///
427    /// The method will create a [`Room`] object if one does not exist yet and
428    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
429    /// object will be persisted in the cache. Please note that the [`Room`]
430    /// will be a stub until a sync has been received with the full room
431    /// state using [`BaseClient::receive_sync_response`].
432    ///
433    /// Update the internal and cached state accordingly. Return the final Room.
434    ///
435    /// # Examples
436    ///
437    /// ```rust
438    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport};
439    /// # use ruma::OwnedRoomId;
440    /// # async {
441    /// # let client = BaseClient::new(StoreConfig::new("example".to_owned()), ThreadingSupport::Disabled);
442    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
443    /// let room_id = send_join_request().await?;
444    /// let room = client.room_joined(&room_id).await?;
445    ///
446    /// assert_eq!(room.state(), RoomState::Joined);
447    /// # anyhow::Ok(()) };
448    /// ```
449    pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
450        let room = self.state_store.get_or_create_room(
451            room_id,
452            RoomState::Joined,
453            self.room_info_notable_update_sender.clone(),
454        );
455
456        // If the state isn't `RoomState::Joined` then this means that we knew about
457        // this room before. Let's modify the existing state now.
458        if room.state() != RoomState::Joined {
459            let _sync_lock = self.sync_lock().lock().await;
460
461            let mut room_info = room.clone_info();
462
463            // If our previous state was an invite and we're now in the joined state, this
464            // means that the user has explicitly accepted the invite. Let's
465            // remember when this has happened.
466            //
467            // This is somewhat of a workaround for our lack of cryptographic membership.
468            // Later on we will decide if historic room keys should be accepted
469            // based on this info. If a user has accepted an invite and we receive a room
470            // key bundle shortly after, we might accept it. If we don't do
471            // this, the homeserver could trick us into accepting any historic room key
472            // bundle.
473            if room.state() == RoomState::Invited {
474                room_info.set_invite_accepted_now();
475            }
476
477            room_info.mark_as_joined();
478            room_info.mark_state_partially_synced();
479            room_info.mark_members_missing(); // the own member event changed
480
481            let mut changes = StateChanges::default();
482            changes.add_room(room_info.clone());
483
484            self.state_store.save_changes(&changes).await?; // Update the store
485
486            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
487        }
488
489        Ok(room)
490    }
491
492    /// User has left a room.
493    ///
494    /// Update the internal and cached state accordingly.
495    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
496        let room = self.state_store.get_or_create_room(
497            room_id,
498            RoomState::Left,
499            self.room_info_notable_update_sender.clone(),
500        );
501
502        if room.state() != RoomState::Left {
503            let _sync_lock = self.sync_lock().lock().await;
504
505            let mut room_info = room.clone_info();
506            room_info.mark_as_left();
507            room_info.mark_state_partially_synced();
508            room_info.mark_members_missing(); // the own member event changed
509            let mut changes = StateChanges::default();
510            changes.add_room(room_info.clone());
511            self.state_store.save_changes(&changes).await?; // Update the store
512            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
513        }
514
515        Ok(())
516    }
517
518    /// Get access to the store's sync lock.
519    pub fn sync_lock(&self) -> &Mutex<()> {
520        self.state_store.sync_lock()
521    }
522
523    /// Receive a response from a sync call.
524    ///
525    /// # Arguments
526    ///
527    /// * `response` - The response that we received after a successful sync.
528    #[instrument(skip_all)]
529    pub async fn receive_sync_response(
530        &self,
531        response: api::sync::sync_events::v3::Response,
532    ) -> Result<SyncResponse> {
533        self.receive_sync_response_with_requested_required_states(
534            response,
535            &RequestedRequiredStates::default(),
536        )
537        .await
538    }
539
540    /// Receive a response from a sync call, with the requested required state
541    /// events.
542    ///
543    /// # Arguments
544    ///
545    /// * `response` - The response that we received after a successful sync.
546    /// * `requested_required_states` - The requested required state events.
547    pub async fn receive_sync_response_with_requested_required_states(
548        &self,
549        response: api::sync::sync_events::v3::Response,
550        requested_required_states: &RequestedRequiredStates,
551    ) -> Result<SyncResponse> {
552        // The server might respond multiple times with the same sync token, in
553        // that case we already received this response and there's nothing to
554        // do.
555        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
556            info!("Got the same sync response twice");
557            return Ok(SyncResponse::default());
558        }
559
560        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
561
562        #[cfg(feature = "e2e-encryption")]
563        let olm_machine = self.olm_machine().await;
564
565        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
566
567        #[cfg(feature = "e2e-encryption")]
568        let to_device = {
569            let processors::e2ee::to_device::Output {
570                processed_to_device_events: to_device,
571                room_key_updates,
572            } = processors::e2ee::to_device::from_sync_v2(&response, olm_machine.as_ref()).await?;
573
574            processors::latest_event::decrypt_from_rooms(
575                &mut context,
576                room_key_updates
577                    .into_iter()
578                    .flatten()
579                    .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
580                    .collect(),
581                processors::e2ee::E2EE::new(
582                    olm_machine.as_ref(),
583                    &self.decryption_settings,
584                    self.handle_verification_events,
585                ),
586            )
587            .await?;
588
589            to_device
590        };
591
592        #[cfg(not(feature = "e2e-encryption"))]
593        let to_device = response
594            .to_device
595            .events
596            .into_iter()
597            .map(|raw| {
598                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
599                    if event_type == "m.room.encrypted" {
600                        matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent::UnableToDecrypt(raw)
601                    } else {
602                        matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent::PlainText(raw)
603                    }
604                } else {
605                    matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent::Invalid(raw) // Exclude events with no type
606                }
607            })
608            .collect();
609
610        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
611
612        let global_account_data_processor =
613            processors::account_data::global(&response.account_data.events);
614
615        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
616
617        let mut room_updates = RoomUpdates::default();
618        let mut notifications = Default::default();
619
620        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
621            BTreeMap::new();
622
623        for (room_id, joined_room) in response.rooms.join {
624            let joined_room_update = processors::room::sync_v2::update_joined_room(
625                &mut context,
626                processors::room::RoomCreationData::new(
627                    &room_id,
628                    self.room_info_notable_update_sender.clone(),
629                    requested_required_states,
630                    &mut ambiguity_cache,
631                ),
632                joined_room,
633                &mut updated_members_in_room,
634                processors::notification::Notification::new(
635                    &push_rules,
636                    &mut notifications,
637                    &self.state_store,
638                ),
639                #[cfg(feature = "e2e-encryption")]
640                processors::e2ee::E2EE::new(
641                    olm_machine.as_ref(),
642                    &self.decryption_settings,
643                    self.handle_verification_events,
644                ),
645            )
646            .await?;
647
648            room_updates.joined.insert(room_id, joined_room_update);
649        }
650
651        for (room_id, left_room) in response.rooms.leave {
652            let left_room_update = processors::room::sync_v2::update_left_room(
653                &mut context,
654                processors::room::RoomCreationData::new(
655                    &room_id,
656                    self.room_info_notable_update_sender.clone(),
657                    requested_required_states,
658                    &mut ambiguity_cache,
659                ),
660                left_room,
661                processors::notification::Notification::new(
662                    &push_rules,
663                    &mut notifications,
664                    &self.state_store,
665                ),
666                #[cfg(feature = "e2e-encryption")]
667                processors::e2ee::E2EE::new(
668                    olm_machine.as_ref(),
669                    &self.decryption_settings,
670                    self.handle_verification_events,
671                ),
672            )
673            .await?;
674
675            room_updates.left.insert(room_id, left_room_update);
676        }
677
678        for (room_id, invited_room) in response.rooms.invite {
679            let invited_room_update = processors::room::sync_v2::update_invited_room(
680                &mut context,
681                &room_id,
682                invited_room,
683                self.room_info_notable_update_sender.clone(),
684                processors::notification::Notification::new(
685                    &push_rules,
686                    &mut notifications,
687                    &self.state_store,
688                ),
689            )
690            .await?;
691
692            room_updates.invited.insert(room_id, invited_room_update);
693        }
694
695        for (room_id, knocked_room) in response.rooms.knock {
696            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
697                &mut context,
698                &room_id,
699                knocked_room,
700                self.room_info_notable_update_sender.clone(),
701                processors::notification::Notification::new(
702                    &push_rules,
703                    &mut notifications,
704                    &self.state_store,
705                ),
706            )
707            .await?;
708
709            room_updates.knocked.insert(room_id, knocked_room_update);
710        }
711
712        global_account_data_processor.apply(&mut context, &self.state_store).await;
713
714        context.state_changes.presence = response
715            .presence
716            .events
717            .iter()
718            .filter_map(|e| {
719                let event = e.deserialize().ok()?;
720                Some((event.sender, e.clone()))
721            })
722            .collect();
723
724        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
725
726        {
727            let _sync_lock = self.sync_lock().lock().await;
728
729            processors::changes::save_and_apply(
730                context,
731                &self.state_store,
732                &self.ignore_user_list_changes,
733                Some(response.next_batch.clone()),
734            )
735            .await?;
736        }
737
738        let mut context = Context::default();
739
740        // Now that all the rooms information have been saved, update the display name
741        // of the updated rooms (which relies on information stored in the database).
742        processors::room::display_name::update_for_rooms(
743            &mut context,
744            &room_updates,
745            &self.state_store,
746        )
747        .await;
748
749        // Save the new display name updates if any.
750        processors::changes::save_only(context, &self.state_store).await?;
751
752        for (room_id, member_ids) in updated_members_in_room {
753            if let Some(room) = self.get_room(&room_id) {
754                let _ =
755                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
756            }
757        }
758
759        if enabled!(Level::INFO) {
760            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
761        }
762
763        let response = SyncResponse {
764            rooms: room_updates,
765            presence: response.presence.events,
766            account_data: response.account_data.events,
767            to_device,
768            notifications,
769        };
770
771        Ok(response)
772    }
773
774    /// Receive a get member events response and convert it to a deserialized
775    /// `MembersResponse`
776    ///
777    /// This client-server request must be made without filters to make sure all
778    /// members are received. Otherwise, an error is returned.
779    ///
780    /// # Arguments
781    ///
782    /// * `room_id` - The room id this response belongs to.
783    ///
784    /// * `response` - The raw response that was received from the server.
785    #[instrument(skip_all, fields(?room_id))]
786    pub async fn receive_all_members(
787        &self,
788        room_id: &RoomId,
789        request: &api::membership::get_member_events::v3::Request,
790        response: &api::membership::get_member_events::v3::Response,
791    ) -> Result<()> {
792        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
793        {
794            // This function assumes all members are loaded at once to optimise how display
795            // name disambiguation works. Using it with partial member list results
796            // would produce incorrect disambiguated display name entries
797            return Err(Error::InvalidReceiveMembersParameters);
798        }
799
800        let Some(room) = self.state_store.room(room_id) else {
801            // The room is unknown to us: leave early.
802            return Ok(());
803        };
804
805        let mut chunk = Vec::with_capacity(response.chunk.len());
806        let mut context = Context::default();
807
808        #[cfg(feature = "e2e-encryption")]
809        let mut user_ids = BTreeSet::new();
810
811        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
812
813        for raw_event in &response.chunk {
814            let member = match raw_event.deserialize() {
815                Ok(ev) => ev,
816                Err(e) => {
817                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
818                    debug!(event_id, "Failed to deserialize member event: {e}");
819                    continue;
820                }
821            };
822
823            // TODO: All the actions in this loop used to be done only when the membership
824            // event was not in the store before. This was changed with the new room API,
825            // because e.g. leaving a room makes members events outdated and they need to be
826            // fetched by `members`. Therefore, they need to be overwritten here, even
827            // if they exist.
828            // However, this makes a new problem occur where setting the member events here
829            // potentially races with the sync.
830            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
831
832            #[cfg(feature = "e2e-encryption")]
833            match member.membership() {
834                MembershipState::Join | MembershipState::Invite => {
835                    user_ids.insert(member.state_key().to_owned());
836                }
837                _ => (),
838            }
839
840            if let StateEvent::Original(e) = &member {
841                if let Some(d) = &e.content.displayname {
842                    let display_name = DisplayName::new(d);
843                    ambiguity_map
844                        .entry(display_name)
845                        .or_default()
846                        .insert(member.state_key().clone());
847                }
848            }
849
850            let sync_member: SyncRoomMemberEvent = member.clone().into();
851            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
852
853            context
854                .state_changes
855                .state
856                .entry(room_id.to_owned())
857                .or_default()
858                .entry(member.event_type())
859                .or_default()
860                .insert(member.state_key().to_string(), raw_event.clone().cast());
861            chunk.push(member);
862        }
863
864        #[cfg(feature = "e2e-encryption")]
865        processors::e2ee::tracked_users::update(
866            self.olm_machine().await.as_ref(),
867            room.encryption_state(),
868            &user_ids,
869        )
870        .await?;
871
872        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
873
874        let _sync_lock = self.sync_lock().lock().await;
875        let mut room_info = room.clone_info();
876        room_info.mark_members_synced();
877        context.state_changes.add_room(room_info);
878
879        processors::changes::save_and_apply(
880            context,
881            &self.state_store,
882            &self.ignore_user_list_changes,
883            None,
884        )
885        .await?;
886
887        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
888
889        Ok(())
890    }
891
892    /// Receive a successful filter upload response, the filter id will be
893    /// stored under the given name in the store.
894    ///
895    /// The filter id can later be retrieved with the [`get_filter`] method.
896    ///
897    ///
898    /// # Arguments
899    ///
900    /// * `filter_name` - The name that should be used to persist the filter id
901    ///   in the store.
902    ///
903    /// * `response` - The successful filter upload response containing the
904    ///   filter id.
905    ///
906    /// [`get_filter`]: #method.get_filter
907    pub async fn receive_filter_upload(
908        &self,
909        filter_name: &str,
910        response: &api::filter::create_filter::v3::Response,
911    ) -> Result<()> {
912        Ok(self
913            .state_store
914            .set_kv_data(
915                StateStoreDataKey::Filter(filter_name),
916                StateStoreDataValue::Filter(response.filter_id.clone()),
917            )
918            .await?)
919    }
920
921    /// Get the filter id of a previously uploaded filter.
922    ///
923    /// *Note*: A filter will first need to be uploaded and persisted using
924    /// [`receive_filter_upload`].
925    ///
926    /// # Arguments
927    ///
928    /// * `filter_name` - The name of the filter that was previously used to
929    ///   persist the filter.
930    ///
931    /// [`receive_filter_upload`]: #method.receive_filter_upload
932    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
933        let filter = self
934            .state_store
935            .get_kv_data(StateStoreDataKey::Filter(filter_name))
936            .await?
937            .map(|d| d.into_filter().expect("State store data not a filter"));
938
939        Ok(filter)
940    }
941
942    /// Get a to-device request that will share a room key with users in a room.
943    #[cfg(feature = "e2e-encryption")]
944    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
945        match self.olm_machine().await.as_ref() {
946            Some(o) => {
947                let Some(room) = self.get_room(room_id) else {
948                    return Err(Error::InsufficientData);
949                };
950
951                let history_visibility = room.history_visibility_or_default();
952                let Some(room_encryption_event) = room.encryption_settings() else {
953                    return Err(Error::EncryptionNotEnabled);
954                };
955
956                // Don't share the group session with members that are invited
957                // if the history visibility is set to `Joined`
958                let filter = if history_visibility == HistoryVisibility::Joined {
959                    RoomMemberships::JOIN
960                } else {
961                    RoomMemberships::ACTIVE
962                };
963
964                let members = self.state_store.get_user_ids(room_id, filter).await?;
965
966                let settings = EncryptionSettings::new(
967                    room_encryption_event,
968                    history_visibility,
969                    self.room_key_recipient_strategy.clone(),
970                );
971
972                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
973            }
974            None => panic!("Olm machine wasn't started"),
975        }
976    }
977
978    /// Get the room with the given room id.
979    ///
980    /// # Arguments
981    ///
982    /// * `room_id` - The id of the room that should be fetched.
983    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
984        self.state_store.room(room_id)
985    }
986
987    /// Forget the room with the given room ID.
988    ///
989    /// The room will be dropped from the room list and the store.
990    ///
991    /// # Arguments
992    ///
993    /// * `room_id` - The id of the room that should be forgotten.
994    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
995        // Forget the room in the state store.
996        self.state_store.forget_room(room_id).await?;
997
998        // Remove the room in the event cache store too.
999        self.event_cache_store().lock().await?.remove_room(room_id).await?;
1000
1001        Ok(())
1002    }
1003
1004    /// Get the olm machine.
1005    #[cfg(feature = "e2e-encryption")]
1006    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1007        self.olm_machine.read().await
1008    }
1009
1010    /// Get the push rules.
1011    ///
1012    /// Gets the push rules previously processed, otherwise get them from the
1013    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1014    /// is logged in.
1015    pub(crate) async fn get_push_rules(
1016        &self,
1017        global_account_data_processor: &processors::account_data::Global,
1018    ) -> Result<Ruleset> {
1019        if let Some(event) = global_account_data_processor
1020            .push_rules()
1021            .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1022        {
1023            Ok(event.content.global)
1024        } else if let Some(event) = self
1025            .state_store
1026            .get_account_data_event_static::<PushRulesEventContent>()
1027            .await?
1028            .and_then(|ev| ev.deserialize().ok())
1029        {
1030            Ok(event.content.global)
1031        } else if let Some(session_meta) = self.state_store.session_meta() {
1032            Ok(Ruleset::server_default(&session_meta.user_id))
1033        } else {
1034            Ok(Ruleset::new())
1035        }
1036    }
1037
1038    /// Returns a subscriber that publishes an event every time the ignore user
1039    /// list changes
1040    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1041        self.ignore_user_list_changes.subscribe()
1042    }
1043
1044    /// Returns a new receiver that gets future room info notable updates.
1045    ///
1046    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1047    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1048        self.room_info_notable_update_sender.subscribe()
1049    }
1050
1051    /// Checks whether the provided `user_id` belongs to an ignored user.
1052    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1053        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1054        {
1055            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1056                Ok(current_ignored_user_list) => {
1057                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1058                }
1059                Err(error) => {
1060                    warn!(?error, "Failed to deserialize the ignored user list event");
1061                    false
1062                }
1063            },
1064            Ok(None) => false,
1065            Err(error) => {
1066                warn!(?error, "Could not get the ignored user list from the state store");
1067                false
1068            }
1069        }
1070    }
1071}
1072
1073/// Represent the `required_state` values sent by a sync request.
1074///
1075/// This is useful to track what state events have been requested when handling
1076/// a response.
1077///
1078/// For example, if a sync requests the `m.room.encryption` state event, and the
1079/// server replies with nothing, if means the room **is not** encrypted. Without
1080/// knowing which state event was required by the sync, it is impossible to
1081/// interpret the absence of state event from the server as _the room's
1082/// encryption state is **not encrypted**_ or _the room's encryption state is
1083/// **unknown**_.
1084#[derive(Debug, Default)]
1085pub struct RequestedRequiredStates {
1086    default: Vec<(StateEventType, String)>,
1087    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1088}
1089
1090impl RequestedRequiredStates {
1091    /// Create a new `RequestedRequiredStates`.
1092    ///
1093    /// `default` represents the `required_state` value for all rooms.
1094    /// `for_rooms` is the `required_state` per room.
1095    pub fn new(
1096        default: Vec<(StateEventType, String)>,
1097        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1098    ) -> Self {
1099        Self { default, for_rooms }
1100    }
1101
1102    /// Get the `required_state` value for a specific room.
1103    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1104        self.for_rooms.get(room_id).unwrap_or(&self.default)
1105    }
1106}
1107
1108impl From<&v5::Request> for RequestedRequiredStates {
1109    fn from(request: &v5::Request) -> Self {
1110        // The following information is missing in the MSC4186 at the time of writing
1111        // (2025-03-12) but: the `required_state`s from all lists and from all room
1112        // subscriptions are combined by doing an union.
1113        //
1114        // Thus, we can do the same here, put the union in `default` and keep
1115        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1116        let mut default = BTreeSet::new();
1117
1118        for list in request.lists.values() {
1119            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1120        }
1121
1122        for room_subscription in request.room_subscriptions.values() {
1123            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1124        }
1125
1126        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use std::collections::HashMap;
1133
1134    use assert_matches2::assert_let;
1135    use futures_util::FutureExt as _;
1136    use matrix_sdk_test::{
1137        async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1138        LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1139    };
1140    use ruma::{
1141        api::client::{self as api, sync::sync_events::v5},
1142        event_id,
1143        events::{room::member::MembershipState, StateEventType},
1144        room_id,
1145        serde::Raw,
1146        user_id,
1147    };
1148    use serde_json::{json, value::to_raw_value};
1149
1150    use super::{BaseClient, RequestedRequiredStates};
1151    use crate::{
1152        client::ThreadingSupport,
1153        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1154        test_utils::logged_in_base_client,
1155        RoomDisplayName, RoomState, SessionMeta,
1156    };
1157
1158    #[test]
1159    fn test_requested_required_states() {
1160        let room_id_0 = room_id!("!r0");
1161        let room_id_1 = room_id!("!r1");
1162
1163        let requested_required_states = RequestedRequiredStates::new(
1164            vec![(StateEventType::RoomAvatar, "".to_owned())],
1165            HashMap::from([(
1166                room_id_0.to_owned(),
1167                vec![
1168                    (StateEventType::RoomMember, "foo".to_owned()),
1169                    (StateEventType::RoomEncryption, "".to_owned()),
1170                ],
1171            )]),
1172        );
1173
1174        // A special set of state events exists for `room_id_0`.
1175        assert_eq!(
1176            requested_required_states.for_room(room_id_0),
1177            &[
1178                (StateEventType::RoomMember, "foo".to_owned()),
1179                (StateEventType::RoomEncryption, "".to_owned()),
1180            ]
1181        );
1182
1183        // No special list for `room_id_1`, it should return the defaults.
1184        assert_eq!(
1185            requested_required_states.for_room(room_id_1),
1186            &[(StateEventType::RoomAvatar, "".to_owned()),]
1187        );
1188    }
1189
1190    #[test]
1191    fn test_requested_required_states_from_sync_v5_request() {
1192        let room_id_0 = room_id!("!r0");
1193        let room_id_1 = room_id!("!r1");
1194
1195        // Empty request.
1196        let mut request = v5::Request::new();
1197
1198        {
1199            let requested_required_states = RequestedRequiredStates::from(&request);
1200
1201            assert!(requested_required_states.default.is_empty());
1202            assert!(requested_required_states.for_rooms.is_empty());
1203        }
1204
1205        // One list.
1206        request.lists.insert("foo".to_owned(), {
1207            let mut list = v5::request::List::default();
1208            list.room_details.required_state = vec![
1209                (StateEventType::RoomAvatar, "".to_owned()),
1210                (StateEventType::RoomEncryption, "".to_owned()),
1211            ];
1212
1213            list
1214        });
1215
1216        {
1217            let requested_required_states = RequestedRequiredStates::from(&request);
1218
1219            assert_eq!(
1220                requested_required_states.default,
1221                &[
1222                    (StateEventType::RoomAvatar, "".to_owned()),
1223                    (StateEventType::RoomEncryption, "".to_owned())
1224                ]
1225            );
1226            assert!(requested_required_states.for_rooms.is_empty());
1227        }
1228
1229        // Two lists.
1230        request.lists.insert("bar".to_owned(), {
1231            let mut list = v5::request::List::default();
1232            list.room_details.required_state = vec![
1233                (StateEventType::RoomEncryption, "".to_owned()),
1234                (StateEventType::RoomName, "".to_owned()),
1235            ];
1236
1237            list
1238        });
1239
1240        {
1241            let requested_required_states = RequestedRequiredStates::from(&request);
1242
1243            // Union of the state events.
1244            assert_eq!(
1245                requested_required_states.default,
1246                &[
1247                    (StateEventType::RoomAvatar, "".to_owned()),
1248                    (StateEventType::RoomEncryption, "".to_owned()),
1249                    (StateEventType::RoomName, "".to_owned()),
1250                ]
1251            );
1252            assert!(requested_required_states.for_rooms.is_empty());
1253        }
1254
1255        // One room subscription.
1256        request.room_subscriptions.insert(room_id_0.to_owned(), {
1257            let mut room_subscription = v5::request::RoomSubscription::default();
1258
1259            room_subscription.required_state = vec![
1260                (StateEventType::RoomJoinRules, "".to_owned()),
1261                (StateEventType::RoomEncryption, "".to_owned()),
1262            ];
1263
1264            room_subscription
1265        });
1266
1267        {
1268            let requested_required_states = RequestedRequiredStates::from(&request);
1269
1270            // Union of state events, all in `default`, still nothing in `for_rooms`.
1271            assert_eq!(
1272                requested_required_states.default,
1273                &[
1274                    (StateEventType::RoomAvatar, "".to_owned()),
1275                    (StateEventType::RoomEncryption, "".to_owned()),
1276                    (StateEventType::RoomJoinRules, "".to_owned()),
1277                    (StateEventType::RoomName, "".to_owned()),
1278                ]
1279            );
1280            assert!(requested_required_states.for_rooms.is_empty());
1281        }
1282
1283        // Two room subscriptions.
1284        request.room_subscriptions.insert(room_id_1.to_owned(), {
1285            let mut room_subscription = v5::request::RoomSubscription::default();
1286
1287            room_subscription.required_state = vec![
1288                (StateEventType::RoomName, "".to_owned()),
1289                (StateEventType::RoomTopic, "".to_owned()),
1290            ];
1291
1292            room_subscription
1293        });
1294
1295        {
1296            let requested_required_states = RequestedRequiredStates::from(&request);
1297
1298            // Union of state events, all in `default`, still nothing in `for_rooms`.
1299            assert_eq!(
1300                requested_required_states.default,
1301                &[
1302                    (StateEventType::RoomAvatar, "".to_owned()),
1303                    (StateEventType::RoomEncryption, "".to_owned()),
1304                    (StateEventType::RoomJoinRules, "".to_owned()),
1305                    (StateEventType::RoomName, "".to_owned()),
1306                    (StateEventType::RoomTopic, "".to_owned()),
1307                ]
1308            );
1309        }
1310    }
1311
1312    #[async_test]
1313    async fn test_invite_after_leaving() {
1314        let user_id = user_id!("@alice:example.org");
1315        let room_id = room_id!("!test:example.org");
1316
1317        let client = logged_in_base_client(Some(user_id)).await;
1318
1319        let mut sync_builder = SyncResponseBuilder::new();
1320
1321        let response = sync_builder
1322            .add_left_room(
1323                LeftRoomBuilder::new(room_id).add_timeline_event(
1324                    EventFactory::new()
1325                        .member(user_id)
1326                        .membership(MembershipState::Leave)
1327                        .display_name("Alice")
1328                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1329                ),
1330            )
1331            .build_sync_response();
1332        client.receive_sync_response(response).await.unwrap();
1333        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1334
1335        let response = sync_builder
1336            .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1337                StrippedStateTestEvent::Custom(json!({
1338                    "content": {
1339                        "displayname": "Alice",
1340                        "membership": "invite",
1341                    },
1342                    "event_id": "$143273582443PhrSn:example.org",
1343                    "origin_server_ts": 1432735824653u64,
1344                    "sender": "@example:example.org",
1345                    "state_key": user_id,
1346                    "type": "m.room.member",
1347                })),
1348            ))
1349            .build_sync_response();
1350        client.receive_sync_response(response).await.unwrap();
1351        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1352    }
1353
1354    #[async_test]
1355    async fn test_invite_displayname() {
1356        let user_id = user_id!("@alice:example.org");
1357        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1358
1359        let client = logged_in_base_client(Some(user_id)).await;
1360
1361        let response = ruma_response_from_json(&json!({
1362            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1363            "device_one_time_keys_count": {
1364                "signed_curve25519": 50u64
1365            },
1366            "device_unused_fallback_key_types": [
1367                "signed_curve25519"
1368            ],
1369            "rooms": {
1370                "invite": {
1371                    "!ithpyNKDtmhneaTQja:example.org": {
1372                        "invite_state": {
1373                            "events": [
1374                                {
1375                                    "content": {
1376                                        "creator": "@test:example.org",
1377                                        "room_version": "9"
1378                                    },
1379                                    "sender": "@test:example.org",
1380                                    "state_key": "",
1381                                    "type": "m.room.create"
1382                                },
1383                                {
1384                                    "content": {
1385                                        "join_rule": "invite"
1386                                    },
1387                                    "sender": "@test:example.org",
1388                                    "state_key": "",
1389                                    "type": "m.room.join_rules"
1390                                },
1391                                {
1392                                    "content": {
1393                                        "algorithm": "m.megolm.v1.aes-sha2"
1394                                    },
1395                                    "sender": "@test:example.org",
1396                                    "state_key": "",
1397                                    "type": "m.room.encryption"
1398                                },
1399                                {
1400                                    "content": {
1401                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1402                                        "displayname": "Kyra",
1403                                        "membership": "join"
1404                                    },
1405                                    "sender": "@test:example.org",
1406                                    "state_key": "@test:example.org",
1407                                    "type": "m.room.member"
1408                                },
1409                                {
1410                                    "content": {
1411                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1412                                        "displayname": "alice",
1413                                        "is_direct": true,
1414                                        "membership": "invite"
1415                                    },
1416                                    "origin_server_ts": 1650878657984u64,
1417                                    "sender": "@test:example.org",
1418                                    "state_key": "@alice:example.org",
1419                                    "type": "m.room.member",
1420                                    "unsigned": {
1421                                        "age": 14u64
1422                                    },
1423                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1424                                }
1425                            ]
1426                        }
1427                    }
1428                }
1429            }
1430        }));
1431
1432        client.receive_sync_response(response).await.unwrap();
1433
1434        let room = client.get_room(room_id).expect("Room not found");
1435        assert_eq!(room.state(), RoomState::Invited);
1436        assert_eq!(
1437            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1438            RoomDisplayName::Calculated("Kyra".to_owned())
1439        );
1440    }
1441
1442    #[async_test]
1443    async fn test_deserialization_failure() {
1444        let user_id = user_id!("@alice:example.org");
1445        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1446
1447        let client = BaseClient::new(
1448            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1449            ThreadingSupport::Disabled,
1450        );
1451        client
1452            .activate(
1453                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1454                RoomLoadSettings::default(),
1455                #[cfg(feature = "e2e-encryption")]
1456                None,
1457            )
1458            .await
1459            .unwrap();
1460
1461        let response = ruma_response_from_json(&json!({
1462            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1463            "rooms": {
1464                "join": {
1465                    "!ithpyNKDtmhneaTQja:example.org": {
1466                        "state": {
1467                            "events": [
1468                                {
1469                                    "invalid": "invalid",
1470                                },
1471                                {
1472                                    "content": {
1473                                        "name": "The room name"
1474                                    },
1475                                    "event_id": "$143273582443PhrSn:example.org",
1476                                    "origin_server_ts": 1432735824653u64,
1477                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1478                                    "sender": "@example:example.org",
1479                                    "state_key": "",
1480                                    "type": "m.room.name",
1481                                    "unsigned": {
1482                                        "age": 1234
1483                                    }
1484                                },
1485                            ]
1486                        }
1487                    }
1488                }
1489            }
1490        }));
1491
1492        client.receive_sync_response(response).await.unwrap();
1493        client
1494            .state_store()
1495            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1496            .await
1497            .expect("Failed to fetch state event")
1498            .expect("State event not found")
1499            .deserialize()
1500            .expect("Failed to deserialize state event");
1501    }
1502
1503    #[async_test]
1504    async fn test_invited_members_arent_ignored() {
1505        let user_id = user_id!("@alice:example.org");
1506        let inviter_user_id = user_id!("@bob:example.org");
1507        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1508
1509        let client = BaseClient::new(
1510            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1511            ThreadingSupport::Disabled,
1512        );
1513        client
1514            .activate(
1515                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1516                RoomLoadSettings::default(),
1517                #[cfg(feature = "e2e-encryption")]
1518                None,
1519            )
1520            .await
1521            .unwrap();
1522
1523        // Preamble: let the SDK know about the room.
1524        let mut sync_builder = SyncResponseBuilder::new();
1525        let response = sync_builder
1526            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1527            .build_sync_response();
1528        client.receive_sync_response(response).await.unwrap();
1529
1530        // When I process the result of a /members request that only contains an invited
1531        // member,
1532        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1533
1534        let raw_member_event = json!({
1535            "content": {
1536                "avatar_url": "mxc://localhost/fewjilfewjil42",
1537                "displayname": "Invited Alice",
1538                "membership": "invite"
1539            },
1540            "event_id": "$151800140517rfvjc:localhost",
1541            "origin_server_ts": 151800140,
1542            "room_id": room_id,
1543            "sender": inviter_user_id,
1544            "state_key": user_id,
1545            "type": "m.room.member",
1546            "unsigned": {
1547                "age": 13374242,
1548            }
1549        });
1550        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1551            to_raw_value(&raw_member_event).unwrap(),
1552        )]);
1553
1554        // It's correctly processed,
1555        client.receive_all_members(room_id, &request, &response).await.unwrap();
1556
1557        let room = client.get_room(room_id).unwrap();
1558
1559        // And I can get the invited member display name and avatar.
1560        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1561
1562        assert_eq!(member.user_id(), user_id);
1563        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1564        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1565    }
1566
1567    #[async_test]
1568    async fn test_reinvited_members_get_a_display_name() {
1569        let user_id = user_id!("@alice:example.org");
1570        let inviter_user_id = user_id!("@bob:example.org");
1571        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1572
1573        let client = BaseClient::new(
1574            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1575            ThreadingSupport::Disabled,
1576        );
1577        client
1578            .activate(
1579                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1580                RoomLoadSettings::default(),
1581                #[cfg(feature = "e2e-encryption")]
1582                None,
1583            )
1584            .await
1585            .unwrap();
1586
1587        // Preamble: let the SDK know about the room, and that the invited user left it.
1588        let mut sync_builder = SyncResponseBuilder::new();
1589        let response = sync_builder
1590            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1591                StateTestEvent::Custom(json!({
1592                    "content": {
1593                        "avatar_url": null,
1594                        "displayname": null,
1595                        "membership": "leave"
1596                    },
1597                    "event_id": "$151803140217rkvjc:localhost",
1598                    "origin_server_ts": 151800139,
1599                    "room_id": room_id,
1600                    "sender": user_id,
1601                    "state_key": user_id,
1602                    "type": "m.room.member",
1603                })),
1604            ))
1605            .build_sync_response();
1606        client.receive_sync_response(response).await.unwrap();
1607
1608        // Now, say that the user has been re-invited.
1609        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1610
1611        let raw_member_event = json!({
1612            "content": {
1613                "avatar_url": "mxc://localhost/fewjilfewjil42",
1614                "displayname": "Invited Alice",
1615                "membership": "invite"
1616            },
1617            "event_id": "$151800140517rfvjc:localhost",
1618            "origin_server_ts": 151800140,
1619            "room_id": room_id,
1620            "sender": inviter_user_id,
1621            "state_key": user_id,
1622            "type": "m.room.member",
1623            "unsigned": {
1624                "age": 13374242,
1625            }
1626        });
1627        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1628            to_raw_value(&raw_member_event).unwrap(),
1629        )]);
1630
1631        // It's correctly processed,
1632        client.receive_all_members(room_id, &request, &response).await.unwrap();
1633
1634        let room = client.get_room(room_id).unwrap();
1635
1636        // And I can get the invited member display name and avatar.
1637        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1638
1639        assert_eq!(member.user_id(), user_id);
1640        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1641        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1642    }
1643
1644    #[async_test]
1645    async fn test_ignored_user_list_changes() {
1646        let user_id = user_id!("@alice:example.org");
1647        let client = BaseClient::new(
1648            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1649            ThreadingSupport::Disabled,
1650        );
1651
1652        client
1653            .activate(
1654                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1655                RoomLoadSettings::default(),
1656                #[cfg(feature = "e2e-encryption")]
1657                None,
1658            )
1659            .await
1660            .unwrap();
1661
1662        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1663        assert!(subscriber.next().now_or_never().is_none());
1664
1665        let mut sync_builder = SyncResponseBuilder::new();
1666        let response = sync_builder
1667            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1668                json!({
1669                    "content": {
1670                        "ignored_users": {
1671                            *BOB: {}
1672                        }
1673                    },
1674                    "type": "m.ignored_user_list",
1675                }),
1676            ))
1677            .build_sync_response();
1678        client.receive_sync_response(response).await.unwrap();
1679
1680        assert_let!(Some(ignored) = subscriber.next().await);
1681        assert_eq!(ignored, [BOB.to_string()]);
1682
1683        // Receive the same response.
1684        let response = sync_builder
1685            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1686                json!({
1687                    "content": {
1688                        "ignored_users": {
1689                            *BOB: {}
1690                        }
1691                    },
1692                    "type": "m.ignored_user_list",
1693                }),
1694            ))
1695            .build_sync_response();
1696        client.receive_sync_response(response).await.unwrap();
1697
1698        // No changes in the ignored list.
1699        assert!(subscriber.next().now_or_never().is_none());
1700
1701        // Now remove Bob from the ignored list.
1702        let response = sync_builder
1703            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1704                json!({
1705                    "content": {
1706                        "ignored_users": {}
1707                    },
1708                    "type": "m.ignored_user_list",
1709                }),
1710            ))
1711            .build_sync_response();
1712        client.receive_sync_response(response).await.unwrap();
1713
1714        assert_let!(Some(ignored) = subscriber.next().await);
1715        assert!(ignored.is_empty());
1716    }
1717
1718    #[async_test]
1719    async fn test_is_user_ignored() {
1720        let ignored_user_id = user_id!("@alice:example.org");
1721        let client = logged_in_base_client(None).await;
1722
1723        let mut sync_builder = SyncResponseBuilder::new();
1724        let response = sync_builder
1725            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1726                json!({
1727                    "content": {
1728                        "ignored_users": {
1729                            ignored_user_id: {}
1730                        }
1731                    },
1732                    "type": "m.ignored_user_list",
1733                }),
1734            ))
1735            .build_sync_response();
1736        client.receive_sync_response(response).await.unwrap();
1737
1738        assert!(client.is_user_ignored(ignored_user_id).await);
1739    }
1740
1741    #[async_test]
1742    async fn test_joined_at_timestamp_is_set() {
1743        let client = logged_in_base_client(None).await;
1744        let invited_room_id = room_id!("!invited:localhost");
1745        let unknown_room_id = room_id!("!unknown:localhost");
1746
1747        let mut sync_builder = SyncResponseBuilder::new();
1748        let response = sync_builder
1749            .add_invited_room(InvitedRoomBuilder::new(invited_room_id))
1750            .build_sync_response();
1751        client.receive_sync_response(response).await.unwrap();
1752
1753        // Let us first check the initial state, we should have a room in the invite
1754        // state.
1755        let invited_room = client
1756            .get_room(invited_room_id)
1757            .expect("The sync should have created a room in the invited state");
1758
1759        assert_eq!(invited_room.state(), RoomState::Invited);
1760        assert!(invited_room.inner.get().invite_accepted_at().is_none());
1761
1762        // Now we join the room.
1763        let joined_room = client
1764            .room_joined(invited_room_id)
1765            .await
1766            .expect("We should be able to mark a room as joined");
1767
1768        // Yup, there's a timestamp now.
1769        assert_eq!(joined_room.state(), RoomState::Joined);
1770        assert!(joined_room.inner.get().invite_accepted_at().is_some());
1771
1772        // If we didn't know about the room before the join, we assume that there wasn't
1773        // an invite and we don't record the timestamp.
1774        assert!(client.get_room(unknown_room_id).is_none());
1775        let unknown_room = client
1776            .room_joined(unknown_room_id)
1777            .await
1778            .expect("We should be able to mark a room as joined");
1779
1780        assert_eq!(unknown_room.state(), RoomState::Joined);
1781        assert!(unknown_room.inner.get().invite_accepted_at().is_none());
1782    }
1783}